因为后端支撑并发连接数的提高以及微服务化的趋势,Non-blocking IO编程越发的必要了。Java由于还没有官方的协程支持,比较主流的都是采用Future/Reactive-Stream之类的call back方式,程序逻辑被割裂,给程序编写、维护、重构都带来很重的心智负担。
实际上Java中还是有一些第三方的协程实现是可以用的,在对非阻塞IO依赖很高的业务中可以考虑尝试一下,目前可用性比较高的有Quasar和ea-async.
这些方案基本上都是通过byte code Instrument,把编译后同步程序class文件修改为异步的操作。这里面有两个关键因素,一个是现场(主要是stack, 操作数栈)的保存和恢复,一个是能够在某些点暂停执行和重入执行。其中前者通过Instrument 字节码进去很容易修改,后者虽然JVM中也有PC(程序计数器)的概念,但是却不能通过字节码修改,所以一般都要更麻烦一些,用一个状态机来实现,退出时保存状态,重入时根据状态跳转到不同的代码位置。Instrument可以在编译后来做(如果用maven配置个maven插件就可以),在可以在启动的之后,加在agent来做。
Quasar
puniverse/quasar 以及其为web开发提供的整合puniverse/comsat 应该是比较流行的了。
Quasar是一个比较庞大的项目,拥有完整的生态,提供了Fiber实现,调度器,甚至Channel,Actor编程范式这样的支持。底层通过Instrument替换了Thread.par为Fiber.park,因而对于java.util.concurrent包中的各种同步原语和容器都可以在Fiber中使用。使用Quasar的Fiber编程,和原先写多线程程序没有任何区别,IO(需要用XXXChannel)/同步操作(除了synchronized)/ThreadLocal这些也都和阻塞IO一样的使用方法,只是需要把Thread替换成Fiber,然后类需要加Suspendable注解或者抛出SuspendExecution异常就可以了。其中Suspendable和SuspendExecution都是为了告诉Quasar哪些类需要做Instrument。Quasar的Fiber调度,Fiber让出执行权给调度器,是要通过抛异常来退出的,所以调度器性能上会差一些。
// Fiber实现的简单“高并发”HTTP Server ServerSocketChannel s = SocketChannel.open().bind(new InetSocketAddress(8080));
new Fiber(() -> {
while (true) {
SocketChannel ch = s.accept();
new Fiber(() -> {
ByteBuffer buffer = ByteBuffer.allocateDirect(1024);
int n = ch.read(buffer);
String response = "HTTP/1.0 200 OK\r\n";
ch.write(encoder.encode(CharBuffer.wrap(response)));
ch.close();
}).start();
}
}).start();
Comsat则对数据库、HTTP、Tomcat/Jetty等常用的lib和容器等提供了集成和封装。
不过项目的维护人Ron Pressler已经加入Oracle,参与JDK官方的协程项目Project Loom,所以这个项目近两年都没有大的更新了,如果要Java9或者以上的版本最好就不要选择这个方案了。
ea-async
electronicarts/ea-async 是一个提供async-await 风格协程实现的工具。实现比较简单,主要的就是Instrument的代码,其他没有提供额外的支持了。简单有时候也是优点,ea-async项目容易理解,学习成本低,效率上损失也不大。使用ea-async编写的代码,方法必须返回CompletableFuture或者CompletionStage,这反而也易于和Spring以及其他现有第三方框架整合。
以官方的例子来看,使用CompletableFuture的话要这么写:
import static java.util.concurrent.CompletableFuture.completedFuture;
public class Store {
public CompletableFuture<Boolean> buyItem(String itemTypeId, int cost) {
return bank.decrement(cost)
.thenCompose(result -> {
if(!result) {
return completedFuture(false);
}
return inventory.giveItem(itemTypeId).thenApply(res -> true);
});
}
}
如果使用ea-async可以写成:
import static com.ea.async.Async.await;
import static java.util.concurrent.CompletableFuture.completedFuture;
public class Store{
public CompletableFuture<Boolean> buyItem(String itemTypeId, int cost) {
if(!await(bank.decrement(cost))) {
return completedFuture(false);
}
try {
await(inventory.giveItem(itemTypeId));
return completedFuture(true);
} catch (Exception ex) {
await(bank.refund(cost));
throw new AppException(ex);
}
}
}
虽然是target Java8开发的,ea-async目前在Java9和Java10上都可以很好的使用。在JDK官方的Fiber实现可用之前,ea-async是一个非常不错的替代选择。
然而其缺点也是很明显的,改写后的代码stack被破坏,debug,profiler都无法正常工作了。