epoll--源码剖析

1.epoll_create()

在内核创建一个事件表,事件表用文件表示。所以epoll_create()返回的是一个文件描述符。主要源代码:

asmlinkage long sys_epoll_create(int size)
{
    int error, fd;
    struct inode *inode; //inode结构
    struct file *file;  //文件file结构

    if (size <= 0)
        goto eexit_1;

    error = ep_getfd(&fd, &inode, &file); //获得fd以及一个file和inode结构
    if (error)
        goto eexit_1;
        
    error = ep_file_init(file);  //file的初始化
    if (error)
        goto eexit_2;

    return fd;

eexit_2:
    sys_close(fd);
eexit_1:

    return error;
}

(1)ep_getfd()

为epoll获得相应file、inode等数据结构,并与新的fd绑定起来。

首先来看两个数据结构:

  • a.file结构: 定义了Linux中文件所需要的所有信息,代表一个打开的文件

file的部分数据结构:

struct file {
    
     /* 文件对应的目录项结构。除了用filp->f_dentry->d_inode的方式来访问索引节点结构之外,设备驱动程序的开发者们一般无需关心dentry结构。
     */
    struct dentry        *f_dentry;
    
    /**
     * 与文件相关的操作。内核在执行open操作时,对这个指针赋值,以后需要处理这些操作时就读取这个指针。
     * 不能为了方便而保存起来。也就是说,可以在任何需要的时候修改文件的关联操作。即"方法重载"。
     */
    struct file_operations    *f_op;


    /**
     * open系统调用在调用驱动程序的open方法前将这个指针置为NULL。驱动程序可以将这个字段用于任何目的或者忽略这个字段。
     * 驱动程序可以用这个字段指向已分配的数据,但是一定要在内核销毁file结构前在release方法中释放内存。
     * 它是跨系统调用时保存状态的非常有用的资源。
     */
    void            *private_data;

};
  • inode结构:

inode在内部表示一个文件,与file不同,file表示的是文件描述符。同一个文件被打开多次,会有多个file文件描述符,但是只会有一个inode。在后面的代码中你可以发现,Linux针对epoll的操作专门会有一个属于epoll的文件系统,这个可以在初始化inode的时候可以看到。

这是ep_getfd的代码:

static int ep_getfd(int *efd, struct inode **einode, struct file **efile)
{
    struct qstr this;
    char name[32];
    struct dentry *dentry;
    struct inode *inode;
    struct file *file;
    int error, fd;


    error = -ENFILE;
    file = get_empty_filp(); //获得新的空的file文件描述符
    if (!file)
        goto eexit_1;

    inode = ep_eventpoll_inode();//获得新的inode结构,代表一个属于epoll文件系统的文件。
    error = PTR_ERR(inode);
    if (IS_ERR(inode))
        goto eexit_2;

    error = get_unused_fd(); //获得一个未使用的fd
    if (error < 0)
        goto eexit_3;
    fd = error;


    error = -ENOMEM;
    sprintf(name, "[%lu]", inode->i_ino);
    this.name = name;
    this.len = strlen(name);
    this.hash = inode->i_ino;
    dentry = d_alloc(eventpoll_mnt->mnt_sb->s_root, &this);//获得一个新的dentry文件目录项结构并初始化
    if (!dentry)
        goto eexit_4;
    dentry->d_op = &eventpollfs_dentry_operations;
    d_add(dentry, inode);//将inode和目录项结构绑定
    file->f_vfsmnt = mntget(eventpoll_mnt); //把该文件所属的文件系统置为epoll文件系统
    file->f_dentry = dentry;
    file->f_mapping = inode->i_mapping;

    file->f_pos = 0;
    file->f_flags = O_RDONLY;
    file->f_op = &eventpoll_fops; //这一步是很重要的一步,用f_op指针指向epoll的回调数
    file->f_mode = FMODE_READ;
    file->f_version = 0;
    file->private_data = NULL;

    fd_install(fd, file);

    *efd = fd;
    *einode = inode;
    *efile = file;
    return 0;

eexit_4:
    put_unused_fd(fd);
eexit_3:
    iput(inode);
eexit_2:
    put_filp(file);
eexit_1:
    return error;
}

