Java中的Future

1 概述

Future代表异步计算返回的结果,提供了检查是否结束、等待结束以及获取计算结果的方法。Executor框架使用Runnable作为其任务表示形式,但它不能返回一个值。许多任务实际上都是存在延迟计算的:执行数据库查询,从网络上获取资源,或者某个复杂耗时的计算。对于这种任务,Callable是一个更好的抽象,他能返回一个值,并可能抛出一个异常。

/**
 * A task that returns a result and may throw an exception.
 * Implementors define a single method with no arguments called
 * <tt>call</tt>.
 *
 * <p>The <tt>Callable</tt> interface is similar to {@link
 * java.lang.Runnable}, in that both are designed for classes whose
 * instances are potentially executed by another thread.  A
 * <tt>Runnable</tt>, however, does not return a result and cannot
 * throw a checked exception.
 *
 * <p> The {@link Executors} class contains utility methods to
 * convert from other common forms to <tt>Callable</tt> classes.
 *
 * @see Executor
 * @since 1.5
 * @author Doug Lea
 * @param <V> the result type of method <tt>call</tt>
 */
public interface Callable<V> {
    /**
     * Computes a result, or throws an exception if unable to do so.
     *
     * @return computed result
     * @throws Exception if unable to compute a result
     */
    V call() throws Exception;
}
public interface Future<V> {

    /**
     * Attempts to cancel execution of this task.  This attempt will
     * fail if the task has already completed, has already been cancelled,
     * or could not be cancelled for some other reason. If successful,
     * and this task has not started when <tt>cancel</tt> is called,
     * this task should never run.  If the task has already started,
     * then the <tt>mayInterruptIfRunning</tt> parameter determines
     * whether the thread executing this task should be interrupted in
     * an attempt to stop the task.
     *
     * <p>After this method returns, subsequent calls to {@link #isDone} will
     * always return <tt>true</tt>.  Subsequent calls to {@link #isCancelled}
     * will always return <tt>true</tt> if this method returned <tt>true</tt>.
     *
     * @param mayInterruptIfRunning <tt>true</tt> if the thread executing this
     * task should be interrupted; otherwise, in-progress tasks are allowed
     * to complete
     * @return <tt>false</tt> if the task could not be cancelled,
     * typically because it has already completed normally;
     * <tt>true</tt> otherwise
     */
    boolean cancel(boolean mayInterruptIfRunning);

    /**
     * Returns <tt>true</tt> if this task was cancelled before it completed
     * normally.
     *
     * @return <tt>true</tt> if this task was cancelled before it completed
     */
    boolean isCancelled();

    /**
     * Returns <tt>true</tt> if this task completed.
     *
     * Completion may be due to normal termination, an exception, or
     * cancellation -- in all of these cases, this method will return
     * <tt>true</tt>.
     *
     * @return <tt>true</tt> if this task completed
     */
    boolean isDone();

    /**
     * Waits if necessary for the computation to complete, and then
     * retrieves its result.
     *
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     */
    V get() throws InterruptedException, ExecutionException;

    /**
     * Waits if necessary for at most the given time for the computation
     * to complete, and then retrieves its result, if available.
     *
     * @param timeout the maximum time to wait
     * @param unit the time unit of the timeout argument
     * @return the computed result
     * @throws CancellationException if the computation was cancelled
     * @throws ExecutionException if the computation threw an
     * exception
     * @throws InterruptedException if the current thread was interrupted
     * while waiting
     * @throws TimeoutException if the wait timed out
     */
    V get(long timeout, TimeUnit unit)
        throws InterruptedException, ExecutionException, TimeoutException;
}

可以通过多种方法来创建一个Future来描述任务。ExecutorService中的submit方法接受一个Runnable或者Callable,然后返回一个Future来获得任务的执行结果或者取消任务。

public interface ExecutorService extends Executor {

