### java future
Runnable的任务是没有返回值,也不能抛出异常的
java.util.concurrent.Callable接口,可以返回一个对象或者抛出异常
使用jdk的这种方式提交Callble任务,如果使用Future等待返回结果,这里就是阻塞的。所以,可能某一个任务时间很长会拖累整个主任务的执行。
1 public class TestFuture { 2 final static ExecutorService executorService = Executors.newCachedThreadPool(); 3 4 public static void main(String[] args) { 5 Future<Boolean> booleanTask = executorService.submit(new Callable<Boolean>() { 6 @Override 7 public Boolean call() throws Exception { 8 9 return true; 10 } 11 }); 12 13 while (true) { 14 if (booleanTask.isDone() && !booleanTask.isCancelled()) { 15 try { 16 TimeUnit.SECONDS.sleep(5); 17 Boolean result = booleanTask.get(); 18 System.out.println("BooleanTask : " + result); 19 } catch (Exception e) { 20 e.printStackTrace(); 21 } 22 break; 23 } 24 } 25 26 if (booleanTask.isDone() && !booleanTask.isCancelled()) { 27 try { 28 29 Boolean result = booleanTask.get(); 30 System.out.println("BooleanTask : " + result); 31 } catch (Exception e) { 32 e.printStackTrace(); 33 } 34 } 35 System.out.println(booleanTask.isCancelled()); 36 System.out.println("BooleanTask is over"); 37 38 Future<String> stringTask = executorService.submit(new Callable<String>() { 39 @Override 40 public String call() throws Exception { 41 42 return "Hello world"; 43 } 44 }); 45 46 /** 47 cancel只能取消还没有提交或者执行的线程任务,所以这里无法确定是否成功取消了submit的callab le任务,stringTask.isCancelled()的值也就是不确定的 48 **/ 49 50 stringTask.cancel(true); 51 52 while (true) { 53 if (stringTask.isDone() && !stringTask.isCancelled()) { 54 try { 55 56 TimeUnit.SECONDS.sleep(5); 57 String result = stringTask.get(); 58 System.out.println("StringTask : " + result); 59 } catch (Exception e) { 60 e.printStackTrace(); 61 } 62 63 break; 64 } 65 66 } 67 68 69 System.out.println("StringTask is over"); 70 System.out.println(stringTask.isDone()); 71 System.out.println(stringTask.isCancelled()); 72 73 } 74 }
###代码片段
1 List<Future<Integer>> futures = Lists.newArrayList(); 2 for (final AuditTask task : taskList) { 3 futures.add(ThreadUtils.CUSTOM_THREAD_EXECUTOR.submit(new Callable<Integer>() { 4 5 @Override 6 public Integer call() throws Exception { 7 try { 8 sendToAkaOperation(task); 9 return COUNT_ONE; 10 } catch (Exception e) { 11 return COUNT_ZERO; 12 } 13 } 14 })); 15 } 16 17 int count = 0; 18 for (Future<Integer> future : futures) { 19 try { 20 //阻塞,可能由于某个任务的执行影响整个主任务 21 count = count + future.get(); 22 } catch (Exception e) { 23 log.error("一次推送auditPageDataWide到AKA任务异常:" + e.getMessage()); 24 } 25 } 26 log.info("一次送审任务结束,本次送审任务完成送审AKA物料数目: " + count); 27 }
### guava future
ListenableFuture是可以监听的Future,如果任务完成或者失败可以自动回调函数。这样多个任务可以异步执行,并且可以实现无阻塞的获取每个任务的执行结果。
1 public class TestListenableFuture { 2 final static ListeningExecutorService LISTENING_EXECUTOR_SERVICE = MoreExecutors.listeningDecorator(Executors.newCachedThreadPool()); 3 4 public static void main(String[] args) { 5 ListenableFuture<Boolean> booleanTask = LISTENING_EXECUTOR_SERVICE.submit(new Callable<Boolean>() { 6 @Override 7 public Boolean call() throws Exception { 8 TimeUnit.SECONDS.sleep(5); 9 return true; 10 } 11 }); 12 13 Futures.addCallback(booleanTask, new FutureCallback<Boolean>() { 14 @Override 15 public void onSuccess( Boolean aBoolean) { 16 System.out.println("BooleanTask : " + aBoolean); 17 } 18 19 @Override 20 public void onFailure(Throwable throwable) { 21 22 } 23 }); 24 25 ListenableFuture<String> stringTask = LISTENING_EXECUTOR_SERVICE.submit(new Callable<String>() { 26 @Override 27 public String call() throws Exception { 28 return "Hello world"; 29 } 30 }); 31 32 Futures.addCallback(stringTask, new FutureCallback<String>() { 33 @Override 34 public void onSuccess(String aBoolean) { 35 System.out.println(Thread.currentThread().getName()); 36 System.out.println("StringTask : " + aBoolean); 37 } 38 39 @Override 40 public void onFailure(Throwable throwable) { 41 42 } 43 }); 44 45 } 46 }
输出结果:
main
StringTask : Hello world
BooleanTask : true