Java Executors 源码分析

0 引言

前段时间需要把一个C++的项目port到Java中,因此时隔三年后重新熟悉了下Java。由于需要一个通用的线程池,自然而然就想到了Executors。

用了后,感觉很爽… 于是忍不住抠了下源码。因此就有了这篇学习笔记。

言归正传,Java Executor是一个功能丰富,接口设计很好的,基于生产者-消费者模式的通用线程池。这种线程池的设计思想也在很多地方被应用。

在这篇文章中,我并不打算介绍java线程池的使用,生产者-消费者模式,并发编程基本概念等。

通常来说,一个线程池的实现包括四个部分:

  1. 执行任务的线程
  2. 用于封装任务的task对象
  3. 存储任务的数据结构
  4. 线程池本身

1 Thread

Thread 并不是concurrent包的一部分。Thread包含着name, priority等成员和对应的操作方法。

它是继承自runable的,也就是说线程的入口函数是run。它的继承体系和重要操作函数如下图:

《Java Executors 源码分析》

它实现了一系列包括sleep, yield等静态方法。以及获取当前线程的静态方法currentThread()。这些都是native方法。

值得注意的是它的中断机制(虽然它也实现了suspend和resume方法,但是这两个方法已被弃用):

  1. 通过调用interrupt来触发一个中断
  2. isInterrupted() 用来查询线程的中断状态
  3. interrupted() 用来查询并清除线程的中断状态
public void interrupt() {
    if (this != Thread.currentThread())
        checkAccess();
    synchronized (blockerLock) {
        Interruptible b = blocker;
        if (b != null) {
            interrupt0();           // Just to set the interrupt flag
            b.interrupt(this);
            return;
        }
    }
    interrupt0();
}

在默认的情况下,blocker (Interruptible 成员变量)的值为null, 这时调用interrupt,仅仅是调用interrupt0设置一个标志位。

而如果blocker的值不为null,则会调用其interrupt方法实现真正的中断。

(关于blocker值何时被设置,在后面会看到一个使用场景。)

当线程处于可中断的阻塞状态时,比如说阻塞在sleep, wait, join,select等操作时,调用interrupt方法会让线程从阻塞状态退出,并抛出InterruptedException。

值得注意的一点是:interrupt让我们从阻塞的方法中退出,但线程的中断状态却并不会被设置

try {
    Thread.sleep(10);
}
catch (InterruptedException e) {
    System.out.println("IsInterrupted: " + Thread.currentThread().isInterrupted());
}

如上述示例代码,此时你得到的输出是: IsInterrupted : false 。这是一个有点令人意外的地方。
上述代码并不是一个好的示例,因为interrupt被我们“吃”掉了!除非你明确的知道这是你想要的。否则的话请考虑在异常捕获中(catch段中)加上:

Thread.currentThread.interrupt();

2. Task

Java可执行的接口类有两种,Runnable和Callable,它们的区别是Callable可以带返回值,一个需要实现Run()方法,另一个需要实现带返回值的Call() 方法。

在java.util.concurret中还有另外一个接口类Future。

Future表示一个异步任务的结果,就是user code向线程池提交一个任务后,它会返回对应的 Future对象。用以观察任务执行的状态(isCancelled, isDone),取消任务(Cancel)或者等待任务执行(get, timeout get)。

《Java Executors 源码分析》

如上图,RunnableFuture是一个中间类,它将Runnable和Future的功能糅合到一起。FutureTask 则是真正的实现。

FutureTask

FutureTask可以从一个Runnable和Callable构造,当通过Runnable构造时,它会调用Excutors.callable接口将其转为Callable对象保存起来。

从上面的类图中可以看出,FutureTask除了简单的状态查询等接口外,还具有两个重要的接口:get()get(long timeout, TimeUnit unit)), cancel(bool mayInterruptIfRunning)

它们分别提供两个重要的功能:阻塞(当前线程)等待(一段时间)直到task完成或者异常终止;取消任务。

任务取消

一个任务具有三种状态:尚未运行,正在运行,已经执行完毕。

