/图解 | 深入揭秘epoll是如何实现IO多路复用的!

图解 | 深入揭秘epoll是如何实现IO多路复用的!

导语 | 提起epoll,大家都不陌生,知道它性能不错。但是它内部是如何工作的,如何达到高性能的效果呢,鲜有文章能把原理介绍清楚,所以我就撸起袖子搞了一篇文章,献给大家!


引言

进程在Linux上是一个开销不小的家伙,先不说创建,光是上下文切换一次就得几个微秒。所以为了高效地对海量用户提供服务,必须要让一个进程能同时处理很多个tcp连接才行。现在假设一个进程保持了10000条连接,那么如何发现哪条连接上有数据可读了、哪条连接可写了?

我们当然可以采用循环遍历的方式来发现IO事件,但这种方式太低级了。我们希望有一种更高效的机制,在很多连接中的某条上有IO事件发生的时候直接快速把它找出来。其实这个事情Linux操作系统已经替我们都做好了,它就是我们所熟知的IO多路复用机制。这里的复用指的就是对进程的复用。


在Linux上多路复用方案有select、poll、epoll。它们三个中epoll的性能表现是最优秀的,能支持的并发量也最大。所以我们今天把epoll作为要拆解的对象,深入揭秘内核是如何实现多路的IO管理的。

为了方便讨论,我们举一个使用了epoll的简单示例(只是个例子,实践中不这么写):

int main(){    listen(lfd, ...);
cfd1 = accept(...); cfd2 = accept(...); efd = epoll_create(...);
epoll_ctl(efd, EPOLL_CTL_ADD, cfd1, ...); epoll_ctl(efd, EPOLL_CTL_ADD, cfd2, ...); epoll_wait(efd, ...)}

其中和epoll相关的函数是如下三个:

  • epoll_create:创建一个epoll对象

  • epoll_ctl:向epoll对象中添加要管理的连接

  • epoll_wait:等待其管理的连接上的IO事件

借助这个demo,我们来展开对epoll原理的深度拆解。相信等你理解了这篇文章以后,你对epoll的驾驭能力将变得炉火纯青!!

一、accept创建新的socket

我们直接从服务器端的accept讲起。当accept之后,进程会创建一个新的socket出来,专门用于和对应的客户端通信,然后把它放到当前进程的打开文件列表中。

接下来我们来看一下接收连接时socket内核对象的创建源码。accept的系统调用代码位于源文件net/socket.c下。

//file: net/socket.cSYSCALL_DEFINE4(accept4, int, fd, struct sockaddr __user *, upeer_sockaddr,        int __user *, upeer_addrlen, int, flags){    struct socket *sock, *newsock;
//根据 fd 查找到监听的 socket sock = sockfd_lookup_light(fd, &err, &fput_needed);
//1.1 申请并初始化新的 socket newsock = sock_alloc(); newsock->type = sock->type; newsock->ops = sock->ops;
//1.2 申请新的 file 对象,并设置到新 socket 上 newfile = sock_alloc_file(newsock, flags, sock->sk->sk_prot_creator->name); ......
//1.3 接收连接 err = sock->ops->accept(sock, newsock, sock->file->f_flags);
//1.4 添加新文件到当前进程的打开文件列表 fd_install(newfd, newfile);

(一)初始化struct socket对象


在上述的源码中,首先是调用sock_alloc申请一个struct socket对象出来。然后接着把listen状态的socket对象上的协议操作函数集合ops赋值给新的socket。(对于所有的AF_INET协议族下的socket来说,它们的ops方法都是一样的,所以这里可以直接复制过来)

其中inet_stream_ops的定义如下:

//file: net/ipv4/af_inet.cconst struct proto_ops inet_stream_ops = {    ...    .accept        = inet_accept,    .listen        = inet_listen,    .sendmsg       = inet_sendmsg,    .recvmsg       = inet_recvmsg,    ...}

(二)为新socket对象申请file

struct socket对象中有一个重要的成员-- file内核对象指针。这个指针初始化的时候是空的。在accept方法里会调用sock_alloc_file来申请内存并初始化。然后将新file对象设置到sock->file上。

来看sock_alloc_file的实现过程:

struct file *sock_alloc_file(struct socket *sock, int flags,     const char *dname){    struct file *file;    file = alloc_file(&path, FMODE_READ | FMODE_WRITE,            &socket_file_ops);    ......    sock->file = file;}

