基本介绍
FutureTask
,JUC中提供的一个Future
接口实现类,主要作用是为了支持ThreadPoolExecutor
的submit操作。
经过前面对JDK线程池的分析JUC ThreadPoolExecutor详解,我们知道在AbstractExecutorService
中实现了submit方法,以提交Callable为例:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
其中newTaskFor方法,返回的就是FutureTask
:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
submit Runnable会用RunnableAdapter这个类先包装一把,RunnableAdapter实现了Callable接口,目的是为了后面FutureTask能够统一处理: 都是以Callable来处理
使用示例
一般不直接使用,在ThreadPoolExecutor(其实是在AbstractExecutorService中用到)的实现中可看到其使用方式(newTaskFor)
源码分析
Tips: 大多数分析采用源码+注释的方式进行,因此请关注下源码上面的注释
继承关系
先看看FutureTask
的类继承关系:
FutureTask -> RunnableFuture -> Future
-> Runnable
RunnableFuture实际上是一个组合了Future和Runnable接口的接口类:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/** * Sets this Future to the result of its computation * unless it has been cancelled. */
void run();
}
Runnable我们非常熟悉,Future我们在下一章节详细介绍
Future介绍
Future
,JUC中提供的实现异步编程的核心基础接口。一个Future
保留一个异步任务执行结果,真正获取结果需要调用其get方法,当异步任务没有执行完,那么get方法将会阻塞等待,否则get方法直接返回,同时Future
还提供了其他一些方法:
public interface Future<V> {
// 尝试取消异步任务
boolean cancel(boolean mayInterruptIfRunning);
// 判断异步任务是否被取消
boolean isCancelled();
// isDone代表异步任务是否结束,包括正常结束和异常结束(被中断取消,其他异常...)
boolean isDone();
// 获取异步任务执行结果,若任务没有结束,则阻塞等待,否则直接返回执行结果
V get() throws InterruptedException, ExecutionException;
// get的超时版本,若任务没有结束,则阻塞等待给定时间,期间若任务结束,则返回;若在超时时间内任务依然没有执行结束,则抛TimeoutException
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
FutureTask内部属性
// 当前状态,与ThreadPoolExecutor类似,FutreTask内部也有状态转移的概念
private volatile int state;
// FutureTask持有的callable任务
private Callable<V> callable;
// 保存最后的执行结果
private Object outcome;
// 指向跑FutureTask的线程
private volatile Thread runner;
// get操作等待线程,由WaitNode组成一个链式栈,操作都是无锁的(CAS + 循环重试)
private volatile WaitNode waiters;
其中state, runner, waiters定义为volatile,是为了配合Unsafe做CAS操作,以实现原子更新:
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
WaitNode比较简单,仅有两个内部属性:
static final class WaitNode {
// 指向当前等待线程
volatile Thread thread;
// 指向下一个节点(另外一个等待线程)
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}
FutureTask的构造
可以由Callable or Runnable来进行构造:
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}
public FutureTask(Runnable runnable, V result) {
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}
Executors.callable静态方法将Runnable和一个result构造成Callable,实现类是RunnableAdapter
状态及其转移
FutureTask
内部定义的state有以下几种:
private volatile int state;
private static final int NEW = 0;
private static final int COMPLETING = 1;
private static final int NORMAL = 2;
private static final int EXCEPTIONAL = 3;
private static final int CANCELLED = 4;
private static final int INTERRUPTING = 5;
private static final int INTERRUPTED = 6;
- NEW: FutureTask构建时的初始化状态
- COMPLETING: 趋向于结束(正常 or 异常)的中间状态
- NORMAL: 正常结束的最终状态
- EXCEPTIONAL: 异常结束的最终状态
- CANCELLED: 已取消,成功调用cancel且不是interrupt方式的最终状态
- INTERRUPTING: 正在执行中断,被中断(调用cancel且参数传递true)的中间状态
- INTERRUPTED: 成功中断,被中断(调用cancel且参数传递true)的最终状态
根据正常结束,异常结束(exception、cancel、interrupt),具有以下多种状态转移情况:
- NEW -> COMPLETING -> NORMAL
- NEW -> COMPLETING -> EXCEPTIONAL
- NEW -> CANCELLED
- NEW -> INTERRUPTING -> INTERRUPTED
核心方法介绍
run
当FutureTask
作为一个task丢到线程池里面,最终线程池调用的就是其run方法,在run方法里面,FutureTask
再调用原始的、被封装的真实task:
public void run() {
// 避免重复执行,需要判断state以及runner是否是初始化状态(Tips: 这里会设置runner为当前线程)
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
// 调用我们丢给线程池的真实task
result = c.call();
ran = true;
} catch (Throwable ex) {
result = null;
ran = false;
// 遇到异常的处理
setException(ex);
}
if (ran)
set(result); // 成功执行完task的处理
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}
set / setException 与 cancel
触发正常的结束与异常的结束,set / setException 两个方法主要逻辑类似,区别只是状态转移方向有差异:
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // NEW -> COMPLETING
outcome = v; // 设置outcome
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state COMPLETING -> NORMAL
finishCompletion(); // 处理等待线程的唤醒
}
}
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) { // NEW -> COMPLETING
outcome = t; // 设置outcome为Throwable t
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state COMPLETING -> EXCEPTIONAL
finishCompletion(); // 处理等待线程的唤醒
}
}
cancel方法有正常cancel与中断线程方式这两种,通过cencel的入参来区别,对于正常cancel,若任务已经执行并且趋于结束(COMPLETING),则cancel会失败(返回false)
public boolean cancel(boolean mayInterruptIfRunning) {
// 1. state必须为初始化状态
// 2. 两种情况:
// 2.1 正常取消, CAS NEW -> CANCELED要成功
// 2.2 中断取消,CAS NEW -> INTERRUPTING要成功
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { // in case call to interrupt throws exception
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt(); // 中断runner线程
} finally { // final state
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED); // 设置中断最终状态INTERRUPTED
}
}
} finally {
finishCompletion(); // 处理等待线程的唤醒
}
return true;
}
finishCompletion: 处理等待线程的唤醒
上面说了,set / setException 与 cancel 最后都会调用该方法,对等待线程进行唤醒操作
private void finishCompletion() {
// assert state > COMPLETING;
for (WaitNode q; (q = waiters) != null;) { // 外层for循环为了避免waiters还有新增的情况,那么新的waiters和当前q引用的waiters就不一致了
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
// 内层for循环处理waiters栈里面线程的唤醒
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break; // q == 当前waiters 且 CAS设置waiters为null成功,则跳出外层for
}
}
// done在FutureTask实现里面是个空实现,主要是为了给子类扩展使用
done();
callable = null; // to reduce footprint
}
get 与 超时get
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
return report(s);
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}
awaitDone: 处理等待线程的阻塞
前面的get or 超时的get,如果FutureTask
对应的task还没执行结束,则此时调用get的线程需要阻塞等待其执行结束,调用finishCompletion将其唤醒,而使等待线程阻塞等待的方法,正是awaitDone:
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
WaitNode q = null;
boolean queued = false;
for (;;) {
if (Thread.interrupted()) { // 如果当前等待线程被中断,则从等待栈中移除该线程,且上抛中断异常
removeWaiter(q); // 等待栈中移除该等待线程
throw new InterruptedException(); // 上抛异常
}
int s = state;
if (s > COMPLETING) { // s > COMPLETING,证明状态已经是最后的终结状态(可能是NORMAL/EXCEPTIONAL/CANCELED/INTERRUPTED),则返回
if (q != null)
q.thread = null;
return s;
}
else if (s == COMPLETING) // COMPLETING到终结状态时间通常很短,没必要休眠,适当调用Thread.yield()配合自旋(for循环)来等待其最终状态的到来
Thread.yield();
else if (q == null)
q = new WaitNode();
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,
q.next = waiters, q); // 当前等待线程还没入等待栈,先入了再说
else if (timed) { // 超时等待的处理
nanos = deadline - System.nanoTime();
if (nanos <= 0L) { // 达到超时时间,也需要将当前等待线程从等待栈中移除,然后返回
removeWaiter(q);
return state;
}
LockSupport.parkNanos(this, nanos); // 超时的阻塞等待
}
else
LockSupport.park(this); // 非超时的阻塞等待
}
}
removeWaiter: 处理awaitDone阻塞等待的线程被唤醒后从等待栈的移除
逻辑实际上是栈节点的移除,因为没有使用锁且在多线程并发下的移除,因此也是使用CAS + 不断自旋重试来实现栈节点的移除:
private void removeWaiter(WaitNode node) {
if (node != null) {
// 先设置待移除的节点中的thread属性为null,后面的操作就是找到这个thread为null的前继以及后继,进行unlink
// 当然有一种情况前继找不到,因为此时删除的节点就是栈顶元素
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next; // s指向待删除节点的后继节点
if (q.thread != null)
pred = q; // pred指向待删除节点的前继节点
// 下面的逻辑证明q.thread == null,则证明当前q就是要从栈中移除的节点
// 下面的分支处理就是处理(1)有前继 (2)无前继 的两种情况
else if (pred != null) { // 有前继节点的处理
pred.next = s;
if (pred.thread == null) // 排除其他线程也进行了removeWaiter操作的干扰,重来
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s)) // 这里是无前继节点的情况,q就是栈顶元素,CAS尝试直接设置栈顶为q的后继节点
continue retry; // 失败也重来
}
break; // 内层循环条件不满足,break
}
}
}
report: 处理执行结果的返回
前面的get操作,最终的结果返回,通过report方法来进行,需要根据不同的state来处理:
private V report(int s) throws ExecutionException {
// outcome在set和setException方法中被设置
// 对于set方法,outcome为callable的返回结果
// 对于setException方法,outcome指向异常Throwable t
Object x = outcome;
if (s == NORMAL) // 正常结束,返回outcome
return (V)x;
if (s >= CANCELLED) // 取消or中断的情况,抛CancellationException
throw new CancellationException();
throw new ExecutionException((Throwable)x); // EXCEPTIONAL的情况,抛ExecutionException
}
isCancelled 与 isDone
比较简单,实际上就是根据内部定义的状态就能够轻松判断是否FutureTask
被取消or已经执行结束:
public boolean isCancelled() {
return state >= CANCELLED;
}
public boolean isDone() {
return state != NEW;
}