Java8提供了一种函数风格的异步和事件驱动编程模型CompletableFuture,该模型不同于以往Java版本,不会造成堵塞Blocking。过去,Java 5并发包主要聚焦于异步任务处理,其模型特点是基于一个生产者线程,不断地创造任务,通过一个堵塞Blocking队列传递给任务的消费者,这个模型在Java 7和Java 8以后使用了另外一种任务执行模型,同时将一个任务的数据分解到子集中,这些子集能够分别被同样的子任务独立地处理。这种风格后面的基础库包就是 fork/join框架。
fork/join框架允许程序员指定一个数据集如何被切分多个子任务,将子任务提交一个标准默认的线程池:通用的ForkJoinPool。在Java 8中fork/join并行还可以通过并行流机制访问获得,但是这种方式的并行处理是有成本的和前提条件的:首先,元素处理必须能够独立进行,其次,数据集必须足够大,每个元素处理的消耗成本需要足够高,因为设置与启动fork/join框架本身也有一定的消耗,这些消耗相对于数据集合中每个元素处理的成本来说可以忽略不计,否则就不是很划算。
Java 8的CompletableFuture背后也是依靠fork/join框架启动新的线程实现异步与并发的,一般情况下,我们将一个任务放到另外一个线程执行,可能就无需等待那个线程处理完成的结果,而是直接在主线程中返回完成,但是有一些业务恰是需要等待新启动的线程中任务完成,然后和当前主线程中的处理进行合并再处理,比如下面代码我们需要在另外一个线程进行很长时间的运行。
CompletableFuture futureCount = CompletableFuture . supplyAsync (
() -> {
try {
// Simulate long running task 模拟长时间运行任务
Thread . sleep ( 5000 );
} catch ( InterruptedException e ) { }
return 10 ;
});
//现在可以同时在当前主线程做其他事情,不用等待上面长时间运行任务结束
CompletableFuture.supplyAsync 允许你基于ForkJoinPool 异步地运行一个任务,同时也有选项供你选择更多对线程池的控制。下面是获得长时间运行任务的返回结果:
try {
int count = futureCount . get ();
System . out . println ( count );
} catch ( InterruptedException | ExecutionException ex ) {
// Exceptions that occur in future will be thrown here.
}
当对CompletableFuture的实力进行.get()方法调用时,在计算过程中任何exception将被抛出。
创建和获得CompletableFuture有下面四个方法,主要是supplyAsunc和runAsync两种,后者提供的方法参数必须是线程的Runnable,因为Runnable是不会返回任何结果,所以,如果你需要异步处理并返回结果,应该使用Supplier<U>。
static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier); static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier, Executor executor); static CompletableFuture<Void> runAsync(Runnable runnable); static CompletableFuture<Void> runAsync(Runnable runnable, Executor executor);
我们需要将长时间任务放入supplyAsync方法体中,传统写法如下:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() { @Override public String get() { //…长时间运行… return “42”; } }, executor);
使用java8的lambda则是如下:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { //…长时间运行… return “42”; }, executor);
甚至简化如下:
final CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> longRunningTask(params), executor);
thenApply用法
从函数编程角度看,CompletableFuture 其实是一个monad和一个functor,如果你有Javacript编程经验,经常会返回或注册一个异步的callback回调函数,这样,我们就不必一直堵塞等待其处理完成再进行其他处理,这也是一种Future概念,意思是:当结果长时间计算出来以后,在结果上运行这个函数。我们可以运行多个这样的回调性质的函数,这可以使用CompletableFuture的 thenApply()方法:
<U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn); <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn); <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor);
第一个没有Async的thenApply方法是指在future完成的当前线程应用参数中函数,而后面两个带Async的方法则是在不同线程池异步地应用参数中的函数。
我们以字符串转换为整数功能为例,如下:
CompletableFuture<String> f1 = //… CompletableFuture<Integer> f2 = f1.thenApply(Integer::parseInt); CompletableFuture<Double> f3 = f2.thenApply(r -> r * r * Math.PI);
或者一句:
CompletableFuture<Double> f3 = f1.thenApply(Integer::parseInt).thenApply(r -> r * r * Math.PI);
这里定义了两个theApply转换功能,第一次是从字符串转换到Integer,然后再到Double。这些转换不是立即执行也不会堵塞,只是简单地记住,只有当f1完成以后才会执行这两个转换,如果一些转换动作很花费时间,你可以使用线程池异步处理。
thenAccept/thenRun用法
在长时间计算完成后可以经过上面转换,但是在最后阶段有两个方法:
CompletableFuture<Void> thenAccept(Consumer<? super T> block); CompletableFuture<Void> thenRun(Runnable action);
thenAccept()提供了final最后的值,而thenRun执行Runnable就不会返回任何计算好的值或结果了。
future.thenAcceptAsync(dbl -> log.debug(“Result: {}”, dbl), executor); log.debug(“Continuing”);
这两个方法是不会堵塞的,即使没有指定executor,可以将它们看成是对未来结果的监听者。
thenCompose()用法
CompletableFuture的异步处理非常不错,有时,你需要在一个future值结构运行某个函数,但是这个函数也是返回某种future,也就是说是两个future彼此依赖串联在一起,它类似于Scala的中的flatMap。
<U> CompletableFuture<U> thenCompose(Function<? super T,CompletableFuture<U>> fn);
下面是比较thenCompose和thenApply的不同案例(类似于Scala的map和flatMap的不同):
CompletableFuture<Document> docFuture = //… CompletableFuture<CompletableFuture<Double>> f = docFuture.thenApply(this::calculateRelevance); CompletableFuture<Double> relevanceFuture = docFuture.thenCompose(this::calculateRelevance); private CompletableFuture<Double> calculateRelevance(Document doc) //…
thenCompose()是能够建立健壮 异步管道pipeline的方法,没有任何堵塞或中间步骤。
thenCombine()用法
上面thenCompose()是用于多个彼此依赖的futrue进行串联起来,而thenCombine则是并联起两个独立的future,注意,这些future都是在长时间计算都完成以后。
<U,V> CompletableFuture<V> thenCombine(CompletableFuture<? extends U> other, BiFunction<? super T,? super U,? extends V> fn)
假设你有两个CompletableFuture,一个是加载Customer,而另外一个是加载最近商店Shop,它们两个都彼此独立,但是当都加载计算完毕以后,你需要使用它们的值计算路径:
CompletableFuture<Customer> customerFuture = loadCustomerDetails(123); CompletableFuture<Shop> shopFuture = closestShop(); CompletableFuture<Route> routeFuture = customerFuture.thenCombine(shopFuture, (cust, shop) -> findRoute(cust, shop)); //… private Route findRoute(Customer customer, Shop shop) //…
在Java8中可以使用this::findRoute 替代(cust, shop) -> findRoute(cust, shop) :
customerFuture.thenCombine(shopFuture, this::findRoute);
当我们有了customerFuture和shopFuture,那么routeFuture会包装它们两个并等待两个都完成计算,当都完成长时间计算以后,routeFuture会运行我们提供的函数findRoute(),routeFuture完成标志前两个前提future已经完成并且findRoute()也完成了。
当有多个CompletableFuture一起工作时,比如你希望在一个CompletableFuture返回的值和另外一个CompletableFuture返回值一起组合在一起再处理,可以使用thenCombine函数,