在调用cancel后,如果任务处于已经执行完毕了,则不需要做任何事情直接返回;

如果任务尚未运行,将其状态设为cancelled;
如果任务正在执行,而且user以cancel(true)的方式取消这个任务。那么FutureTask会通过调用Thread.interrupt来终止当前任务。

public boolean cancel(boolean mayInterruptIfRunning) {
    // 任务已经完成或者被中断等其他状态
    if (state != NEW)
        return false;
    if (mayInterruptIfRunning) {
        // 正在运行,或者尚未运行
        if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, INTERRUPTING))
            return false;
        Thread t = runner;
        if (t != null)
            t.interrupt();
        UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // final state
    }
    // 设置cancel标志位
    else if (!UNSAFE.compareAndSwapInt(this, stateOffset, NEW, CANCELLED))
        return false;
    finishCompletion();
    return true;
}

注意到: FutureTask并没有一个RUNNING的状态来标识该任务正在执行。正常的情况下,任务从开始创建直到运行完毕,这段过程的状态都是NEW。

阻塞等待

user code可以调用get() 接口等待任务完成或者调用get(long, TimeUnit)等待一段时间。但get()接口被调用,当前的线程将被挂起,直到条件满足(任务完成或者异常退出)。

在前文中我们了解到,Thread并没有提供挂起和阻塞的方法。在这里,Java利用LockSupport类来实现目的。(我猜测其中用了类似条件变量的方法来实现)。

park

LockSupport也属于concurrent。FutureTask利用它的park (parkNanos)和unpark方法来实现线程的挂起和恢复:


public static void park(Object blocker) { Thread t = Thread.currentThread(); setBlocker(t, blocker); unsafe.park(false, 0L); setBlocker(t, null); } public static void unpark(Thread thread) { if (thread != null) unsafe.unpark(thread); }

其中parkNanos跟park方法并无本质区别,只是多了一个timeout参数。FutureTask分别用它们来实现get和timeout的get。

注意到上面的setBlocker方法了吗?没错,它就是给在上文Thread.interrupt方法中出现过的Thread成员变量blocker赋值。从这我们可以看出,它是可中断的。

而它真正实现挂起的则是依赖unsafe类。unsafe类在concurrent中频繁出现,但sun去并不建议使用它。

它除了提供park,unpark方法外,还提供了一些内存和同步原语。比如CAS等。

多个等待者

调用get()的线程可以是一个,也可以是多个。为了能够在恰当的时机将它们一一恢复,FutureTask内部需要维护一个链表来记录所有的等待线程:waiters.

static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

get 全貌

至此,我们终于了解get的全貌了。get会调用awaitDone方法来实现阻塞。当然,只有两个状态需要处理:NEW, COMPLETING。

NEW的状态在前文已经有介绍过。COMPLETING状态通常持续较短,在FutureTask 内部的callable 的call方法调用完毕后,会需要将call的返回值设置到outcome这个成员变量。随后将状态设为NORMAL。这期间的状态就是COMPLETING。

显而易见,对于这种状态,我们只需要调用yield让出线程资源,使得FutureTask完成这一过程即可。


