接上文:java线程池的原理学习(二)
ThreadPoolExecutor深入剖析
线程池的五种状态
ThreadPoolExecutor
类中将线程状态( runState
)分为了以下五种:
RUNNING
:可以接受新任务并且处理进入队列中的任务SHUTDOWN
:不接受新任务,但是仍然执行队列中的任务STOP
:不接受新任务也不执行队列中的任务TIDYING
:所有任务中止,队列为空,进入该状态下的任务会执行terminated()
方法TERMINATED
:terminated()
方法执行完成后进入该状态
状态之间的转换
RUNNING
->SHUTDOWN
调用了 shutdown()
方法,可能是在 finalize()
方法中被隐式调用
(RUNNING or SHUTDOWN)
->STOP
调用 shutdownNow()
SHUTDOWN
->TIDYING
当队列和线程池都为空时
STOP
->TIDYING
线程池为空时
TIDYING
->TERMINATED
terminated()
方法执行完成
线程池状态实现
如果查看 ThreadPoolExecutor
的源码,会发现开头定义了这几个变量来代表线程状态和活动线程的数量:
//原子变量
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY = (1 << COUNT_BITS) - 1;
// runState is stored in the high-order bits
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
这个类中将二进制数分为了两部分,高位代表线程池状态( runState
),低位代表活动线程数( workerCount
), CAPACITY
代表最大的活动线程数,为2^29-1,下面为了更直观的看到这些数我做了些打印:
public class Test1 {
public static void main(String[] args) {
final int COUNT_BITS = Integer.SIZE - 3;
final int CAPACITY = (1 << COUNT_BITS) - 1;
final int RUNNING = -1 << COUNT_BITS;
final int SHUTDOWN = 0 << COUNT_BITS;
final int STOP = 1 << COUNT_BITS;
final int TIDYING = 2 << COUNT_BITS;
final int TERMINATED = 3 << COUNT_BITS;
System.out.println(Integer.toBinaryString(CAPACITY));
System.out.println(Integer.toBinaryString(RUNNING));
System.out.println(Integer.toBinaryString(SHUTDOWN));
System.out.println(Integer.toBinaryString(STOP));
System.out.println(Integer.toBinaryString(TIDYING));
System.out.println(Integer.toBinaryString(TERMINATED));
}
}
输出:
11111111111111111111111111111
11100000000000000000000000000000
0
100000000000000000000000000000
1000000000000000000000000000000
1100000000000000000000000000000
打印的时候会将高位0省略
可以看到,第一行代表线程容量,后面5行提取高3位得到:
111 - RUNNING
000 - SHUTDOWN
001 - STOP
010 - TIDYING
011 - TERMINATED
分别对应5种状态,可以看到这样定义之后,只需要通过简单的移位操作就可以进行状态的转换。
重要方法
execute
方法:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
/**分三步执行
* 如果workerCount<corePoolSize,则创建一个新线程执行该任务
*/
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true)) //创建成功则return
return;
c = ctl.get(); //创建失败重新读取状态,随时保持状态的最新
}
/**
* workerCount>=corePoolSize,判断线程池是否处于运行状态,再将任务加入队列
* */
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get(); //用于double check
//如果线程池处于非运行态,则将任务从缓存队列中删除
if (! isRunning(recheck) && remove(command))
reject(command); //拒绝任务
else if (workerCountOf(recheck) == 0) //如果活动线程数为0,则创建新线程
addWorker(null, false);
}
//如果线程池不处于RUNNING状态,或者workQueue满了,则执行以下代码
else if (!addWorker(command, false))
reject(command);
}
可以看到,在类中使用了 Work
类来代表任务,下面是 Work
类的简单摘要:
private final class Worker extends AbstractQueuedSynchronizer
implements Runnable
{
/** Thread this worker is running in. Null if factory fails. */
final Thread thread;
/** Initial task to run. Possibly null. */
Runnable firstTask;
/** Per-thread task counter */
volatile long completedTasks;
/**
* Creates with given first task and thread from ThreadFactory.
* @param firstTask the first task (null if none)
*/
Worker(Runnable firstTask) {
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
...
Work
类实现了 Runnable
接口,使用了线程工厂创建线程,使用 runWork
方法来运行任务
创建新线程时用到了 addWorker()
方法:
/**
* 检查在当前线程池状态和限制下能否创建一个新线程,如果可以,会相应改变workerCount,
* 每个worker都会运行他们的firstTask
* @param firstTask 第一个任务
* @param core true使用corePoolSize作为边界,false使用maximumPoolSize
* @return false 线程池关闭或者已经具备关闭的条件或者线程工厂没有创建新线程
*/
private boolean addWorker(Runnable firstTask, boolean core) {
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 只有当rs < SHUTDOWN才有可能接受新任务
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c); //工作线程数量
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) //不合法则返回
return false;
if (compareAndIncrementWorkerCount(c)) //将工作线程数量+1
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs) //判断线程池状态有没有改变,改变了则进行外循环,否则只进行内循环
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
//创建新线程
Worker w = new Worker(firstTask);
Thread t = w.thread;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
//再次检查状态,防止ThreadFactory创建线程失败或者状态改变了
int c = ctl.get();
int rs = runStateOf(c);
if (t == null ||
(rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null))) {
decrementWorkerCount(); //减少线程数量
tryTerminate();//尝试中止线程
return false;
}
workers.add(w);//添加到工作线程Set集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
} finally {
mainLock.unlock();
}
t.start();//执行任务
//状态变成了STOP(调用了shutdownNow方法)
if (runStateOf(ctl.get()) == STOP && ! t.isInterrupted())
t.interrupt();
return true;
}
再看 Work中
的 runWork
方法:
final void runWorker(Worker w) {
Runnable task = w.firstTask;
w.firstTask = null;
boolean completedAbruptly = true;//线程是否异常中止
try {
//先取firstTask,再从队列中取任务直到为null
while (task != null || (task = getTask()) != null) {
w.lock();
clearInterruptsForTaskRun();
try {
beforeExecute(w.thread, task);//实现钩子方法
Throwable thrown = null;
try {
task.run();//运行任务
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);//实现钩子方法
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;//成功运行,说明没有异常中止
} finally {
processWorkerExit(w, completedAbruptly);
}
}