Android源码:Handler, Looper和MessageQueue实现解析

做了6年的Android开发,此间做的事情非常杂包括ROM,SDK和APP,从来没有好好的研究过Android的基础代码。趁着这段时间项目没那么忙,把基础的东西仔细研究清楚并记录下来。

想到要分析Looper和Handler的时候其实正在看背光调节模块的代码,其中看到PowerManagerService中很多API最终是向Handler发送Message的形式进行实际操作。Handler在做App的时候用的很多,最常见的用法就是extend一个Handler类然后重载其handleMessage方法,然后要发消息的时候调用其sendMessage API即可。使用Handler还可以在不同的线程之间传递消息实现异步操作,下边我们扒一扒Android N的源码看看其实现以及Handler, Looper以及Thread之间的关系。

一,Looper, Thread之间的关系

先来看一段代码

@Override
protected void onCreate(Bundle savedInstanceState) {
    super.onCreate(savedInstanceState);
    setContentView(R.layout.activity_main);

    Handler h1 = new Handler();

    new Thread(new Runnable() {
        @Override public void run() {
            Handler h2 = new Handler();
        }
    }).start();
}

运行之后h1正常创建,但是创建h2的时候crash了:

--------- beginning of crash
E/AndroidRuntime: FATAL EXCEPTION: Thread-263
Process: com.example.stone.sfsandroidclient, PID: 32286
java.lang.RuntimeException: Can't create handler inside thread that has not called Looper.prepare()
   at android.os.Handler.<init>(Handler.java:200)
   at android.os.Handler.<init>(Handler.java:114)
   at com.example.stone.sfsandroidclient.MainActivity$1.run(MainActivity.java:71)
   at java.lang.Thread.run(Thread.java:818)

出错日志提示不能在一个没有调用过Looper.prepare()的Thread里边new Handler()。为什么Looper.prepare()会影响Handler的创建呢,先看源码哪里会抛出这个异常:

static final ThreadLocal<Looper> sThreadLocal = new ThreadLocal<Looper>();

public Handler(Callback callback, boolean async) {
    ... ...
    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    ... ...
}

public static @Nullable Looper myLooper() {
    return sThreadLocal.get();
}

这个sThreadLocal是一个ThreadLocal(线程局部)对象,它有get()/set()这么一对方法。它的工作机制是,仅当一个线程调用了它的set()来设置值之后,get()才能获取到设置进去的对象,而且这个对象是线程唯一的。也就是说在ThreadA里面调用了sThreadLocal.set(),在ThreadB里面调用sThreadLocal.get()还是null。所以在Looper.prepare()里一定调用了set():

public static void prepare() {
    prepare(true);
}

private static void prepare(boolean quitAllowed) {
    if (sThreadLocal.get() != null) {
        throw new RuntimeException("Only one Looper may be created per thread");
    }
    sThreadLocal.set(new Looper(quitAllowed));
}

public static void prepareMainLooper() {
    prepare(false);
    synchronized (Looper.class) {
        if (sMainLooper != null) {
            throw new IllegalStateException("The main Looper has already been prepared.");
        }
        sMainLooper = myLooper();
    }
}

为什么主线程上来就有Looper而我们自己new的Thread就没有呢,因为在主线程里边有这段(代码):

public static void main(String[] args) {
    ... ...
    Looper.prepareMainLooper();
    ... ...
}

所以主线程是“生来”就有一个Looper跟它绑定在一起的,而且这个looper身份很特殊,它是整个进程里面的main looper,通过Looper.getMainLooper()就能拿到,接着就可以为所欲为的做些更新UI的活儿了。
那么其他的Thread要怎样才能也拥有一个Looper么,Android为我们提供了一个很好的例子 – HandlerThread类,用法如下:

HandlerThread ht = new HandlerThread("worker");
ht.start();
Handler handler = new Handler(ht.getLooper());

关键就在HandlerThread的run里面:

@Override
public void run() {
    mTid = Process.myTid();
    Looper.prepare();
    synchronized (this) {
        mLooper = Looper.myLooper();
        notifyAll();
    }
    Process.setThreadPriority(mPriority);
    onLooperPrepared();
    Looper.loop();
    mTid = -1;
}