   /**
    * Submits a value-returning task for execution and returns a
    * Future representing the pending results of the task. The
    * Future's <tt>get</tt> method will return the task's result upon
    * successful completion.
    *
    * <p>
    * If you would like to immediately block waiting
    * for a task, you can use constructions of the form
    * <tt>result = exec.submit(aCallable).get();</tt>
    *
    * <p> Note: The {@link Executors} class includes a set of methods
    * that can convert some other common closure-like objects,
    * for example, {@link java.security.PrivilegedAction} to
    * {@link Callable} form so they can be submitted.
    *
    * @param task the task to submit
    * @return a Future representing pending completion of the task
    * @throws RejectedExecutionException if the task cannot be
    *         scheduled for execution
    * @throws NullPointerException if the task is null
    */
   <T> Future<T> submit(Callable<T> task);

   /**
    * Submits a Runnable task for execution and returns a Future
    * representing that task. The Future's <tt>get</tt> method will
    * return the given result upon successful completion.
    *
    * @param task the task to submit
    * @param result the result to return
    * @return a Future representing pending completion of the task
    * @throws RejectedExecutionException if the task cannot be
    *         scheduled for execution
    * @throws NullPointerException if the task is null
    */
   <T> Future<T> submit(Runnable task, T result);

   /**
    * Submits a Runnable task for execution and returns a Future
    * representing that task. The Future's <tt>get</tt> method will
    * return <tt>null</tt> upon <em>successful</em> completion.
    *
    * @param task the task to submit
    * @return a Future representing pending completion of the task
    * @throws RejectedExecutionException if the task cannot be
    *         scheduled for execution
    * @throws NullPointerException if the task is null
    */
   Future<?> submit(Runnable task);

   /**
    * Executes the given tasks, returning a list of Futures holding
    * their status and results when all complete.
    * {@link Future#isDone} is <tt>true</tt> for each
    * element of the returned list.
    * Note that a <em>completed</em> task could have
    * terminated either normally or by throwing an exception.
    * The results of this method are undefined if the given
    * collection is modified while this operation is in progress.
    *
    * @param tasks the collection of tasks
    * @return A list of Futures representing the tasks, in the same
    *         sequential order as produced by the iterator for the
    *         given task list, each of which has completed.
    * @throws InterruptedException if interrupted while waiting, in
    *         which case unfinished tasks are cancelled.
    * @throws NullPointerException if tasks or any of its elements are <tt>null</tt>
    * @throws RejectedExecutionException if any task cannot be
    *         scheduled for execution
    */

   <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
       throws InterruptedException;

   /**
    * Executes the given tasks, returning a list of Futures holding
    * their status and results
    * when all complete or the timeout expires, whichever happens first.
    * {@link Future#isDone} is <tt>true</tt> for each
    * element of the returned list.
    * Upon return, tasks that have not completed are cancelled.
    * Note that a <em>completed</em> task could have
    * terminated either normally or by throwing an exception.
    * The results of this method are undefined if the given
    * collection is modified while this operation is in progress.
    *
    * @param tasks the collection of tasks
    * @param timeout the maximum time to wait
    * @param unit the time unit of the timeout argument
    * @return a list of Futures representing the tasks, in the same
    *         sequential order as produced by the iterator for the
    *         given task list. If the operation did not time out,
    *         each task will have completed. If it did time out, some
    *         of these tasks will not have completed.
    * @throws InterruptedException if interrupted while waiting, in
    *         which case unfinished tasks are cancelled
    * @throws NullPointerException if tasks, any of its elements, or
    *         unit are <tt>null</tt>
    * @throws RejectedExecutionException if any task cannot be scheduled
    *         for execution
    */
   <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
                                 long timeout, TimeUnit unit)
       throws InterruptedException;

   /**
    * Executes the given tasks, returning the result
    * of one that has completed successfully (i.e., without throwing
    * an exception), if any do. Upon normal or exceptional return,
    * tasks that have not completed are cancelled.
    * The results of this method are undefined if the given
    * collection is modified while this operation is in progress.
    *
    * @param tasks the collection of tasks
    * @return the result returned by one of the tasks
    * @throws InterruptedException if interrupted while waiting
    * @throws NullPointerException if tasks or any element task
    *         subject to execution is <tt>null</tt>
    * @throws IllegalArgumentException if tasks is empty
    * @throws ExecutionException if no task successfully completes
    * @throws RejectedExecutionException if tasks cannot be scheduled
    *         for execution
    */
   <T> T invokeAny(Collection<? extends Callable<T>> tasks)
       throws InterruptedException, ExecutionException;

   /**
    * Executes the given tasks, returning the result
    * of one that has completed successfully (i.e., without throwing
    * an exception), if any do before the given timeout elapses.
    * Upon normal or exceptional return, tasks that have not
    * completed are cancelled.
    * The results of this method are undefined if the given
    * collection is modified while this operation is in progress.
    *
    * @param tasks the collection of tasks
    * @param timeout the maximum time to wait
    * @param unit the time unit of the timeout argument
    * @return the result returned by one of the tasks.
    * @throws InterruptedException if interrupted while waiting
    * @throws NullPointerException if tasks, or unit, or any element
    *         task subject to execution is <tt>null</tt>
    * @throws TimeoutException if the given timeout elapses before
    *         any task successfully completes
    * @throws ExecutionException if no task successfully completes
    * @throws RejectedExecutionException if tasks cannot be scheduled
    *         for execution
    */
   <T> T invokeAny(Collection<? extends Callable<T>> tasks,
                   long timeout, TimeUnit unit)
       throws InterruptedException, ExecutionException, TimeoutException;
}

