异步编程的难点
如何优雅地实现异步编程一直都是一个难题,异步编程的通常做法就是采用callback
的方法,但是这种方法通常会把代码嵌套在正常流程的代码中,而且当有多层嵌套的时候代码更加难以维护。
另外还有一点,异步编程的异常处理也是难以未维护,特别是在Java中,异步编程通常由新的线程完成,而子线程的异常是无法在父线程捕获的,那么对于异步执行结果的获取就需要付出更大的代价,比如通过:轮询、事件驱动等来完成。
从Future说起
Java5之后就引入Future用于异步编程,通过get()
方法来对异步执行结果的同步等待和结果获取:
Future<String> doSomething = Executors.newSingleThreadExecutor().submit(() -> {
try {
Thread.sleep(1000 * 3);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "success";
});
String result = doSomething.get();
System.out.println(result);
Future的Api比较简单,而已对异常处理不友好,如果有同时有多个异步操作需要同时进行是就不好处理了
假设有这么一个场景,用户登录拿到登录凭证(token),登录之后获取用户信息
ExecutorService executors = Executors.newFixedThreadPool(10);
Future<String> login = executors.submit(()->login());
String token = login.get();
Future<String> userInfo = executors.submit(() -> userInfo(token));
String userInfoResult = userInfo.get();
System.out.println(userInfoResult);
这种实现方法还是不能实现真正的异步编程或者说不是我们所期望的,我们期望的是登录后获取用户信息,但这两件事情完成后统一对结果进行处理,而这种方式是先等待登录之后再取用户信息,和同步调用类似,这就与我们的设想不符。
CompletableFuture
初识CompletableFuture
在Java8中引入了CompletableFuture类,同时实现了Future接口和CompletionStage接口,提供了一套用于异步编程的Api接口并且提供了异步处理
CompletableFuture提供了许多异步编程的操作,可以说是Java中的Promise了,下面通过CompletableFuture来实现上面提到的例子:
String userInfo = CompletableFuture.supplyAsync(() -> login())
.thenApplyAsync(token -> userInfo(token))
.get();
System.out.println(userInfo);
CompletableFuture API
CompletableFuture方法很多,功能也很丰富,这里不一一说明,主要可以分为这几类来使用:
1.把CompletableFuture当Future使用
CompletableFuture实现了Future接口,也就是Future能做的CompletableFuture也同样能使用,加上complete
和completeExceptionally
方法可以控制结果的结束:
CompletableFuture<String> f = new CompletableFuture<>();
Executors.newSingleThreadExecutor().submit(()->{
f.complete("hello");
//f.completeExceptionally(new RuntimeException("error"));
});
String result = f.get();
System.out.println(result);
可以通过CompletableFuture来控制多个异步操作同时执行:
CompletableFuture<String> f = new CompletableFuture<>();
new Thread(() -> {
try {
System.out.println("thread1:" + f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
new Thread(() -> {
try {
System.out.println("thread2:" + f.get());
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}).start();
f.complete("hello");
2.异步操作
创建异步操作的方法主要是:
public static CompletableFuture<Void> runAsync(Runnable runnable)
public static CompletableFuture<Void> runAsync(Runnable runnable,Executor executor)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier)
public static <U> CompletableFuture<U> supplyAsync(Supplier<U> supplier,Executor executor)
使用如下:
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> {
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
e.printStackTrace();
}
return "hello";
});
String result = f.get();
System.out.println(result);
3.连续异步操作
public CompletableFuture<Void> thenRun(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action)
public CompletableFuture<Void> thenRunAsync(Runnable action,Executor executor)
public <U> CompletableFuture<U> thenApply(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn)
public <U> CompletableFuture<U> thenApplyAsync(Function<? super T,? extends U> fn, Executor executor)
public CompletableFuture<Void> thenAccept(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action)
public CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action,Executor executor)
使用如下:
CompletableFuture<Void> f = CompletableFuture
.supplyAsync(() -> "hello")
.thenApplyAsync(res -> res + " world!")
.thenAcceptAsync(System.out::println);
// wait for job done
f.get();
4.等待操作完成
public CompletableFuture<T> whenComplete(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action)
public CompletableFuture<T> whenCompleteAsync(BiConsumer<? super T, ? super Throwable> action, Executor executor)
使用如下:
CompletableFuture<String> f = CompletableFuture
.supplyAsync(() -> "hello")
.thenApplyAsync(res -> res + " world!")
.whenComplete((res, err) -> {
if (err != null) {
err.printStackTrace();
} else {
System.out.println(res);
}
});
// wait for job done
f.get();
5.组合
public <U> CompletableFuture<U> thenCompose(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn)
public <U> CompletableFuture<U> thenComposeAsync(Function<? super T, ? extends CompletionStage<U>> fn,Executor executor)
public <U,V> CompletableFuture<V> thenCombine(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn)
public <U,V> CompletableFuture<V> thenCombineAsync(CompletionStage<? extends U> other,BiFunction<? super T,? super U,? extends V> fn, Executor executor)
使用如下:
CompletableFuture<String> f = CompletableFuture.supplyAsync(() -> "Hello")
.thenCompose(res -> CompletableFuture.supplyAsync(() -> res + " World,"))
.thenCombine(CompletableFuture.supplyAsync(() -> "CompletableFuture!"), (a, b) -> a + b);
String result = f.get();
System.out.println(result);//Hello World,CompletableFuture!
6.结果&异常处理
public <U> CompletableFuture<U> handle(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn)
public <U> CompletableFuture<U> handleAsync(BiFunction<? super T, Throwable, ? extends U> fn, Executor executor)
public CompletableFuture<T> exceptionally(Function<Throwable, ? extends T> fn)
使用如下:
// 异常处理
CompletableFuture<Object> f = CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(res -> res + "World")
.thenApplyAsync(res -> {
throw new RuntimeException("error");
})
.exceptionally(e -> {
//handle exception here
e.printStackTrace();
return null;
});
f.get();
// 执行结果处理
CompletableFuture<Object> f2 = CompletableFuture.supplyAsync(() -> "Hello")
.thenApplyAsync(res -> res + "World")
.thenApplyAsync(res -> {
throw new RuntimeException("error");
})
.handleAsync((res, err) -> {
if (err != null) {
//handle exception here
return null;
} else {
return res;
}
});
Object result = f2.get();
System.out.println(result);
7.并行执行异步操作并统一处理结果
public static CompletableFuture<Void> allOf(CompletableFuture<?>... cfs)
使用如下:
CompletableFuture<String> f1 = CompletableFuture.supplyAsync(() -> "hello");
CompletableFuture<String> f2 = CompletableFuture.supplyAsync(() -> "world");
CompletableFuture<String> f3 = CompletableFuture.supplyAsync(() -> "!");
// 使用allOf方法
CompletableFuture<Void> all = CompletableFuture.allOf(f1, f2, f3);
all.get();
System.out.println(f1.get());
System.out.println(f2.get());
System.out.println(f3.get());
// 结合StreamAPI
List<String> result = Stream.of(f1, f2, f3)
.map(CompletableFuture::join)
.collect(Collectors.toList());
System.out.println(result);
总结
在Java7之前,Java中对于异步编程的实现都可能比较复杂或者实现得不够优雅,而CompletableFuture的出现则提供了异步编程的强大能力,虽然API有点多但是只要稍加理解和使用还是很好应用的,通过链式调用使原本通过回调的方式变得更加优雅,代码的可阅读性和可维护性也得到一定的提高。