从上述分析可知Looper.prepare()之后这个Thread就有了一个跟它关联在一起的Looper了,这里头有两个需要注意的点:一是new Handler()要在ht.start()之后,二是如果要extend HandlerThread则记得要在run()里面调用super.run()。

二, Looper, Handler和MessageQueue

还是看Handler的构造函数

public Handler(Callback callback, boolean async) {
    ... ...
    mLooper = Looper.myLooper();
    if (mLooper == null) {
        throw new RuntimeException(
            "Can't create handler inside thread that has not called Looper.prepare()");
    }
    mQueue = mLooper.mQueue;
    mCallback = callback;
    mAsynchronous = async;
}

可知它们的关系为,一个HandlerThread带有一个Looper,Looper有一个MessageQueue和多个与之关联的Handler,而一个Handler只与一个Looper有关联。UML类图如下:

《Android源码:Handler, Looper和MessageQueue实现解析》 handler_looper.png

在使用中接触得最多的是Message,它在MessageQueue里面以单项链表的形式组织,mMessages指向链表头部。每个Java层的MessageQueue都有一个C++实现的NativeMessageQueue与之对应,关键的实现部分其实在native层中,下边的章节会分析。

三,工作机制

先借一张《Efficient Android Threading》里面的图:

《Android源码:Handler, Looper和MessageQueue实现解析》 looper.png

顾名思义,Looper的主要执行过程是无线循环的loop(),它在HandlerThread的run()运行起来:

public static void loop() {
    final Looper me = myLooper();
    final MessageQueue queue = me.mQueue;

    for (;;) {
        // queue.next()是阻塞调用,后面详细分析
        Message msg = queue.next(); // might block
        // 当next()返回null的时候,表示Looper.quit()被调用了所以退出无限循环
        if (msg == null) {
            // No message indicates that the message queue is quitting.
            return;
        }
        ... ...
        try {
            // 找到了msg就交给target(Handler)开干吧
            msg.target.dispatchMessage(msg);
        } finally {
            if (traceTag != 0) {
                Trace.traceEnd(traceTag);
            }
        }
        // Message可回收利用,这里不展开了
        msg.recycleUnchecked();
    }
}

Handler的dispatchMessage()就没什么可说的了:

public void dispatchMessage(Message msg) {
    if (msg.callback != null) {
        handleCallback(msg);
    } else {
        if (mCallback != null) {
            if (mCallback.handleMessage(msg)) {
                return;
            }
        }
        handleMessage(msg);
    }
}

里面要么就是执行msg自己定义的callback,要么就是在extend Handler时定义的handleMessage方法。要记得的就是这个方法的调用是在Handler关联的Looper所关联的线程里面执行的。所以如果你的Handler关联的是main Looper,就不要在handleMessage()里面做耗时的操作(网络请求,IO,密集计算等)了,否则很有可能造成ANR。
接着分析最关键的queue.next():

