Jdk1.6 JUC源码解析(16)-FutureTask
作者:大飞
功能简介:
- FutureTask是一种异步任务(或异步计算),举个栗子,主线程的逻辑中需要使用某个值,但这个值需要复杂的运算得来,那么主线程可以提前建立一个异步任务来计算这个值(在其他的线程中计算),然后去做其他事情,当需要这个值的时候再通过刚才建立的异步任务来获取这个值,有点并行的意思,这样可以缩短整个主线程逻辑的执行时间。
- FutureTask也是基于AQS来构建的,使用共享模式,使用AQS的状态来表示异步任务的运行状态。
源码分析:
- 先来看下FutureTask都实现了哪些接口。首先实现了RunnableFuture接口,先看下这个接口:
public interface RunnableFuture<V> extends Runnable, Future<V> {
/**
* Sets this Future to the result of its computation
* unless it has been cancelled.
*/
void run();
}
RunnableFuture又扩展了Runnable和Future:
public interface Runnable {
/**
* When an object implementing interface <code>Runnable</code> is used
* to create a thread, starting the thread causes the object's
* <code>run</code> method to be called in that separately executing
* thread.
* <p>
* The general contract of the method <code>run</code> is that it may
* take any action whatsoever.
*
* @see java.lang.Thread#run()
*/
public abstract void run();
}
public interface Future<V> {
/**
* 尝试取消任务的执行,如果任务已经完成或者已经被取消或者由于某种原因
* 无法取消,方法返回false。如果任务取消成功,或者任务开始执行之前调用
* 了取消方法,那么任务就永远不会执行了。mayInterruptIfRunning参数决定
* 了是否要中断执行任务的线程。
*/
boolean cancel(boolean mayInterruptIfRunning);
/**
* 判断任务是否在完成之前被取消。
*/
boolean isCancelled();
/**
* 判断任务是否完成。
*/
boolean isDone();
/**
* 等待,直到获取任务的执行结果。如果任务还没执行完,这个方法会阻塞。
*/
V get() throws InterruptedException, ExecutionException;
/**
* 等待,在给定的时间内获取任务的执行结果。
*/
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
Runnable接口经常写多线程程序的话一定非常熟悉了,这里不说了。看下Future接口,它提供了取消任务接口,并提供了查看任务状态的接口,最重要的是提供了有阻塞行为的获取任务执行结果的接口。
- 接下来看下FutureTask的实现,由于其基于AQS实现,那先看一下内部的同步机制:
private final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -7828117401763700385L;
/** 表示任务正在执行 */
private static final int RUNNING = 1;
/** 表示任务已经运行完毕 */
private static final int RAN = 2;
/** 表示任务被取消 */
private static final int CANCELLED = 4;
/** 内部的callable */
private final Callable<V> callable;
/** 执行结果 */
private V result;
/** 执行过程中发生的异常 */
private Throwable exception;
/**
* 执行当前任务的线程。在set/cancel之后置空,说明可以
* 了。必须使用volatile来修饰,以确保任务完成后的可见性。
*/
private volatile Thread runner;
Sync(Callable<V> callable) {
this.callable = callable;
}
内部同步器接中使用了一个callable来保存要执行的任务,看下这个接口:
public interface Callable<V> {
/**
* Computes a result, or throws an exception if unable to do so.
*
* @return computed result
* @throws Exception if unable to compute a result
*/
V call() throws Exception;
}
这个接口的行为和Runnable类似,不同的是有返回值,且能抛出异常,算是对Runnable的补充。
继续看下同步器的innerRun方法,这个方法用于支持FutureTask的run方法:
void innerRun() {
//尝试设置任务运行状态为正在执行。
if (!compareAndSetState(0, RUNNING))
return; //如果设置失败,直接返回。
try {
runner = Thread.currentThread(); //设置执行线程。
if (getState() == RUNNING) //再次检测任务状态
innerSet(callable.call()); //执行任务,然后设置执行结果。
else
releaseShared(0); //说明任务已取消。
} catch (Throwable ex) {
innerSetException(ex); //如果执行任务过程中发生异常,设置异常。
}
}
看下设置执行结果的innerSet方法:
void innerSet(V v) {
for (;;) {
int s = getState(); //获取任务执行状态。
if (s == RAN)
return; //如果任务已经执行完毕,退出。
if (s == CANCELLED) {
//这里释放AQS控制权并设置runner为null,
//为了避免正在和一个试图中断线程的取消请求竞
releaseShared(0);
return;
}
//尝试将任务状态设置为执行完成。
if (compareAndSetState(s, RAN)) {
result = v; //设置执行结果。
releaseShared(0); //释放AQS控制权。
done(); //这里调用一下done方法,子类可覆盖这个方法,做一些定制处理。
return;
}
}
}
AQS分析过,releaseShared方法中会调用tryReleaseShared方法,看一下当前同步器中这个方法的实现:
protected boolean tryReleaseShared(int ignore) {
runner = null;
return true;
}
innerRun方法中在执行抛异常后会调用innerSetException:
void innerSetException(Throwable t) {
for (;;) {
int s = getState();
if (s == RAN)
return;
if (s == CANCELLED) {
// aggressively release to set runner to null,
// in case we are racing with a cancel request
// that will try to interrupt runner
releaseShared(0);
return;
}
if (compareAndSetState(s, RAN)) {
exception = t;
result = null;
releaseShared(0);
done();
return;
}
}
}
过程和innerSet类似,只不过最后要设置异常,清空result。
和innerRun类似还有innerRunAndReset方法,看下实现:
boolean innerRunAndReset() {
if (!compareAndSetState(0, RUNNING))
return false;
try {
runner = Thread.currentThread();
if (getState() == RUNNING)
callable.call(); // don't set result
runner = null;
return compareAndSetState(RUNNING, 0);
} catch (Throwable ex) {
innerSetException(ex);
return false;
}
}
和innerRun的区别是不设置执行结果,最后执行完毕后重置异步任务状态为0。
再看下同步器的innerGet方法,这个方法用于支持FutureTask的get方法:
V innerGet() throws InterruptedException, ExecutionException {
//获取共享锁,无法获取时阻塞等待。
acquireSharedInterruptibly(0);
if (getState() == CANCELLED)
throw new CancellationException(); //如果任务状态为取消,那么抛出CancellationException
if (exception != null)
throw new ExecutionException(exception);//如果任务执行异常,抛出ExecutionException,并传递异常。
return result; //成功执行完成,返回执行结果。
}
类似的有带超时的innerGet:
V innerGet(long nanosTimeout) throws InterruptedException, ExecutionException, TimeoutException {
if (!tryAcquireSharedNanos(0, nanosTimeout))
throw new TimeoutException();
if (getState() == CANCELLED)
throw new CancellationException();
if (exception != null)
throw new ExecutionException(exception);
return result;
}
AQS分析过,acquireSharedInterruptibly和tryAcquireSharedNanos方法中会调用tryAcquireShared方法,看一下当前同步器中这个方法的实现:
protected int tryAcquireShared(int ignore) {
return innerIsDone()? 1 : -1;
}
boolean innerIsDone() {
return ranOrCancelled(getState()) && runner == null;
}
private boolean ranOrCancelled(int state) {
return (state & (RAN | CANCELLED)) != 0;
}
可见innerGet中会首先判断任务是否完成,要依据任务(完成或取消)状态来判断。
最后看下同步器的innerCancel方法,这个方法用于支持FutureTask的cancel方法:
boolean innerCancel(boolean mayInterruptIfRunning) {
for (;;) {
int s = getState();
if (ranOrCancelled(s))
return false; //如果任务已经执行完毕或者取消。
if (compareAndSetState(s, CANCELLED))//否则尝试设置任务状态为取消。
break;
}
if (mayInterruptIfRunning) {
Thread r = runner;
if (r != null)
r.interrupt(); //如果设置了mayInterruptIfRunning为true,需要中断线程,
}
releaseShared(0); //释放AQS的控制权。
done(); //这里也会调用done,定制子类时需要注意下。
return true;
}
- 有了内部同步机制,FutureTask的实现起来就很容易了,看下代码:
public class FutureTask<V> implements RunnableFuture<V> {
/** 内部同步器 */
private final Sync sync;
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
sync = new Sync(callable);
}
public FutureTask(Runnable runnable, V result) {
sync = new Sync(Executors.callable(runnable, result));
}
public boolean isCancelled() {
return sync.innerIsCancelled();
}
public boolean isDone() {
return sync.innerIsDone();
}
public boolean cancel(boolean mayInterruptIfRunning) {
return sync.innerCancel(mayInterruptIfRunning);
}
public V get() throws InterruptedException, ExecutionException {
return sync.innerGet();
}
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return sync.innerGet(unit.toNanos(timeout));
}
/**
* 本类中是一个空实现,子类可以覆盖这个方法,做回调或一些记录工作。
* 可以来实现里面通过任务状态来判断任务是否被取消。
*/
protected void done() { }
protected void set(V v) {
sync.innerSet(v);
}
protected void setException(Throwable t) {
sync.innerSetException(t);
}
public void run() {
sync.innerRun();
}
protected boolean runAndReset() {
return sync.innerRunAndReset();
}
...
实现都是基于上面分析过的方法,这里不啰嗦了。注意构造方法中有一个Runnable到callable的转换,使用了Executors中的方法,这个类后续会分析,这里简单看一下:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
就是简单的通过一个适配类在适配Runnable和Callable。
小总结一下:
1.当前线程建立异步任务后,异步任务处于初始状态(内部有一个数值表示状态,初始为0),一般交由其他线程执行任务(比如提交给线程池处理)。当前线程通过异步任务的get方法来获取执行结果时,如果异步任务此时还没执行完毕(内部状态既不是完成,也不是取消),那么当前线程会在get方法处阻塞。
2.当其他线程(比如线程池中的工作线程)执行了异步任务,那么会将异步任务的状态改成”完成”(根据情况也可能是取消),同时将在get除等待的线程唤醒。
FutureTask的代码解析完毕!
参见:Jdk1.6 JUC源码解析(6)-locks-AbstractQueuedSynchronizer