Executor接口
如果查看jdk文档,会发现java线程池都源自于这个超级接口Executor,但是这个接口本身比较简单:
public interface Executor {
/**
在未来某个时间执行给定的命令。该命令可能在新的线程、已入池的线程或者正调用的线程中执行,
这由 Executor 实现决定。
*
* @param command the runnable task
* @throws RejectedExecutionException if this task cannot be
* accepted for execution.
* @throws NullPointerException if command is null
*/
void execute(Runnable command);
}
可以看到Executor
中只有一个execute
方法。此接口提供一种将任务提交与每个任务将如何运行的机制分离开来的方法,相比较为每个人物调用new Thread(Runnable r).start()
,我们更偏向于使用Executor
(执行器)来运行任务:
Executor executor = anExecutor;
executor.execute(new RunnableTask1());
executor.execute(new RunnableTask2());
...
实现一个执行器也很简单:
class ThreadPerTaskExecutor implements Executor {
public void execute(Runnable r) {
new Thread(r).start();
}
}
ExecutorService接口
Executor
提供的方法太少了!根本不能满足日常所需,而从它派生下来的接口ExecutorService
则显得更通用,毕竟它也是个Service。
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
...
}
可以看到,ExecutorService
接口中包含了我们平常使用的线程池的绝大多数方法,其中的一些方法在上文已经介绍过了。
AbstractExecutorService
AbstractExecutorService是一个抽象类,并且实现了ExecutorService接口。
public abstract class AbstractExecutorService implements ExecutorService
在这个类中,提供了ExecutorService
一些方法的默认实现,比如submit
,invokeAll
,首先看submit
的实现:
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
其中使用了newTaskFor
方法:
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
newTaskFor
方法只是简单的将给定可调用任务包装成一个RunnableFuture
,使其具有取消运行的特性。而submit
中直接将任务交给execute()
运行.
再来看invokeAll()
:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
//创建一个list保存所有的结果
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f); //运行任务
}
for (Future<T> f : futures) {
if (!f.isDone()) { //依次取结果
try {
f.get(); //这里使用get是为了等待运行完成,如果没完成就会阻塞
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done) //如果发生异常,则取消所有任务
for (Future<T> f : futures)
f.cancel(true);
}
}