sock_alloc_file又会接着调用到alloc_file。注意在alloc_file 方法中,把socket_file_ops函数集合一并赋到了新file->f_op 里了。

//file: fs/file_table.cstruct file *alloc_file(struct path *path, fmode_t mode,        const struct file_operations *fop){    struct file *file;    file->f_op = fop;    ......}

socket_file_ops的具体定义如下:

//file: net/socket.cstatic const struct file_operations socket_file_ops = {    ...    .aio_read   = sock_aio_read,    .aio_write  = sock_aio_write,    .poll     = sock_poll,    .release  = sock_close,    ...};

这里看到,在accept里创建的新socket里的file->f_op->poll函数指向的是sock_poll。接下来我们会调用到它,后面我们再说。

其实file对象内部也有一个socket指针,指向socket对象。

(三)接收连接

在socket内核对象中除了file对象指针以外,有一个核心成员sock。

//file: include/linux/net.hstruct socket {    struct file     *file;    struct sock     *sk;}

这个struct sock数据结构非常大,是socket的核心内核对象。发送队列、接收队列、等待队列等核心数据结构都位于此。其定义位置文件include/net/sock.h,由于太长就不展示了。

在accept的源码中:

//file: net/socket.cSYSCALL_DEFINE4(accept4, ...)    ...    //1.3 接收连接    err = sock->ops->accept(sock, newsock, sock->file->f_flags);}

sock->ops->accept对应的方法是inet_accept。它执行的时候会从握手队列里直接获取创建好的sock。sock对象的完整创建过程涉及到三次握手,比较复杂,不展开了说了。咱们只看struct sock初始化过程中用到的一个函数:

void sock_init_data(struct socket *sock, struct sock *sk){    sk->sk_wq   =   NULL;    sk->sk_data_ready   =   sock_def_readable;}

在这里把sock对象的sk_data_ready函数指针设置为sock_def_readable。这个这里先记住就行了,后面会用到。

(四)添加新文件到当前进程的打开文件列表中

当file、socket、sock等关键内核对象创建完毕以后,剩下要做的一件事情就是把它挂到当前进程的打开文件列表中就行了。

//file: fs/file.cvoid fd_install(unsigned int fd, struct file *file){    __fd_install(current->files, fd, file);}
void __fd_install(struct files_struct *files, unsigned int fd, struct file *file){ ... fdt = files_fdtable(files); BUG_ON(fdt->fd[fd] != NULL); rcu_assign_pointer(fdt->fd[fd], file);}

二、epoll_create实现


在用户进程调用epoll_create时,内核会创建一个struct eventpoll的内核对象。并同样把它关联到当前进程的已打开文件列表中。

对于struct eventpoll对象,更详细的结构如下(同样只列出和今天主题相关的成员)。

epoll_create的源代码相对比较简单。在fs/eventpoll.c下

// file:fs/eventpoll.cSYSCALL_DEFINE1(epoll_create1, int, flags){    struct eventpoll *ep = NULL;
//创建一个 eventpoll 对象 error = ep_alloc(&ep);}

struct eventpoll的定义也在这个源文件中。

// file:fs/eventpoll.cstruct eventpoll {
//sys_epoll_wait用到的等待队列 wait_queue_head_t wq;
//接收就绪的描述符都会放到这里 struct list_head rdllist;
//每个epoll对象中都有一颗红黑树 struct rb_root rbr;
......}

eventpoll这个结构体中的几个成员的含义如下:

  • wq:等待队列链表。软中断数据就绪的时候会通过wq来找到阻塞在epoll对象上的用户进程。

  • rbr:一棵红黑树。为了支持对海量连接的高效查找、插入和删除,eventpoll内部使用了一棵红黑树。通过这棵树来管理用户进程下添加进来的所有socket连接。

  • rdllist:就绪的描述符的链表。当有的连接就绪的时候,内核会把就绪的连接放到rdllist链表里。这样应用进程只需要判断链表就能找出就绪进程,而不用去遍历整棵树。

当然这个结构被申请完之后,需要做一点点的初始化工作,这都在ep_alloc中完成。

//file: fs/eventpoll.cstatic int ep_alloc(struct eventpoll **pep){    struct eventpoll *ep;
//申请 epollevent 内存 ep = kzalloc(sizeof(*ep), GFP_KERNEL);
//初始化等待队列头 init_waitqueue_head(&ep->wq);
//初始化就绪列表 INIT_LIST_HEAD(&ep->rdllist);
//初始化红黑树指针 ep->rbr = RB_ROOT;
......}

