JUC FutureTask 分析

基本介绍

FutureTask,JUC中提供的一个Future接口实现类,主要作用是为了支持ThreadPoolExecutor的submit操作。

经过前面对JDK线程池的分析JUC ThreadPoolExecutor详解,我们知道在AbstractExecutorService中实现了submit方法,以提交Callable为例:

public <T> Future<T> submit(Callable<T> task) {
    if (task == null) throw new NullPointerException();
    RunnableFuture<T> ftask = newTaskFor(task);
    execute(ftask);
    return ftask;
}

其中newTaskFor方法,返回的就是FutureTask:

protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
    return new FutureTask<T>(callable);
}

submit Runnable会用RunnableAdapter这个类先包装一把,RunnableAdapter实现了Callable接口,目的是为了后面FutureTask能够统一处理: 都是以Callable来处理

使用示例

一般不直接使用,在ThreadPoolExecutor(其实是在AbstractExecutorService中用到)的实现中可看到其使用方式(newTaskFor)

源码分析

Tips: 大多数分析采用源码+注释的方式进行,因此请关注下源码上面的注释

继承关系

先看看FutureTask的类继承关系:

FutureTask -> RunnableFuture -> Future
                             -> Runnable

RunnableFuture实际上是一个组合了Future和Runnable接口的接口类:

public interface RunnableFuture<V> extends Runnable, Future<V> {
    /** * Sets this Future to the result of its computation * unless it has been cancelled. */
    void run();
}

Runnable我们非常熟悉,Future我们在下一章节详细介绍

Future介绍

Future,JUC中提供的实现异步编程的核心基础接口。一个Future保留一个异步任务执行结果,真正获取结果需要调用其get方法,当异步任务没有执行完,那么get方法将会阻塞等待,否则get方法直接返回,同时Future还提供了其他一些方法:

public interface Future<V> {

    // 尝试取消异步任务
    boolean cancel(boolean mayInterruptIfRunning);

    // 判断异步任务是否被取消
    boolean isCancelled();

    // isDone代表异步任务是否结束,包括正常结束和异常结束(被中断取消,其他异常...)
    boolean isDone();

    // 获取异步任务执行结果,若任务没有结束,则阻塞等待,否则直接返回执行结果
    V get() throws InterruptedException, ExecutionException;

    // get的超时版本,若任务没有结束,则阻塞等待给定时间,期间若任务结束,则返回;若在超时时间内任务依然没有执行结束,则抛TimeoutException
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;

}

FutureTask内部属性

// 当前状态,与ThreadPoolExecutor类似,FutreTask内部也有状态转移的概念
private volatile int state;
// FutureTask持有的callable任务
private Callable<V> callable;
// 保存最后的执行结果
private Object outcome;
// 指向跑FutureTask的线程
private volatile Thread runner;
// get操作等待线程,由WaitNode组成一个链式栈,操作都是无锁的(CAS + 循环重试)
private volatile WaitNode waiters;

其中state, runner, waiters定义为volatile,是为了配合Unsafe做CAS操作,以实现原子更新:

// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
    try {
        UNSAFE = sun.misc.Unsafe.getUnsafe();
        Class<?> k = FutureTask.class;
        stateOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("state"));
        runnerOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("runner"));
        waitersOffset = UNSAFE.objectFieldOffset
            (k.getDeclaredField("waiters"));
    } catch (Exception e) {
        throw new Error(e);
    }
}

WaitNode比较简单,仅有两个内部属性:

static final class WaitNode {
    // 指向当前等待线程
    volatile Thread thread;
    // 指向下一个节点(另外一个等待线程)
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

FutureTask的构造

可以由Callable or Runnable来进行构造:

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

Executors.callable静态方法将Runnable和一个result构造成Callable,实现类是RunnableAdapter

状态及其转移

FutureTask内部定义的state有以下几种:

private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;
  • NEW: FutureTask构建时的初始化状态
  • COMPLETING: 趋向于结束(正常 or 异常)的中间状态
  • NORMAL: 正常结束的最终状态
  • EXCEPTIONAL: 异常结束的最终状态
  • CANCELLED: 已取消,成功调用cancel且不是interrupt方式的最终状态
  • INTERRUPTING: 正在执行中断,被中断(调用cancel且参数传递true)的中间状态
  • INTERRUPTED: 成功中断,被中断(调用cancel且参数传递true)的最终状态

根据正常结束,异常结束(exception、cancel、interrupt),具有以下多种状态转移情况:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

核心方法介绍

run

FutureTask作为一个task丢到线程池里面,最终线程池调用的就是其run方法,在run方法里面,FutureTask再调用原始的、被封装的真实task:

public void run() {
    // 避免重复执行,需要判断state以及runner是否是初始化状态(Tips: 这里会设置runner为当前线程)
    if (state != NEW ||
        !UNSAFE.compareAndSwapObject(this, runnerOffset,
                                     null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
                // 调用我们丢给线程池的真实task
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
                result = null;
                ran = false;
                // 遇到异常的处理
                setException(ex);
            }
            if (ran)
                set(result); // 成功执行完task的处理
        }
    } finally {
        // runner must be non-null until state is settled to
        // prevent concurrent calls to run()
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

set / setException 与 cancel

触发正常的结束与异常的结束,set / setException 两个方法主要逻辑类似,区别只是状态转移方向有差异:

protected void set(V v) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // NEW -> COMPLETING
        outcome = v; // 设置outcome
        UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state COMPLETING -> NORMAL
        finishCompletion(); // 处理等待线程的唤醒
    }
}