Message next() {
    ... ...
    int nextPollTimeoutMillis = 0;
    for (;;) {
        if (nextPollTimeoutMillis != 0) {
            Binder.flushPendingCommands();
        }
        // 阻塞在native层
        nativePollOnce(ptr, nextPollTimeoutMillis);
        // mMessages有可能被其他线程修改(通过sendMessage),锁起来
        synchronized (this) {
            // Try to retrieve the next message.  Return if found.
            final long now = SystemClock.uptimeMillis();
            Message prevMsg = null;
            Message msg = mMessages;
            if (msg != null && msg.target == null) {
                // 这是一个有故事的msg因为target为null表示这个一个barrier(围栏)
                // barrier可以阻止卡住所有sync类型的msg的执行,它通过postSyncBarrier()插入,
                // 通过removeSyncBarrier()去除,当barrier去除后所有被拦住的sync msg都被依次执行
                // 目前源码中用到了barrier的就只有view的invalidation机制了。
                // 如果特别重要的msg还是有特权可以执行的,async msg就是这个例外。
                do {
                    prevMsg = msg;
                    msg = msg.next;
                } while (msg != null && !msg.isAsynchronous());
            }
            if (msg != null) {
                // 当没有barrier而且有sync msg,或者有barrier但是找到async msg的时候
                if (now < msg.when) {
                    // msg定义的when还没到,让native继续等nextPollTimeoutMillis时长
                    nextPollTimeoutMillis = (int) Math.min(msg.when - now, Integer.MAX_VALUE);
                } else {
                    // 有到点的msg,返回给Looper的loop()处理
                    mBlocked = false;
                    if (prevMsg != null) {
                        prevMsg.next = msg.next;
                    } else {
                        mMessages = msg.next;
                    }
                    msg.next = null;
                    if (DEBUG) Log.v(TAG, "Returning message: " + msg);
                    msg.markInUse();
                    return msg;
                }
            } else {
                // 没有msg了,在native层阻塞
                nextPollTimeoutMillis = -1;
            }

            // 如果quit()调用了,通知Looper.loop()退出无限循环
            if (mQuitting) {
                dispose();
                return null;
            }
        ... ...
        // 在idle handler执行的时候有可能来了新的mesasge,设为0不阻塞马上进行检查
        nextPollTimeoutMillis = 0;
    }
}

捎带讲讲barrier的插入/拔出,没兴趣的可以略过毕竟用得少:

private int postSyncBarrier(long when) {
    // 将barrier插入到mMessges队列中,回收回来的msg的target是空的
    // 把token返回给调用者,拔掉barrier的时候是需要token的
    synchronized (this) {
        final int token = mNextBarrierToken++;
        final Message msg = Message.obtain();
        msg.markInUse();
        msg.when = when;
        msg.arg1 = token;

        Message prev = null;
        Message p = mMessages;
        if (when != 0) {
            while (p != null && p.when <= when) {
                prev = p;
                p = p.next;
            }
        }
        if (prev != null) { // invariant: p == prev.next
            msg.next = p;
            prev.next = msg;
        } else {
            msg.next = p;
            mMessages = msg;
        }
        return token;
    }
}

public void removeSyncBarrier(int token) {
    synchronized (this) {
        Message prev = null;
        Message p = mMessages;
        // 对一下token
        while (p != null && (p.target != null || p.arg1 != token)) {
            prev = p;
            p = p.next;
        }
        if (p == null) {
            throw new IllegalStateException("The specified message queue synchronization "
                    + " barrier token has not been posted or has already been removed.");
        }
        final boolean needWake;
        if (prev != null) {
            prev.next = p.next;
            needWake = false;
        } else {
            mMessages = p.next;
            needWake = mMessages == null || mMessages.target != null;
        }
        p.recycleUnchecked();

        // 拔出barrier唤醒阻塞在native的looper所在线程
        if (needWake && !mQuitting) {
            nativeWake(mPtr);
        }
    }
}

四, native层的NativeMessageQueue机制

static void android_os_MessageQueue_nativePollOnce(JNIEnv* env, jobject obj,
        jlong ptr, jint timeoutMillis) {
    NativeMessageQueue* nativeMessageQueue = reinterpret_cast<NativeMessageQueue*>(ptr);
    nativeMessageQueue->pollOnce(env, obj, timeoutMillis);
}

void NativeMessageQueue::pollOnce(JNIEnv* env, jobject pollObj, int timeoutMillis) {
    mPollEnv = env;
    mPollObj = pollObj;
    // 交给native looper去poll了
    mLooper->pollOnce(timeoutMillis);
    mPollObj = NULL;
    mPollEnv = NULL;
    ... ...
}

native looper使用了linux的epoll机制:

int Looper::pollOnce(int timeoutMillis, int* outFd, int* outEvents, void** outData) {
    int result = 0;
    for (;;) {
        // native looper实现了很复杂的功能,可以从fd中读取远端响应并返回
        // 但是MessageQueue只用到了其阻塞的特点,所以这里忽略了一大段代码
        ... ...
        if (result != 0) {
#if DEBUG_POLL_AND_WAKE
            ALOGD("%p ~ pollOnce - returning result %d", this, result);
#endif
            if (outFd != NULL) *outFd = 0;
            if (outEvents != NULL) *outEvents = 0;
            if (outData != NULL) *outData = NULL;
            return result;
        }

        result = pollInner(timeoutMillis);
    }
}

int Looper::pollInner(int timeoutMillis) {
    ... ...
    // 同样,这样删了更大一段代码因为我们只关心阻塞
    int eventCount = epoll_wait(mEpollFd, eventItems, EPOLL_MAX_EVENTS, timeoutMillis);

    // 不管返回什么,不阻塞的返回就好
    if (eventCount < 0) {
        ... ...
        goto Done;
    }

    // Check for poll timeout.
    if (eventCount == 0) {
        ... ...
        goto Done;
    }
    ... ...
Done: ;
    ... ...
    return result;
}

这里最最重要的就是epoll_wait()调用,在linux shell里面man epoll_wait得到提示:

NAME
       epoll_wait, epoll_pwait - wait for an I/O event on an epoll file descriptor

SYNOPSIS
       #include <sys/epoll.h>

       int epoll_wait(int epfd, struct epoll_event *events,
                      int maxevents, int timeout);
... ...
RETURN VALUE
       When successful, epoll_wait() returns the number of file descriptors ready for the requested I/O, or zero if no file  descrip‐
       tor  became  ready  during the requested timeout milliseconds.  When an error occurs, epoll_wait() returns -1 and errno is set
       appropriately.

其中timeout即Looper中的timeoutMillis即Java层的MessageQueue.next()中的nextPollTimeoutMillis。重新看next()中nextPollTimeoutMillis有3种可能的选择:0, -1或(int) Math.min(msg.when – now, Integer.MAX_VALUE);即最近一个msg的到期时间,传到epoll_wait()中产生的效果分表是马上返回、一直阻塞直至mEpollFd可用、阻塞msg.when-now的时长或者mEpollFd可用。这样就达到了闲时线程阻塞交出CPU,msg到期之后Thread接着执行的效果。当Thread正在永久阻塞的时候,还可以通过让mEpollFd变为可用来通知线程跳出阻塞。

五,Native Looper的核心 – epoll

还是只关心阻塞和唤醒那部分,首先要先了解linux的epoll机制,epoll需要三个linux系统调用包括上边的epoll_wait还有epoll_create以及epoll_ctl,看看native Looper是使用这些系统调用的:

Looper::Looper(bool allowNonCallbacks) :......{
    // 通过eventfd生成一个专门用于发射event的fd
    mWakeEventFd = eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC);
    ... ...
    rebuildEpollLocked();
}