说到这儿,这些成员其实只是刚被定义或初始化了,还都没有被使用。它们会在下面被用到。

三、epoll_ctl添加socket


理解这一步是理解整个epoll的关键

为了简单,我们只考虑使用EPOLL_CTL_ADD添加socket,先忽略删除和更新。

假设我们现在和客户端们的多个连接的socket都创建好了,也创建好了epoll内核对象。在使用epoll_ctl注册每一个socket的时候,内核会做如下三件事情:

  • 分配一个红黑树节点对象epitem。

  • 添加等待事件到socket的等待队列中,其回调函数是ep_poll_callback。

  • 将epitem插入到epoll对象的红黑树里。

通过epoll_ctl添加两个socket以后,这些内核数据结构最终在进程中的关系图大致如下:

我们来详细看看socket是如何添加到epoll对象里的,找到epoll_ctl的源码。

// file:fs/eventpoll.cSYSCALL_DEFINE4(epoll_ctl, int, epfd, int, op, int, fd,        struct epoll_event __user *, event){    struct eventpoll *ep;    struct file *file, *tfile;
//根据 epfd 找到 eventpoll 内核对象 file = fget(epfd); ep = file->private_data;
//根据 socket 句柄号, 找到其 file 内核对象 tfile = fget(fd);
switch (op) { case EPOLL_CTL_ADD: if (!epi) { epds.events |= POLLERR | POLLHUP; error = ep_insert(ep, &epds, tfile, fd); } else error = -EEXIST; clear_tfile_check_list(); break;}

在epoll_ctl中首先根据传入fd找到eventpoll、socket相关的内核对象。对于EPOLL_CTL_ADD操作来说,会然后执行到ep_insert函数。所有的注册都是在这个函数中完成的。

//file: fs/eventpoll.cstatic int ep_insert(struct eventpoll *ep,                 struct epoll_event *event,                struct file *tfile, int fd){    //3.1 分配并初始化 epitem    //分配一个epi对象    struct epitem *epi;    if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))        return -ENOMEM;
//对分配的epi进行初始化 //epi->ffd中存了句柄号和struct file对象地址 INIT_LIST_HEAD(&epi->pwqlist); epi->ep = ep; ep_set_ffd(&epi->ffd, tfile, fd);
//3.2 设置 socket 等待队列 //定义并初始化 ep_pqueue 对象 struct ep_pqueue epq; epq.epi = epi; init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
//调用 ep_ptable_queue_proc 注册回调函数 //实际注入的函数为 ep_poll_callback revents = ep_item_poll(epi, &epq.pt);
...... //3.3 将epi插入到 eventpoll 对象中的红黑树中 ep_rbtree_insert(ep, epi); ......}

(一)分配并初始化epitem

对于每一个socket,调用epoll_ctl的时候,都会为之分配一个epitem。该结构的主要数据如下:

//file: fs/eventpoll.cstruct epitem {
//红黑树节点 struct rb_node rbn;
//socket文件描述符信息 struct epoll_filefd ffd;
//所归属的 eventpoll 对象 struct eventpoll *ep;
//等待队列 struct list_head pwqlist;}

对epitem进行了一些初始化,首先在epi->ep=ep这行代码中将其ep指针指向eventpoll对象。另外用要添加的socket的file、fd来填充epitem->ffd。

其中使用到的ep_set_ffd函数如下。


static inline void ep_set_ffd(struct epoll_filefd *ffd, struct file *file, int fd){ ffd->file = file; ffd->fd = fd;}

(二)设置socket等待队列

在创建epitem并初始化之后,ep_insert中第二件事情就是设置socket对象上的等待任务队列。并把函数fs/eventpoll.c文件下的ep_poll_callback设置为数据就绪时候的回调函数。

这一块的源代码稍微有点绕,没有耐心的话直接跳到下面的加粗字体来看。首先来看ep_item_poll。

static inline unsigned int ep_item_poll(struct epitem *epi, poll_table *pt){    pt->_key = epi->event.events;
return epi->ffd.file->f_op->poll(epi->ffd.file, pt) & epi->event.events;}

看,这里调用到了socket下的file->f_op->poll。通过上面第一节的socket的结构图,我们知道这个函数实际上是sock_poll。

