前言
可完成的Future
对于1.5提供的Future接口。有一定的局限性,很难直接表述多个future之间的依赖性。比如对于结果的获取只能通过阻塞或者轮询的方式获得,这违背了异步编程的初衷。
下面通过一个简单的例子说明CompletableFuture的使用。
例子
将List中元素,异步方式全部小写转大写
List<String> list = new ArrayList<>();
list.add("a");
list.add("b");
list.add("c");
//supplyAsync 构造一个CompletableFuture
//CompletableFuture.completedFuture() 构造一个已完成的future
//thenApply 相当于回调函数
List<CompletableFuture> completableFutures = list.stream().map(e->CompletableFuture.supplyAsync(()-> {
//异步任务执行
try {
//模拟耗时任务
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
return e;
}).thenApply(r->r.toUpperCase())).collect(Collectors.toList());
//阻塞主线程直到所有全部任务完成
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[completableFutures.size()])).join();
completableFutures.stream().forEach(e->{
try {
//获得执行结果
System.out.println("执行结果" + e.get());
} catch (InterruptedException e1) {
e1.printStackTrace();
} catch (ExecutionException e1) {
e1.printStackTrace();
}
});
接口介绍
CompletableFuture实现了ComplationStage和Future接口。ComplationStage理解为一个执行阶段。
ComplationStage中的抽象方法大部分返回值是ComplationStage,参数都是函数式接口,这符合函数式编程的流式处理思想。
函数式编程 | 抽象行为 |
面向对象编程 | 抽象数据 |
CompletableFuture的创建
1)带返回结果
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier) //使用ForkJoinPool
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor) //使用外部线程池Executor
2)无返回结果
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
CompletableFuture.supplyAsync(()-> "hello") // 可以在里面调用执行的任务
回调执行thenApply
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn,Executor executor)
CompletableFuture.supplyAsync(()-> "hello")
.thenApply( e -> e.toUpperCase())
.whenComplete((e,r)-> System.out.println(e));//HELLO
thenAccept和thenRun 阶段处理
CompletableFuture.supplyAsync(()-> "hello")
.thenApply( e -> e.toUpperCase())
.thenAccept(e -> System.out.println(e));
thenAccpet接受上一阶段的返回值,进行处理。thenRun 不接受上一阶段的返回值。两者执行都返回CompletableFuture<Void>
合并两个计算结果thenCombine
CompletableFuture.supplyAsync(()-> "hello")
.thenCombine(CompletableFuture.completedFuture("world"),(s1,s2)->s1+s2)
.whenComplete((e,r)-> System.out.println(e)); // helloworld
等待所有异步任务执行完成 allOf
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
通过CompletableFuture的join方法来进行阻塞等待异步任务完成