浅谈 Select & Epoll 多路复用技术

传统阻塞 Socket 编程

传统的并发通信是如何进行的?

阻塞模式下使用阻塞模式下的多进程 + recv/send 进行通信。

  • 【主进程】依次调用 socket 创建和初始化 socket,调用 bind 将 socket 关联到 ip:port 二元组,调用 listen 开始被动监听。

  • 【主进程】然后开一个循环反复阻塞在 accept 上等待连接,一旦连接建立就通过 fork() 创建子进程

  • 【子进程】在子进程中通过 recvsend 和客户端通信。需要注意的是 TCP 是基于字节流的,因此需要在设计应用层协议时约定消息边界。往往需要在子进程中设计好状态机来处理。

  • 【主进程】通过 sigactionsignal(SIGCHLD, SIG_IGN) 处理子进程结束信号,避免僵尸进程。

send 调用将用户 buf 复制到内核 sndbuf,实际发送由协议层完成,如果失败会在下一个 socket 函数报错.

recv 调用实质只从内核 rcvbuf 复制数据,实际接收由协议完成。

如何知道 socket 连接断开?

可以用 setsockopt 设置超时时间。如果出现问题,recv 会返回 -1

非阻塞 Socket 编程

非阻塞模式下,只需要一个线程,从而大大减轻了运行开销。一个线程能在(宏观上)同时处理大量 socketfd,这是一种多路复用。

多路复用有 select,poll,epoll 三种方式。

而 Select 需要完整遍历,且 fdset 本质是位图,容量小。Poll 利用 pollfd 数组解决容量问题。二者都需要遍历 fd。

Epoll 解决低效遍历问题。只需要直接管理一个 fd,而各个 socketfd 通过内部的红黑树管理,利用事件监听

Select 、Poll 多路复用

  • select 方式:监视 writefds、readfds、和 exceptfds 三类 fdset。以读为例,调用 select 后,内核会遍历检查各个 fd 是否可读。三类描述符状态变化,或者超时,则将 fds 集合拷贝到用户内存,并唤醒用户进程。

  • poll 方式:采用 pollfd 描述 fd 集合,解决了 fdset 的大小限制。

问题总结:

  1. 用户态需要开个循环反复调用

  2. 反复调用导致反复传参:每次调用都要把 fdsets 从用户空间拷贝到内核空间

  3. 处理能力有限:只能监听有限个 fd

  4. 用户每次都只能挨个遍历每个 socket 来收集可读事件

这俩了解即可,只是作为历史的尘埃和反面教材罢了。重点是 epoll。

EPoll 方式

基础了解:

epoll 方式:采用 epoll 对象管理 fd。内部采用红黑树管理。按需得到 events,解决低效问题。提供三个函数:

  • epoll_create:创建一个 epoll 句柄

  • epoll_ctl:向 epoll 对象中添加/修改/删除要管理的连接

  • epoll_wait:等待其管理的连接上的 IO 事件

    • 按需拷贝:fd 首次调用 epoll_ctl 拷贝,每次调用 epoll_wait 不拷贝

    • 无限大小:其内部有 wq(等待队列链表)、rbr(红黑树,索引 socket)、rdllist(就绪 fd 链表)

    • 回调通知:不需要遍历所有 fd

  • epoll 水平触发和边缘触发

    • LT:只要内核缓冲区有数据就一直通知

    • ET:只有状态发生变化才通知,只有当 socket 由不可写可写或由不可读可读,才会返回其sockfd

Epoll 使用实践

API 和相关结构

1#include <sys/epoll.h>
2
3int epoll_create(int size);
  • 作用:创建一个 epoll 的句柄,size 用来告诉内核这个监听的数目一共有多大。

  • 注意:使用完 epoll 后,必须调用 close() 关闭,否则可能导致 fd 被耗尽。

  • 参数:

    • size:监听 socket 的数目
  • 返回值:

    • 如果成功,返回 epoll fd 句柄,否则返回 -1