/* No kernel lock held - perfect */static unsigned int sock_poll(struct file *file, poll_table *wait){    ...    return sock->ops->poll(file, sock, wait);}

同样回看第一节里的socket的结构图,sock->ops->poll其实指向的是tcp_poll。

//file: net/ipv4/tcp.cunsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait){    struct sock *sk = sock->sk;
sock_poll_wait(file, sk_sleep(sk), wait);}

在sock_poll_wait的第二个参数传参前,先调用了sk_sleep函数。在这个函数里它获取了sock对象下的等待队列列表头wait_queue_head_t,待会等待队列项就插入这里。这里稍微注意下,是socket的等待队列,不是epoll对象的。来看sk_sleep源码:

//file: include/net/sock.hstatic inline wait_queue_head_t *sk_sleep(struct sock *sk){    BUILD_BUG_ON(offsetof(struct socket_wq, wait) != 0);    return &rcu_dereference_raw(sk->sk_wq)->wait;}

接着真正进入sock_poll_wait。

static inline void sock_poll_wait(struct file *filp,        wait_queue_head_t *wait_address, poll_table *p){    poll_wait(filp, wait_address, p);}

static inline void poll_wait(struct file * filp, wait_queue_head_t * wait_address, poll_table *p){    if (p && p->_qproc && wait_address)        p->_qproc(filp, wait_address, p);}

这里的qproc是个函数指针,它在前面的init_poll_funcptr调用时被设置成了ep_ptable_queue_proc函数。

static int ep_insert(...){    ...    init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);    ...}

//file: include/linux/poll.hstatic inline void init_poll_funcptr(poll_table *pt,     poll_queue_proc qproc){    pt->_qproc = qproc;    pt->_key   = ~0UL; /* all events enabled */}

敲黑板!!!注意,划重点:在ep_ptable_queue_proc函数中,新建了一个等待队列项,并注册其回调函数为ep_poll_callback函数。然后再将这个等待项添加到socket的等待队列中

//file: fs/eventpoll.cstatic void ep_ptable_queue_proc(struct file *file, wait_queue_head_t *whead,                 poll_table *pt){    struct eppoll_entry *pwq;    f (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL))) {                //初始化回调方法                init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
//将ep_poll_callback放入socket的等待队列whead(注意不是epoll的等待队列) add_wait_queue(whead, &pwq->wait);
}

我们今天的socket是交给epoll来管理的,不需要在一个socket就绪的时候就唤醒进程,所以这里的q->private没有什么用,就设置成了NULL。

//file:include/linux/wait.hstatic inline void init_waitqueue_func_entry(    wait_queue_t *q, wait_queue_func_t func){    q->flags = 0;    q->private = NULL;
//ep_poll_callback 注册到 wait_queue_t对象上 //有数据到达的时候调用 q->func q->func = func; }

如上,等待队列项中仅仅只设置了回调函数q->func为ep_poll_callback。在后面的第5节数据来啦中我们将看到,软中断将数据收到socket的接收队列后,会通过注册的这个ep_poll_callback函数来回调,进而通知到epoll对象。

(三)插入红黑树

分配完epitem对象后,紧接着并把它插入到红黑树中。一个插入了一些socket描述符的epoll里的红黑树的示意图如下:

这里我们再聊聊为啥要用红黑树,很多人说是因为效率高。其实我觉得这个解释不够全面,要说查找效率树哪能比的上HASHTABLE。我个人认为觉得更为合理的一个解释是为了让epoll在查找效率、插入效率、内存开销等等多个方面比较均衡,最后发现最适合这个需求的数据结构是红黑树。

四、epoll_wait等待接收

epoll_wait做的事情不复杂,当它被调用时它观察eventpoll->rdllist 链表里有没有数据即可。有数据就返回,没有数据就创建一个等待队列项,将其添加到eventpoll的等待队列上,然后把自己阻塞掉就完事。

注意:epoll_ctl添加socket时也创建了等待队列项。不同的是这里的等待队列项是挂在epoll对象上的,而前者是挂在socket对象上的。

其源代码如下:

