JUC源码解析----- 线程池之 FutureTask类

1、FutureTask 概念

FutureTask一个可取消的异步计算,FutureTask 实现了Future的基本方法,提供了 start () 、cancel()操作,可以查询计算是否已经完成,并且可以获取计算的结果。结果只可以在计算完成之后获取,get方法会阻塞当计算没有完成的时候,一旦计算已经完成,那么计算就不能再次启动或是取消。

一个FutureTask 可以用来包装一个 Callable 或是一个runnable对象。因为FurtureTask实现了Runnable方法,所以一个 FutureTask可以提交(submit)给一个Excutor执行(excution)。

2、FutureTask 使用场景

FutureTask可用于异步获取执行结果或取消执行任务的场景。通过传入Runnable或者Callable的任务给FutureTask,直接调用其run方法或者放入线程池执行,之后可以在外部通过FutureTask的get方法异步获取执行结果,因此,FutureTask非常适合用于耗时的计算,主线程可以在完成自己的任务后,再去获取结果。另外,FutureTask还可以确保即使调用了多次run方法,它都只会执行一次Runnable或者Callable任务,或者通过cancel取消FutureTask的执行等。

在AbstractExecutorService 的 submit()方法中,利用FutureTask的来封装提交的任务。

《JUC源码解析----- 线程池之 FutureTask类》

结构图:

《JUC源码解析----- 线程池之 FutureTask类》

RunnableFuture继承了Runnable和Future接口,而FutureTask实现了RunnableFuture接口。

《JUC源码解析----- 线程池之 FutureTask类》

源码分析:

/**
 * 存在的状态变换
 * NEW -> COMPLETING -> NORMAL
 * NEW -> COMPLETING -> EXCEPTIONAL
 * NEW -> CANCELLED
 * NEW -> INTERRUPTING -> INTERRUPTED
 */
private volatile int state;
private static final int NEW          = 0;
private static final int COMPLETING   = 1;
private static final int NORMAL       = 2;
private static final int EXCEPTIONAL  = 3;
private static final int CANCELLED    = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED  = 6;

/** 构造赋值,要执行的任务 */
private Callable<V> callable;
/** get方法返回值,返回执行结果或者异常 */
private Object outcome; // non-volatile, protected by state reads/writes
/** 运行callable的线程 */
private volatile Thread runner;
/** outcome的等待栈(单向链表结构) */
private volatile WaitNode waiters;

/** 单向链表结构 */
static final class WaitNode {
    volatile Thread thread;
    volatile WaitNode next;
    WaitNode() { thread = Thread.currentThread(); }
}

/** 2种构造,state写在后,后面使用的时候先读state,再读callable,volatile语义保证可见性,infoQ和并发网上各有一篇文章讲volatile语义的,可以看下 */
public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

1、FutureTask在不同阶段拥有不同的状态state,初始化为NEW;
2、FutureTask类实现了Runnable接口,这样就可以通过Executor.execute方法提交FutureTask到线程池中等待被执行,最终执行的是FutureTask的run方法;

state初始化为NEW。只有在set, setException和cancel方法中state才可以转变为终态。在任务完成期间,state的值可能为COMPLETING或INTERRUPTING。 
state有四种可能的状态转换:

  • NEW -> COMPLETING -> NORMAL
  • NEW -> COMPLETING -> EXCEPTIONAL
  • NEW -> CANCELLED
  • NEW -> INTERRUPTING -> INTERRUPTED

内部结构,一个volatile语义的state标识状态,一个waitNode链表维护所有等待节点,最重要的是volatile的语义。 

FutureTask的运行的基本流程: 
1. submit一个task,获的返回的Future:提交task后,就是ThreadPoolExecutor的那一套流程addworker-runwork-gettask-task.run,最后会到提交的task的run()方法; 
2. 通过Future的get()获取执行的结果; 
3. 也可以cancel()方法取消未执行的task。

FutureTask.get() 实现

/**
 * @throws CancellationException {@inheritDoc}
 */
 public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING) //如果任务还未运行或者运行中,那就等
        s = awaitDone(false, 0L);
    return report(s); //根据状态返回
}