1#include <sys/epoll.h>
2
3int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
  • 作用:epoll 的事件注册函数,epoll_ctl 向 epoll 对象中添加、修改或者删除感兴趣的事件

  • 参数:

    • epfd:epoll 句柄

    • op:操作类型,EPOLL_CTL_ADD 添加、EPOLL_CTL_MOD 修改、EPOLL_CTL_DEL 删除

    • fd:要监听的 fd

    • event:监听事件,是告诉内核需要监听什么事。包含了要监听的 fd 的事件类型,以及回调函数

  • 返回值:返回 0 表示成功,返回 - 1 表示失败

epoll_event 和 epoll_event 结构体:

 1typedef union epoll_data {
 2    void *ptr;
 3    int fd;
 4    __uint32_t u32;
 5    __uint64_t u64;
 6} epoll_data_t;
 7
 8struct epoll_event {
 9    __uint32_t events; /* Epoll events */
10    epoll_data_t data; /* User data variable */
11};

events 的取值:

  • EPOLLIN:表示对应的 fd 可读

  • EPOLLOUT:表示对应的 fd 可写

  • EPOLLPRI:表示对应的 fd 有紧急数据可读

  • EPOLLERR:表示对应的 fd 发生错误

  • EPOLLHUP:表示对应的 fd 连接关闭

  • EPOLLET:表示对应的 fd 被 epoll 注册的时候,使用 edge-trigger 模式,也就是只有 fd 的状态发生变化时才触发回调(性能更高)

  • EPOLLONESHOT:表示一次性的事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个socket加入到EPOLL队列里

1#include <sys/epoll.h>
2
3int epoll_wait(int epfd, struct epoll_event *events,
4               int maxevents, int timeout);
  • 作用:epoll 的事件监听函数,epoll_wait 用来监听 epoll 对象上注册的事件

  • 参数:

    • epfd:epoll 句柄

    • events:用来保存监听到的事件

    • maxevents:最多监听多少个事件

    • timeout:超时时间,单位是毫秒

  • 返回值:返回监听到的事件数目,如果返回 -1 表示失败

C++ 实践