protected void setException(Throwable t) {
    if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // NEW -> COMPLETING
        outcome = t; // 设置outcome为Throwable t
        UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state COMPLETING -> EXCEPTIONAL
        finishCompletion(); // 处理等待线程的唤醒
    }
}

cancel方法有正常cancel与中断线程方式这两种,通过cencel的入参来区别,对于正常cancel,若任务已经执行并且趋于结束(COMPLETING),则cancel会失败(返回false)

public boolean cancel(boolean mayInterruptIfRunning) {
    // 1. state必须为初始化状态
    // 2. 两种情况:
    // 2.1 正常取消, CAS NEW -> CANCELED要成功
    // 2.2 中断取消,CAS NEW -> INTERRUPTING要成功 
    if (!(state == NEW &&
          UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
                Thread t = runner;
                if (t != null)
                    t.interrupt(); // 中断runner线程
            } finally { // final state
                UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // 设置中断最终状态INTERRUPTED
            }
        }
    } finally {
        finishCompletion(); // 处理等待线程的唤醒
    }
    return true;
}

finishCompletion: 处理等待线程的唤醒

上面说了,set / setException 与 cancel 最后都会调用该方法,对等待线程进行唤醒操作

private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) { // 外层for循环为了避免waiters还有新增的情况,那么新的waiters和当前q引用的waiters就不一致了
        if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
            // 内层for循环处理waiters栈里面线程的唤醒
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break; // q == 当前waiters 且 CAS设置waiters为null成功,则跳出外层for
        }
    }

    // done在FutureTask实现里面是个空实现,主要是为了给子类扩展使用
    done();

    callable = null;        // to reduce footprint
}

get 与 超时get

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    return report(s);
}

public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException();
    return report(s);
}

awaitDone: 处理等待线程的阻塞

前面的get or 超时的get,如果FutureTask对应的task还没执行结束,则此时调用get的线程需要阻塞等待其执行结束,调用finishCompletion将其唤醒,而使等待线程阻塞等待的方法,正是awaitDone:

private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        if (Thread.interrupted()) { // 如果当前等待线程被中断,则从等待栈中移除该线程,且上抛中断异常
            removeWaiter(q); // 等待栈中移除该等待线程
            throw new InterruptedException(); // 上抛异常
        }

        int s = state;
        if (s > COMPLETING) { // s > COMPLETING,证明状态已经是最后的终结状态(可能是NORMAL/EXCEPTIONAL/CANCELED/INTERRUPTED),则返回
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING) // COMPLETING到终结状态时间通常很短,没必要休眠,适当调用Thread.yield()配合自旋(for循环)来等待其最终状态的到来
            Thread.yield();
        else if (q == null)
            q = new WaitNode();
        else if (!queued)
            queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                 q.next = waiters, q); // 当前等待线程还没入等待栈,先入了再说
        else if (timed) { // 超时等待的处理
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) { // 达到超时时间,也需要将当前等待线程从等待栈中移除,然后返回
                removeWaiter(q);
                return state;
            }
            LockSupport.parkNanos(this, nanos); // 超时的阻塞等待
        }
        else
            LockSupport.park(this); // 非超时的阻塞等待
    }
}

removeWaiter: 处理awaitDone阻塞等待的线程被唤醒后从等待栈的移除

逻辑实际上是栈节点的移除,因为没有使用锁且在多线程并发下的移除,因此也是使用CAS + 不断自旋重试来实现栈节点的移除:

private void removeWaiter(WaitNode node) {
    if (node != null) {
        // 先设置待移除的节点中的thread属性为null,后面的操作就是找到这个thread为null的前继以及后继,进行unlink
        // 当然有一种情况前继找不到,因为此时删除的节点就是栈顶元素
        node.thread = null;
        retry:
        for (;;) {          // restart on removeWaiter race
            for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
                s = q.next; // s指向待删除节点的后继节点
                if (q.thread != null)
                    pred = q; // pred指向待删除节点的前继节点

                // 下面的逻辑证明q.thread == null,则证明当前q就是要从栈中移除的节点
                // 下面的分支处理就是处理(1)有前继 (2)无前继 的两种情况
                else if (pred != null) { // 有前继节点的处理
                    pred.next = s;
                    if (pred.thread == null) // 排除其他线程也进行了removeWaiter操作的干扰,重来
                        continue retry;
                }
                else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
                                                      q, s)) // 这里是无前继节点的情况,q就是栈顶元素,CAS尝试直接设置栈顶为q的后继节点
                    continue retry; // 失败也重来
            }
            break; // 内层循环条件不满足,break
        }
    }
}

report: 处理执行结果的返回

前面的get操作,最终的结果返回,通过report方法来进行,需要根据不同的state来处理:

private V report(int s) throws ExecutionException {
    // outcome在set和setException方法中被设置
    // 对于set方法,outcome为callable的返回结果
    // 对于setException方法,outcome指向异常Throwable t
    Object x = outcome;
    if (s == NORMAL) // 正常结束,返回outcome
        return (V)x;
    if (s >= CANCELLED) // 取消or中断的情况,抛CancellationException
        throw new CancellationException();
    throw new ExecutionException((Throwable)x); // EXCEPTIONAL的情况,抛ExecutionException
}

isCancelled 与 isDone

比较简单,实际上就是根据内部定义的状态就能够轻松判断是否FutureTask被取消or已经执行结束:

public boolean isCancelled() {
    return state >= CANCELLED;
}

public boolean isDone() {
    return state != NEW;
}
    原文作者:JUC
    原文地址: https://blog.csdn.net/d6619309/article/details/81051434
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