private int awaitDone(boolean timed, long nanos) throws InterruptedException { final long deadline = timed ? System.nanoTime() + nanos : 0L; WaitNode q = null; boolean queued = false; for (;;) { if (Thread.interrupted()) { removeWaiter(q); throw new InterruptedException(); } int s = state; if (s > COMPLETING) { // 1 if (q != null) q.thread = null; return s; } else if (s == COMPLETING) // cannot time out yet 2 Thread.yield(); else if (q == null) // 3 q = new WaitNode(); else if (!queued) // 4 queued = UNSAFE.compareAndSwapObject(this, waitersOffset, q.next = waiters, q); else if (timed) { // 5 nanos = deadline - System.nanoTime(); if (nanos <= 0L) { removeWaiter(q); return state; } LockSupport.parkNanos(this, nanos); } else // 6 LockSupport.park(this); } }

当任务处于NEW状态正在被执行时,其他线程调用get而进入awaitdone函数。

此时的流程是 3 -> 4 -> 5 或者 3 -> 4 -> 6。

它会首先分配一个WaitNode对象 –> 把它插入到waiters链表的表头 –> 然后开始等待。那么park函数何时返回呢?

  1. 对应的unpark被调用(或者在这之前已经被调用)
  2. 如果设置了timeout的,会在时间到达后退出。
  3. 被中断。
  4. 其他异常。

等待线程恢复

当任务执行完毕(或者被cancel)时,FutureTask会调用最终调用finishcompletion,改函数会改变FutureTask状态,并调用LockSupport.unpark方法。

此时,awaitDone线程从park中返回,然后检查当前的状态已经被改变,随后退出for循环。

线程安全

FutureTask是会被多个线程访问的,涉及到临界区的保护,但是其内部却并没有任何的锁操作。而在该类定义的末尾,有这样的代码。

private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

这段代码会在类被加载时执行一次。注意到它利用getDeclaredField反射机制来保存了三个offset:
stateOffset,runnerOffset,waitersOffset分别对应着state,runner,waiters这三个成员的偏移量。

FutureTask真是对这三个成员变量进行CAS操作来保证原子性和无锁化的。实现CAS的类正是上文出现过的sun.misc.Unsafe类。

UNSAFE.compareAndSwapObject(this, runnerOffset, null, Thread.currentThread())

第一个参数是对象指针,第二个是偏移量,第三个是旧值,最后一个是新值。详细可参考Unsafe文档。

3. BlockingQueue

java实现了生产者-消费者模式的队列。由于队列的容量有限,因此涉及到在队列为空的时候取task和在队列已满的时候存task的策略,连同一系列的查询函数一起,BlockingQueue包含着11个静态方法。

BlockingQueue只是一个interface,它的实现类包括链表方式的LinkedBlockingQueue 、数组方式的ArrayBlockingQueue以及PriorityBlockingQueue等。

LinkedBlockingQueue

下面以LinkedBlockingQueue为例来了解一下它的实现。

LinkedBlockingQueue是一个FIFO的队列,它真正用来存储元素的节点类型是Node :

static class Node<E> {
    E item;
    Node<E> next;
    Node(E x) { item = x; }
}

对应的,在LinkedBlockingQueue中保存了头节点和尾节点 :

/**
 * Head of linked list.
 * Invariant: head.item == null
 */
private transient Node<E> head;
/**
 * Tail of linked list.
 * Invariant: last.next == null
 */
private transient Node<E> last;

在LinkedBlockingQueue中,Java使用了双锁机制,分别对头节点和尾节点加锁。这样取和存的操作就可以同时进行了。


/** Lock held by take, poll, etc */ private final ReentrantLock takeLock = new ReentrantLock(); /** Wait queue for waiting takes */ private final Condition notEmpty = takeLock.newCondition(); /** Lock held by put, offer, etc */ private final ReentrantLock putLock = new ReentrantLock(); /** Wait queue for waiting puts */ private final Condition notFull = putLock.newCondition();

以take为例,获取并移除此队列的头部,在元素变得可用之前一直等待(可被打断)。


public E take() throws InterruptedException { E x; int c = -1; final AtomicInteger count = this.count; final ReentrantLock takeLock = this.takeLock; takeLock.lockInterruptibly(); try { while (count.get() == 0) { notEmpty.await(); } x = dequeue(); c = count.getAndDecrement(); if (c > 1) notEmpty.signal(); } finally { takeLock.unlock(); } if (c == capacity) signalNotFull(); return x; }

它将会一直阻塞在notEmpty.await()上,直到信号到达或者被中断。注意到它只需要对takeLock加锁,而无需对putLock加锁。

相应的,put操作也只需要锁上putLock就可以了。

有的操作则需要两个锁都锁上,比如说remove,因为我们不确定要删除的元素的位置。

public boolean remove(Object o) {
    if (o == null) return false;
    fullyLock();
    try {
        for (Node<E> trail = head, p = trail.next;
             p != null;
             trail = p, p = p.next) {
            if (o.equals(p.item)) {
                unlink(p, trail);
                return true;
            }
        }
        return false;
    } finally {
        fullyUnlock();
    }
}

可以看到LinkedBlockingQueue 并没有直接调用lock,而是通过fullyLock和fullyUnLock来加解锁以保证一致性,避免死锁:


/** * Lock to prevent both puts and takes. */ void fullyLock() { putLock.lock(); takeLock.lock(); } /** * Unlock to allow both puts and takes. */ void fullyUnlock() { takeLock.unlock(); putLock.unlock(); }

当然,双锁队列在插入第一个元素和最后一个元素出队的时候会有冲突。这里的解决办法是加了一个哨兵,开始的时候,头尾节点都指向这个哨兵,在随后的操作中,头结点始终指向哨兵,而尾节点指向真正有效的值。

4. Executors

类结构

有了前面这些零件,我们就可以开始组装线程池对象了。java里面Executors的真正实现类主要包括两个ThreadPollExecutors和ScheduledThreadPoolExecutor。其中ScheduledThreadPoolExecutor通过实现其基类ScheduledExecutorService扩展了ThreadPoolExecutor类。

《Java Executors 源码分析》

SheduledExecutorsService主要用于执行周期性的或者定时的任务。其他情况下我们更多使用ThreadPoolExecutor。

ThreadPoolExecutor

ThreadPoolExecutor总共有七个构造参数:

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {

从其注释和参数名不难猜测各个参数的用途。唯一有点麻烦的是corePoolSize, maximumPoolSize这两个参数的区别。你可以参考这里或者这里。

但大多数情况我们并不需要直接调用它的构造函数,在Executors里面定义了一系列的静态方法供我们使用。包括newFixedThreadPool、newSingleThreadExecutor等。

由于ThreadPoolExecutor是一个通用的线程池,因此它需要为各种各样的情况预留足够的接口。ThreadPoolExecutor除了提供丰富的接口外,还提供了一些“什么都不做”的函数,为user预留接口。
比如每个任务在执行之前会调用beforeExecute,执行完毕后又会调用afterExecute。又比如terminate用来通知用户代码该线程将要结束。

这些接口java都提供了及其丰富的文档。

Executor接口设计的目的或许也在于此,为简单的情况提供尽量简单的使用方法,同时为复杂的情况或者说高级用户提供足够多的接口。

一个不用担心的问题

在最初使用ThreadPoolExecutor 时候,用到FutrueTask的cancel接口,我总是担心一个问题:

由于cancel是依赖线程的interrupt方法来实现的,也就是说cancel的状态保持在线程中而不是task中。那么当这个线程执行下一个task会不会被影响?为了验证这一点,我做了个小小的实验:

public class InterruptTest
{
    public static class MyTask implements Runnable {
        @Override
        public void run() {
            System.out.println(Thread.currentThread());
            System.out.println("before interrupt " + Thread.currentThread().isInterrupted());
            Thread.currentThread().interrupt();

            System.out.println("after interrupt " + Thread.currentThread().isInterrupted());
        }
    }
    public static void main(String[] str)
    {
        ExecutorService service = Executors.newFixedThreadPool(1);
        // MyTask task1 = new MyTask();
        Future<?> future1 = service.submit(new InterruptTest.MyTask());
        Future<?> future2 = service.submit(new InterruptTest.MyTask());
    }
}  

输出结果说明,我的担心是多余的:

Thread[pool-1-thread-1,5,main]
before interrupt false
after interrupt true
Thread[pool-1-thread-1,5,main]
before interrupt false
after interrupt true

其关键代码就在ThreadPoolExecutor.runWorker 方法中,线程的中断状态会被清除(shutDown例外)。

final void runWorker(Worker w) {
     ...
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
    ...
}

参见 SO 的提问

其中Executors还有很多的东西,但是看看文章的长度,我决定把那些关于Executors的笔记先“藏”起来。

如果感兴趣的可以翻看源码: ThreadFactory, RejectHandler, worker, task, shutDown策略,锁机制… 看看ThreadPoolExecutor 把这些积木堆成一个房子的吧。

    原文作者:HashMap源码分析
    原文地址: https://segmentfault.com/a/1190000002733812
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