//file: fs/eventpoll.cSYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,        int, maxevents, int, timeout){    ...    error = ep_poll(ep, events, maxevents, timeout);}
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events, int maxevents, long timeout){ wait_queue_t wait; ......
fetch_events: //4.1 判断就绪队列上有没有事件就绪 if (!ep_events_available(ep)) {
//4.2 定义等待事件并关联当前进程 init_waitqueue_entry(&wait, current);
//4.3 把新 waitqueue 添加到 epoll->wq 链表里 __add_wait_queue_exclusive(&ep->wq, &wait); for (;;) { ... //4.4 让出CPU 主动进入睡眠状态 if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS)) timed_out = 1; ... }

(一)判断就绪队列上有没有事件就绪

首先调用ep_events_available来判断就绪链表中是否有可处理的事件。

//file: fs/eventpoll.cstatic inline int ep_events_available(struct eventpoll *ep){    return !list_empty(&ep->rdllist) || ep->ovflist != EP_UNACTIVE_PTR;}

(二)定义等待事件并关联当前进程

假设确实没有就绪的连接,那接着会进入init_waitqueue_entry中定义等待任务,并把current(当前进程)添加到waitqueue上。

注意:当没有IO事件的时候,epoll也是会阻塞掉当前进程。这个是合理的,因为没有事情可做了占着CPU也没啥意义。网上的很多文章有个很不好的习惯,讨论阻塞、非阻塞等概念的时候都不说主语。这会导致你看的云里雾里。拿epoll来说,epoll本身是阻塞的,但一般会把socket设置成非阻塞。只有说了主语,这些概念才有意义。

//file: include/linux/wait.hstatic inline void init_waitqueue_entry(wait_queue_t *q, struct task_struct *p){    q->flags = 0;    q->private = p;    q->func = default_wake_function;}

注意这里的回调函数名称是default_wake_function。后续在第5节数据来啦时将会调用到该函数。

(三)添加到等待队列

static inline void __add_wait_queue_exclusive(wait_queue_head_t *q,                                wait_queue_t *wait){    wait->flags |= WQ_FLAG_EXCLUSIVE;    __add_wait_queue(q, wait);}

在这里,把上一小节定义的等待事件添加到了epoll对象的等待队列中。

(四)让出CPU主动进入睡眠状态

通过set_current_state把当前进程设置为可打断。调用schedule_hrtimeout_range让出CPU,主动进入睡眠状态。

//file: kernel/hrtimer.cint __sched schedule_hrtimeout_range(ktime_t *expires,     unsigned long delta, const enum hrtimer_mode mode){    return schedule_hrtimeout_range_clock(            expires, delta, mode, CLOCK_MONOTONIC);}
int __sched schedule_hrtimeout_range_clock(...){ schedule(); ...}

在schedule中选择下一个进程调度

//file: kernel/sched/core.cstatic void __sched __schedule(void){    next = pick_next_task(rq);    ...    context_switch(rq, prev, next);}

五、数据来啦


在前面epoll_ctl执行的时候,内核为每一个socket上都添加了一个等待队列项。在epoll_wait运行完的时候,又在event poll对象上添加了等待队列元素。在讨论数据开始接收之前,我们把这些队列项的内容再稍微总结一下。

  • socket->sock->sk_data_ready设置的就绪处理函数是sock_def_readable。


  • 在socket的等待队列项中,其回调函数是ep_poll_callback。另外其private没有用了,指向的是空指针null。


  • 在eventpoll的等待队列项中,回调函数是default_wake_function。其private指向的是等待该事件的用户进程。

在这一小节里,我们将看到软中断是怎么样在数据处理完之后依次进入各个回调函数,最后通知到用户进程的。

(一)接收数据到任务队列

关于软中断是怎么处理网络帧,为了避免篇幅过于臃肿,这里不再介绍。我们今天直接从tcp协议栈的处理入口函数tcp_v4_rcv开始说起。

// file: net/ipv4/tcp_ipv4.cint tcp_v4_rcv(struct sk_buff *skb){    ......    th = tcp_hdr(skb); //获取tcp header    iph = ip_hdr(skb); //获取ip header
//根据数据包 header 中的 ip、端口信息查找到对应的socket sk = __inet_lookup_skb(&tcp_hashinfo, skb, th->source, th->dest); ......
//socket 未被用户锁定 if (!sock_owned_by_user(sk)) { { if (!tcp_prequeue(sk, skb)) ret = tcp_v4_do_rcv(sk, skb); } }}

在tcp_v4_rcv中首先根据收到的网络包的header里的source和dest信息来在本机上查询对应的socket。找到以后,我们直接进入接收的主体函数tcp_v4_do_rcv来看。

//file: net/ipv4/tcp_ipv4.cint tcp_v4_do_rcv(struct sock *sk, struct sk_buff *skb){    if (sk->sk_state == TCP_ESTABLISHED) { 
//执行连接状态下的数据处理 if (tcp_rcv_established(sk, skb, tcp_hdr(skb), skb->len)) { rsk = sk; goto reset; } return 0; }
//其它非 ESTABLISH 状态的数据包处理 ......}

我们假设处理的是ESTABLISH状态下的包,这样就又进入tcp_rcv_established函数中进行处理。

//file: net/ipv4/tcp_input.cint tcp_rcv_established(struct sock *sk, struct sk_buff *skb,            const struct tcphdr *th, unsigned int len){    ......
//接收数据到队列中 eaten = tcp_queue_rcv(sk, skb, tcp_header_len, &fragstolen);
//数据 ready,唤醒 socket 上阻塞掉的进程 sk->sk_data_ready(sk, 0);

在tcp_rcv_established中通过调用tcp_queue_rcv函数中完成了将接收数据放到socket的接收队列上。

如下源码所示:

//file: net/ipv4/tcp_input.cstatic int __must_check tcp_queue_rcv(struct sock *sk, struct sk_buff *skb, int hdrlen,            bool *fragstolen){    //把接收到的数据放到 socket 的接收队列的尾部    if (!eaten) {        __skb_queue_tail(&sk->sk_receive_queue, skb);        skb_set_owner_r(skb, sk);    }    return eaten;}

(二)查找就绪回调函数

调用tcp_queue_rcv接收完成之后,接着再调用sk_data_ready来唤醒在socket上等待的用户进程。这又是一个函数指针。回想上面第一节我们在accept函数创建socket流程里提到的sock_init_data函数,在这个函数里已经把sk_data_ready设置成sock_def_readable函数了。它是默认的数据就绪处理函数。

当socket上数据就绪时候,内核将以sock_def_readable这个函数为入口,找到epoll_ctl添加socket时在其上设置的回调函数ep_poll_callback。

我们来详细看下细节:

//file: net/core/sock.cstatic void sock_def_readable(struct sock *sk, int len){    struct socket_wq *wq;
rcu_read_lock(); wq = rcu_dereference(sk->sk_wq);
//这个名字起的不好,并不是有阻塞的进程, //而是判断等待队列不为空 if (wq_has_sleeper(wq)) //执行等待队列项上的回调函数 wake_up_interruptible_sync_poll(&wq->wait, POLLIN | POLLPRI | POLLRDNORM | POLLRDBAND); sk_wake_async(sk, SOCK_WAKE_WAITD, POLL_IN); rcu_read_unlock();}

这里的函数名其实都有迷惑人的地方。

  • wq_has_sleeper,对于简单的recvfrom系统调用来说,确实是判断是否有进程阻塞。但是对于epoll下的socket只是判断等待队列不为空,不一定有进程阻塞的。

  • wake_up_interruptible_sync_poll,只是会进入到socket等待队列项上设置的回调函数,并不一定有唤醒进程的操作。

那接下来就是我们重点看wake_up_interruptible_sync_poll。

我们看一下内核是怎么找到等待队列项里注册的回调函数的。


//file: include/linux/wait.h#define wake_up_interruptible_sync_poll(x, m)       \    __wake_up_sync_key((x), TASK_INTERRUPTIBLE, 1, (void *) (m))

//file: kernel/sched/core.cvoid __wake_up_sync_key(wait_queue_head_t *q, unsigned int mode,            int nr_exclusive, void *key){    ...    __wake_up_common(q, mode, nr_exclusive, wake_flags, key);}

接着进入__wake_up_common

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,            int nr_exclusive, int wake_flags, void *key){    wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q->task_list, task_list) { unsigned flags = curr->flags;
if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break; }}

