FutureTask 源码分析

FutureTask:一个可取消的异步任务执行类,这个类提供了Future接口的基本实现,主要有以下功能:

  • 异步执行任务
  • 可以开始、取消以及查看任务是否完成
  • 如果任务没有执行完,get方法会导致线程阻塞
  • 一旦一个执行任务已经完成就不能再次开始和结束(除非执行时通过runAndReset()方法)

类关系

先看一下类关系图:
《FutureTask 源码分析》FutureTask类关系图.jpg

  • Future为Java Tuture模式接口,Runable是实现异步操作的接口
  • RunnableFuture同时继承了以上两个接口,所以同时具有两种功能
  • 而FutureTask为RunnableFuture的实现类

成员变量

修饰符变量名描述
private Callablecallable任务的执行体
private Objectoutcone最终输出的结果
private volatile Threadrunner异步执行任务的线程
private volatile WaitNodewaiters获取任务结果的等待线程(是一个链式列表)
private volatile intstate当前一步任务的状态
private static final intNEW任务初始化状态
private static final intCOMPLETING任务已经完成,但结果还没有赋值给outcome
private static final intNORMAL任务执行完成
private static final intEXCEPTIONAL任务执行异常
private static final intINTERRUPTING任务被中断中
private static final intINTERRUPTED任务被中断

构造函数及方法

通过传入Callable来构造一个任务

public FutureTask(Callable<V> callable) {
    if (callable == null)
        throw new NullPointerException();
    this.callable = callable;
    this.state = NEW;       // ensure visibility of callable
}

通过传人Runnable来构造一个任务

public FutureTask(Runnable runnable, V result) {
    this.callable = Executors.callable(runnable, result);
    this.state = NEW;       // ensure visibility of callable
}

以上是两个构造函数,有构造函数可知,回家传入参数转换成Callable,然后放到callable变量里,将任务执行状态置为NEW。

公共方法概要如下:

修饰符方法描述
booleancancel(boolean mayInterruptIfRunning)取消或者中断任务(true为中断,false为取消)
Vget()返回执行结果,当为完成执行时,则阻塞线程
Vget(long timeout, TimeUnit unit)获得执行结果,当时间超出设定时间时,则返回超时
booleanisCancelled()返回任务是否已经取消
booleanisDone()判断任务是否执行完毕

执行状态转换

执行任务时可能有4种状态转换:

  1. 任务顺利执行:NEW -> COMPLETING -> NORMAL
  2. 任务执行异常:NEW -> COMPLETING -> EXCEPTIONAL
  3. 任务取消:NEW -> CANCELLED
  4. 任务中断:NEW -> INTERRUPTING -> INTERRUPTED

主要方法

run()