ep_getfd其实就做了三件事:获取新的file文件描述符,获取新的inode结构,获得新的fd,最后把三者连接绑定在一起,就表示了epoll在内核的事件表。

(2) ep_file_init(file)

前面讲到epoll的内核事件表已经创建完毕了,但是我们可以发现epoll是事件一次写入内核,多次监听的,然而在之前都没有发现可以存事件的数据结构红黑树。所以这一步ep_file_init就是来解决这个问题的。

下面根据代码来说它干了些什么

static int ep_file_init(struct file *file)
{
    struct eventpoll *ep;  //一个指向eventpoll的指针

    if (!(ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL)))
        return -ENOMEM;

    /*下面是对eventpoll结构的一系列初始化*/
    memset(ep, 0, sizeof(*ep));
    rwlock_init(&ep->lock);
    init_rwsem(&ep->sem);
    init_waitqueue_head(&ep->wq);
    init_waitqueue_head(&ep->poll_wait);
    INIT_LIST_HEAD(&ep->rdllist);
    ep->rbr = RB_ROOT; //这里可以看到红黑树的root的初始化

    file->private_data = ep;    //ep指针给file

    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_file_init() ep=%p\n",
             current, ep));
    return 0;
}

这一步就是把一个eventpoll的结构创建并初始化,然后让file->private_data指向这个结构,这个结构中我们就可以找到rb_root,这一步之后epoll_creat()也就是epoll所有的准备工作就已经做完了。

2.epoll_ctl()

因为epoll对监听事件来说是一次写入多次监听的,所以必须要有对事件表的增删改操作接口,epoll_ctl就是提供给用户的可以进行事件表进行操作的接口。我们可以通过这个系统调用来添加删除和修改事件。

下面根据源码来走一遍:

asmlinkage long
sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{
    /**一系列的数据结构的定义**/
    int error;
    struct file *file, *tfile;
    struct eventpoll *ep;
    struct epitem *epi;
    struct epoll_event epds;


    error = -EFAULT;
    if (EP_OP_HASH_EVENT(op) &&
        copy_from_user(&epds, event, sizeof(struct epoll_event)))
        goto eexit_1;//把用户的事件从用户空间考到内核空间


    error = -EBADF;
    file = fget(epfd); //获得epoll内核事件表得文件描述符
    if (!file)
        goto eexit_1;

    tfile = fget(fd); //获得要操作的事件得文件描述符
    if (!tfile)
        goto eexit_2;


    error = -EPERM;
    if (!tfile->f_op || !tfile->f_op->poll)
        goto eexit_3; //如果fd文件描述符中f_op(指向所有文件操作指针得结构体)和这里需要用到的poll操作为空,就退出。因为poll是所有Io复用:select 、poll、epoll得底层实现。

    
    error = -EINVAL;
    if (file == tfile || !IS_FILE_EPOLL(file))
        goto eexit_3;   //判断需要操作的fd文件描述符是不是epfd文件描述符和 file是不是epoll文件系统的文件

    
    ep = file->private_data;  //拿到内核事件表的eventpoll结构体

    down_write(&ep->sem); //为写获得读写内核事件表eventpoll的信号量。

    
    epi = ep_find(ep, tfile, fd);//判断该事件在内核事件表中是否存在

    error = -EINVAL;
    switch (op) {
    case EPOLL_CTL_ADD:  //添加事件操作
        if (!epi) {
            epds.events |= POLLERR | POLLHUP;

            error = ep_insert(ep, &epds, tfile, fd);
        } else
            error = -EEXIST;
        break;
    case EPOLL_CTL_DEL: //删除事件
        if (epi)
            error = ep_remove(ep, epi);
        else
            error = -ENOENT;
        break;
    case EPOLL_CTL_MOD:   //修改事件
        if (epi) {
            epds.events |= POLLERR | POLLHUP;
            error = ep_modify(ep, epi, &epds);
        } else
            error = -ENOENT;
        break;
    }

    
    if (epi)
        ep_release_epitem(epi);

    up_write(&ep->sem); //释放内核事件表eventpoll的读写信号量

eexit_3:
    fput(tfile);
eexit_2:
    fput(file);
eexit_1:
    DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p) = %d\n",
             current, epfd, op, fd, event, error));

    return error;
}