void Looper::rebuildEpollLocked() {
    // 通过epoll_create生成用于epoll_wait调用的fd
    mEpollFd = epoll_create(EPOLL_SIZE_HINT);
    LOG_ALWAYS_FATAL_IF(mEpollFd < 0, "Could not create epoll instance: %s", strerror(errno));

    struct epoll_event eventItem;
    memset(& eventItem, 0, sizeof(epoll_event)); // zero out unused members of data field union
    eventItem.events = EPOLLIN;
    eventItem.data.fd = mWakeEventFd;
    // 通过epoll_ctl将mEpollFd和mWakeEventFd联系起来
    int result = epoll_ctl(mEpollFd, EPOLL_CTL_ADD, mWakeEventFd, & eventItem);

}

至此,mEpollFd和mWakeEventFd就变成了一对好基友,当向mWakeEventFd写入数据的时候 阻塞在epoll_wait(mEpollFd…);的线程将会被唤醒(向mWakeEventFd写入event的话epoll_wait还能读取到该event,我们只关心唤醒所以不作进一步解析)。所以native Looper的wake()是这样实现的:

void Looper::wake() {
#if DEBUG_POLL_AND_WAKE
    ALOGD("%p ~ wake", this);
#endif

    uint64_t inc = 1;
    // 向mWakeEventFd里面写了个1,用来唤醒足够了
    ssize_t nWrite = TEMP_FAILURE_RETRY(write(mWakeEventFd, &inc, sizeof(uint64_t)));
    if (nWrite != sizeof(uint64_t)) {
        if (errno != EAGAIN) {
            ALOGW("Could not write wake signal: %s", strerror(errno));
        }
    }
}

想要了解更多可以man epoll_ctl, man epoll_create以及查找linux epoll,管道pipe的实现机制。总的来说,Android的Looper设计的是相当的精巧。

    原文作者:darkengine
    原文地址: https://www.jianshu.com/p/10dd4d605d40
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