在__wake_up_common中,选出等待队列里注册某个元素curr,回调其curr->func。回忆我们ep_insert调用的时候,把这个func设置成ep_poll_callback了。

(三)执行socket就绪回调函数

在上一小节找到了socket等待队列项里注册的函数ep_poll_callback,软中断接着就会调用它。

//file: fs/eventpoll.cstatic int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key){    //获取 wait 对应的 epitem    struct epitem *epi = ep_item_from_wait(wait);
//获取 epitem 对应的 eventpoll 结构体 struct eventpoll *ep = epi->ep;
//1. 将当前epitem 添加到 eventpoll 的就绪队列中 list_add_tail(&epi->rdllink, &ep->rdllist);
//2. 查看 eventpoll 的等待队列上是否有在等待 if (waitqueue_active(&ep->wq)) wake_up_locked(&ep->wq);

在ep_poll_callback根据等待任务队列项上的额外的base指针可以找到epitem,进而也可以找到eventpoll对象。

首先它做的第一件事就是把自己的epitem添加到epoll的就绪队列中

接着它又会查看eventpoll对象上的等待队列里是否有等待项(epoll_wait执行的时候会设置)。

如果没执行软中断的事情就做完了。如果有等待项,那就查找到等待项里设置的回调函数。


调用:wake_up_locked()=>__wake_up_locked()=>__wake_up_common。