在epoll_ctl中,主要分为以下几个步骤:

  • 判断需要操作的事件在事件表中是否存在
  • 判断需要进行的操作
  • 使用更底层的代码对事件表进行增删改

epoll比poll和select高效的地方,其实在这里就可以 看出来,poll的系统调用会把该进程“挂”到fd所对应的设备的等待队列上,select和poll他们每一次都需要把current “挂”到fd对应设备的等待队列,同一个fd来说,你要不停监听的话,你就需要不停的进行poll,那么fd太多的话,这样的poll操作其实是很低效的,epoll则不同的是,每一个事件,它只在添加的时候调用poll,以后都不用去调用poll,避免了太多重复的poll操作,所以epoll对于poll和select来说是更高效的。
还有一个比较有趣的地方是,所有的事件的存储都是在红黑树里面,但是我们可以发现红黑树的节点其实是这样的:

struct rb_node
{
    /**
     * 红黑树节点的双亲。
     */
    struct rb_node *rb_parent;
    /**
     * 红黑树节点颜色。
     */
    int rb_color;
#define    RB_RED        0
#define    RB_BLACK    1
    /**
     * 右孩子
     */
    struct rb_node *rb_right;
    /**
     * 左孩子
     */
    struct rb_node *rb_left;
};

struct rb_root
{
    struct rb_node *rb_node;
};

不管是根节点还是节点,你都看不到数据,那内核是怎么通过红黑树操作数据?

struct epitem {
    
    struct rb_node rbn;
    ........
    }

其实这才是真正的红黑树的节点,里面的东西很多,但是这里只列出来了相关的。一个fd对应一个这样的结构,同一个fd只能插入一次,这也是要采用一个红黑树这样的数据结构的其中一个原因,还有一个原因就是因为红黑树的查询等操作效率高。那么从rb_node到struct epitem就可以很容易的做到了,只需要做一个指针的强转就可以从rb_node到epitem:


epic = rb_entry(parent, struct epitem, rbn);


#define    rb_entry(ptr, type, member) container_of(ptr, type, member)


#define container_of(ptr, type, member) ({            \
        const typeof( ((type *)0)->member ) *__mptr = (ptr);    \
        (type *)( (char *)__mptr - offsetof(type,member) );})

这也就跟C++的模板类似,只不过这是C语言实现的,很多方法还是需要再重新定义。但是对C语言来说,在有限的条件下做到这样的代码复用还是非常厉害的。

3.epoll_wait()

