很多时候我们都希望能够最大的利用资源,比如在进行IO操作的时候尽可能的避免同步阻塞的等待,因为这会浪费CPU的资源。如果在有可读的数据的时候能够通知程序执行读操作甚至由操作系统内核帮助我们完成数据的拷贝,这再好不过了。从NIO到CompletableFuture、Lambda、Fork/Join,Java一直在努力让程序尽可能变的异步甚至拥有更高的并行度,这一点一些函数式语言做的比较好,因此java也或多或少的借鉴了某些特性。下面介绍一种非常常用的实现异步操作的方式。
考虑有一个耗时的操作,操作完后会返回一个结果(不管是正常结果还是异常),程序如果想拥有比较好的性能不可能由线程去等待操作的完成,而是应该采用listener模式。jdk并发包里的Future代表了未来的某个结果,当我们向线程池中提交任务的时候会返回该对象。代码例子:
[java]
view plain
copy
- /**
- * jdk1.8之前的Future
- *
- * @author Administrator
- *
- */
- public class JavaFuture {
- public static void main(String[] args) throws Throwable, ExecutionException {
- ExecutorService executor = Executors.newFixedThreadPool(1);
- // Future代表了线程执行完以后的结果,可以通过future获得执行的结果
- // 但是jdk1.8之前的Future有点鸡肋,并不能实现真正的异步,需要阻塞的获取结果,或者不断的轮询
- // 通常我们希望当线程执行完一些耗时的任务后,能够自动的通知我们结果,很遗憾这在原生jdk1.8之前
- // 是不支持的,但是我们可以通过第三方的库实现真正的异步回调
- Future<String> f = executor.submit(new Callable<String>() {
- @Override
- public String call() throws Exception {
- System.out.println(“task started!”);
- Thread.sleep(3000);
- System.out.println(“task finished!”);
- return “hello”;
- }
- });
- //此处阻塞main线程
- System.out.println(f.get());
- System.out.println(“main thread is blocked”);
- }
- }
如果想获得耗时操作的结果,可以通过get方法获取,但是该方法会阻塞当前线程,我们可以在做完剩下的某些工作的时候调用get方法试图去获取结果,也可以调用非阻塞的方法isDone来确定操作是否完成,这种方式有点儿类似下面的过程:
这种方式对流程的控制很混乱,但是在jdk1.8之前只提供了这种笨拙的实现方式,以至于很多高性能的框架都实现了自己的一套异步框架,比如Netty和Guava,下面分别介绍下这三种异步的实现方式(包括jdk1.8)。首先是Guava中的实现方式:
[java]
view plain
copy
- package guava;
- import java.util.concurrent.Callable;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import com.google.common.util.concurrent.FutureCallback;
- import com.google.common.util.concurrent.Futures;
- import com.google.common.util.concurrent.ListenableFuture;
- import com.google.common.util.concurrent.ListeningExecutorService;
- import com.google.common.util.concurrent.MoreExecutors;
- /**
- * Guava中的Future
- *
- * @author Administrator
- *
- */
- public class GuavaFuture {
- public static void main(String[] args) {
- ExecutorService executor = Executors.newFixedThreadPool(1);
- // 使用guava提供的MoreExecutors工具类包装原始的线程池
- ListeningExecutorService listeningExecutor = MoreExecutors.listeningDecorator(executor);
- //向线程池中提交一个任务后,将会返回一个可监听的Future,该Future由Guava框架提供
- ListenableFuture<String> lf = listeningExecutor.submit(new Callable<String>() {
- @Override
- public String call() throws Exception {
- System.out.println(“task started!”);
- //模拟耗时操作
- Thread.sleep(3000);
- System.out.println(“task finished!”);
- return “hello”;
- }
- });
- //添加回调,回调由executor中的线程触发,但也可以指定一个新的线程
- Futures.addCallback(lf, new FutureCallback<String>() {
- //耗时任务执行失败后回调该方法
- @Override
- public void onFailure(Throwable t) {
- System.out.println(“failure”);
- }
- //耗时任务执行成功后回调该方法
- @Override
- public void onSuccess(String s) {
- System.out.println(“success ” + s);
- }
- });
- //主线程可以继续做其他的工作
- System.out.println(“main thread is running”);
- }
- }
Guava提供了一套完整的异步框架,核心是可监听的Future,通过注册监听器或者回调方法实现及时获取操作结果的能力。需要提一点的是,假设添加监听的时候耗时操作已经执行完了,此时回调方法会被立即执行并不会丢失。想探究其实现方式的话可以跟一下源码,底层的原理并不难。
谈到异步编程就不得不提一下Promise,很多函数式语言比如js原生支持Promise,但是在java界也有一些promise框架,其中就有大名鼎鼎的Netty。从Future、Callback到Promise甚至线程池,Netty实现了一套完整的异步框架,并且netty代码中也大量使用了Promise,下面是Netty中的例子:
[java]
view plain
copy
- package netty_promise;
- import io.netty.util.concurrent.DefaultEventExecutorGroup;
- import io.netty.util.concurrent.EventExecutorGroup;
- import io.netty.util.concurrent.Future;
- import io.netty.util.concurrent.FutureListener;
- /**
- * netty中的promise
- *
- * @author Administrator
- *
- */
- public class PromiseTest {
- @SuppressWarnings({ “unchecked”, “rawtypes” })
- public static void main(String[] args) throws Throwable {
- //线程池
- EventExecutorGroup group = new DefaultEventExecutorGroup(1);
- //向线程池中提交任务,并返回Future,该Future是netty自己实现的future
- //位于io.netty.util.concurrent包下,此处运行时的类型为PromiseTask
- Future<?> f = group.submit(new Runnable() {
- @Override
- public void run() {
- System.out.println(“任务正在执行”);
- //模拟耗时操作,比如IO操作
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- System.out.println(“任务执行完毕”);
- }
- });
- //增加监听
- f.addListener( new FutureListener() {
- @Override
- public void operationComplete(Future arg0) throws Exception {
- System.out.println(“ok!!!”);
- }
- });
- System.out.println(“main thread is running.”);
- }
- }
直到jdk1.8才算真正支持了异步操作,其中借鉴了某些框架的实现思想,但又有新的功能,同时在jdk1.8中提供了lambda表达式,使得java向函数式语言又靠近了一步。借助jdk原生的CompletableFuture可以实现异步的操作,同时结合lambada表达式大大简化了代码量。代码例子如下:
[java]
view plain
copy
- package netty_promise;
- import java.util.concurrent.CompletableFuture;
- import java.util.concurrent.ExecutionException;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.function.Supplier;
- /**
- * 基于jdk1.8实现任务异步处理
- *
- * @author Administrator
- *
- */
- public class JavaPromise {
- public static void main(String[] args) throws Throwable, ExecutionException {
- // 两个线程的线程池
- ExecutorService executor = Executors.newFixedThreadPool(2);
- //jdk1.8之前的实现方式
- CompletableFuture<String> future = CompletableFuture.supplyAsync(new Supplier<String>() {
- @Override
- public String get() {
- System.out.println(“task started!”);
- try {
- //模拟耗时操作
- Thread.sleep(2000);
- } catch (InterruptedException e) {
- e.printStackTrace();
- }
- return “task finished!”;
- }
- }, executor);
- //采用lambada的实现方式
- future.thenAccept(e -> System.out.println(e + ” ok”));
- System.out.println(“main thread is running”);
- }
- }
以上的三种实现方式类似下面的过程:
上面的图只是简单的表示了一下异步的实现流程,实际的调用中看似顺序的步骤会发生线程的切换。
- 顶
- 0
- 踩