基本介绍
ExecutorCompletionService
,JUC中提供的CompletionService
接口实现,用于实现有限任务的执行以及任务结果的有序返回(先执行完的先被取出)
想象这样一种场景,有N个给定的计算任务,每个计算任务耗时各不相同,同时每个计算任务会返回一个结果,应用程序需要做到当每个计算任务完成之后,尽快地对计算结果进行处理(展示,二次处理等等)。假设我们用普通的ThreadPoolExecutor
去submit这些任务并得到执行结果的future,但是此时调用future.get必然会阻塞等待,且当前阻塞等待的任务有可能刚好是耗时时间最长的任务,那么后面那些很快就执行完的任务反而没法对其执行结果进行快速处理
ExecutorCompletionService
的存在,就是为了解决这一类似的问题,它能够做到,最快完成的任务,最快拿到其执行结果
使用示例
public class ExecutorCompletionServiceTest {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(8);
((ThreadPoolExecutor) executor).prestartAllCoreThreads();
ExecutorCompletionService<String> completionService = new ExecutorCompletionService<String>(executor);
int taskSize = 8;
for (int i = 0; i < taskSize; i++) {
final int order = i;
completionService.submit(new Callable<String>() {
@Override
public String call()
throws Exception {
int num = new Random(System.nanoTime()).nextInt(10);
TimeUnit.SECONDS.sleep(num);
return String.format("task%s finish, cost %s s", order, num);
}
});
}
for (int i = 0; i < taskSize; i++) {
try {
System.out.println(completionService.take().get());
} catch (Exception e) {
e.printStackTrace();
}
}
executor.shutdown();
}
}
console输出:
task1 finish, cost 1 s
task4 finish, cost 1 s
task3 finish, cost 3 s
task0 finish, cost 5 s
task5 finish, cost 6 s
task6 finish, cost 7 s
task2 finish, cost 8 s
task7 finish, cost 9 s
当任务的耗时太短(ms级别)的时候,貌似极有可能出现耗时长的任务先被取出来?跟CPU有关?跟真正的threadpool task执行时间有关?
核心原理
前面已经提到了ExecutorCompletionService
的核心功能,那么实现的原理主要有以下几点:
- 聚合executor实现任务的提交
- 扩展FutureTask的实现,当executor执行完task之后,回调其done方法并将task结果输出到提供的阻塞队列中
源码分析
类继承关系
ExecutorCompletionService implements CompletionService
CompletionService
提供几个需要实现的方法,主要提供的方法是任务的提交(与ExecutorService提供的没啥区别)和Future的获取
public interface CompletionService<V> {
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
// 尝试获取已经执行结束的任务的future,若没有则阻塞等待
Future<V> take() throws InterruptedException;
// 尝试获取已经执行结束的任务的future,若没有则返回null
Future<V> poll();
// 尝试超时获取已经执行结束的任务的future,若在规定时间内没有获取到,则返回null
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}
下面的分析主要通过前面章节“核心原理”的讲述进行分析:
(1)聚合executor实现任务的提交
在其构造方法和submit方法中体现:
先看看构造方法,规定必须提供Executor(可以是JUC提供的executor实现or第三方实现),可选择是否提供阻塞队列(JUC任意阻塞队列实现or第三方实现)
public ExecutorCompletionService(Executor executor) {
if (executor == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = new LinkedBlockingQueue<Future<V>>();
}
public ExecutorCompletionService(Executor executor,
BlockingQueue<Future<V>> completionQueue) {
if (executor == null || completionQueue == null)
throw new NullPointerException();
this.executor = executor;
this.aes = (executor instanceof AbstractExecutorService) ?
(AbstractExecutorService) executor : null;
this.completionQueue = completionQueue;
}
再看看submit方法,其实就是调用了executor的execute方法:
public Future<V> submit(Callable<V> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task);
executor.execute(new QueueingFuture(f));
return f;
}
public Future<V> submit(Runnable task, V result) {
if (task == null) throw new NullPointerException();
RunnableFuture<V> f = newTaskFor(task, result);
executor.execute(new QueueingFuture(f));
return f;
}
注意一下newTaskFor的处理,若传入的executor是AbstractExecutorService
的实现,则使用其提供的newTaskFor,否则,则使用JUC自带的默认实现FutureTask
:
private RunnableFuture<V> newTaskFor(Callable<V> task) {
if (aes == null)
return new FutureTask<V>(task);
else
return aes.newTaskFor(task);
}
private RunnableFuture<V> newTaskFor(Runnable task, V result) {
if (aes == null)
return new FutureTask<V>(task, result);
else
return aes.newTaskFor(task, result);
}
在execute的时候,对task的包装使用的是内置的QueueingFuture
实现类
(2)扩展FutureTask的实现,当executor执行完task之后,回调其done方法并将task结果输出到提供的阻塞队列中
前面说到,对task的包装以及执行使用的是内置的QueueingFuture
实现:
private class QueueingFuture extends FutureTask<Void> {
QueueingFuture(RunnableFuture<V> task) {
super(task, null);
this.task = task;
}
protected void done() { completionQueue.add(task); }
private final Future<V> task;
}
最关键的点就是重写了FutureTask
的done方法,我们知道,在JUC的线程池实现中,当task(FutureTask及其子类)执行结束后,会回调task的done实现,这里重写了done,将futuretask丢到提供的阻塞队列
(3)取执行完的future
实际上就是直接调用了阻塞队列的相关方法:
public Future<V> take() throws InterruptedException {
return completionQueue.take();
}
public Future<V> poll() {
return completionQueue.poll();
}
public Future<V> poll(long timeout, TimeUnit unit)
throws InterruptedException {
return completionQueue.poll(timeout, unit);
}