之前的两个函数已经把事件添加到内核事件表,而且已经把当前进程“挂”到fd的所有设备上,这就相当于一个回调函数,当对应fd有可处理事件时,就会唤醒等待队列的进程,进程会把当前可处理的事件及有关信息记录到一个rdllist的链表中,接下来就是epoll_wait所要做的事了:

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
           int maxevents, long timeout)
{
    /*定义一些必要的数据结构*/
    int res, eavail;
    unsigned long flags;
    long jtimeout;
    wait_queue_t wait;

    /*检查超时时间是否有效*/
    jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ?
        MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;

retry:
    /*获得eventpoll的写锁*/
    write_lock_irqsave(&ep->lock, flags);


    /*rdllist如果是空就阻塞循环等待回调函数往rdllist中写数据,一旦不为空或者超过超时时间就会退出循环*/
    res = 0;
    if (list_empty(&ep->rdllist)) {
        
        init_waitqueue_entry(&wait, current);
        add_wait_queue(&ep->wq, &wait);

        for (;;) {
        
            set_current_state(TASK_INTERRUPTIBLE);
            if (!list_empty(&ep->rdllist) || !jtimeout)
                break;
            if (signal_pending(current)) {
                res = -EINTR;
                break;
            }

            write_unlock_irqrestore(&ep->lock, flags);
            jtimeout = schedule_timeout(jtimeout);
            write_lock_irqsave(&ep->lock, flags);
        }
        remove_wait_queue(&ep->wq, &wait);

        set_current_state(TASK_RUNNING);
    }

    /*判断rdllist是否为空*/
    eavail = !list_empty(&ep->rdllist); 

    /*释放eventpoll的写锁*/
    write_unlock_irqrestore(&ep->lock, flags);


    /*这一步是把内核的rdllist中的事件copy到用户空间*/
    if (!res && eavail &&
        !(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)
        goto retry;

    return res;
}

简单总结一下epoll_wait:

  • 检查超时事件和用来存就绪事件的用户空间的大小
  • 循环等待就绪事件
  • 把就绪事件从内核空间copy到用户空间

这就是epoll的整个流程和主要的步骤的分析。但是我们没能看出来它的ET和LT工作模式体现在哪里?所以下面就来说说epoll的ET模式具体实现。

4.ET模式

ET模式是epoll特有的高效工作模式。主要体现就是一次就绪事件只给用户提醒一次。ET模式的实现其实就是在epoll_wait的最后一步,从内核链表往用户空间考数据的时候,下面来看代码:

static int ep_events_transfer(struct eventpoll *ep,
                  struct epoll_event __user *events, int maxevents)
{
    int eventcnt = 0;
    struct list_head txlist;  //建立一个临时量,用以保存就绪的事件

    INIT_LIST_HEAD(&txlist); //初始化

    
    down_read(&ep->sem);  //获得ep的读写信号量


    /*从ep->rdllist中copy到txlist*/
    if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {
        /*从txlist到用户空间*/
        eventcnt = ep_send_events(ep, &txlist, events);

        /*从txlist再反过来copy给ep->rdllist,这一步是具体的ET实现*/
        ep_reinject_items(ep, &txlist);
    }

    up_read(&ep->sem); //释放ep读写信号量

    return eventcnt;
}

下面的函数就是ET的具体实现,也是ET和LT的区别

static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist)
{
    int ricnt = 0, pwake = 0;
    unsigned long flags;
    struct epitem *epi;

    /*获得ep的读写锁*/
    write_lock_irqsave(&ep->lock, flags);

    while (!list_empty(txlist)) {
        /*这一步跟之前讲过的rb_node到epi的一步是一样的*/
        epi = list_entry(txlist->next, struct epitem, txlink);
        
        /*初始化*/
        EP_LIST_DEL(&epi->txlink);

        /*
        1.内核事件表(红黑树)不为空
        2.事件没有设置ET工作模式
        3.就绪事件类型和监听事件类型相同
        4.该事件的rdllink不为空
        */
        if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&
            (epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {
        /*把刚刚临时量txlist中的该事件继续添加到rdllist中*/
            list_add_tail(&epi->rdllink, &ep->rdllist);
            ricnt++;
        }
    }

    if (ricnt) {
    
        if (waitqueue_active(&ep->wq))
            wake_up(&ep->wq);
        if (waitqueue_active(&ep->poll_wait))
            pwake++;
    }

    /*释放读写锁*/
    write_unlock_irqrestore(&ep->lock, flags);


    if (pwake)
        ep_poll_safewake(&psw, &ep->poll_wait);
}

先说LT模式对于同一个就绪事件会重复提醒,从上面可以看出来是因为它又把依旧就绪且未设置ET标志的事件重新copy到了rdllist中,所以下一次epoll_wait还是会把该事件返回给用户。那么ET这里就很好解释了,尽管该事件未处理完,但是你只要设置了ET标志,我就不会再次把该事件返回给用户。这就是ET的实现。

    原文作者:算法小白
    原文地址: https://segmentfault.com/a/1190000014882854
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