1.epoll_create()
在内核创建一个事件表,事件表用文件表示。所以epoll_create()返回的是一个文件描述符。主要源代码:
asmlinkage long sys_epoll_create(int size)
{
int error, fd;
struct inode *inode; //inode结构
struct file *file; //文件file结构
if (size <= 0)
goto eexit_1;
error = ep_getfd(&fd, &inode, &file); //获得fd以及一个file和inode结构
if (error)
goto eexit_1;
error = ep_file_init(file); //file的初始化
if (error)
goto eexit_2;
return fd;
eexit_2:
sys_close(fd);
eexit_1:
return error;
}
(1)ep_getfd()
为epoll获得相应file、inode等数据结构,并与新的fd绑定起来。
首先来看两个数据结构:
- a.file结构: 定义了Linux中文件所需要的所有信息,代表一个打开的文件
file的部分数据结构:
struct file {
/* 文件对应的目录项结构。除了用filp->f_dentry->d_inode的方式来访问索引节点结构之外,设备驱动程序的开发者们一般无需关心dentry结构。
*/
struct dentry *f_dentry;
/**
* 与文件相关的操作。内核在执行open操作时,对这个指针赋值,以后需要处理这些操作时就读取这个指针。
* 不能为了方便而保存起来。也就是说,可以在任何需要的时候修改文件的关联操作。即"方法重载"。
*/
struct file_operations *f_op;
/**
* open系统调用在调用驱动程序的open方法前将这个指针置为NULL。驱动程序可以将这个字段用于任何目的或者忽略这个字段。
* 驱动程序可以用这个字段指向已分配的数据,但是一定要在内核销毁file结构前在release方法中释放内存。
* 它是跨系统调用时保存状态的非常有用的资源。
*/
void *private_data;
};
- inode结构:
inode在内部表示一个文件,与file不同,file表示的是文件描述符。同一个文件被打开多次,会有多个file文件描述符,但是只会有一个inode。在后面的代码中你可以发现,Linux针对epoll的操作专门会有一个属于epoll的文件系统,这个可以在初始化inode的时候可以看到。
这是ep_getfd的代码:
static int ep_getfd(int *efd, struct inode **einode, struct file **efile)
{
struct qstr this;
char name[32];
struct dentry *dentry;
struct inode *inode;
struct file *file;
int error, fd;
error = -ENFILE;
file = get_empty_filp(); //获得新的空的file文件描述符
if (!file)
goto eexit_1;
inode = ep_eventpoll_inode();//获得新的inode结构,代表一个属于epoll文件系统的文件。
error = PTR_ERR(inode);
if (IS_ERR(inode))
goto eexit_2;
error = get_unused_fd(); //获得一个未使用的fd
if (error < 0)
goto eexit_3;
fd = error;
error = -ENOMEM;
sprintf(name, "[%lu]", inode->i_ino);
this.name = name;
this.len = strlen(name);
this.hash = inode->i_ino;
dentry = d_alloc(eventpoll_mnt->mnt_sb->s_root, &this);//获得一个新的dentry文件目录项结构并初始化
if (!dentry)
goto eexit_4;
dentry->d_op = &eventpollfs_dentry_operations;
d_add(dentry, inode);//将inode和目录项结构绑定
file->f_vfsmnt = mntget(eventpoll_mnt); //把该文件所属的文件系统置为epoll文件系统
file->f_dentry = dentry;
file->f_mapping = inode->i_mapping;
file->f_pos = 0;
file->f_flags = O_RDONLY;
file->f_op = &eventpoll_fops; //这一步是很重要的一步,用f_op指针指向epoll的回调数
file->f_mode = FMODE_READ;
file->f_version = 0;
file->private_data = NULL;
fd_install(fd, file);
*efd = fd;
*einode = inode;
*efile = file;
return 0;
eexit_4:
put_unused_fd(fd);
eexit_3:
iput(inode);
eexit_2:
put_filp(file);
eexit_1:
return error;
}
ep_getfd其实就做了三件事:获取新的file文件描述符,获取新的inode结构,获得新的fd,最后把三者连接绑定在一起,就表示了epoll在内核的事件表。
(2) ep_file_init(file)
前面讲到epoll的内核事件表已经创建完毕了,但是我们可以发现epoll是事件一次写入内核,多次监听的,然而在之前都没有发现可以存事件的数据结构红黑树。所以这一步ep_file_init就是来解决这个问题的。
下面根据代码来说它干了些什么
static int ep_file_init(struct file *file)
{
struct eventpoll *ep; //一个指向eventpoll的指针
if (!(ep = kmalloc(sizeof(struct eventpoll), GFP_KERNEL)))
return -ENOMEM;
/*下面是对eventpoll结构的一系列初始化*/
memset(ep, 0, sizeof(*ep));
rwlock_init(&ep->lock);
init_rwsem(&ep->sem);
init_waitqueue_head(&ep->wq);
init_waitqueue_head(&ep->poll_wait);
INIT_LIST_HEAD(&ep->rdllist);
ep->rbr = RB_ROOT; //这里可以看到红黑树的root的初始化
file->private_data = ep; //ep指针给file
DNPRINTK(3, (KERN_INFO "[%p] eventpoll: ep_file_init() ep=%p\n",
current, ep));
return 0;
}
这一步就是把一个eventpoll的结构创建并初始化,然后让file->private_data指向这个结构,这个结构中我们就可以找到rb_root,这一步之后epoll_creat()也就是epoll所有的准备工作就已经做完了。
2.epoll_ctl()
因为epoll对监听事件来说是一次写入多次监听的,所以必须要有对事件表的增删改操作接口,epoll_ctl就是提供给用户的可以进行事件表进行操作的接口。我们可以通过这个系统调用来添加删除和修改事件。
下面根据源码来走一遍:
asmlinkage long
sys_epoll_ctl(int epfd, int op, int fd, struct epoll_event __user *event)
{
/**一系列的数据结构的定义**/
int error;
struct file *file, *tfile;
struct eventpoll *ep;
struct epitem *epi;
struct epoll_event epds;
error = -EFAULT;
if (EP_OP_HASH_EVENT(op) &&
copy_from_user(&epds, event, sizeof(struct epoll_event)))
goto eexit_1;//把用户的事件从用户空间考到内核空间
error = -EBADF;
file = fget(epfd); //获得epoll内核事件表得文件描述符
if (!file)
goto eexit_1;
tfile = fget(fd); //获得要操作的事件得文件描述符
if (!tfile)
goto eexit_2;
error = -EPERM;
if (!tfile->f_op || !tfile->f_op->poll)
goto eexit_3; //如果fd文件描述符中f_op(指向所有文件操作指针得结构体)和这里需要用到的poll操作为空,就退出。因为poll是所有Io复用:select 、poll、epoll得底层实现。
error = -EINVAL;
if (file == tfile || !IS_FILE_EPOLL(file))
goto eexit_3; //判断需要操作的fd文件描述符是不是epfd文件描述符和 file是不是epoll文件系统的文件
ep = file->private_data; //拿到内核事件表的eventpoll结构体
down_write(&ep->sem); //为写获得读写内核事件表eventpoll的信号量。
epi = ep_find(ep, tfile, fd);//判断该事件在内核事件表中是否存在
error = -EINVAL;
switch (op) {
case EPOLL_CTL_ADD: //添加事件操作
if (!epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_insert(ep, &epds, tfile, fd);
} else
error = -EEXIST;
break;
case EPOLL_CTL_DEL: //删除事件
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD: //修改事件
if (epi) {
epds.events |= POLLERR | POLLHUP;
error = ep_modify(ep, epi, &epds);
} else
error = -ENOENT;
break;
}
if (epi)
ep_release_epitem(epi);
up_write(&ep->sem); //释放内核事件表eventpoll的读写信号量
eexit_3:
fput(tfile);
eexit_2:
fput(file);
eexit_1:
DNPRINTK(3, (KERN_INFO "[%p] eventpoll: sys_epoll_ctl(%d, %d, %d, %p) = %d\n",
current, epfd, op, fd, event, error));
return error;
}
在epoll_ctl中,主要分为以下几个步骤:
- 判断需要操作的事件在事件表中是否存在
- 判断需要进行的操作
- 使用更底层的代码对事件表进行增删改
epoll比poll和select高效的地方,其实在这里就可以 看出来,poll的系统调用会把该进程“挂”到fd所对应的设备的等待队列上,select和poll他们每一次都需要把current “挂”到fd对应设备的等待队列,同一个fd来说,你要不停监听的话,你就需要不停的进行poll,那么fd太多的话,这样的poll操作其实是很低效的,epoll则不同的是,每一个事件,它只在添加的时候调用poll,以后都不用去调用poll,避免了太多重复的poll操作,所以epoll对于poll和select来说是更高效的。
还有一个比较有趣的地方是,所有的事件的存储都是在红黑树里面,但是我们可以发现红黑树的节点其实是这样的:
struct rb_node
{
/**
* 红黑树节点的双亲。
*/
struct rb_node *rb_parent;
/**
* 红黑树节点颜色。
*/
int rb_color;
#define RB_RED 0
#define RB_BLACK 1
/**
* 右孩子
*/
struct rb_node *rb_right;
/**
* 左孩子
*/
struct rb_node *rb_left;
};
struct rb_root
{
struct rb_node *rb_node;
};
不管是根节点还是节点,你都看不到数据,那内核是怎么通过红黑树操作数据?
struct epitem {
struct rb_node rbn;
........
}
其实这才是真正的红黑树的节点,里面的东西很多,但是这里只列出来了相关的。一个fd对应一个这样的结构,同一个fd只能插入一次,这也是要采用一个红黑树这样的数据结构的其中一个原因,还有一个原因就是因为红黑树的查询等操作效率高。那么从rb_node到struct epitem就可以很容易的做到了,只需要做一个指针的强转就可以从rb_node到epitem:
epic = rb_entry(parent, struct epitem, rbn);
#define rb_entry(ptr, type, member) container_of(ptr, type, member)
#define container_of(ptr, type, member) ({ \
const typeof( ((type *)0)->member ) *__mptr = (ptr); \
(type *)( (char *)__mptr - offsetof(type,member) );})
这也就跟C++的模板类似,只不过这是C语言实现的,很多方法还是需要再重新定义。但是对C语言来说,在有限的条件下做到这样的代码复用还是非常厉害的。
3.epoll_wait()
之前的两个函数已经把事件添加到内核事件表,而且已经把当前进程“挂”到fd的所有设备上,这就相当于一个回调函数,当对应fd有可处理事件时,就会唤醒等待队列的进程,进程会把当前可处理的事件及有关信息记录到一个rdllist的链表中,接下来就是epoll_wait所要做的事了:
static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,
int maxevents, long timeout)
{
/*定义一些必要的数据结构*/
int res, eavail;
unsigned long flags;
long jtimeout;
wait_queue_t wait;
/*检查超时时间是否有效*/
jtimeout = timeout == -1 || timeout > (MAX_SCHEDULE_TIMEOUT - 1000) / HZ ?
MAX_SCHEDULE_TIMEOUT: (timeout * HZ + 999) / 1000;
retry:
/*获得eventpoll的写锁*/
write_lock_irqsave(&ep->lock, flags);
/*rdllist如果是空就阻塞循环等待回调函数往rdllist中写数据,一旦不为空或者超过超时时间就会退出循环*/
res = 0;
if (list_empty(&ep->rdllist)) {
init_waitqueue_entry(&wait, current);
add_wait_queue(&ep->wq, &wait);
for (;;) {
set_current_state(TASK_INTERRUPTIBLE);
if (!list_empty(&ep->rdllist) || !jtimeout)
break;
if (signal_pending(current)) {
res = -EINTR;
break;
}
write_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout);
write_lock_irqsave(&ep->lock, flags);
}
remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
}
/*判断rdllist是否为空*/
eavail = !list_empty(&ep->rdllist);
/*释放eventpoll的写锁*/
write_unlock_irqrestore(&ep->lock, flags);
/*这一步是把内核的rdllist中的事件copy到用户空间*/
if (!res && eavail &&
!(res = ep_events_transfer(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
}
简单总结一下epoll_wait:
- 检查超时事件和用来存就绪事件的用户空间的大小
- 循环等待就绪事件
- 把就绪事件从内核空间copy到用户空间
这就是epoll的整个流程和主要的步骤的分析。但是我们没能看出来它的ET和LT工作模式体现在哪里?所以下面就来说说epoll的ET模式具体实现。
4.ET模式
ET模式是epoll特有的高效工作模式。主要体现就是一次就绪事件只给用户提醒一次。ET模式的实现其实就是在epoll_wait的最后一步,从内核链表往用户空间考数据的时候,下面来看代码:
static int ep_events_transfer(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents)
{
int eventcnt = 0;
struct list_head txlist; //建立一个临时量,用以保存就绪的事件
INIT_LIST_HEAD(&txlist); //初始化
down_read(&ep->sem); //获得ep的读写信号量
/*从ep->rdllist中copy到txlist*/
if (ep_collect_ready_items(ep, &txlist, maxevents) > 0) {
/*从txlist到用户空间*/
eventcnt = ep_send_events(ep, &txlist, events);
/*从txlist再反过来copy给ep->rdllist,这一步是具体的ET实现*/
ep_reinject_items(ep, &txlist);
}
up_read(&ep->sem); //释放ep读写信号量
return eventcnt;
}
下面的函数就是ET的具体实现,也是ET和LT的区别
static void ep_reinject_items(struct eventpoll *ep, struct list_head *txlist)
{
int ricnt = 0, pwake = 0;
unsigned long flags;
struct epitem *epi;
/*获得ep的读写锁*/
write_lock_irqsave(&ep->lock, flags);
while (!list_empty(txlist)) {
/*这一步跟之前讲过的rb_node到epi的一步是一样的*/
epi = list_entry(txlist->next, struct epitem, txlink);
/*初始化*/
EP_LIST_DEL(&epi->txlink);
/*
1.内核事件表(红黑树)不为空
2.事件没有设置ET工作模式
3.就绪事件类型和监听事件类型相同
4.该事件的rdllink不为空
*/
if (EP_RB_LINKED(&epi->rbn) && !(epi->event.events & EPOLLET) &&
(epi->revents & epi->event.events) && !EP_IS_LINKED(&epi->rdllink)) {
/*把刚刚临时量txlist中的该事件继续添加到rdllist中*/
list_add_tail(&epi->rdllink, &ep->rdllist);
ricnt++;
}
}
if (ricnt) {
if (waitqueue_active(&ep->wq))
wake_up(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
}
/*释放读写锁*/
write_unlock_irqrestore(&ep->lock, flags);
if (pwake)
ep_poll_safewake(&psw, &ep->poll_wait);
}
先说LT模式对于同一个就绪事件会重复提醒,从上面可以看出来是因为它又把依旧就绪且未设置ET标志的事件重新copy到了rdllist中,所以下一次epoll_wait还是会把该事件返回给用户。那么ET这里就很好解释了,尽管该事件未处理完,但是你只要设置了ET标志,我就不会再次把该事件返回给用户。这就是ET的实现。