Jdk1.6 JUC源码解析(17)-ThreadPoolExecutor
作者:大飞
功能简介:
- ThreadPoolExecutor是JUC包中提供的线程池,使用ThreadPoolExecutor的好处一方面是能重用线程资源,避免重复创建线程带来的开销;另一方面是ThreadPoolExecutor提供了内部资源(线程、任务)的管理功能,方便我们监控线程池工作状态。
源码分析:
- 首先,ThreadPoolExecutor继承了AbstractExecutorService,实现了ExecutorService和Executor,先自顶向下简单分析下这些类:
public interface Executor {
/**
* 在未来的某个时刻执行给定的命令。命令可能在一个新线程中执行,
* 也可能在一个线程池中执行,也可能被调用线程执行,取决于实现。
*/
void execute(Runnable command);
}
public interface ExecutorService extends Executor {
/**
* 关闭当前执行器,关闭之前提交的任务仍然会被执行,但不
* 会接收新的任务。如果当前执行服务已经关闭,那么调用这
* 个方法不会产生额外的影响。
*/
void shutdown();
/**
* 尝试停止所有正在执行的任务,停止处理正在等待的任务,
* 并返回等待执行任务列表。
*
* 并不能确保一定可以停止正在执行的任务。比如,通常的实现
* 方式是中断执行任务的线程,但如果任务执行过程中并不响应
* 中断,那就无法停止这个任务。
*/
List<Runnable> shutdownNow();
/**
* 判断当前执行器是否已经关闭。
*/
boolean isShutdown();
/**
* 判断在执行器关闭后,是否所有的任务都执行完毕。
*/
boolean isTerminated();
/**
* 在一个关闭请求后,阻塞等待,直到所有任务都执行完毕,或者
* 超时,或者当前线程被中断。
*/
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 提交一个有返回结果的任务,并返回一个Future。
*/
<T> Future<T> submit(Callable<T> task);
/**
* 提交一个Runnable任务和返回结果,并返回一个Future。
*/
<T> Future<T> submit(Runnable task, T result);
/**
* 提交一个Runnable任务和返回结果,并返回一个Future。
* 如果任务执行完毕,调用Future的get方法返回null。
*/
Future<?> submit(Runnable task);
/**
* 执行一批任务,当所有任务都执行完毕后,以Future集合
* 集合的形式返回结果。
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException;
/**
* 执行一批任务,当所有任务都执行完毕或者超时,以Future集合
* 集合的形式返回结果。注意如果超时的话,没有执行完的任务会
* 被取消。
*/
<T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 执行一批任务,如果其中有一个任务完成,就返回结果。
* 其他没有完成的任务会被取消。
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException;
/**
* 执行一批任务,如果其中有一个任务完成或者超时,就返回结果。
* 其他没有完成的任务会被取消。
*/
<T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException;
}
AbstractExecutorService类中有一些方法实现,看下:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
这两个方法是把Runnable和Callable转换成了RunnableFuture。
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Object> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
上面是一系列提交任务的方法,内部都是先转成RunnableFuture,然后提交到执行器,然后返回这些RunnableFuture(异步任务)。
/**
* invokeAny方法的主流程。
*/
private <T> T doInvokeAny(Collection<? extends Callable<T>> tasks,
boolean timed, long nanos)
throws InterruptedException, ExecutionException, TimeoutException {
if (tasks == null)
throw new NullPointerException();
int ntasks = tasks.size();
if (ntasks == 0)
throw new IllegalArgumentException();
List<Future<T>> futures= new ArrayList<Future<T>>(ntasks);
ExecutorCompletionService<T> ecs =
new ExecutorCompletionService<T>(this);
// 处于性能考虑,尤其在并行有限的情况下(比如单核处理器),
// 下面的程序会放一个任务到ecs,然后查看这个任务是否完成,
// 如果没完成,再放一个。。。(而不是一起放进去)。
// 这个过程还会记录异常,如果都没有执行成功,会抛出最后
// 记录的异常。最后会把没执行完的任务取消掉。
try {
ExecutionException ee = null;
long lastTime = (timed)? System.nanoTime() : 0;
Iterator<? extends Callable<T>> it = tasks.iterator();
// Start one task for sure; the rest incrementally
futures.add(ecs.submit(it.next()));
--ntasks;
int active = 1;
for (;;) {
Future<T> f = ecs.poll();
if (f == null) {
if (ntasks > 0) {
--ntasks;
futures.add(ecs.submit(it.next()));
++active;
}
else if (active == 0)
break;
else if (timed) {
f = ecs.poll(nanos, TimeUnit.NANOSECONDS);
if (f == null)
throw new TimeoutException();
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
else
f = ecs.take();
}
if (f != null) {
--active;
try {
return f.get();
} catch (InterruptedException ie) {
throw ie;
} catch (ExecutionException eex) {
ee = eex;
} catch (RuntimeException rex) {
ee = new ExecutionException(rex);
}
}
}
if (ee == null)
ee = new ExecutionException();
throw ee;
} finally {
for (Future<T> f : futures)
f.cancel(true);
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks)
throws InterruptedException, ExecutionException {
try {
return doInvokeAny(tasks, false, 0);
} catch (TimeoutException cannotHappen) {
assert false;
return null;
}
}
public <T> T invokeAny(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
return doInvokeAny(tasks, true, unit.toNanos(timeout));
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks)
throws InterruptedException {
if (tasks == null)
throw new NullPointerException();
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
//把所有的任务一起提交到执行器。
for (Callable<T> t : tasks) {
RunnableFuture<T> f = newTaskFor(t);
futures.add(f);
execute(f);
}
//等待所有的任务都执行完成。
for (Future<T> f : futures) {
if (!f.isDone()) {
try {
f.get();
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
}
}
}
done = true;
return futures;
} finally {
if (!done) //如果没执行完成(比如被中断),那么取消没完成的任务。
for (Future<T> f : futures)
f.cancel(true);
}
}
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks,
long timeout, TimeUnit unit)
throws InterruptedException {
if (tasks == null || unit == null)
throw new NullPointerException();
long nanos = unit.toNanos(timeout);
List<Future<T>> futures = new ArrayList<Future<T>>(tasks.size());
boolean done = false;
try {
for (Callable<T> t : tasks)
futures.add(newTaskFor(t));
long lastTime = System.nanoTime();
// 和上面invokeAll的逻辑相似,只是加入了超时判断
// 注意如果方法超时的话,返回的任务中也可能有完成的。
Iterator<Future<T>> it = futures.iterator();
while (it.hasNext()) {
execute((Runnable)(it.next()));
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0)
return futures;
}
for (Future<T> f : futures) {
if (!f.isDone()) {
if (nanos <= 0)
return futures;
try {
f.get(nanos, TimeUnit.NANOSECONDS);
} catch (CancellationException ignore) {
} catch (ExecutionException ignore) {
} catch (TimeoutException toe) {
return futures;
}
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
}
}
done = true;
return futures;
} finally {
if (!done)
for (Future<T> f : futures)
f.cancel(true);
}
}
}
- 接着就是重头戏,ThreadPoolExecutor本身的源码分析了,先看一下内部结构:
public class ThreadPoolExecutor extends AbstractExecutorService {
/**
* Permission for checking shutdown
*/
private static final RuntimePermission shutdownPerm =
new RuntimePermission("modifyThread");
volatile int runState; //线程池运行状态。
//下面是一系列线程池状态定义。
static final int RUNNING = 0; //表示正在运行。会接受新任务,并会处理任务队列中的任务。
static final int SHUTDOWN = 1; //表示已经关闭。不接受新任务,但仍然会处理任务队列中的任务。
static final int STOP = 2; //表示已经停止。不接受新任务,不处理任务队列中的任务,会中断正在执行的任务。
static final int TERMINATED = 3; //在STOP的基础上,在加上所有的任务都已经结束。
/**
* 任务队列,负责保存任务并将任务交给工作线程处理。
*/
private final BlockingQueue<Runnable> workQueue;
/**
* 在更新内部数据(如:线程数量,运行状态,工作线程集等)时要使用的锁。
*/
private final ReentrantLock mainLock = new ReentrantLock();
/**
* 用于支持awaitTermination的等待条件。
*/
private final Condition termination = mainLock.newCondition();
/**
* 包含所有工作类的集合。只能在持有mainLock的情况下使用。
*/
private final HashSet<Worker> workers = new HashSet<Worker>();
/**
* 空闲工作线程等待任务的超时时间,单位:纳秒。
* 当前线程数大于核心线程数时,超出的线程会使用这个超时时间。
* 如果设置了allowCoreThreadTimeOut,核心线程数也会使用这个
* 超时时间。否则,线程会一直等待新任务,不会超时。
*/
private volatile long keepAliveTime;
/**
* 如果为false(默认情况下),核心线程就算空闲也会一直存活。
* 如果为true,等待任务的核心线程会使用keepAliveTime作为
* 超时时间,如果超时,线程被回收。
*/
private volatile boolean allowCoreThreadTimeOut;
/**
* 核心线程数量,只能在持有mainLock的情况下修改。
* volatile可以保证可见性。
*/
private volatile int corePoolSize;
/**
* 最大线程数量,只能在持有mainLock的情况下修改。
* volatile可以保证可见性。
*/
private volatile int maximumPoolSize;
/**
* 当前线程数量,只能在持有mainLock的情况下修改。
* volatile可以保证可见性。
*/
private volatile int poolSize;
/**
* 当线程池饱和或者关闭时,负责处理新来任务的处理器,称为拒绝任务处理器。
*/
private volatile RejectedExecutionHandler handler;
/**
* 线程工厂。用于创建新线程。
*/
private volatile ThreadFactory threadFactory;
/**
* 记录曾经达到的最大的线程数量。
*/
private int largestPoolSize;
/**
* 统计任务完成数量的计数器。在工作线程终止的时候才会更新。
*/
private long completedTaskCount;
/**
* 默认的拒绝任务处理器。
*/
private static final RejectedExecutionHandler defaultHandler =
new AbortPolicy();
可见,ThreadPoolExecutor内部结构大体上是一个任务队列,一个工作类集合,一些状态数据还有一个拒绝任务处理器。
先看下构造方法,看看上面这些数据怎么初始化:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
threadFactory, defaultHandler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
RejectedExecutionHandler handler) {
this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
Executors.defaultThreadFactory(), handler);
}
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
构造方法都一目了然,只需要注意2点:1.内部的keepAliveTime都使用纳秒,所以构造方法中会有一个时间转换。2.如果不指定线程工厂,会使用Executors.defaultThreadFactory(),Executors后续文章会分析,这里简单看一下,线程工厂默认会创建相同线程组、非daemon、相同线程优先级、线程名称为[pool-线程池序列-thread-线程序列]的线程:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
/**
* The default thread factory
*/
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);
final ThreadGroup group;
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = "pool-" +
poolNumber.getAndIncrement() +
"-thread-";
}
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),
0);
if (t.isDaemon())
t.setDaemon(false);
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
构造方法看完了,下面来从使用线程池的主流程-提交任务到线程池来入手,开始分析线程池内部逻辑:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
主流程很清晰,过程如下:
1.如果当前线程数量大于等于核心线程数量,进入第2步;如果当前线程数量小于核心线程数量,那么尝试添加一个工作线程,同时让这个工作线程处理当前提交的任务,提交任务流程结束;如果添加工作线程失败,那么进入第2步。
2.首先判断当前线程池状态是否为正在运行,如果正在运行,就将当前任务放入任务队列中,然后进入第4步。
3.如果当前线程池状态不是正在运行,或者第2步中将任务放入任务队列失败(任务队列饱和),那么尝试添加一个工作线程,同时让这个工作线程处理当前提交的任务,但不能超时最大工作线程数。如果添加成功,提交任务流程结束;如果添加失败,使用拒绝任务处理器来处理任务。
4.再次检测线程池状态,确保刚添加的任务能被处理,提交任务流程结束。 看一下提交任务流程中调用到的addIfUnderCorePoolSize方法:
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask); //如果当前线程数量小于核心线程数量,并且当前线程池处于运行状态,那么添加一个工作线程。
} finally {
mainLock.unlock();
}
return t != null;
}
继续看一下添加工作线程的方法addThread:
private Thread addThread(Runnable firstTask) {
//创建一个工作类。
Worker w = new Worker(firstTask);
//基于工作类创建一个工作线程。
Thread t = threadFactory.newThread(w);
boolean workerStarted = false;
if (t != null) {
if (t.isAlive()) // 检查线程状态。
throw new IllegalThreadStateException();
w.thread = t;
workers.add(w); //将工作类添加到工作类集合。
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt; //记录曾经达到过的最大线程数量。
try {
t.start(); //启动工作类。
workerStarted = true;
}
finally {
if (!workerStarted)
workers.remove(w); //如果没启动成功,移除这个工作类。
}
}
return t;
}
重点看一下这个Worker类:
private final class Worker implements Runnable {
/**
* 这个锁放在任务执行前后,目的是防止中断要取消worker的线程。
*/
private final ReentrantLock runLock = new ReentrantLock();
/**
* 在Worker进入主循环后首先要执行的初始任务,可能为null。
*/
private Runnable firstTask;
/**
* 记录单个线程完成的任务数量,当前worker终止时会累计到线程池中的总完成任务数量上。
*/
volatile long completedTasks;
/**
* 运行当前worker的线程。只有线程在被创建时才能设置到这个域上,行为类似final域。
*/
Thread thread;
/**
* 记录赋予当前worker线程是否已经运行过run方法。
* 这样的线程才能被中断。
*/
volatile boolean hasRun = false;
Worker(Runnable firstTask) {
this.firstTask = firstTask;
}
boolean isActive() {
return runLock.isLocked();
}
/**
* 如果当前worker没有在执行任务,中断worker的线程。
*/
void interruptIfIdle() {
final ReentrantLock runLock = this.runLock;
if (runLock.tryLock()) {
try {
if (hasRun && thread != Thread.currentThread())
thread.interrupt();
} finally {
runLock.unlock();
}
}
}
/**
* 中断worker线程,不管worker是否在执行任务。
*/
void interruptNow() {
if (hasRun)
thread.interrupt();
}
/**
* Runs a single task between before/after methods.
*/
private void runTask(Runnable task) {
final ReentrantLock runLock = this.runLock;
runLock.lock();
try {
/*
* 如果线程池停止了,确保当前工作线程被中断。否则,确保
* 当前工作线程不被中断(清空下中断标记)
* 这里需要两次检测,以避免中断标记同时被一个shutdownNow
* 调用清空(如果是这种情况,需要重新中断线程)。
*/
if ((runState >= STOP ||
(Thread.interrupted() && runState >= STOP)) &&
hasRun)
thread.interrupt();
/*
* 这里记录一个运行完成标记,来确保afterExecute方法只
* 在任务完成后或者执行任务过程中抛错时被调用。否则,
* 捕获的运行时异常可能来自afterExecute本身,这样我们
* 就不能再次调用afterExecute方法了。
*/
boolean ran = false;
beforeExecute(thread, task);
try {
task.run();
ran = true;
afterExecute(task, null);
++completedTasks;
} catch (RuntimeException ex) {
if (!ran)
afterExecute(task, ex);
throw ex;
}
} finally {
runLock.unlock();
}
}
/**
* 主循环。
*/
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) {
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
}
可见,在worker运行方法的主循环内部逻辑如下:
1.首先判断有没有firstTask,有就执行这个任务。
2.如果没有firstTask,就通过getTask方法从任务队列中来获取任务(这个过程可能会阻塞),获取成功后执行任务。
3.如果通过getTask方法没能获取任务(返回null),那么当前worker退出。 还要注意,执行具体任务时,还会在执行前后调用两个方法beforeExecute和afterExecute,这两个方法在本类中都是空实现,作为钩子方法交由子类去实现。 重点看下这个getTask方法:
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null; // 线程池已经停止或终止,获取不到任务。
Runnable r;
if (state == SHUTDOWN)
r = workQueue.poll(); // 如果线程池已经关闭,还可以从任务队列中取任务。
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
/*
* 如果当前线程数量多于核心线程数量或者允许核心线程超时,
* 才会调用带超时的poll方法,如果超时时间内无法获取任务
* 那么当前worker后继就会被干掉了。
*/
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take(); //否则,调用take方法,阻塞直到有任务可以获取。
if (r != null)
return r;
//如果没有获取到任务,检测一下当前worker是否可以退出。
if (workerCanExit()) {
if (runState >= SHUTDOWN)
interruptIdleWorkers(); // 如果线程池已经关闭,中断空闲的worker。
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
private boolean workerCanExit() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean canExit;
try {
canExit = runState >= STOP ||
workQueue.isEmpty() ||
(allowCoreThreadTimeOut &&
poolSize > Math.max(1, corePoolSize));
} finally {
mainLock.unlock();
}
return canExit;
}
void interruptIdleWorkers() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (Worker w : workers)
w.interruptIfIdle();
} finally {
mainLock.unlock();
}
}
getTask中的逻辑很清晰了。在worker主循环中,如果getTask返回null,那么worker主循环会退出,然后执行finally块中的workerDone方法,看下这个方法:
void workerDone(Worker w) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//终结worker之前先将其内部统计的任务完成数量累计到线程池中的总数上来。
completedTaskCount += w.completedTasks;
workers.remove(w); //将其从worker集中移除。
if (--poolSize == 0)
tryTerminate(); //如果这时核心线程数为0了,那么尝试终结线程池。
} finally {
mainLock.unlock();
}
}
private void tryTerminate() {
if (poolSize == 0) {
int state = runState;
if (state < STOP && !workQueue.isEmpty()) {
state = RUNNING; // 把state设置成RUNNING是为了让代码不进下面的条件。
addThread(null); //如果线程池只是关闭,还没停止(还需要处理任务队列里的线程),那么添加一个工作线程。
}
if (state == STOP || state == SHUTDOWN) {
//任务队列为空了,工作线程也没有了,可以终结线程池了!
runState = TERMINATED;
termination.signalAll();
terminated();
}
}
}
可以看到,如果worker被移除后,当前没有工作线程了,那么会尝试终结线程池。如果成功的终结了线程池,会唤醒一个条件上等待的线程(比如其他调用awaitTermination后,在这个条件上等待的线程),还会调一个钩子方法terminated。 好了,继续回到execute方法,继续看里面调用的ensureQueuedTaskHandled方法:
private void ensureQueuedTaskHandled(Runnable command) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try {
int state = runState;
//如果当前线程池状态不是正在运行,尝试从任务队列中删除当前任务。
if (state != RUNNING && workQueue.remove(command))
reject = true; //如果删除任务成功,设置拒绝任务标记。
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
/*
* 如果线程池的任务状态表示还需要处理任务队列中的任务,
* 且当前线程数量小于核心线程数量,且任务队列中还有任务。
* 那么添加一个工作线程。
*/
t = addThread(null);
} finally {
mainLock.unlock();
}
if (reject)
reject(command); //有拒绝任务标记的话,执行拒绝任务处理。
}
void reject(Runnable command) {
handler.rejectedExecution(command, this);
}
再看一下execute方法中调用的addIfUnderMaximumPoolSize方法:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//这个逻辑很简单,如果当前线程正在运行且当前线程数小于最大线程数,那么添加一个工作线程。
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
我们平时使用ThreadPoolExecutor,除了execute方法以外,最常使用的就是关闭系列的方法了,首先看下shutdown方法:
public void shutdown() {
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (security != null) { // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}
int state = runState;
if (state < SHUTDOWN)
runState = SHUTDOWN; //设置线程池运行状态为SHUTDOWN。
try {
for (Worker w : workers) {
w.interruptIfIdle(); //注意关闭后还能处理任务队列中的任务,所以这里只将空闲的工作线程中断。
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
tryTerminate(); // 尝试终止线程池。
} finally {
mainLock.unlock();
}
}
再看下shutdownNow方法:
public List<Runnable> shutdownNow() {
/*
* shutdownNow和shutdown方法在几个地方有所区别:
* 1. 运行状态设置为STOP。
* 2. 中断所有的工作线程,不仅仅是空闲的工作线程。
* 3. 清空任务队列,并返回任务队列中的任务。
*/
SecurityManager security = System.getSecurityManager();
if (security != null)
security.checkPermission(shutdownPerm);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (security != null) { // Check if caller can modify our threads
for (Worker w : workers)
security.checkAccess(w.thread);
}
int state = runState;
if (state < STOP)
runState = STOP;
try {
for (Worker w : workers) {
w.interruptNow();
}
} catch (SecurityException se) { // Try to back out
runState = state;
// tryTerminate() here would be a no-op
throw se;
}
List<Runnable> tasks = drainQueue();
tryTerminate(); // Terminate now if pool and queue empty
return tasks;
} finally {
mainLock.unlock();
}
}
/**
* 将任务队列中的任务清除,放到一个新建的集合中。
*/
private List<Runnable> drainQueue() {
List<Runnable> taskList = new ArrayList<Runnable>();
workQueue.drainTo(taskList);
/*
* 如果当前的任务队列是延迟队列或其他类型的队列,
* 可能通过drainTo方法并不能清空所有的任务。
* 所以这里需要验证一下队列是否为空,不为空的话,
* 需要手工迭代。注意由于一般阻塞队列的迭代器都
* 是弱一致的,所以这里为了保证原子性,需要每次
* 创建一个新的迭代器。
*/
while (!workQueue.isEmpty()) {
Iterator<Runnable> it = workQueue.iterator();
try {
if (it.hasNext()) {
Runnable r = it.next();
if (workQueue.remove(r))
taskList.add(r);
}
} catch (ConcurrentModificationException ignore) {
}
}
return taskList;
}
最后看下awaitTermination方法:
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
for (;;) {
if (runState == TERMINATED)
return true;
if (nanos <= 0)
return false;
// 这里会在终结条件上等待(或等待超时),直到线程池完全终结后会唤醒这里等待的线程。。
nanos = termination.awaitNanos(nanos);
}
} finally {
mainLock.unlock();
}
}
ThreadPoolExecutor中主要流程涉及到的方法都分析完毕了,看看其他的一些方法。 平时我们使用ThreadPoolExecutor的时候,貌似使用之后也没有显示的调用关闭方法,但线程池最终也还是关闭了(不在对其进行引用),怎么回事呢?看看下面这个方法:
protected void finalize() {
shutdown();
}
这个方法一定比较熟悉,Java对象的析构方法,对象要被收回时这个方法就会被调用。 还有另外一个疑问,从上面的分析可知,只有我们调用execute时才会创建工作线程。那如果初始的任务队列里就有任务,而我们想在提交新任务到线程池之前就让它先执行初始队列里的任务的话,该怎么做呢?看看下面的方法:
public boolean prestartCoreThread() {
return addIfUnderCorePoolSize(null);
}
public int prestartAllCoreThreads() {
int n = 0;
while (addIfUnderCorePoolSize(null))
++n;
return n;
}
prestartCoreThread方法会(预先)启动一个核心线程,prestartAllCoreThreads会将所有的核心线程启动起来,并返回启动的数量(假设核心线程为6,已经有2个已经启动,这个方法就会启动4个核心线程,并返回4)。 ThreadPoolExecutor还提供了一个清理任务队列中已取消任务的方法:
public void purge() {
// Fail if we encounter interference during traversal
try {
Iterator<Runnable> it = getQueue().iterator();
while (it.hasNext()) {
Runnable r = it.next();
if (r instanceof Future<?>) {
Future<?> c = (Future<?>)r;
if (c.isCancelled())
it.remove();
}
}
}
catch (ConcurrentModificationException ex) {
return;
}
}
还有一些内部状态监控方法:
public boolean isShutdown() {
return runState != RUNNING;
}
boolean isStopped() {
return runState == STOP;
}
public boolean isTerminating() {
int state = runState;
return state == SHUTDOWN || state == STOP;
}
public boolean isTerminated() {
return runState == TERMINATED;
}
public void setThreadFactory(ThreadFactory threadFactory) {
if (threadFactory == null)
throw new NullPointerException();
this.threadFactory = threadFactory;
}
public ThreadFactory getThreadFactory() {
return threadFactory;
}
public void setRejectedExecutionHandler(RejectedExecutionHandler handler) {
if (handler == null)
throw new NullPointerException();
this.handler = handler;
}
public RejectedExecutionHandler getRejectedExecutionHandler() {
return handler;
}
public void setCorePoolSize(int corePoolSize) {
if (corePoolSize < 0)
throw new IllegalArgumentException();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int extra = this.corePoolSize - corePoolSize;
this.corePoolSize = corePoolSize;
if (extra < 0) {
//如果核心线程数量扩充,且任务队列中有足够的任务,那么增加工作线程数量(到核心线程数量)。
int n = workQueue.size(); // don't add more threads than tasks
while (extra++ < 0 && n-- > 0 && poolSize < corePoolSize) {
Thread t = addThread(null);
if (t == null)
break;
}
}
else if (extra > 0 && poolSize > corePoolSize) {
//如果核心线程数量缩减,且任务队列中没有任务,且当前线程数量大于核心线程数量,那么移除一些工作线程(到核心线程数量)。
try {
Iterator<Worker> it = workers.iterator();
while (it.hasNext() &&
extra-- > 0 &&
poolSize > corePoolSize &&
workQueue.remainingCapacity() == 0)
it.next().interruptIfIdle();
} catch (SecurityException ignore) {
// Not an error; it is OK if the threads stay live
}
}
} finally {
mainLock.unlock();
}
}
public int getCorePoolSize() {
return corePoolSize;
}
/**
* 判断是否允许核心线程超时。
*/
public boolean allowsCoreThreadTimeOut() {
return allowCoreThreadTimeOut;
}
/**
* 设置是否允许核心线程超时。
*/
public void allowCoreThreadTimeOut(boolean value) {
if (value && keepAliveTime <= 0)
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
allowCoreThreadTimeOut = value;
}
public void setMaximumPoolSize(int maximumPoolSize) {
if (maximumPoolSize <= 0 || maximumPoolSize < corePoolSize)
throw new IllegalArgumentException();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int extra = this.maximumPoolSize - maximumPoolSize;
this.maximumPoolSize = maximumPoolSize;
//如果最大线程数缩减了,且当前线程数量大于最大线程数量,那么将多出的工作线程移除。
if (extra > 0 && poolSize > maximumPoolSize) {
try {
Iterator<Worker> it = workers.iterator();
while (it.hasNext() &&
extra > 0 &&
poolSize > maximumPoolSize) {
it.next().interruptIfIdle();
--extra;
}
} catch (SecurityException ignore) {
// Not an error; it is OK if the threads stay live
}
}
} finally {
mainLock.unlock();
}
}
public int getMaximumPoolSize() {
return maximumPoolSize;
}
public void setKeepAliveTime(long time, TimeUnit unit) {
if (time < 0)
throw new IllegalArgumentException();
if (time == 0 && allowsCoreThreadTimeOut())
throw new IllegalArgumentException("Core threads must have nonzero keep alive times");
this.keepAliveTime = unit.toNanos(time);
}
public long getKeepAliveTime(TimeUnit unit) {
return unit.convert(keepAliveTime, TimeUnit.NANOSECONDS);
}
public BlockingQueue<Runnable> getQueue() {
return workQueue;
}
public boolean remove(Runnable task) {
return getQueue().remove(task);
}
/* Statistics */
public int getPoolSize() {
return poolSize;
}
/**
* 获取当前正在执行任务的工作线程数量。
*/
public int getActiveCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int n = 0;
for (Worker w : workers) {
if (w.isActive())
++n;
}
return n;
} finally {
mainLock.unlock();
}
}
public int getLargestPoolSize() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
return largestPoolSize;
} finally {
mainLock.unlock();
}
}
/**
* 获取线程池中所有的任务数量。
*/
public long getTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//包括已完成的、正在执行的和等待执行的。
long n = completedTaskCount;
for (Worker w : workers) {
n += w.completedTasks;
if (w.isActive())
++n;
}
return n + workQueue.size();
} finally {
mainLock.unlock();
}
}
/**
* 获取线程池中已完成的任务数量。
*/
public long getCompletedTaskCount() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
long n = completedTaskCount;
for (Worker w : workers)
n += w.completedTasks;
return n;
} finally {
mainLock.unlock();
}
}
- 最后的最后是3个钩子方法和4种拒绝策略:
protected void beforeExecute(Thread t, Runnable r) { }
protected void afterExecute(Runnable r, Throwable t) { }
protected void terminated() { }
这些钩子方法的调用时机上面已经分析过。
/**
* 使用当前线程(提交任务到线程池的线程)来执行任务。
* 同时能延迟后续任务的提交,相当于一个平滑过渡的过程。
*/
public static class CallerRunsPolicy implements RejectedExecutionHandler {
public CallerRunsPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
r.run();
}
}
}
/**
* 拒绝任务,抛出RejectedExecutionException异常,默认的拒绝任务处理器就是这个。
*/
public static class AbortPolicy implements RejectedExecutionHandler {
public AbortPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
throw new RejectedExecutionException();
}
}
/**
* 丢弃任务,默默丢弃,什么都不做。
*/
public static class DiscardPolicy implements RejectedExecutionHandler {
public DiscardPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
}
}
/**
* 将当前任务队列中最老的任务丢弃,并在此尝试将当前任务提交到线程池。
*/
public static class DiscardOldestPolicy implements RejectedExecutionHandler {
public DiscardOldestPolicy() { }
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (!e.isShutdown()) {
e.getQueue().poll();
e.execute(r);
}
}
}
当然,可以自己按需定制拒绝任务处理器。
ThreadPoolExecutor的代码解析完毕!
参见:Jdk1.6 JUC源码解析(16)-FutureTask
参见:Jdk1.6 JUC源码解析(7)-locks-ReentrantLock