Jdk1.6 JUC源码解析(20)-Executors
作者:大飞
功能简介:
- Executors是JUC包提供的一个工具性质的帮助类,它针对ExecutorService、ScheduledExecutorService、ThreadFactory和Callable提供了一系列工厂方法和工具方法。
源码分析:
- 首先看下针对ExecutorService提供的一些工厂方法:
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
通过之前文章中对ThreadPoolExecutor的分析可知:
1.这个方法创建了一个核心线程数量和最大线程数量一致的,并且任务队列是无界队列的线程池。 2.由于默认核心线程不会超时,所以超时相关的参数也没有意义。 3.如果在线程关闭之前,一个工作线程由于某种原因挂了,那么线程池会自动补上一个新的工作线程。
public static ExecutorService newFixedThreadPool(int nThreads, ThreadFactory threadFactory) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory);
除了能定制ThreadFactory之外,和上个方法一样。
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
这个工厂方法看上去有点类似newFixedThreadPool(1) ,但有一点儿区别,这个不能重新调整配置(比如动态增大核心线程数量)了,由于方法内返回的不是ThreadPoolExecutor实例,而是一个包装类:
static class FinalizableDelegatedExecutorService
extends DelegatedExecutorService {
FinalizableDelegatedExecutorService(ExecutorService executor) {
super(executor);
}
protected void finalize() {
super.shutdown();//被垃圾回收时,关闭线程池。
}
}
/**
* 包装类,方法代理到内部的ExecutorService,只暴漏ExecutorService定义的方法。
*/
static class DelegatedExecutorService extends AbstractExecutorService {
private final ExecutorService e;
DelegatedExecutorService(ExecutorService executor) { e = executor; }
public void execute(Runnable command) { e.execute(command); }
public void shutdown() { e.shutdown(); }
public List<Runnable> shutdownNow() { return e.shutdownNow(); }
public boolean isShutdown() { return e.isShutdown(); }
public boolean isTerminated() { return e.isTerminated(); }
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
return e.awaitTermination(timeout, unit);
}
public Future<?> submit(Runnable task) {
return e.submit(task);
}
public <T> Future<T> submit(Callable<T> task) {
return e.submit(task);
}
public <T> Future<T> submit(Runnable task, T result) {
return e.submit(task, result);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
return e.invokeAll(tasks);
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
return e.invokeAll(tasks, timeout, unit);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
return e.invokeAny(tasks);
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return e.invokeAny(tasks, timeout, unit);
}
}
public static ExecutorService newSingleThreadExecutor(ThreadFactory threadFactory) {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>(),
threadFactory));
}
除了能定制ThreadFactory之外,和上个方法一样。
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
通过之前文章中对ThreadPoolExecutor的分析可知: 1.这个方法创建了一个核心线程数量为0,最大线程(可以认为)无上限,并且任务队列是同步队列(无实际容量)的线程池。 2.针对每一个新任务,如果当前没有空闲线程,都会创建一个新的工作线程来处理任务。工作线程默认空闲超过60秒超时被回收。
public static ExecutorService newCachedThreadPool(ThreadFactory threadFactory) {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>(),
threadFactory);
除了能定制ThreadFactory之外,和上个方法一样。
public static ExecutorService unconfigurableExecutorService(ExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedExecutorService(executor);
}
DelegatedExecutorService这面已经看到过,这个方法就相当于将一个ExecutorService包装成一个不可配置的ExecutorService。
- 继续看下针对ScheduledExecutorService提供的一些工厂方法:
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
public static ScheduledExecutorService newScheduledThreadPool(
int corePoolSize, ThreadFactory threadFactory) {
return new ScheduledThreadPoolExecutor(corePoolSize, threadFactory);
}
通过之前文章中对ScheduledThreadPoolExecutor的分析可知:
1.这个方法创建了一个给定(核心)线程数量的ScheduledThreadPoolExecutor(由于其内部的任务队列是无界的,所以尽管继承自ThreadPoolExecutor,但最大线程数量无意义)。
public static ScheduledExecutorService newSingleThreadScheduledExecutor() {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1));
}
public static ScheduledExecutorService newSingleThreadScheduledExecutor(ThreadFactory threadFactory) {
return new DelegatedScheduledExecutorService
(new ScheduledThreadPoolExecutor(1, threadFactory));
}
类似newScheduledThreadPool(1) (这个其实看起来更像是一个加强版的Timer),但不能调整配置:
static class DelegatedScheduledExecutorService
extends DelegatedExecutorService
implements ScheduledExecutorService {
private final ScheduledExecutorService e;
DelegatedScheduledExecutorService(ScheduledExecutorService executor) {
super(executor);
e = executor;
}
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
return e.schedule(command, delay, unit);
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) {
return e.schedule(callable, delay, unit);
}
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, TimeUnit unit) {
return e.scheduleAtFixedRate(command, initialDelay, period, unit);
}
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, TimeUnit unit) {
return e.scheduleWithFixedDelay(command, initialDelay, delay, unit);
}
}
public static ScheduledExecutorService unconfigurableScheduledExecutorService(ScheduledExecutorService executor) {
if (executor == null)
throw new NullPointerException();
return new DelegatedScheduledExecutorService(executor);
}
相当于将一个ScheduledExecutorService包装成一个不可配置的ScheduledExecutorService。
- 再看下针对ThreadFactory提供的一些工厂方法:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
返回一个DefaultThreadFactory实例。在创建ThreadPoolExecutor和ScheduledThreadPoolExecutor时如果没有显式指定ThreadFactory,会默认使用这个,看下实现:
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
public static ThreadFactory privilegedThreadFactory() {
return new PrivilegedThreadFactory();
}
static class PrivilegedThreadFactory extends DefaultThreadFactory {
private final ClassLoader ccl;
private final AccessControlContext acc;
PrivilegedThreadFactory() {
super();
this.ccl = Thread.currentThread().getContextClassLoader();
this.acc = AccessController.getContext();
acc.checkPermission(new RuntimePermission("setContextClassLoader"));
}
public Thread newThread(final Runnable r) {
return super.newThread(new Runnable() {
public void run() {
AccessController.doPrivileged(new PrivilegedAction<Object>() {
public Object run() {
Thread.currentThread().setContextClassLoader(ccl);
r.run();
return null;
}
}, acc);
}
});
}
}
privilegedThreadFactory和defaultThreadFactory返回的工厂类会创建设置相同的Thread,只是PrivilegedThreadFactory创建的Thread会使用和当前线程(创建线程)相同的访问控制和类加载器。
- 最后看下针对Callable提供的一些工具方法:
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<T>(task, result);
}
将一个Runnable和一个返回值包装成一个Callable,返回的这个适配类之前也见过:
static final class RunnableAdapter<T> implements Callable<T> {
final Runnable task;
final T result;
RunnableAdapter(Runnable task, T result) {
this.task = task;
this.result = result;
}
public T call() {
task.run();
return result;
}
}
public static Callable<Object> callable(Runnable task) {
if (task == null)
throw new NullPointerException();
return new RunnableAdapter<Object>(task, null);
}
上面方法的重载版本,返回值默认为null。
当然也会涉及到有访问控制和类加载器设定的工具方法:
public static Callable<Object> callable(final PrivilegedAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() { return action.run(); }};
}
public static Callable<Object> callable(final PrivilegedExceptionAction<?> action) {
if (action == null)
throw new NullPointerException();
return new Callable<Object>() {
public Object call() throws Exception { return action.run(); }};
}
public static <T> Callable<T> privilegedCallable(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallable<T>(callable);
}
public static <T> Callable<T> privilegedCallableUsingCurrentClassLoader(Callable<T> callable) {
if (callable == null)
throw new NullPointerException();
return new PrivilegedCallableUsingCurrentClassLoader<T>(callable);
}
static final class PrivilegedCallable<T> implements Callable<T> {
private final AccessControlContext acc;
private final Callable<T> task;
private T result;
private Exception exception;
PrivilegedCallable(Callable<T> task) {
this.task = task;
this.acc = AccessController.getContext();
}
public T call() throws Exception {
AccessController.doPrivileged(new PrivilegedAction<T>() {
public T run() {
try {
result = task.call();
} catch (Exception ex) {
exception = ex;
}
return null;
}
}, acc);
if (exception != null)
throw exception;
else
return result;
}
}
static final class PrivilegedCallableUsingCurrentClassLoader<T> implements Callable<T> {
private final ClassLoader ccl;
private final AccessControlContext acc;
private final Callable<T> task;
private T result;
private Exception exception;
PrivilegedCallableUsingCurrentClassLoader(Callable<T> task) {
this.task = task;
this.ccl = Thread.currentThread().getContextClassLoader();
this.acc = AccessController.getContext();
acc.checkPermission(new RuntimePermission("getContextClassLoader"));
acc.checkPermission(new RuntimePermission("setContextClassLoader"));
}
public T call() throws Exception {
AccessController.doPrivileged(new PrivilegedAction<T>() {
public T run() {
Thread t = Thread.currentThread();
try {
ClassLoader cl = t.getContextClassLoader();
if (ccl == cl) {
result = task.call();
} else {
t.setContextClassLoader(ccl);
try {
result = task.call();
} finally {
t.setContextClassLoader(cl);
}
}
} catch (Exception ex) {
exception = ex;
}
return null;
}
}, acc);
if (exception != null)
throw exception;
else
return result;
}
}
Executors的代码解析完毕! 参见:
Jdk1.6 JUC源码解析(17)-ThreadPoolExecutor
Jdk1.6 JUC源码解析(19)-ScheduledThreadPoolExecutor