上一篇从整体上介绍了Executor
接口,从上一篇我们知道了Executor
框架的最顶层实现是ThreadPoolExecutor
类,Executors
工厂类中提供的newScheduledThreadPool
、newFixedThreadPool
、newCachedThreadPool
方法其实也只是ThreadPoolExecutor
的构造函数参数不同而已。通过传入不同的参数,就可以构造出适用于不同应用场景下的线程池,那么它的底层原理是怎样实现的呢,这篇就来介绍下ThreadPoolExecutor
线程池的运行过程。
1.线程池状态
既然要讲运行过程,那么首先要了解下线程池的状态分为哪些?
volatile int runState;
static final int RUNNING = 0;
static final int SHUTDOWN = 1;
static final int STOP = 2;
static final int TERMINATED = 3;
ThreadPoolExecutor
代码中定义了上面几个变量:定义了一个volatile变量runState,以及其他几个表示状态的常量。
runState
:初始状态,表示当前线程池的运行状态,它的值就是上面的那4个常量值之一
RUNNING
:线程池接受新任务并执行队列任务中…
SHUTDOWN
:不再接受新任务,但是会继续执行等待队列Queued中的任务。当调用了shutdown()方法,会从 RUNNING -> SHUTDOWN
STOP
:不再接受新任务,同时也不执行等待队列Queued中的任务,并且会尝试终止正在执行中的任务。当调用了shutdownNow()方法, 会从(RUNNING or SHUTDOWN) -> STOP
TERMINATED
:线程池中所有线程已经停止运行,其他行为同 STOP状态。
- 当等待队列和线程池为空时,会从SHUTDOWN -> TERMINATED
- 当线程池为空时,会从STOP -> TERMINATED
2.线程池运行任务
2.1变量介绍
在讲解运行过程前,我们先看下ThreadPoolExecutor
中的几个比较重要的成员变量:
private final BlockingQueue<Runnable> workQueue; //任务缓存队列,用来保存等待中的任务,等待worker线程空闲时执行任务
private final ReentrantLock mainLock = new ReentrantLock(); //更新 poolSize, corePoolSize,maximumPoolSize, runState, and workers set 时需要持有这个锁
private final HashSet<Worker> workers = new HashSet<Worker>(); //用来保存工作中的执行线程
private volatile long keepAliveTime; //超过corePoolSize外的线程空闲存活之间
private volatile boolean allowCoreThreadTimeOut; //是否对corePoolSize内的线程设置空闲存活时间
private volatile int corePoolSize; //核心线程数
private volatile int maximumPoolSize; //最大线程数(即线程池中的线程数目大于这个参数时,提交的任务会被放进任务缓存队列)
private volatile int poolSize; //线程池中的当前线程数
private volatile RejectedExecutionHandler handler; //任务拒绝策略
private volatile ThreadFactory threadFactory; //线程工厂,用来新建线程
private int largestPoolSize; //记录线程池中出现过的最大线程数大小
private long completedTaskCount; //已经执行完的线程数
这边重点解释下 corePoolSize
、maximumPoolSize
、workQueue
两个变量,这两个变量涉及到线程池中创建线程个数的一个策略。
corePoolSize
: 这个变量我们可以理解为线程池的核心大小,举个例子来说明(corePoolSize假设等于10,maximumPoolSize等于20):
- 有一个部门,其中有10(corePoolSize)名工人,当有新任务来了后,领导就分配任务给工人去做,每个工人只能做一个任务。
- 当10个工人都在忙时,新来的任务就要放到队列(workQueue)中等待。
- 当任务越积累越多,远远超过工人做任务的速度时,领导就想了一个办法:从其他部门借10个工人来,借的数量有一个公式(maximumPoolSize – corePoolSize)来计算。然后把新来的任务分配给借来的工人来做。
- 但是如果速度还是还不急的话,可能就要采取措施来放弃一些任务了(RejectedExecutionHandler)。
等到一定时间后,任务都完成了,工人比较闲的情况下,就考虑把借来的10个工人还回去(根据keepAliveTime判断) - 也就是说corePoolSize就是线程池大小,maximumPoolSize在我看来是线程池的一种补救措施,即任务量突然过大时的一种补救措施。
2.2线程执行过程
先看下前一篇文章中的一个例子:
ExecutorService executor = Executors.newFixedThreadPool(3);
IntStream.range(0, 6).forEach(i -> executor.execute(() -> {
String threadName = Thread.currentThread().getName();
System.out.println("finished: " + threadName);
}));
上面代码就是新建6个任务,然后扔到线程池中运行,输出线程名称,直到运行完毕。其中最核心的方法就是execute()
方法,虽然submit()
也可以执行任务,但它底层也是调用execute()
方法,所以懂了execute()
的实现原理即可:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { //1.
if (runState == RUNNING && workQueue.offer(command)) { //2.
if (runState != RUNNING || poolSize == 0) //3.
ensureQueuedTaskHandled(command); //4.
}
else if (!addIfUnderMaximumPoolSize(command)) //5.
reject(command); // is shutdown or saturated //6
}
}
上面的代码看起来逻辑有点复杂,我们一个一个看,首先看上面1位置处:
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command))
是一个或表达式,它分成两部分
- 首先判断当前线程数是否大于等于核心线程数,是的话直接进入if语句块中,否则判断第二个部分
- 第二个部分
addIfUnderCorePoolSize(command)
,这个方法是当线程数小于核心线程数时,用来新建线程执行任务(因为线程数小于corePoolSize时,直接新建线程来运行任务,不管当前线程池里有没有空闲的线程)。如果新建失败,那么进入if语句块,成功了那么execute方法就执行结束了,因为线程已经新建成功了,任务已经开始在线程池中运行。
进入if语句块后,看上面代码2.if (runState == RUNNING && workQueue.offer(command))
- 判断当前线程池状态是否是RUNNING 而且 任务放入等待队列中成功,那么直接进入if语句块
- 否则到代码5.处
if (!addIfUnderMaximumPoolSize(command))
,判断新任务用新线程执行是否成功(注:这里的新线程就是我们上面讲的 “借来的工人” maximumPoolSize) - 如果“借来的工人”还是处理不了的话,执行任务拒绝策略
继续进到代码块3 的if语句块if (runState != RUNNING || poolSize == 0)
, 因为新任务加入到等待队列中了,这句判断是为了防止在将此任务添加进任务缓存队列的同时其他线程突然调用shutdown或者shutdownNow方法关闭了线程池的一种应急措施。如果是的话,应急处理加入的新任务 ensureQueuedTaskHandled(command)
。
我们看下两个关键方法的实现:
##### 1.addIfUnderCorePoolSize
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
首先获取锁,因为涉及到线程池状态的变化。然后再次判断 if (poolSize < corePoolSize && runState == RUNNING)
,在execute()方法中我们已经判断过一次,这边再次判断是为了防止其他线程又新增了新线程或者调用了shutdown、shutdownNow方法,这边起到了双重检查的一个效果。如果为true
的话,进行t = addThread(firstTask)
新增线程执行任务。addThread方法里面比较简单,就是通过线程工厂创建线程thread,然后封装到Worker对象中,加入到 workers队列中,并执行线程,可以把Worker对象看成是拥有一个线程的对象。
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);
Thread t = threadFactory.newThread(w);
boolean workerStarted = false;
if (t != null) {
w.thread = t;
workers.add(w);
int nt = ++poolSize;
if (nt > largestPoolSize)
largestPoolSize = nt;
try {
t.start();
workerStarted = true;
}
}
return t;
}
这里在介绍下Worker对象, 它实现了Runnable接口,你把它当成Runnable的一个代理类即可,最终也是执行它的run方法。只要注意一下Worker中的beforeExecute
和afterExecute
方法,这两个方法在ThreadPoolExecutor中没有具体实现,用户可以重写这个方法和后面的afterExecute方法来进行一些统计信息,比如某个任务的执行时间等,而afterExecute方法还有一个Throwable t
参数,用户可以用来记录一些异常信息,因为新线程中的异常时捕获不到的,需要在afterExecute中记录。
看起来这个是不是和spring 切面有点像,可以看到 知识都是相通的。
看一下它的run方法:
public void run() {
try {
hasRun = true;
Runnable task = firstTask;
firstTask = null;
while (task != null || (task = getTask()) != null) { //1
runTask(task);
task = null;
}
} finally {
workerDone(this);
}
}
注意代码块1,可以看到这边在循环获取任务,并执行,直到任务全部执行完毕。除了第一个任务,其他任务都是通过getTask()
方法去取,这个方法是ThreadPoolExecutor中的一个方法。我们猜一下,整个类中只有任务缓存队列中保存了任务,应该就是去缓存队列中取了。
Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll(); //取任务
else if (poolSize > corePoolSize || allowCoreThreadTimeOut) //如果线程数大于核心池大小或者允许为核心池线程设置空闲时间,
//则通过poll取任务,若等待一定的时间取不到任务,则返回null
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
else
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) { //如果没取到任务,即r为null,则判断当前的worker是否可以退出
if (runState >= SHUTDOWN) // Wake up others
interruptIdleWorkers(); //中断处于空闲状态的worker
return null;
}
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
}
}
}
这里有一个非常巧妙的设计方式,假如我们来设计线程池,可能会有一个任务分派线程,当发现有线程空闲时,就从任务缓存队列中取一个任务交给 空闲线程执行。但是在这里,并没有采用这样的方式,因为这样会要额外地对任务分派线程进行管理,无形地会增加难度和复杂度,这里直接让执行完任务的线程Worker去任务缓存队列里面取任务来执行,因为每一个Worker里面都包含了一个线程thread。
2. addIfUnderMaximumPoolSize
这个方法的实现思想和 addIfUnderCorePoolSize方法的实现思想非常相似,唯一的区别在于addIfUnderMaximumPoolSize方法是在线程 池中的线程数达到了核心池大小并且往任务队列中添加任务失败的情况下执行的:
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
到这里,大部分朋友应该对任务提交给线程池之后到被执行的整个过程有了一个基本的了解,下面总结一下:
- 首先,要清楚corePoolSize和maximumPoolSize的含义;
- 其次,要知道Worker是用来起到什么作用的;
- 要知道任务提交给线程池之后的处理策略,这里总结一下主要有4点:
- 如果当前线程池中的线程数目小于corePoolSize,则每来一个任务,就会创建一个线程去执行这个任务;
- 如果当前线程池中的线程数目>=corePoolSize,则每来一个任务,会尝试将其添加到任务缓存队列当中,若添加成功,则该任务会等待空闲线程将其取出去执行;若添加失败(一般来说是任务缓存队列已满),则会尝试创建新的线程去执行这个任务;
- 如果当前线程池中的线程数目达到maximumPoolSize,则会采取任务拒绝策略进行处理;
- 如果线程池中的线程数量大于 corePoolSize时,如果某线程空闲时间超过keepAliveTime,线程将被终止,直至线程池中的线程数目不大于 corePoolSize;如果允许为核心池中的线程设置存活时间,那么核心池中的线程空闲时间超过keepAliveTime,线程也会被终止。
这篇写完了,后面会介绍一下任务缓存队列的种类已经缓存的策略以及任务拒绝策略等。如果文章有什么问题,欢迎大家指正,大家互相沟通,互相学习。