并发编程的概念并不新鲜,每一种编程语言中都内置了相关的支持,而有些编程语言因为对并发提供了更有友好的支持而得到了更多的关注。
拥抱并发
使用并发编程并不仅仅是为了CPU多核从而使得程序能够并行执行,其本质其实就是为了消除延迟,例如访问硬盘、网络IO等慢速的设备相对单纯的CPU计算会有很高的延迟,进而导致线程阻塞在这里等待资源,这个时候CPU的资源就白白浪费了,因此我们会根据业务场景,选择开启多个线程,将这些比较耗时的IO任务丢到另外的线程中去处理,这样就不会因为某些慢请求而影响其他用户,从而提高响应时间。因此这里就涉及到了并发模型的选型,下面我选择几种并尝试总结其优劣。
Java Future
Java Future 代表一种异步计算的结果,调用方可以检查计算是否完成、等待计算完成、获取计算结果等,使用起来非常的直观,但当多个Future组合起来时,特别每个异步计算的耗时都不一样,代码通常会变的很复杂并且很容易出错。各种Future#get
和try/catch
充斥在代码中,导致可读性变的非常差。
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
Future<List<String>> blogsFuture = fetchBlogs();
try {
List<String> blogs = blogsFuture.get();
blogs.forEach(s -> {
Future<String> imageFuture = fetchImage(s);
try {
String image = imageFuture.get();
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
});
} catch (InterruptedException | ExecutionException e) {
e.printStackTrace();
}
}
public static Future<List<String>> fetchBlogs() {
return executorService.submit(
(Callable<List<String>>) Collections::emptyList);
}
public static Future<String> fetchImage(String title) {
return executorService.submit(() -> title);
}
What about callback?
提起回调我们都会想起回调地狱,随便Google一下都能找到很多类似的代码,但回调也有它自己的优势所在,异步计算的结果完成的那一刻我们就能够收到通知,但和Future一样当有多个不同的条件时,代码就会变的很不可控,充满了各种内嵌。 下面的示例只有两层而已,试想一下如果有多个需要组合处理,那么维护这段代码将会变的非常蛋疼。
public static void main(String[] args) {
fetchBlogs(new Callback<List<String>>() {
public void onComplete(List<String> data) {
data.forEach(s -> fetchImage(s, new Callback<String>() {
@Override
public void onComplete(String data) {
}
@Override
public void error(Throwable throwable) {
throwable.printStackTrace();
}
}));
}
public void error(Throwable throwable) {
throwable.printStackTrace();
}
});
}
public static void fetchBlogs(Callback<List<String>> callback) {
callback.onComplete(Collections.<String>emptyList());
}
public static void fetchImage(String title, Callback<String> callback) {
callback.onComplete(title);
}
}
interface Callback<T> {
void onComplete(T data);
void error(Throwable throwable);
}
Guava的解决方案
熟悉Guava的同学都知道,它提供了很多工具类从而使得我们可以更友好的编写并发代码,但是在处理多个异步任务之间的组合依赖关系时也有类似的问题, 因为本质上就是将Future的计算转换成了回调的方式提供给用户,同样会产生很多的内嵌代码。
private static final ListeningExecutorService executorService = MoreExecutors
.listeningDecorator(Executors.newCachedThreadPool());
public static void main(String[] args) {
Futures.addCallback(fetchBlogs(), new FutureCallback<List<String>>() {
@Override
public void onSuccess(List<String> result) {
result
.forEach(s -> Futures.addCallback(fetchImage(s), new FutureCallback<String>() {
@Override
public void onSuccess(String result) {
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, executorService));
}
@Override
public void onFailure(Throwable t) {
t.printStackTrace();
}
}, executorService);
}
public static ListenableFuture<List<String>> fetchBlogs() {
return executorService.submit(
(Callable<List<String>>) Collections::emptyList);
}
public static ListenableFuture<String> fetchImage(String title) {
return executorService.submit(() -> title);
}
And CompletableFuture?
CompletableFuture是在JDK1.8进入的,相当于Future
的升级版,其实灵感就是来自Guava的 Listenable Futures,它提供了一堆操作符让你可以将多个异步任务的处理组合成链式,从而让你可以方便处理多个future之间的依赖关系。
public static void main(String[] args) {
fetchBlogs().thenAccept(blogs ->
blogs.forEach(s ->
fetchImage(s)
.thenAccept(s1 -> System.out.println())
.exceptionally(throwable -> {
throwable.printStackTrace();
return null;
})))
.exceptionally(throwable -> {
throwable.printStackTrace();
return null;
});
}
public static CompletableFuture<List<String>> fetchBlogs() {
CompletableFuture<List<String>> completableFuture = new CompletableFuture<>();
completableFuture.complete(Collections.emptyList());
return completableFuture;
}
public static CompletableFuture<String> fetchImage(String title) {
CompletableFuture<String> completableFuture = new CompletableFuture<>();
completableFuture.complete("");
return completableFuture;
}
RxJava
Rxjava最初是由Netflix开发的Reactive programming框架,库本身非常的强大,提供了大量的操作符、更友好的错误处理、事件驱动的编程模式,屏蔽了线程安全、同步、并发数据结构等底层并发原语。
所有的方法都返回Observable,对于调用方都是一直的方法签名,而方法内部,我们想基于同步/异步、从缓存读取还是数据库读取、采用NIO还是阻塞式IO,对于调用者来说都是透明的,你不需要关心,假设我们有这个方法Data getData()
,很明显目前是同步的方式,如果我们想改成异步的方式,那么就需要改动方法签名,例如返回Future
或者添加一个callback参数,这就涉及了很大的改造,那么如果采用RxJava的方法,这些对于上层都是透明的。
但是RxJava的也有其问题,学习成本很高,observeOn/subscribeOn等各种新鲜的概念需要去理解,如果系统完全基于Rxjava开发,那么会调试也是个很蛋疼的问题,特别是多线程的情况下,虽然说官网以及WIKI等都提供了相当丰富的资料,但问题也在于此,太丰富了,不利于初学者上手。当然Java 9也提供了Reactive programming的解决方案,Flow API 提供了一系列接口,具体的实现可以采用Rxjava、 Vertx等。
private static final ExecutorService executorService = Executors.newCachedThreadPool();
public static void main(String[] args) {
fetchBlogs()
.subscribe(blogs ->
blogs.forEach(s ->
fetchImage(s)
.subscribe(System.out::println,
Throwable::printStackTrace)),
Throwable::printStackTrace);
}
public static Observable<List<String>> fetchBlogs() {
return Observable.create(observable -> executorService.submit(() -> {
try {
observable.onNext(Collections.emptyList());
observable.onComplete();
} catch (Exception e) {
observable.onError(e);
}
}));
}
public static Observable<String> fetchImage(String title) {
return Observable.create(observable -> {
observable.onNext("");
observable.onComplete();
});
}
Coroutine
Kotlin目前已经支持了Coroutine,Java也有类似的解决方案,现在也已经有了提案给Java也增加Coroutine的机制,可能很多人有一个误解,coroutine并不是说有更高的性能,而是说让我们可以像调用其他方法一样去调用并发/阻塞的方法,而调度器则向你屏蔽了底层的细节。例如一个GetXXX
方法,其内部设计到了网络调用,那么我们可以中断在这里并让出CPU资源,从而使得其他coroutine可以运行,具体实现机制可以参考另一篇博客.
fun main(args: Array<String>) {
async {
val blogs = fetchBlogs()
blogs.forEach {
fetchImage(it)
}
}
}
suspend fun fetchBlogs(): List<String> {
return Collections.emptyList()
}
suspend fun fetchImage(title: String): String {
return ""
}
总结
这里简单总结了几种不同的并发编程模式,并不是说哪一种更好,而是要看看自己具体的场景,选择最合适的解决方案,万能药是不存在的,每一种药都有其用武之地、其擅长的领域。