Java 5 开始引入 Conccurent 软件包,提供完备的并发能力,对线程池有了更好的支持。其中,Executor 框架是最值得称道的。
Executor框架是指java 5中引入的一系列并发库中与executor相关的一些功能类,其中包括线程池,Executor,Executors,ExecutorService,CompletionService,Future,Callable等。并发编程的一种编程方式是把任务拆分为一些列的小任务,即Runnable,然后在提交给一个Executor执行,Executor.execute(Runnalbe) 。Executor在执行时使用内部的线程池完成操作。
一、创建线程池
Executors类,提供了一系列工厂方法用于创先线程池,返回的线程池都实现了ExecutorService接口。
public static ExecutorService newFixedThreadPool(int nThreads)
创建固定数目线程的线程池。
public static ExecutorService newCachedThreadPool()
创建一个可缓存的线程池,调用execute 将重用以前构造的线程(如果线程可用)。如果现有线程没有可用的,则创建一个新线程并添加到池中。终止并从缓存中移除那些已有 60 秒钟未被使用的线程。
public static ExecutorService newSingleThreadExecutor()
创建一个单线程化的Executor。
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize)
创建一个支持定时及周期性的任务执行的线程池,多数情况下可用来替代Timer类。
1 Executor executor = Executors.newFixedThreadPool(10); 2 Runnable task = new Runnable() { 3 4 @Override 5 6 public void run() { 7 8 System.out.println("task over"); 9 10 } 11 12 }; 13 14 executor.execute(task);
或者
1 executor = Executors.newScheduledThreadPool(10); 2 ScheduledExecutorService scheduler = (ScheduledExecutorService) executor; 3 scheduler.scheduleAtFixedRate(task, 10, 10, TimeUnit.SECONDS);
二、ExecutorService与生命周期
ExecutorService扩展了Executor并添加了一些生命周期管理的方法。一个Executor的生命周期有三种状态,运行 ,关闭 ,终止 。Executor创建时处于运行状态。当调用ExecutorService.shutdown()后,处于关闭状态,isShutdown()方法返回true。这时,不应该再想Executor中添加任务,所有已添加的任务执行完毕后,Executor处于终止状态,isTerminated()返回true。
如果Executor处于关闭状态,往Executor提交任务会抛出unchecked exception RejectedExecutionException。
1 ExecutorService executorService = (ExecutorService) executor; 2 3 while (!executorService.isShutdown()) { 4 5 try { 6 7 executorService.execute(task); 8 9 } catch (RejectedExecutionException ignored) { 10 11 12 13 } 14 15 } 16 17 executorService.shutdown();
三、使用Callable,Future返回结果
Future代表一个异步执行的操作,通过get()方法可以获得操作的结果,如果异步操作还没有完成,则,get()会使当前线程阻塞。FutureTask实现了Future和Runable。Callable代表一个有返回值得操作。
1 Callable func = new Callable(){ 2 3 public Integer call() throws Exception { 4 5 System.out.println("inside callable"); 6 7 Thread.sleep(1000); 8 9 return new Integer(8); 10 11 } 12 13 }; 14 15 FutureTask futureTask = new FutureTask(func); 16 17 Thread newThread = new Thread(futureTask); 18 19 newThread.start(); 20 21 22 23 try { 24 25 System.out.println("blocking here"); 26 27 Integer result = futureTask.get(); 28 29 System.out.println(result); 30 31 } catch (InterruptedException ignored) { 32 33 } catch (ExecutionException ignored) { 34 35 }
ExecutoreService提供了submit()方法,传递一个Callable,或Runnable,返回Future。如果Executor后台线程池还没有完成Callable的计算,这调用返回Future对象的get()方法,会阻塞直到计算完成。
例子:并行计算数组的和。
1 package executorservice; 2 3 4 5 import java.util.ArrayList; 6 7 import java.util.List; 8 9 import java.util.concurrent.Callable; 10 11 import java.util.concurrent.ExecutionException; 12 13 import java.util.concurrent.ExecutorService; 14 15 import java.util.concurrent.Executors; 16 17 import java.util.concurrent.Future; 18 19 import java.util.concurrent.FutureTask; 20 21 22 23 public class ConcurrentCalculator { 24 25 26 27 private ExecutorService exec; 28 29 private int cpuCoreNumber; 30 31 private List> tasks = new ArrayList>(); 32 33 34 35 // 内部类 36 37 class SumCalculator implements Callable { 38 39 private int[] numbers; 40 41 private int start; 42 43 private int end; 44 45 46 47 public SumCalculator(final int[] numbers, int start, int end) { 48 49 this.numbers = numbers; 50 51 this.start = start; 52 53 this.end = end; 54 55 } 56 57 58 59 public Long call() throws Exception { 60 61 Long sum = 0l; 62 63 for (int i = start; i < end; i++) { 64 65 sum += numbers[i]; 66 67 } 68 69 return sum; 70 71 } 72 73 } 74 75 76 77 public ConcurrentCalculator() { 78 79 cpuCoreNumber = Runtime.getRuntime().availableProcessors(); 80 81 exec = Executors.newFixedThreadPool(cpuCoreNumber); 82 83 } 84 85 86 87 public Long sum(final int[] numbers) { 88 89 // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor 90 91 for (int i = 0; i < cpuCoreNumber; i++) { 92 93 int increment = numbers.length / cpuCoreNumber + 1; 94 95 int start = increment * i; 96 97 int end = increment * i + increment; 98 99 if (end > numbers.length) 100 101 end = numbers.length; 102 103 SumCalculator subCalc = new SumCalculator(numbers, start, end); 104 105 FutureTask task = new FutureTask(subCalc); 106 107 tasks.add(task); 108 109 if (!exec.isShutdown()) { 110 111 exec.submit(task); 112 113 } 114 115 } 116 117 return getResult(); 118 119 } 120 121 122 123 124 125 public Long getResult() { 126 127 Long result = 0l; 128 129 for (Future task : tasks) { 130 131 try { 132 133 // 如果计算未完成则阻塞 134 135 Long subSum = task.get(); 136 137 result += subSum; 138 139 } catch (InterruptedException e) { 140 141 e.printStackTrace(); 142 143 } catch (ExecutionException e) { 144 145 e.printStackTrace(); 146 147 } 148 149 } 150 151 return result; 152 153 } 154 155 156 157 public void close() { 158 159 exec.shutdown(); 160 161 } 162 163 }
Main
1 int[] numbers = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 10, 11 }; 2 3 ConcurrentCalculator calc = new ConcurrentCalculator(); 4 5 Long sum = calc.sum(numbers); 6 7 System.out.println(sum); 8 9 calc.close();
四、CompletionService
在刚在的例子中,getResult()方法的实现过程中,迭代了FutureTask的数组,如果任务还没有完成则当前线程会阻塞,如果我们希望任意字任务完成后就把其结果加到result中,而不用依次等待每个任务完成,可以使CompletionService。生产者submit()执行的任务。使用者take()已完成的任务,并按照完成这些任务的顺序处理它们的结果 。也就是调用CompletionService的take方法是,会返回按完成顺序放回任务的结果,CompletionService内部维护了一个阻塞队列BlockingQueue,如果没有任务完成,take()方法也会阻塞。修改刚才的例子使用CompletionService:
1 public class ConcurrentCalculator2 { 2 3 4 5 private ExecutorService exec; 6 7 private CompletionService completionService; 8 9 10 11 12 13 private int cpuCoreNumber; 14 15 16 17 // 内部类 18 19 class SumCalculator implements Callable { 20 21 ...... 22 23 } 24 25 26 27 public ConcurrentCalculator2() { 28 29 cpuCoreNumber = Runtime.getRuntime().availableProcessors(); 30 31 exec = Executors.newFixedThreadPool(cpuCoreNumber); 32 33 completionService = new ExecutorCompletionService(exec); 34 35 36 37 38 39 } 40 41 42 43 public Long sum(final int[] numbers) { 44 45 // 根据CPU核心个数拆分任务,创建FutureTask并提交到Executor 46 47 for (int i = 0; i < cpuCoreNumber; i++) { 48 49 int increment = numbers.length / cpuCoreNumber + 1; 50 51 int start = increment * i; 52 53 int end = increment * i + increment; 54 55 if (end > numbers.length) 56 57 end = numbers.length; 58 59 SumCalculator subCalc = new SumCalculator(numbers, start, end); 60 61 if (!exec.isShutdown()) { 62 63 completionService.submit(subCalc); 64 65 66 67 68 69 } 70 71 72 73 } 74 75 return getResult(); 76 77 } 78 79 80 81 82 83 public Long getResult() { 84 85 Long result = 0l; 86 87 for (int i = 0; i < cpuCoreNumber; i++) { 88 89 try { 90 91 Long subSum = completionService.take().get(); 92 93 result += subSum; 94 95 } catch (InterruptedException e) { 96 97 e.printStackTrace(); 98 99 } catch (ExecutionException e) { 100 101 e.printStackTrace(); 102 103 } 104 105 } 106 107 return result; 108 109 } 110 111 112 113 public void close() { 114 115 exec.shutdown(); 116 117 } 118 119 }