下面的代码,我们使用 epoll 实现一个简单的 echo server。

  1#include <arpa/inet.h>
  2#include <sys/epoll.h>
  3#include <unistd.h>
  4#include <csignal>
  5#include <cstring>
  6#include <iostream>
  7#include <string>
  8
  9namespace lb {
 10
 11class cli_options {
 12 private:
 13  // IPv4 Address
 14  std::string address_;
 15  u_int16_t port_;
 16
 17 public:
 18  cli_options(const std::string& address, const u_int16_t& port)
 19      : address_(address), port_(port) {}
 20  const std::string& address() const { return address_; }
 21  u_int16_t port() const { return port_; }
 22};
 23
 24class server {
 25 private:
 26  static server* instance_;
 27  static constexpr int max_events = 1024;
 28  lb::cli_options options_;
 29  int epoll_fd_;
 30
 31 public:
 32  server(const lb::cli_options& options) : options_(options) {
 33    instance_ = this;
 34  }
 35  void run();
 36  // signal handler
 37  static void handle_signal(int signo);
 38};
 39
 40}  // namespace lb
 41
 42lb::server* lb::server::instance_ = nullptr;
 43
 44lb::cli_options init_args(int argc, char const* const argv[]) {
 45  std::string address = "0.0.0.0";
 46  u_int16_t port = 8080;
 47  const char* usage = "usage: lb_server [--address=<address>] [--port=<port>]";
 48  const char* address_opt = "--address";
 49  const char* port_opt = "--port";
 50  for (int i = 1; i < argc; i++) {
 51    if (std::strcmp(argv[i], address_opt) == 0) {
 52      if (i + 1 < argc) {
 53        address = argv[i + 1];
 54      } else {
 55        std::cerr << usage << std::endl;
 56        exit(1);
 57      }
 58    } else if (std::strcmp(argv[i], port_opt) == 0) {
 59      if (i + 1 < argc) {
 60        port = atoi(argv[i + 1]);
 61      } else {
 62        std::cerr << usage << std::endl;
 63        exit(1);
 64      }
 65    } else {
 66      std::cerr << usage << std::endl;
 67      exit(1);
 68    }
 69  }
 70  lb::cli_options options(address, port);
 71  return options;
 72}
 73
 74void lb::server::handle_signal(int signo) {
 75  if (signo == SIGINT) {
 76    std::cout << "server is shutting down..." << std::endl;
 77    auto instance = lb::server::instance_;
 78    close(instance->epoll_fd_);
 79    exit(0);
 80  }
 81}
 82
 83void lb::server::run() {
 84  // 初始化信号处理函数
 85  struct sigaction sa;
 86  sa.sa_handler = lb::server::handle_signal;
 87  sigemptyset(&sa.sa_mask);
 88  sa.sa_flags = 0;
 89  sigaction(SIGINT, &sa, NULL);
 90
 91  // 初始化地址
 92  in_addr_t addr = inet_addr(options_.address().c_str());
 93  if (addr == INADDR_NONE) {
 94    std::cerr << "invalid address" << std::endl;
 95    exit(1);
 96  }
 97  sockaddr_in addr_in;
 98  addr_in.sin_family = AF_INET;
 99  addr_in.sin_port = htons(options_.port());
100  addr_in.sin_addr.s_addr = addr;
101
102  // 初始化 socket_fd,并进行 bind 和 listen
103  auto listen_fd = socket(AF_INET, SOCK_STREAM, 0);
104  if (listen_fd == -1) {
105    std::cerr << "socket error" << std::endl;
106    exit(1);
107  }
108  int opt = 1;
109  setsockopt(listen_fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
110
111  int ret;
112  ret = bind(listen_fd, (sockaddr*)&addr_in, sizeof(addr_in));
113  if (ret == -1) {
114    std::cerr << "bind error: " << strerror(errno) << std::endl;
115    exit(1);
116  }
117  ret = listen(listen_fd, SOMAXCONN);
118  if (ret == -1) {
119    std::cerr << "listen error" << std::endl;
120    exit(1);
121  }
122
123  std::cout << "server is listening on " << options_.address() << ":"
124            << options_.port() << std::endl;
125
126  // 初始化 epoll
127
128  epoll_fd_ = epoll_create(max_events);
129  if (epoll_fd_ == -1) {
130    std::cerr << "epoll_create error" << std::endl;
131    exit(1);
132  }
133
134  epoll_event event;
135  event.data.fd = listen_fd;
136  event.events = EPOLLIN | EPOLLET;
137
138  ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, listen_fd, &event);
139  if (ret == -1) {
140    std::cerr << "epoll_ctl error when initialize: " << strerror(errno)
141              << std::endl;
142    exit(1);
143  }
144  epoll_event events[max_events];
145
146  // 事件循环,接收到的事件会被放到 events 中
147  while (true) {
148    int n = epoll_wait(epoll_fd_, events, max_events, -1);
149    if (n == -1) {
150      std::cerr << "epoll_wait error" << std::endl;
151      exit(1);
152    }
153    if (n == 0) {
154      std::cout << ".";
155      continue;
156    }
157    // 处理每个事件
158    for (int i = 0; i < n; i++) {
159      // 如果是新连接,则 accept
160      if (events[i].data.fd == listen_fd) {
161        sockaddr_in client_addr;
162        socklen_t client_addr_len = sizeof(client_addr);
163        // 新的连接
164        int client_fd =
165            accept(listen_fd, (sockaddr*)&client_addr, &client_addr_len);
166        if (client_fd == -1) {
167          std::cerr << "accept error" << std::endl;
168          exit(1);
169        }
170        std::cout << "client connected (from "
171                  << inet_ntoa(client_addr.sin_addr) << ":"
172                  << ntohs(client_addr.sin_port) << ")" << std::endl;
173
174        // 将新的 client_fd 添加到 epoll 中
175        event.data.fd = client_fd;
176        // event.events = EPOLLIN ;
177        event.events = EPOLLIN | EPOLLET;
178        ret = epoll_ctl(epoll_fd_, EPOLL_CTL_ADD, client_fd, &event);
179        if (ret == -1) {
180          std::cerr << "epoll_ctl error when add client: " << strerror(errno)
181                    << std::endl;
182          exit(1);
183        }
184        continue;
185      }
186
187      // 处理 EPOLLIN 事件
188      if (!(events[i].events & EPOLLIN)) {
189        continue;
190      }
191
192      // 对于已经连接的 client,如果有数据到来,则读取数据
193      char buf[32];
194      int client_fd = events[i].data.fd;
195      int n = recv(client_fd, buf, sizeof(buf), 0);
196      if (n == -1) {
197        // 查询错误码
198        int err = errno;
199        // 如果是不能立即完成,则继续等待
200        if (err == EAGAIN || err == EWOULDBLOCK) {
201          continue;
202        } else {
203          // 其他错误,关闭连接
204          std::cerr << "recv error" << std::endl;
205          close(client_fd);
206          // 删除 epoll 中的连接
207          epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, NULL);
208          // 打印错误信息
209          std::cerr << "recv error when recv: " << strerror(err) << std::endl;
210          continue;
211        }
212      }
213      // 获取 client 的地址,这一步只是为了输出地址
214      sockaddr_in client_addr;
215      socklen_t client_addr_len = sizeof(client_addr);
216      ret = getsockname(client_fd, (sockaddr*)&client_addr, &client_addr_len);
217      if (ret == -1) {
218        std::cerr << "getsockname error" << std::endl;
219        exit(1);
220      }
221
222      // 收到 0 字节的数据,则表示 client 正常地断开连接
223      if (n == 0) {
224        // https://stackoverflow.com/questions/8707601/is-it-necessary-to-deregister-a-socket-from-epoll-before-closing-it
225        std::cout << "client disconnected (from "
226                  << inet_ntoa(client_addr.sin_addr) << ":"
227                  << ntohs(client_addr.sin_port) << ")" << std::endl;
228        close(client_fd);
229        // ret = epoll_ctl(epoll_fd_, EPOLL_CTL_DEL, client_fd, NULL);
230        // if (ret == -1) {
231        // std::cerr << "epoll_ctl error when remove fd: " << strerror(errno) <<
232        // std::endl; exit(1);
233        // }
234      }
235      // 正常收到数据,则打印出来
236      else {
237        // 打印收到的 bytes
238        std::cout << "time: " << time(NULL) << std::endl
239                  << "client addr: " << inet_ntoa(client_addr.sin_addr) << ":"
240                  << ntohs(client_addr.sin_port) << std::endl
241                  << "received " << n << " bytes" << std::endl
242                  << "data: " << std::string(buf, 0, n) << std::endl
243                  << std::endl;
244        // echo back
245        send(client_fd, buf, n, 0);
246      }
247    }
248  }
249}
250
251int main(int argc, char const* argv[]) {
252  auto options = init_args(argc, argv);
253  lb::server server(options);
254  server.run();
255  return 0;
256}

当然,还有一些不足之处:

  1. 程序轻易使用 exit(1) 来退出,导致健壮性不是很好。

  2. TCP 提供的是字节流服务,我们没有处理消息边界,所以收到的数据可能会被分割。

  3. 安全性不是很好,无论谁来连接我们都允许了。

其他问题

epoll 可以用回调函数处理吗?

答:epoll_ctl 虽然可以设置要监听的 fd,但 epoll_event 并未提供函数指针之类的接口,因此无法直接设置回调。需要的话得自己封装。

epoll 可以处理 UDP 吗?

答:可以,但没必要。多路复用是用于面向连接的服务,而 UDP 已经提供了面向消息的服务,并且不可靠,所以内核不会像 TCP 那样进行复杂的连接管理,自然也不需要多路复用。

对于 UDP,我们迭代+状态机就足矣处理了。

HTTP/3 中的底层支撑协议 QUIC,是基于 UDP 的协议,在应用层面实现了连接,因此又引入了多路复用。

epoll 如果想结合多线程如何使用?

可以。Reactor 模型了解一下?