假设我们通过一个方法从远程获取一些计算结果,假设方法是List getDataFromRemote(),如果采用同步的方法,代码大概是List data = getDataFromRemote(),我们将一直等待getDataFromRemote返回,然后才能继续后面的工作,这个函数是从远程获取计算结果的,如果需要很长时间,后面的代码又和这个数据没有什么关系的话,阻塞在那里就会浪费很多时间。我们有什么办法可以改进呢???

能够想到的办法是调用函数后,立即返回,然后继续执行,等需要用数据的时候,再取或者等待这个数据。具体实现有两种方式,一个是用Future,另一个是回调。

Future<List> future = getDataFromRemoteByFuture();
        //do something....
List data = future.get();

可以看到我们返回的是一个Future对象,然后接着自己的处理后面通过future.get()来获得我们想要的值。也就是说在执行getDataFromRemoteByFuture的时候,就已经启动了对远程计算结果的获取,同时自己的线程还继续执行不阻塞。知道获取时候再拿数据就可以。看一下getDataFromRemoteByFuture的实现:

private Future<List> getDataFromRemoteByFuture() {

        return threadPool.submit(new Callable<List>() {
            @Override
            public List call() throws Exception {
                return getDataFromRemote();
            }
        });
    }

我们在这个方法中调用getDataFromRemote方法,并且用到了线程池。把任务加入线程池之后,理解返回Future对象。Future的get方法,还可以传入一个超时参数,用来设置等待时间,不会一直等下去。
也可以利用FutureTask来获取结果:

FutureTask<List> futureTask = new FutureTask<List>(new Callable<List>() {
            @Override
            public List call() throws Exception {
                return getDataFromRemote();
            }
        });

        threadPool.submit(futureTask);


        futureTask.get();

FutureTask是一个具体的实现类,ThreadPoolExecutor的submit方法返回的就是一个Future的实现,这个实现就是FutureTask的一个具体实例,FutureTask帮助实现了具体的任务执行,以及和Future接口中的get方法的关联。FutureTask除了帮助ThreadPool很好的实现了对加入线程池任务的Future支持外,也为我们提供了很大的便利,使得我们自己也可以实现支持Future的任务调度。

Java本身没有提供Promise方式实现,在Scala以及Netty中都有Promise模式的实现,可以让用户控制future的完成状态(成功、失败等)以及future的返回值,参考文档中是一个自定义Promise的简单实现。

Reference

    原文作者:薛定谔的猫Plus
    原文地址: https://www.jianshu.com/p/9ccd7ab0687d
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