欢迎光临散文网 会员登陆 & 注册

epoll

2023-03-01 00:26 作者:小粉丝8  | 我要投稿

//server.cpp

// CXXFLAGS = -std=c++2a

#include <sys/epoll.h>

#include <stdio.h>

#include <sys/socket.h>

#include <netinet/in.h>

#include <stdlib.h>

#include <string.h>

#include <fcntl.h>

#include <arpa/inet.h>

#include <unistd.h>

#include <errno.h>

#include <signal.h>

#include <thread>

#include <mutex>

#include <string>

#include <vector>

#define PRINT_LOG(fmt, ...) do \

{ \

  printf("%s:%d:%s: " fmt "\n", \

      __FILE__, __LINE__, __func__, ##__VA_ARGS__); \

} while (0) /* ; no trailing semicolon here */

#define PERROR(fmt, ...) do \

{ \

  printf("%s:%d:%s: %s. " fmt "\n", \

      __FILE__, __LINE__, __func__, strerror(errno), ##__VA_ARGS__); \

} while (0) /* ; no trailing semicolon here */

int efd;

std::vector<int> vfd;

std::mutex lock;

int time_to_write = 0;

int set_nonblock(int fd)

{

  int flags = fcntl(fd, F_GETFL);

  if (flags == -1) {

    PERROR("fcntl");

    return -1;

  }

  flags |= O_NONBLOCK;

  if (fcntl(fd, F_SETFL, flags) == -1) {

    PERROR("fcntl");

    return -1;

  }

  return 0;

}

void update_events()

{

  struct epoll_event ev;

  int ret = 0;

  int err = 0;

  for (;;){

    sleep(1); //test

    time_to_write = !time_to_write;

    std::lock_guard<std::mutex> guard(lock);

    // PRINT_LOG("vfd: %lu, time_to_write: %d", vfd.size(), time_to_write);

    for (auto fd : vfd){

      // PRINT_LOG("fd: %d, time_to_write: %d", fd, time_to_write);

      ev.events = EPOLLIN | EPOLLET;

      ev.data.fd = fd;

      if (time_to_write){

        ev.events = EPOLLOUT | EPOLLET;

      }

      ret = epoll_ctl(efd, EPOLL_CTL_MOD, fd, &ev);

      err = errno;

      if (err == EBADF){

        continue;

      }

      if (ret == -1) {

        PERROR("epoll_ctl");

      }

    }

  }

}

int main()

{

  struct epoll_event ev, events[10];

  int listenfd, connfd, nfds;

  /* set up listening socket */

  int port = 8000;

  int ret = 0;

  struct sockaddr_in my_addr, peer_addr;

  socklen_t addrlen = sizeof(peer_addr);

  int sockopt = 1;

  int seq = 0;

  memset(&my_addr, 0, sizeof(my_addr));

  my_addr.sin_port = htons(port);

  my_addr.sin_addr.s_addr = htons(INADDR_ANY);

  my_addr.sin_family = AF_INET;

  listenfd = socket(AF_INET, SOCK_STREAM, 0);

  if (listenfd == -1) {

    PERROR("socket");

    return -1;

  }

  ret = setsockopt(listenfd, SOL_SOCKET, SO_REUSEADDR, &sockopt,

                              sizeof(sockopt));

  if (ret == -1){

    PERROR("setsockopt");

    close(listenfd);

    return -1;

  }

  ret = bind(listenfd, (struct sockaddr*)&my_addr, sizeof(my_addr));

  if (ret == -1) {

    PERROR("bind");

    close(listenfd);

    return -1;

  }

  ret = listen(listenfd, SOMAXCONN);

  if (ret == -1) {

    PERROR("listen");

    close(listenfd);

    return -1;

  }

  PRINT_LOG("listen on port: %d", port);

  signal(SIGPIPE, SIG_IGN);

  /* set up epoll */

  efd = epoll_create1(0);

  if (efd == -1) {

    PERROR("epoll_create1");

    return -1;

  }

  ev.events = EPOLLIN;

  ev.data.fd = listenfd;

  if (epoll_ctl(efd, EPOLL_CTL_ADD, listenfd, &ev) == -1) {

    PERROR("epoll_ctl");

    return -1;

  }

  PRINT_LOG("waiting for data input from clients");

  std::thread(update_events).detach();

  for (;;) {

    nfds = epoll_wait(efd, events, sizeof(events) / sizeof(events[0]),

                                1000); //1 sec

    // PRINT_LOG("epoll_wait");

    if (nfds == -1) {

      PERROR("epoll_wait");

      for (int i = 0; i != sizeof(events) / sizeof(events[0]); i++){

        PRINT_LOG();

        close(events[i].data.fd);

      }

      return -1;

    }

    for (int i = 0; i < nfds; i++) {

      if (events[i].data.fd == listenfd) {

        connfd = accept(listenfd,

                (struct sockaddr *) &peer_addr, &addrlen);

        if (connfd == -1) {

          PERROR("accept");

          continue;

        }

        if (set_nonblock(connfd)){

          PRINT_LOG("set_nonblock failed");

          continue;

        }

        PRINT_LOG("accept client %s:%u", inet_ntoa(peer_addr.sin_addr),

                              peer_addr.sin_port);

        ev.events = EPOLLIN | EPOLLET;

        ev.data.fd = connfd;

        if (epoll_ctl(efd, EPOLL_CTL_ADD, connfd, &ev) == -1) {

          PERROR("epoll_ctl");

        }

        {

          std::lock_guard<std::mutex> guard(lock);

          vfd.push_back(connfd);

        }

      } else if (events[i].events & (EPOLLHUP | EPOLLERR)) {

        int fd = events[i].data.fd;

        close(fd);

        {

          std::lock_guard<std::mutex> guard(lock);

          std::erase(vfd, fd); // -std=c++2a

        }

        PRINT_LOG("EPOLLHUP | EPOLLERR");

      } else if (events[i].events & EPOLLIN) {

        int fd = events[i].data.fd;

        enum {count = 1024};

        char buf[count + 1] = {'\0'}, *p = buf;

        size_t len = 0;

        while (len < count){

          ret = read(fd, p, count - len);

          int err = errno;

          if (ret > 0){

            len += ret;

            p += ret;

          }

          if (ret == 0){

            close(fd);

            {

              std::lock_guard<std::mutex> guard(lock);

              std::erase(vfd, fd);

            }

            break;

          }

          if (err == EAGAIN || err == EWOULDBLOCK || err == EPIPE ||

                              err == ECONNRESET){

            break;

          }

          if (err == EINTR){

            continue;

          }

          if (err != 0){

            PRINT_LOG("ret: %d, err: %d, %s", ret, err,

                                strerror(err));

          }

        }

        if (len > 0) {

          PRINT_LOG("%s", buf);

        }

      } else if (events[i].events & EPOLLOUT){

        int fd = events[i].data.fd;

        enum {count = 1024};

        char buf[count + 1] = {'\0'}, *p = buf;

        std::string msg = "hello client " + std::to_string(seq++);

        strncpy(buf, msg.c_str(), count);

        size_t len = 0;

        while (len < count){

          ret = write(fd, p, count - len);

          int err = errno;

          if (ret > 0){

            len += ret;

            p += ret;

          }

          if (ret == 0){

            close(fd);

            {

              std::lock_guard<std::mutex> guard(lock);

              std::erase(vfd, fd);

            }

            break;

          }

          if (err == EAGAIN || err == EWOULDBLOCK || err == EPIPE ||

                              err == ECONNRESET){

            break;

          }

          if (err == EINTR){

            continue;

          }

          if (err != 0){

            PRINT_LOG("ret: %d, err: %d, %s", ret, err,

                                strerror(err));

          }

        }

      } else {

        PRINT_LOG("unknown event");

        int fd = events[i].data.fd;

        close(fd);

        {

          std::lock_guard<std::mutex> guard(lock);

          std::erase(vfd, fd);

        }

      }

    }

  }

  return 0;

}

---

//client.cpp

#include <sys/socket.h>

#include <unistd.h>

#include <fcntl.h>

#include <stdio.h>

#include <string.h>

#include <errno.h>

#include <stdlib.h>

#include <arpa/inet.h>

#include <signal.h>

#include <string>

#define PRINT_LOG(fmt, ...) do \

{ \

  printf("%s:%d:%s: " fmt "\n", \

      __FILE__, __LINE__, __func__, ##__VA_ARGS__); \

} while (0) /* ; no trailing semicolon here */

#define PERROR(fmt, ...) do \

{ \

  printf("%s:%d:%s: %s. " fmt "\n", \

      __FILE__, __LINE__, __func__, strerror(errno), ##__VA_ARGS__); \

} while (0) /* ; no trailing semicolon here */

int set_nonblock(int fd)

{

  int flags = fcntl(fd, F_GETFL);

  if (flags == -1) {

    PERROR("fcntl");

    return -1;

  }

  flags |= O_NONBLOCK;

  if (fcntl(fd, F_SETFL, flags) == -1) {

    PERROR("fcntl");

    return -1;

  }

  return 0;

}

int main(int argc, char *argv[])

{

  int fd;

  int ret;

  struct sockaddr_in addr;

  const char *host;

  int port;

  int seq = 0;

  if (argc != 4) {

    PRINT_LOG("Usage: %s <host> <port> <msg>\n", argv[0]);

    return 1;

  }

  host = argv[1];

  port = atoi(argv[2]);

  fd = socket(AF_INET, SOCK_STREAM, 0);

  if (fd == -1){

    PERROR("socket");

    return -1;

  }

  memset(&addr, 0, sizeof(addr));

  addr.sin_port = htons(port);

  addr.sin_family = AF_INET;

  ret = inet_aton(host, (struct in_addr*)&addr.sin_addr.s_addr);

  if (ret == 0){

    PRINT_LOG("inet_aton failed");

    return -1;

  }

  ret = connect(fd, (struct sockaddr*)&addr, sizeof(addr));

  if (ret == -1){

    PERROR("connect");

    close(fd);

    return -1;

  }

  PRINT_LOG("connect to server %s:%d", host, port);

  set_nonblock(fd);

  // signal(SIGPIPE, SIG_IGN);

  for(;;){

    //write

    sleep(1); //test

    std::string msg = argv[3] + std::to_string(seq++);

    ret = write(fd, msg.c_str(), msg.size()); //

    //read

    enum {count = 1024};

    char buf[count + 1] = {'\0'};

    ret = read(fd, buf, count); //

    if (ret > 0){

      PRINT_LOG("%s", buf);

    }

  }

  ret = close(fd);

  return 0;

}


epoll的评论 (共 条)

分享到微博请遵守国家法律