/** get超时等待 */
public V get(long timeout, TimeUnit unit)
    throws InterruptedException, ExecutionException, TimeoutException {
    if (unit == null)
        throw new NullPointerException();
    int s = state;
    if (s <= COMPLETING &&
        (s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
        throw new TimeoutException(); //超时后还未到最终状态那就timeout异常
    return report(s);
}

/** 根据状态返回 */
private V report(int s) throws ExecutionException {
    Object x = outcome;
    if (s == NORMAL)
        return (V)x;
    if (s >= CANCELLED)
        throw new CancellationException();
    throw new ExecutionException((Throwable)x);
}

1、首先判断FutureTask的状态是否为完成状态,如果是完成状态,说明已经执行过set或setException方法,返回report(s)。

在report() 方法中,如果FutureTask的状态是NORMAL, 即正确执行了set方法,get方法直接返回处理的结果, 如果是取消状态,即执行了setException,则抛出CancellationException异常。

2、如果还没到最终状态,则调用awaitDone方法进行阻塞。(之前说过到最终状态时会调用finishCompletion),否则就根据状态返回。

内部通过awaitDone方法对主线程进行阻塞,具体实现如下:

《JUC源码解析----- 线程池之 FutureTask类》

1、如果执行get的线程被中断,则移除FutureTask的所有阻塞队列中的线程(waiters),并抛出中断异常;
2、判断FutureTask当前的state,如果大于COMPLETING,说明任务已经执行完成,则直接返回完成状态;
3、如果当前state等于COMPLETING,说明正在set结果或者任务已经执行完,这时主线程只需通过yield方法让出cpu资源,等待state变成NORMAL;
4、如果FutureTask的状态为初始态NEW,则将当前线程加入到FutureTask的阻塞线程中去;

5、通过WaitNode类封装当前线程,并通过UNSAFE添加到waiters链表;

6、判断get() 方法是否设置了超时时间,如果是超时时间get,判断当前是否超时,如果已经超时,将当前等待节点q从waiters中移出,返回FutureTask状态state,如果还未超时,调用LockSupport.parkNanos方法阻塞当前线程。

否则,调用LockSupport.park方法,阻塞当前线程。

FutureTask.run实现

《JUC源码解析----- 线程池之 FutureTask类》

FutureTask.run方法是在线程池中被执行的,而非主线程
1、通过执行Callable任务的call方法;
2、如果call执行成功,则通过set方法保存结果;
3、如果call执行有异常,则通过setException保存异常;

首先判断任务的状态,如果任务状态不是new,说明任务状态已经改变(说明他已经走了上面4种可能变化的一种,比如caller调用了cancel,此时状态为Interrupting, 也说明了上面的cancel方法,task没运行时,就interrupt, task得不到运行,总是返回);

如果状态是new, 判断runner是否为null, 如果为null, 则把当前执行任务的线程赋值给runner,如果runner不为null, 说明已经有线程在执行,返回。此处使用cas来赋值worker thread是保证多个线程同时提交同一个FutureTask时,确保该FutureTask的run只被调用一次, 如果想运行多次,使用runAndReset()方法。

接着开始执行任务,如果要执行的任务不为空,并且state为New就执行,可以看到这里调用了Callable的call方法。如果执行成功则set结果,如果出现异常则setException。最后把runner设为null。

set

《JUC源码解析----- 线程池之 FutureTask类》

如果现在的状态是NEW就把状态设置成COMPLETING,然后设置成NORMAL。这个执行流程的状态变化就是: NEW->COMPLETING->NORMAL。

正常执行完成状态变化步骤是 :首先执行 run() 并且Task正常完成而且在这其间没有调用cancel(),如果有异常的话会调用setException()方法。

setException

《JUC源码解析----- 线程池之 FutureTask类》

set和setException方法中,都会通过UnSAFE修改FutureTask的状态,并执行finishCompletion方法通知主线程任务已经执行完成;

finishCompletion

《JUC源码解析----- 线程池之 FutureTask类》

1、执行FutureTask类的get方法时,会把主线程封装成WaitNode节点并保存在waiters链表中;
2、FutureTask任务执行完成后,通过UNSAFE设置waiters的值,并通过LockSupport类unpark方法唤醒主线程;

3、finishCompletion()会解除所有阻塞的worker thread, 调用done()方法,将成员变量callable设为null。

cancel():

《JUC源码解析----- 线程池之 FutureTask类》

1.如果是cancel(false) 那么Task的状态变化就是

 NEW->=CANCELLED

2.如果是cancel(true)那么Task的状态化就是

 NEW->INTERRUPTING ->INTERRUPTED

我们在分析下两个方法交叉执行的情况( run()->cancel() ):

1.如果Task已经执行然后再调用cancel():

     A:调用cancel(false)情况
           a:如果Task已经在执行而callable.call()没有返回 或是 call()已经返回但是state状态还没有改变
              那么任务调用cancel(false) 不会对任务的执行造成影响 只会影响task的状态  

           b:如果callable.call()已经返回并且状态已经变成COMPLETING或是 COMPLED 那么对任务执行 和任务状态都没有影响
   

B:调用cancel(true)

          a:如果任务已经在执行而callable.call()没有返回 会把state设置成 INTERRUPTING然后调用执行线程的中断请求 然后把状态设置成INTERRUPTED,这里 如果
                callable.call()方法可以响应中断 可能对任务执行产生影响,如果方法不会响应中断不会对任务运行产生影响。影响任务的状态    
          b:.如果任务已经在执行并且 call()已经返回但是state状态还没有改变  不会对任务的执行造成影响 只会影响任务的状态 。

2。调用cancel()后 在执行任务 ( cancel() -> run() )

      先调用cancel()无论是那种调用方式都会引起state状态的变化。在run()方法执行的时候发现state已经不是new了 就会放弃任务的执行

    原文作者:JUC
    原文地址: https://blog.csdn.net/qingtian211/article/details/81870466
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