基本介绍
ThreadPoolExecutor
,JUC提供的线程池实现,用于高效执行应用中的多种任务,通过使用多线程并发执行任务,来提高效率;另外相比于自己new Thread去跑任务,使用线程池具有更好的性能,因为线程的创建与销毁需要一定的时间,线程池通过将线程管理起来,new完之后一般不立刻销毁,而是不断地等待处理任务。另外通过线程池来规范线程的使用,可以有效避免滥用线程
使用示例
对于简单的线程池使用,可以采用JUC包中提供的Executors
工具类创建一个具有固定线程数的线程池,来跑任务:
execute方式
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
for (int i = 0; i < 10; i++) {
executor.execute(new Task(i));
}
System.out.println("will shutdown thread pool ...");
executor.shutdown();
try {
while (!executor.isTerminated()) {
executor.awaitTermination(3, TimeUnit.SECONDS);
}
System.out.println("thread pool is terminated !");
} catch (InterruptedException e) {
// ignore ...
}
}
private static class Task implements Runnable {
private int order;
public Task(int order) {
this.order = order;
}
@Override
public void run() {
System.out.println("execute task " + order);
}
}
}
上面的demo执行了10个任务,然后关闭线程池,可以观察到console输出(Tips: 输出顺序随机,但是能确保任务都执行完)
execute task 1
will shutdown thread pool ...
execute task 3
execute task 4
execute task 5
execute task 6
execute task 7
execute task 8
execute task 9
execute task 0
execute task 2
thread pool is terminated !
submit方式
使用execute执行的任务,无法返回其结果,若想执行带有返回结果的task,可使task类实现callable接口,并使用线程池的submit方法对task进行提交。submit方法提交后返回Future
结果,Future
代表了异步task的返回结果,调用get方法将阻塞等待task执行结束,demo代码如下所示:
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3);
List<Future<String>> futures = new ArrayList<Future<String>>();
for (int i = 0; i < 10; i++) {
Future<String> f = executor.submit(new CallableTask(i));
futures.add(f);
}
for (Future<String> f : futures) {
try {
System.out.println(f.get());
} catch (ExecutionException e) {
// ignore
} catch (InterruptedException e) {
// ignore
}
}
System.out.println("will shutdown thread pool ...");
executor.shutdown();
try {
while (!executor.isTerminated()) {
executor.awaitTermination(3, TimeUnit.SECONDS);
}
System.out.println("thread pool is terminated !");
} catch (InterruptedException e) {
// ignore ...
}
}
private static class CallableTask implements Callable<String> {
private int order;
public CallableTask(int order) {
this.order = order;
}
@Override
public String call() throws Exception {
return "execute callable task " + order;
}
}
上面使用Future的get阻塞返回,在异步编程里,可能会使用回调的思想进一步强化为真正的异步,但是JDK Future 编程框架中,并没有提供此技术实现,若需要实现回调,可利用Google提供的guava类库来完成,有兴趣可以看前面写过的一篇博客: Guava 对JDK Future异步编程的扩展支持
源码分析
大部分分析采用在源码上面+注释的方式进行解释,因此可重点关注下展示代码中的注释
类继承关系
ThreadPoolExecutor -> AbstractExecutorService(abstract class) -> ExecutorService(interface) -> Executor(interface)
Executor与ExecutorService
Executor
只提供了一个execute方法:
public interface Executor {
void execute(Runnable command);
}
ExecutorService
为线程池实现提供了一些生命周期方法,并提供了异步执行的接口(submit)
public interface ExecutorService extends Executor {
void shutdown();
List<Runnable> shutdownNow();
boolean isShutdown();
boolean isTerminated();
boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException;
<T> Future<T> submit(Callable<T> task);
<T> Future<T> submit(Runnable task, T result);
Future<?> submit(Runnable task);
// ... 省略其他一些方法 invokeAll, invokeAny ...
}
AbstractExecutorService
AbstractExecutorService
主要提供submit抽象实现,其实submit最终还是调用了execute方法,此外还实现了newTaskFor方法,用于将Runnable、Callable等包装为FutureTask进而进行统一处理:
public abstract class AbstractExecutorService implements ExecutorService {
protected <T> RunnableFuture<T> newTaskFor(Runnable runnable, T value) {
return new FutureTask<T>(runnable, value);
}
protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
return new FutureTask<T>(callable);
}
public Future<?> submit(Runnable task) {
if (task == null) throw new NullPointerException();
RunnableFuture<Void> ftask = newTaskFor(task, null);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Runnable task, T result) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task, result);
execute(ftask);
return ftask;
}
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
// .. 省略其他实现方法 invokeAll, invokeAny
}
类成员属性介绍
重要的类属性包括构造方法中需要传递的属性,当然也包括一些内部维护的属性:
构造方法属性介绍
// 线程池中核心线程上限
private volatile int corePoolSize;
// 线程池中最大允许的线程数(核心线程和非核心线程)上限
private volatile int maximumPoolSize;
// 线程存活时间(配合构造函数传入的TimeUnit转换而来,这里的单位为纳秒级别)
private volatile long keepAliveTime;
// 工作队列,用于保留待执行的task
private final BlockingQueue<Runnable> workQueue;
// 线程工厂,用于构造worker工作线程的工厂定义,可以通过实现ThreadFactory来定制线程名字
private volatile ThreadFactory threadFactory;
// 当线程池中任务需要拒绝处理,使用该handler的实现类来进行处理,有不同的策略实现,默认使用AbortPolicy实现
// JDK自带提供的策略实现类有:
// AbortPolicy: 默认的策略,会抛RejectedExecutionException异常
// CallerRunsPolicy: 让调用者线程(调用exeuctor.execute的那个线程,一般可能是main线程)负责跑该reject掉的task
// DiscardOldestPolicy: 抛弃掉队头元素(oldest element),然后执行该task
// DiscardPolicy: 忽略掉该task的执行,没有抛异常,也没有任何异常日志输出
private volatile RejectedExecutionHandler handler;
// 最详细的构造方法,其他构造方法最终也会调用该方法
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
if (corePoolSize < 0 ||
maximumPoolSize <= 0 ||
maximumPoolSize < corePoolSize ||
keepAliveTime < 0)
throw new IllegalArgumentException();
if (workQueue == null || threadFactory == null || handler == null)
throw new NullPointerException();
this.corePoolSize = corePoolSize;
this.maximumPoolSize = maximumPoolSize;
this.workQueue = workQueue;
this.keepAliveTime = unit.toNanos(keepAliveTime);
this.threadFactory = threadFactory;
this.handler = handler;
}
内部维护属性介绍
// ctl,一个int保留两个属性,高3位用于保留线程池状态(后续章节会详细说明),低29位用于保留当前工作线程个数
// 因为这么维护,所以ThredPoolExecutor中有很多静态方法用于获取state以及workerCount
private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
// 线程池中维护的锁,只有在一些特定场景下使用,如新建thread,等待线程池关闭
private final ReentrantLock mainLock = new ReentrantLock();
// 存储工作线程引用
private final HashSet<Worker> workers = new HashSet<Worker>();
// 用于实现等待线程终止的condition,需配合mainLock使用
private final Condition termination = mainLock.newCondition();
// 线程池曾经达到的最大线程个数
private int largestPoolSize;
// 执行task的总数
private long completedTaskCount;
// 核心线程是否也需要进行idle销毁处理,默认为false,即不回收核心线程
private volatile boolean allowCoreThreadTimeOut;
可以发现,上述一些重要的成员属性,都是volatile的,配合ThreadPoolExecutor
提供了其set方法,支持了多线程下动态修改其属性,能让所有的线程立刻可见(主要针对线程池内部的工作线程可见),这里就不贴set方法的代码了,有兴趣的翻翻源码看看即可
线程池状态(生命周期)
状态简介
ThreadPoolExecutor内部通过定义一系列的状态,来规划其生命周期,从运行到最后的终结,这些状态如下:
private static final int RUNNING = -1 << COUNT_BITS;
private static final int SHUTDOWN = 0 << COUNT_BITS;
private static final int STOP = 1 << COUNT_BITS;
private static final int TIDYING = 2 << COUNT_BITS;
private static final int TERMINATED = 3 << COUNT_BITS;
- RUNNING: 该状态下,线程池能接受新的task,同时也会处理工作队列中的task
- SHUTDOWN: 该状态下,线程池不再接受新的task,但是仍会处理工作队列中的task
- STOP: 该状态下,线程池不再接受新的task,也不会处理队列中的task,同时对于正在运行工作线程,尝试中断回收
- TIDYING: 该状态下,所有的task已经被终止,线程池中的worker线程全部停止,准备执行terminated()方法,然后进入TERMINATED终止状态
- TERMINATED: 该状态表示线程池已经完全终止,terminated方法(该方法在ThreadPoolExecutor里面是空方法,子类可以重载这个方法,在线程池终止的时候进行一些逻辑处理,比如打印日志等)也调用结束,线程池生命周期正式终结
状态转移
状态之间存在一定的转移,路线如下:
- RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED
- RUNNING -> STOP -> TIDYING -> TERMINATED
状态转移需要我们手动调用其指定的方法:
- 调用shutdown方法,将走第一条线路
- 调用shutdown方法,将走第二条线路
理解状态转移线路有助于更好地理解对整个线程池的代码分析,所以请谨记这些状态以及状态之间的转换流向,另外注意一下几个状态之间的大小关系,从RUNING到TERMINATED,值逐渐变大(有些方法用了其大小来做比较)
任务提交执行流程
通常我们使用线程池,大部分情况下是将封装好的任务对象丢给execute方法去执行,那么我们的分析以execute为主,任务提交到线程池执行的主要流程说明如下:
调用submit方法其实最后也会调用execute方法
- 如果线程池内线程数小于核心线程数(corePoolSize),则新建线程(核心线程),且将待执行的任务作为新建线程的第一个任务立即执行
- 否则(线程池内线程个数大于核心线程数corePoolSize),则尝试将任务丢到工作队列workQueue里面,若添加成功,还需要处理一些特殊场景(在代码里面进行注释说明)
- 否则(任务塞工作队列失败),尝试添加临时线程(线程池构造时候有个maximumPoolSize,当前worker线程数不能大于maximumPoolSize设置的值),添加成功则由临时线程跑任务,若添加失败,则由rejectHandler对任务进行reject
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) { // 当前线程池线程数 < corePoolSize
if (addWorker(command, true)) // 添加核心线程并由核心线程跑任务,成功则返回
return;
// 添加核心线程失败,重新获取ctl的值(期望addWorker成功但是最后失败,证明多线程操作线程池导致ctl发生变化 or 线程池状态发生变化,可能正在被关闭),走后续操作,尝试将任务丢工作队列
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) { // 当前状态为RUNNING且核心线程数达到上限,则尝试将任务丢工作队列
// 这里的操作是针对多线程下线程池被其他线程停止或者遇到异常,而不能正常提供服务的一些处理
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command)) // 线程池被关闭, 且任务还没被执行(仍在工作队列里面)则拒绝任务
reject(command);
else if (workerCountOf(recheck) == 0) // 没有工作线程(核心线程刚好都空闲被回收的情况)?创建临时线程
addWorker(null, false);
}
else if (!addWorker(command, false)) // 正常来说走到这里,表示队列也无法添加任务,则创建临时线程来跑任务,若失败则用rejectHandler来处理任务
reject(command);
}
工作线程的创建与运行
上面的代码可知,当需要添加线程来跑任务,是通过调用addWorker方法来实现的,因此下面对addWorker方法进行分析:
private boolean addWorker(Runnable firstTask, boolean core) {
// 外层for首先需要判断是否能够添加worker线程,判断的条件比较多
// 内层for需要尝试CAS使得workerCount+1,此步骤若成功,则证明可添加worker线程
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 不能添加worker线程的条件
if (rs >= SHUTDOWN &&
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
// 到这里表示可以尝试添加worker线程,有两种情况:
// 1. 线程池状态正常: RUNNING
// 2. rs == SHUTDOWN && firstTask == null && !workQueue.isEmpty()
// 内层for循环主要处理workCount的判断,判断是否能够添加worker线程
for (;;) {
int wc = workerCountOf(c);
// woker线程大于线程池线程上限(2的29次幂-1,太夸张,几乎不可能)
if (wc >= CAPACITY ||
wc >= (core ? corePoolSize : maximumPoolSize)) // 若入参core为true,判断是否大于核心线程个数,否则,判断是否大于允许的最大线程个数
return false; // 超过限制,不能添加,返回false
if (compareAndIncrementWorkerCount(c)) // CAS使得wokerCount+1成功,则跳出外层for,接着往下走添加worker线程的逻辑
break retry;
// CAS失败,这里再次判断线程池状态,与之前获取的不一致则继续外层for判断,否则,单纯的CAS workerCount失败,则继续内层for循环处理
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
// 走到这里,证明上面已经CAS workerCount + 1成功,走新建worker线程处理逻辑
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
// 需在mainLock获取的条件下进行,主要是处理workers的添加以及largestPoolSize的更新
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start(); // 启动worker线程,执行其run方法(Tips: run方法最后转调runWorker方法)
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w); // 处理worker添加失败的情况
}
return workerStarted;
}
工作线程Worker分析
ThreadPoolExecutor中的Worker内部类用于表示工作线程,并不是直接继承thread,而是通过内部组合的方式来实现,同时worker也使用了AQS实现了锁的语义:
为何要实现锁的语义,个人理解有以下几个作用:
- 可以控制一些变量的互斥更新
- 控制task执行与idle中断的互斥,因为在shutdown的时候,会关闭idle线程,为了与忙碌(执行task)做到互斥,因此需要锁的存在(执行task需要持有锁,idle线程的关闭也需要持有锁)
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
final Thread thread;
Runnable firstTask;
volatile long completedTasks;
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}
public void run() {
runWorker(this);
}
protected boolean isHeldExclusively() {
return getState() != 0;
}
protected boolean tryAcquire(int unused) {
if (compareAndSetState(0, 1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
protected boolean tryRelease(int unused) {
setExclusiveOwnerThread(null);
setState(0);
return true;
}
public void lock() { acquire(1); }
public boolean tryLock() { return tryAcquire(1); }
public void unlock() { release(1); }
public boolean isLocked() { return isHeldExclusively(); }
void interruptIfStarted() {
Thread t;
if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
try {
t.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
addWorker的时候会调用Worker
中持有thread的start方法使线程启动,会调用Worker
的run方法(Worker对于thread来说是一个task),而run方法又转调ThreadPoolExecutor
的runWorker方法:
private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
// ... 省略其他属性和方法
public void run() {
runWorker(this);
}
}
工作线程运行流程
其实我们可以猜想,在runWorker里面,必然以死循环方式不断地从workQueue队列(当然因为workQueue为阻塞队列,take没拿到则阻塞等待,因此不会出现CPU空转)里面取task,使得task在每一个Worker里面串行执行,基于这个猜想,我们看下runWorker这个方法:
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // Worker的AQS state初始设置为-1,调用unlock,目的是为了使得state被正常重置为0
boolean completedAbruptly = true;
try {
// 死循环方式,在getTask方法里面因take workQueue而阻塞等待(or 调用超时版本的阻塞等待)
while (task != null || (task = getTask()) != null) {
// 获取到task,获取到锁则尝试执行task(这个task指的是我们通过调用ThreadPoolExecutor execute方法传入的任务)
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task); // 空实现,子类可以重载此方法,在任务执行之前做一些操作,如进行打印日志、计数、记时等逻辑
Throwable thrown = null;
try {
task.run(); // 执行task主方法run
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown); // 与beforeExecute一样为空实现,子类可以重载此方法,在任务执行之后做一些操作
}
} finally {
task = null;
w.completedTasks++; // 计算执行了多少个task
w.unlock(); // 解锁
}
}
completedAbruptly = false;
} finally {
// 当循环退出,证明工作线程的生命周期将会被结束,退出可能是由于被中断or正常退出(没有task了: idle or 线程池正在被SHUTDOWN),由第二个参数completedAbruptly控制
processWorkerExit(w, completedAbruptly);
}
}
看下getTask方法验证猜想:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// 线程池正在被关闭时的一些处理
// SHUTDOWN且workQueue为empty
// or 状态至少为STOP
// 则返回null表示没有task需要工作线程去执行了
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
int wc = workerCountOf(c);
// 是否需要处理超时的情况判断: allowCoreThreadTimeOut == true 或者 workerCount > 核心线程数上限
boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
// worker线程 > maximumPoolSize ? or 超时获取task还没有获取到(线程idle)
if ((wc > maximumPoolSize || (timed && timedOut))
&& (wc > 1 || workQueue.isEmpty())) {
// 此时表示可以销毁调用此方法的worker线程,尝试减少worker线程数,成功则返回null,表示worker线程可以runWorker循环退出进行销毁回收
if (compareAndDecrementWorkerCount(c))
return null;
continue; // CAS失败则从头开始
}
// 阻塞队列取task操作,超时使用超时阻塞的poll,否则使用非超时阻塞的take
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
processWorkerExit这个方法不仅仅处理单个工作线程的回收,还会加以处理整个线程池的生命周期:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w); // 将当前worker线程移除
} finally {
mainLock.unlock();
}
// Tips: 尝试是否可以终止线程池生命周期,将在这个方法里面看到线程池最终趋于状态TERMINATED
tryTerminate();
int c = ctl.get();
// RUNNING or SHUTDOWN 状态,进入该逻辑
if (runStateLessThan(c, STOP)) {
if (!completedAbruptly) {
int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
if (min == 0 && ! workQueue.isEmpty())
min = 1;
if (workerCountOf(c) >= min)
return; // replacement not needed
}
// 防止之前对线程的中断回收,导致整个线程池内无工作线程继续处理workQueue工作队列中的task,因此添加临时线程来处理
addWorker(null, false);
}
}
生命周期终结关键
tryTerminate方法是推进整个线程池生命周期趋向于终结(TERMINATED)的关键点:
final void tryTerminate() {
for (;;) {
int c = ctl.get();
// 状态为RUNNING
// or 状态为TIDYING,TERMINATED
// or 状态为SHUTDOWN并且workQueue工作队列不为空
if (isRunning(c) ||
runStateAtLeast(c, TIDYING) ||
(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
return;
// 走到这里证明状态为SHUTDOWN且工作队列为空,或者状态为STOP
// 处于SHUDOWN or STOP状态的线程池,最终肯定是要趋于所有的工作线程被回收掉,这里考虑SHUTDOWN的时候有可能还有task没执行完,所以每次只尝试中断一个空闲线程
if (workerCountOf(c) != 0) { // Eligible to terminate
interruptIdleWorkers(ONLY_ONE);
return;
}
// 能走到这里证明线程池可以趋于TIDYING了,则设置为TIDYING,然后调用terminated方法,最终(finally块)将状态设置为TERMINATED
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
try {
terminated();
} finally {
ctl.set(ctlOf(TERMINATED, 0));
termination.signalAll();
}
return;
}
} finally {
mainLock.unlock();
}
// else retry on failed CAS
}
}
该方法主要会被以下方法调用以触发线程池生命周期趋于终结:
- shutdown
- shutdownNow
- addWorkerFailed
- processWorkerExist(重头戏)
状态(生命周期)转换分析
前面章节已经讲到有两条线路的状态转移:
- RUNNING -> SHUTDOWN -> TIDYING -> TERMINATED
- RUNNING -> STOP -> TIDYING -> TERMINATED
主要通过调用shutdown / shutdownNow方法来触发这个转移,下面我们分别分析这两个方法所触发的行为:
shutdown
调用shutdown方法后,线程池将不再接受新的task(也就是说调用execute方法执行的task都会被reject掉),但是对于工作队列里面的task还会将其拿出来执行
调用shutdown方法,将触发以下行为:
- 将使得线程池状态由RUNNING转换为SHUTDOWN
- 接着会关闭空闲的线程,然后由忙着的线程将工作队列中的任务都做完
- 最后将线程池的状态设置为TIDYING,调用terminated方法后,状态设置为TERMINATED
public void shutdown() {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置状态为SHUTDOWN
advanceRunState(SHUTDOWN);
// 中断并回收所有空闲工作线程
interruptIdleWorkers();
onShutdown(); // hook for ScheduledThreadPoolExecutor
} finally {
mainLock.unlock();
}
// 尝试使线程池状态趋于TERMINATED
tryTerminate();
}
shutdownNow
调用shutdownNow方法,将触发以下行为:
- 线程池状态由RUNNING转化为STOP
- 中断所有的工作线程,包括正在执行task的线程
- 将当前workQueue中的task全部弹出(会返回给调用者)
- 最后将线程池的状态设置为TIDYING,调用terminated方法后,状态设置为TERMINATED
public List<Runnable> shutdownNow() {
List<Runnable> tasks;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
checkShutdownAccess();
// 设置状态为STOP
advanceRunState(STOP);
// 中断所有工作线程
interruptWorkers();
tasks = drainQueue();
} finally {
mainLock.unlock();
}
// 尝试使线程池状态趋于TERMINATED
tryTerminate();
// 返回当前工作队列中的所有task
return tasks;
}
总结来说,shutdownNow比showDown方法更加暴力,shutdown方法处理比较圆滑,工作队列中的任务还会做完才进行终止,而shutdownNow就直接打烊赶人了
当中体现的线程模型
线程池体现了经典的生产者-消费者线程模型,生产者就是提交task的一方(调用方),可能有多个生产者,而消费者就是线程池中的Worker工作线程,消费者有多个(多个线程),而线程池中的工作队列workQueue,充当生产者和消费者之间数据传输的媒介
一些使用建议
- 简单的构造线程池对象可使用
Executors
工具类,但是,建议正式应用的代码里面,最好还是自己使用ThreadPoolExecutor
提供的完备构造方法进行构造,建议设置合适的线程数、合理的工作队列、线程工厂以及合适的rejectHandler - 构造线程池时最好提供ThreadFactory的实现,对线程进行合理命名,方便线上出问题快速定位排查,比如jstack中可以快速根据具体线程名定位问题
- 工作队列尽量不要采用无界队列,容易在任务很多,线程池处理不过来的场景下使得程序OOM(Tips:
Executors
的newFixedThreadPool提供的线程池采用的就是无界工作队列,正式环境谨慎使用) - 预估好线程池中线程的个数,并不是线程数越大越好,要综合考虑硬件CPU的能力与待执行任务的特性: 比如CPU才4核且任务都是计算型(耗CPU资源),开几十个线程,这种情况下肯定扛不住,并且因为线程之间征用CPU时间片比较激烈,其效率也许会不尽人意。对于网络上以及一些书上面常给的建议,还是具有一定的参考价值:(1)若任务都是CPU密集型的,线程数最好跟CPU核心数保持一致,尽量不要超过核心数(2)若任务都是IO密集型的,那么线程数可以稍微高一些,大概可以设置CPU核心数的2倍(3)当然,也许业务中的任务没有明显的界限,有些耗CPU,有些耗IO,那么一开始可以设置在CPU核心数附近,然后后期再根据需要适当去调整线程数,慢慢调整出一个合适的线程数
- 大多数的应用中,不同任务的执行耗时大不相同,为了避免耗时的任务把不耗时的任务给饿死(想象一下耗时任务都抢占了线程池中所有线程的场景…),可以考虑使用多个线程池来协调这些任务的执行,当然这个需要适当地降低每个线程池中线程的个数