static void __wake_up_common(wait_queue_head_t *q, unsigned int mode,            int nr_exclusive, int wake_flags, void *key){    wait_queue_t *curr, *next;
list_for_each_entry_safe(curr, next, &q->task_list, task_list) { unsigned flags = curr->flags;
if (curr->func(curr, mode, wake_flags, key) && (flags & WQ_FLAG_EXCLUSIVE) && !--nr_exclusive) break; }}

在__wake_up_common里,调用curr->func。这里的func是在epoll_wait是传入的default_wake_function函数。

(四)执行epoll就绪通知

在default_wake_function中找到等待队列项里的进程描述符,然后唤醒之。

源代码如下:

//file:kernel/sched/core.cint default_wake_function(wait_queue_t *curr, unsigned mode, int wake_flags,                void *key){    return try_to_wake_up(curr->private, mode, wake_flags);}

等待队列项curr->private指针是在epoll对象上等待而被阻塞掉的进程。

将epoll_wait进程推入可运行队列,等待内核重新调度进程。然后epoll_wait对应的这个进程重新运行后,就从schedule恢复。

当进程醒来后,继续从epoll_wait时暂停的代码继续执行。把rdlist中就绪的事件返回给用户进程。

//file: fs/eventpoll.cstatic int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,             int maxevents, long timeout){
...... __remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING); }check_events: //返回就绪事件给用户进程 ep_send_events(ep, events, maxevents))}

从用户角度来看,epoll_wait只是多等了一会儿而已,但执行流程还是顺序的。

总结


我们来用一幅图总结一下epoll的整个工作路程。

其中软中断回调的时候回调函数也整理一下:sock_def_readable:sock对象初始化时设置的=>ep_poll_callback : epoll_ctl时添加到 socket上的=>default_wake_function: epoll_wait是设置到epoll上的。


总结下,epoll相关的函数里内核运行环境分两部分:


  • 用户进程内核态。进行调用epoll_wait等函数时会将进程陷入内核态来执行。这部分代码负责查看接收队列,以及负责把当前进程阻塞掉,让出CPU。


  • 硬软中断上下文。在这些组件中,将包从网卡接收过来进行处理,然后放到socket的接收队列。对于epoll来说,再找到socket关联的epitem,并把它添加到epoll对象的就绪链表中。这个时候再捎带检查一下epoll上是否有被阻塞的进程,如果有唤醒之。


为了介绍到每个细节,本文涉及到的流程比较多,把阻塞都介绍进来了。但其实在实践中,只要活儿足够的多,epoll_wait根本都不会让进程阻塞。用户进程会一直干活,一直干活,直到epoll_wait里实在没活儿可干的时候才主动让出CPU。这就是epoll高效的地方所在!

恭喜你没被内核源码劝退,一直能坚持到了现在。赶快给先自己鼓个掌,晚饭去加个鸡腿!

当然网络编程剩下还有一些概念我们没有讲到,比如Reactor和Proactor等。不过相对内核来讲,这些用户层的技术相对就很简单了。这些只是在讨论当多进程一起配合工作时谁负责查看IO事件、谁该负责计算、谁负责发送和接收,仅仅是用户进程的不同分工模式罢了。


 作者简介



张彦飞

腾讯开发工程师


腾讯开发工程师,有腾讯搜狗累计十多年的开发经验,目前负责腾讯浏览器业务后端开发。



 推荐阅读


这一次,再定义领导力!

再不Go就来不及了!Go高性能编程技法解读

揭秘Golang网络框架netpoll的源码实现!

C++反射:深入探究function实现机制!


本文来自微信公众号“腾讯云开发者”(ID:QcloudCommunity)。大作社经授权转载,该文观点仅代表作者本人,大作社平台仅提供信息存储空间服务。