public void run() {
	//如果状态不是初始化后,则返回(避免重复执行)
    if (state != NEW ||
        !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
        return;
    try {
        Callable<V> c = callable;
        if (c != null && state == NEW) {
            V result;
            boolean ran;
            try {
            	//执行主要方法
                result = c.call();
                ran = true;
            } catch (Throwable ex) {
            	//异常处理
                result = null;
                ran = false;
                setException(ex);
            }
            //如果执行成功,则设置结果和状态
            if (ran)
                set(result);
        }
    } finally {
        // 将执行任务的执行线程置空
        runner = null;
        // state must be re-read after nulling runner to prevent
        // leaked interrupts
        int s = state;
        //中断处理
        if (s >= INTERRUPTING)
            handlePossibleCancellationInterrupt(s);
    }
}

set()

/**
* 执行结果的赋值操作, 子类可重写
**/
protected void set(V v) {
	//为state赋值COMPLETING
    if (U.compareAndSwapInt(this, STATE, NEW, COMPLETING)) {
        outcome = v;
        //赋值完成后,为state赋值NORMAL
        U.putOrderedInt(this, STATE, NORMAL);
        //完成处理
        finishCompletion();
    }
}

finishCompletion()

/**
* 在任务执行完成(包括取消、正常结束、发生异常), 将等待线程列表唤醒
* 同时让任务执行体置空
**/
private void finishCompletion() {
    // assert state > COMPLETING;
    for (WaitNode q; (q = waiters) != null;) {
        if (U.compareAndSwapObject(this, WAITERS, q, null)) {
            for (;;) {
                Thread t = q.thread;
                if (t != null) {
                    q.thread = null;
                    //释放许可
                    LockSupport.unpark(t);
                }
                WaitNode next = q.next;
                if (next == null)
                    break;
                q.next = null; // unlink to help gc
                q = next;
            }
            break;
        }
    }
	//完成之后需要执行的方法,该方法可重写
    done();

    callable = null;        // to reduce footprint
}

cancel(boolean mayInterruptIfRunning)

public boolean cancel(boolean mayInterruptIfRunning) {
	//如果任务状态不是初始化状态,则取消任务
    if (!(state == NEW &&
          U.compareAndSwapInt(this, STATE, NEW,
              mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
        return false;
    try {    // in case call to interrupt throws exception
        if (mayInterruptIfRunning) {
            try {
            	//则让执行线程中断(在这个过程中执行线程可能会改变任务的状态)
                Thread t = runner;
                if (t != null)
                    t.interrupt();
            } finally { // final state
            	//改变任务状态为INTERRUPTED
                U.putOrderedInt(this, STATE, INTERRUPTED);
            }
        }
    } finally {
    	//处理任务完成的结果
        finishCompletion();
    }
    return true;
}

get()

public V get() throws InterruptedException, ExecutionException {
    int s = state;
    if (s <= COMPLETING)
        s = awaitDone(false, 0L);
    //返回结果值
    return report(s);
}

awaitDone(boolean timed, long nanos)

/**
* 等待任务的执行结果
* timed: 是否有时间限制  nanos: 限制的时间
**/ 
private int awaitDone(boolean timed, long nanos)
    throws InterruptedException {
    long startTime = 0L;    // Special value 0L means not yet parked
    WaitNode q = null;
    boolean queued = false;
    for (;;) {
        int s = state;
        /**
        * 1. 首先判断任务状态是否是完成状态, 是就直接返回结果
        * 2. 如果1为false,并且任务的状态是COMPLETING, 也就是在set()任务结果时被阻塞了,则让出当前线程cpu资源
        * 3. 如果前两步false,并且q==null,则初始化一个当前线程的等待节点
        * 4. 下一次循环体, 如果前3步依然是false,并且当前节点没有加入到等待列表,
        *     则将当前线程节点放在等待列表的第一个位置
        * 5. 在下一次循环, 如果前4步为false, 如果是时间范围内等待的,则判断当前时间是否过期,
        *    过期则将线程节点移出等待队列并返回任务状态结果, 如果没过期,则让当前线程阻塞一定时间
        * 6. 如果不是时间范围内等待, 并且前5步均为false,则让线程阻塞,直到被唤醒
        **/
        if (s > COMPLETING) {
            if (q != null)
                q.thread = null;
            return s;
        }
        else if (s == COMPLETING)
            Thread.yield();
        else if (Thread.interrupted()) {
            removeWaiter(q);
            throw new InterruptedException();
        }
        else if (q == null) {
            if (timed && nanos <= 0L)
                return s;
            q = new WaitNode();
        }
        else if (!queued)
            queued = U.compareAndSwapObject(this, WAITERS,
                                            q.next = waiters, q);
        else if (timed) {
            final long parkNanos;
            if (startTime == 0L) { // first time
                startTime = System.nanoTime();
                if (startTime == 0L)
                    startTime = 1L;
                parkNanos = nanos;
            } else {
                long elapsed = System.nanoTime() - startTime;
                if (elapsed >= nanos) {
                    removeWaiter(q);
                    return state;
                }
                parkNanos = nanos - elapsed;
            }
            // nanoTime may be slow; recheck before parking
            if (state < COMPLETING)
                LockSupport.parkNanos(this, parkNanos);
        }
        else
            LockSupport.park(this);
    }
}

注:如果你在源码中看到sun.misc.Unsafe,这个类是Java中直接操作内存的,可以理解为C语言中的指针,由于安全原因,Java并不向外部透露此类,使用时可以通过反射的方法。如果想要了解具体使用方法,请自行百度~

用法示例

ExecutorService executor = new ScheduledThreadPoolExecutor(2);
FutureTask<String> future = new FutureTask<>(() -> {
    System.out.println("FutureTask sleep..., Time is " + System.currentTimeMillis());
    Thread.sleep(5000);
    return "OK";
});
executor.execute(future);
System.out.println("MainThread sleep..., Time is " + System.currentTimeMillis());
try {
    Thread.sleep(4000);
} catch (InterruptedException e) {
    e.printStackTrace();
}
String result = "";
try {
    result = future.get();
} catch (InterruptedException e) {
    e.printStackTrace();
} catch (ExecutionException e) {
    e.printStackTrace();
}
System.out.println("result is " + result + ", Time is " + System.currentTimeMillis());

执行结果如下:

MainThread sleep..., Time is 1491225495421
FutureTask sleep..., Time is 1491225495421
result is OK, Time is 1491225500426

    原文作者:Android源码分析
    原文地址: https://juejin.im/entry/58e46461ac502e4957a28e42
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