Android中的Looper与epoll
众所周知,Android的消息队列是通过Looper实现的,但是这与pipe有什么关系,为什么会用到epoll这样的Linux IO多路复用机制呢。
前言
在应用程序进程的入口函数ActivityThread的main函数中,会调用Looper的prepareMainLooper方法最终实例化一个MessageQueue对象,而MessageQueue.java的构造函数中会通过nativeinit调用到jni层方法,最终生成C++层的Looper对象(代码在frameworks/base/libs/utils/Looper.cpp)
读
在Looper.cpp的构造函数中:
Looper::Looper(bool allowNonCallbacks) :
mAllowNonCallbacks(allowNonCallbacks), mSendingMessage(false),
mResponseIndex(0), mNextMessageUptime(LLONG_MAX) {
int wakeFds[2];
int result = pipe(wakeFds);
mWakeReadPipeFd = wakeFds[0];
mWakeWritePipeFd = wakeFds[1];
result = fcntl(mWakeReadPipeFd, F_SETFL, O_NONBLOCK);
result = fcntl(mWakeWritePipeFd, F_SETFL, O_NONBLOCK);
// Allocate the epoll instance and register the wake pipe.
mEpollFd = epoll_create(EPOLL_SIZE_HINT);
struct epoll_event eventItem;
// zero out unused members of data field union
memset(& eventItem, 0, sizeof(epoll_event));
eventItem.events = EPOLLIN;
eventItem.data.fd = mWakeReadPipeFd;
result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeReadPipeFd, & eventItem);
}
首先通过管道pipe创建了读端与写端两个文件描述符,最后通过epoll_create创建epoll专用文件描述符,最后通过epoll_ctl告诉mEpollFd需要监控mWakeReadPipeFd描述符的EPOLLIN事件。
看到这里会有疑问了,为什么单独监控一个mWakeReadPipeFd描述符需要用到epoll呢,使用recvform这样的同步阻塞IO就行了呢。实际上Looper的功能是强大的,它提供了addfd的方法,外部可调用该方法动态添加需要监控的描述符与回调,Android的Choreographer中就是通过该方法监控VSYNC事件的。
在Java层的MessageQueue的next方法中:
int nextPollTimeoutMillis = 0;
for (;;) {
if (nextPollTimeoutMillis != 0) {
Binder.flushPendingCommands();
}
nativePollOnce(ptr, nextPollTimeoutMillis);
synchronized (this) {
final long now = SystemClock.uptimeMillis();
Message prevMsg = null;
Message msg = mMessages;
if (msg != null && msg.target == null) {
do {
prevMsg = msg;
msg = msg.next;
} while (msg != null && !msg.isAsynchronous());
}
if (msg != null) {
if (now < msg.when) {
nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
} else {
mBlocked = false;
if (prevMsg != null) {
prevMsg.next = msg.next;
} else {
mMessages = msg.next;
}
msg.next = null;
if (DEBUG) Log.v(TAG, "Returning message: " + msg);
msg.markInUse();
return msg;
}
} else {
nextPollTimeoutMillis = -1;
}
...
}
//nativePollOnce是个jni调用,在管道内无消息时有可能导致线程阻塞
//首次传入nativePollOnce的超时时间为0,表示不阻塞立即返回
//mMessages保存了下一条Message,如果存在直接返回,并将mMessages设置为当前Message.next,下次调用next可直接返回
//如果没有消息则设置超时时间为-1,表示无限期阻塞等待管道中有新的消息写入。
其中的nativePollOnce方法,会通过jni调用进入Looper.cpp的pollInner方法中:
struct epoll_event eventItems[EPOLL_MAX_EVENTS];
int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);
//这里通过epoll_wait调用,当mEpollFd监控的文件描述符没有IO事件发生时,线程会阻塞住且不会被CPU分配时间片了,如果有IO事件发生或事件超时了,该方法会返回了。
for (int i = 0; i < eventCount; i++) {
int fd = eventItems[i].data.fd;
uint32_t epollEvents = eventItems[i].events;
if (fd == mWakeReadPipeFd) {
if (epollEvents & EPOLLIN) {
awoken();
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on wake read pipe.", epollEvents);
}
} else {
ssize_t requestIndex = mRequests.indexOfKey(fd);
if (requestIndex >= 0) {
int events = 0;
if (epollEvents & EPOLLIN) events |= ALOOPER_EVENT_INPUT;
if (epollEvents & EPOLLOUT) events |= ALOOPER_EVENT_OUTPUT;
if (epollEvents & EPOLLERR) events |= ALOOPER_EVENT_ERROR;
if (epollEvents & EPOLLHUP) events |= ALOOPER_EVENT_HANGUP;
pushResponse(events, mRequests.valueAt(requestIndex));
} else {
ALOGW("Ignoring unexpected epoll events 0x%x on fd %d that is "
"no longer registered.", epollEvents, fd);
}
}
}
//遍历返回的消息,如果是管道mWakeReadPipeFd的消息,调用awoken();
//如果是其他描述符的消息,pushResponse进集合稍后执行,也就是通过addfd方法添加的描述符;
//另外Looper.cpp也支持sendmessage方法,会在Response之前执行。
而awoken方法:
void Looper::awoken() {
char buffer[16];
ssize_t nRead;
do {
nRead = read(mWakeReadPipeFd, buffer, sizeof(buffer));
} while ((nRead == -1 && errno == EINTR) || nRead == sizeof(buffer));
}
//该方法只是将管道中的内容清空
写
我们在应用中通过handler的sendmessage想消息队列发送消息,最终会调用到MessageQueue.java的enqueueMessage方法:
if (p == null || when == 0 || when < p.when) {
// New head, wake up the event queue if blocked.
msg.next = p;
mMessages = msg;
needWake = mBlocked;
} else {
// Inserted within the middle of the queue. Usually we don't have to wake
// up the event queue unless there is a barrier at the head of the queue
// and the message is the earliest asynchronous message in the queue.
needWake = mBlocked && p.target == null && msg.isAsynchronous();
Message prev;
for (;;) {
prev = p;
p = p.next;
if (p == null || when < p.when) {
break;
}
if (needWake && p.isAsynchronous()) {
needWake = false;
}
}
msg.next = p; // invariant: p == prev.next
prev.next = msg;
}
// We can assume mPtr != 0 because mQuitting is false.
if (needWake) {
nativeWake(mPtr);
}
//根据注释也可以知道,这里分两种情况
//第一是消息队列为空,第二是消息队列已经有新消息时,需要适当的位置放置消息。
//当线程needWake时,说明当前线程挂起,需要唤醒线程,会调用nativeWake方法,最终调用到Looper.cpp的wake方法。
wake方法:
void Looper::wake() {
ssize_t nWrite;
do {
nWrite = write(mWakeWritePipeFd, "W", 1);
} while (nWrite == -1 && errno == EINTR);
if (nWrite != 1) {
if (errno != EAGAIN) {
ALOGW("Could not write wake signal, errno=%d", errno);
}
}
}
//只是往管道写端写入W,唤醒当前应用主线程。
总结
Android的应用层通过Message.java实现队列,利用管道和epoll机制实现线程状态的管理,配合起来实现了Android主线程的消息队列模型,而这只是Android的一部分。epoll机制通过Looper.cpp的addfd实现了对其他描述符的监听,在4.1版本之后应用程序会等待VSYNC信号发起View的绘制操作,就是通过它实现的,通过分析SurfaceFlinger和Choreographer,可以很好的理解Android的绘制流程。。。