ThreadPoolExecutor源码解析

《ThreadPoolExecutor源码解析》 ThreadPoolExecutor

阅读原文请访问我的博客
BrightLoong’s Blog

一. 线程池概述

在介绍线程池之前,先说一下为什么在开发中要使用线程池,个人认为主要有一下的原因:

  • 开发中使用线程的时候,大部分线程执行的时间很短,频繁的创建、启动、销毁线程带来不必要的资源消耗。
  • 线程的创建、挂起、唤醒等操作都需要依靠CPU调度,大量的线程会频繁进行上下文切换,特别是任务执行时间短、任务数量多的情况,大量时间将花在上下文切换上,又耗时,又耗资源。
  • 无法对任务做统一的管理、分配、和监控等。

线程池主要通过ThreadPoolExecutor来实现,在ThreadPoolExecutor类注释上有下面一段话(下面是翻译过来的):

线程池解决了两个不同的问题:由于减少了每个任务的调用开销,改进了执行大量异步任务的性能;并且它还提供了一种任务执行时限制和管理资源(包括线程)的方法。 同时每个ThreadPoolExecutor还维护一些基本统计信息,例如已完成任务的数量。

二. 线程池的构造方法参数解析

构造方法中各个参数的含义

先上一张图,有个大概的印象。

《ThreadPoolExecutor源码解析》 threadPool

  • 这里任务加入可以看做是放入了两个地方,一是线程池中,另一个是任务队列中。
  • 放入线程池的又分为两个部分,一是corePoolSize,暂且叫做核心线程(下图红色部分),另一部分是非核心线程(下图橙色部分),核心线程+非核心线程 <= maximumPoolSize。
参 数含 义
corePoolSize核心线程数,当向线程池中添加新任务的时候,如果此时线程数量小于corePoolSize,哪怕线程池中有空闲的线程,此时也会重新新建一个线程来处理这个任务请求。同时corePoolSize也是线程池中维持的线程数量,就算都是空闲线程的也会存在,除非设置了allowCoreThreadTimeOut。
maximumPoolSize线程池中最大的线程数量,当向线程池中添加新任务的时候如果此时线程数量大于corePoolSize,小于maximumPoolSize,并且队列已经满了,将会新建线程来处理这个任务请求。
keepAliveTime非核心线程在空闲后存活的是时间。
unitkeepAliveTime的单位
workQueue任务队列,常用的有三种:SynchronousQueue、LinkedBlockingQueue、ArrayBlockingQueue
threadFactory创建线程的工厂,默认使用DefaultThreadFactory
handler被拒绝后的处理,拒绝处理策略。
a. CallerRunsPolicy,由调用execute方法提交任务的线程来执行这个任务
b. AbortPolicy,抛出异常RejectedExecutionException拒绝提交任务
c. DiscardPolicy,直接抛弃任务,不做任何处理
d. DiscardOldestPolicy,去除任务队列中的第一个任务,重新提交

构造方法

//使用默认的ThreadFactory和拒绝处理策略
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), defaultHandler);
    }
    
//使用默认的拒绝处理策略    
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             threadFactory, defaultHandler);
    }

//使用默认的ThreadFactory
 public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              RejectedExecutionHandler handler) {
        this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,
             Executors.defaultThreadFactory(), handler);
    }

//传入所有参数,并且对参数进行校验   
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

三. 工作原理

其原理如下图所示:

《ThreadPoolExecutor源码解析》 theory

当一个新加一个任务的时候执行如下步骤:

  1. 首先尝试新建核心线程启动任务
  2. 核心线程已经满了,尝试加入到队列中
  3. 队列已经满了,尝试新建非核心线程启动任务
  4. 非核心线程已满,执行拒绝策略

同时,当线程池中的工作线程执行完一个Task后会从队列中take任务执行。

四. UML类图

《ThreadPoolExecutor源码解析》 UML

  • AbstractExecutorService,提供了ExecutorService的默认实现。
  • ExecutorService,定义了管理终止任务的一些方法以及让任务返回Future的方法。
  • Executor,只定义了一个execute()方法,任务的提交执行。
  • AbortPolicy、DiscardOldestPolicy、DiscardPolicy、CallerRunsPolicy,ThreadPoolExecutor内部定义的拒绝策略类。
  • Worker,是实现线程池重要的内部类,其UML类图如下:

《ThreadPoolExecutor源码解析》 Worker

Worker类主要是运行任务的以及维护线程的中断控制状态,以及其他状态的记录。Worker类继承AbstractQueuedSynchronizer以简化每个任务执行时候的锁的获取和释放。

五. 源码分析

注意区分Worker和Taks,Worker是指工作线程,也就是用来执行任务的线程,Task是线程执行的任务(添加到队列中的就是任务)。

ThreadPoolExecutor关键变量

ThreadPoolExecutor使用一个原子integer(AtomicInteger)变量ctl(32位),来表示线程池的控制状态,这个状态值实际上由两部分组成:

  1. workerCount:有效的线程数,最大(2^29)-1,为ctl的低29位
  2. runState:线程池运行状态,保存在ctl高位3位。
//ctl初始化的时候状态为RUNNING,workerCount为0
 private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
 
    //workerCount位数
    private static final int COUNT_BITS = Integer.SIZE - 3;
    //workerCount容量,(2^29)-1
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState 的状态
    private static final int RUNNING    = -1 << COUNT_BITS;
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;


    // 获取runState
    private static int runStateOf(int c)     { return c & ~CAPACITY; }
    //获取workerCount
    private static int workerCountOf(int c)  { return c & CAPACITY; }
    private static int ctlOf(int rs, int wc) { return rs | wc; }

说一下上面线程池的几种状态:

  • RUNNING:可以接受新的任务,并且处理在队列中的任务。
  • SHUTDOWN:不再接受新的任务,但是会把队列中的任务处理完成。
  • STOP:不再接受新的任务,也不处理队列中的任务,并且正在处理的任务会被中断。
  • TIDYING:所有任务都终止了,线程要转换到TIDYING状态,需要运行terminated()钩子方法。
  • TERMINATED:terminated()执行完成,到达这个状态时候awaitTermination()方法返回。

在ThreadPoolExecutor类注释中提到状态之间的转换情况:

  • RUNNING -> SHUTDOWN:调用shutdown(),或者隐式调用finalize()。
  • (RUNNING or SHUTDOWN) -> STOP:调用shutdownNow()。
  • SHUTDOWN -> TIDYING:当线程池和队列都为空的时候。
  • STOP -> TIDYING:线程池为空的时候。
  • TIDYING -> TERMINATED:钩子方法terminated()执行完成。

execute(Runnable command)

execute(Runnable command),提交任务。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
       
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果线程池状态为Running(其他状态拒绝添加新任务到队列)将任务添加到队列中。
        if (isRunning(c) && workQueue.offer(command)) {
            //添加成功后进行recheck
            int recheck = ctl.get();
            //如果当前状态不是Running,从队列中移除任务。
            if (! isRunning(recheck) && remove(command))
                reject(command);
            //刚刚在队列中加入了任务,保证线程池中至少有一个工作线程可以处理任务。
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //
        else if (!addWorker(command, false))
            reject(command);
    }

分为3步来执行:

  1. 如果workerCount小于corePoolSize,将任务作为first Task新建线程来执行任务。
  2. 将任务添加到队列中,如果添加成功依然需要检查,在进入任务之前,如果线程池被关闭,那么将任务从队列中移除;如果当前线程池中没有工作线程,而刚刚在队列中加入了任务,要保证线程池中至少有一个工作线程可以处理任务。
  3. 如果不能将任务加入队列中,尝试新加一个线程来执行任务,但是并不定会成功,可能是线程池被shut down或者线程池达到了饱和(maximumPoolSize),如果失败了执行拒绝策略。

关于Worker的构造函数

在介绍addWorker(Runnable firstTask, boolean core)之前我们来看看Worker的构造函数。

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

Worker实现了Runnable,这里将其构造为Thread赋值给thread。第一步中的setState(-1),这一步很有意思,将AQS中的同步状态设置为-1,到后面我们会看到,线程池使用AQS中的同步状态来判断该工作线程是否可以被中断。-1:初始化值,此时工作线程还没有启动,也没有中断的必要;0:表示接受中断,此时工作线程为空闲状态;1:表示此时工作线程正在执行任务。

addWorker(Runnable firstTask, boolean core)

检查根据当前线程池的工作状态和给定的界限限制(corePoolSize 和maximumPoolSize)是否可以添加新的工作线程。 如果添加了新的工作线程,workerCount会相应调整,并且如果可能的话将firstTask作为其第一个任务运行。 如果线程池停止(Stop)或关闭(ShutDown),此方法返回false。如果线程工厂未能创建线程,它也会返回false。如果线程创建失败,无论是由于线程工厂返回null还是由于异常(通常是Thread.start()中的OutOfMemoryError),都会进行回滚。


/**
 *@Param core:如果为true把corePoolSize作为解析,否则把maximumPoolSize作为界限。
 *
 */
private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            //将下面的判断拆开来看。
            //rs >= SHUTDOWN,在这个前提下,需要满足:
            //! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())
            //上面的可以这样理解,只有在同时满足三个条件的情况下才会返回false,所以话句话说,只要有一个不满足,就会返回true
            //将上面的判断修改为 (rs != SHUTDOWN || firstTask != null ||  workQueue.isEmpty())
            //rs != SHUTDOWN:基于rs >= SHUTDOWN的前提下,当rs != SHUTDOWN,不再创建新的线程。
            //firstTask != null:基于rs >= SHUTDOWN的前提下,不再接受新的任务。
            //workQueue.isEmpty():任务队列为空,不用再创建新的工作线程来处理任务。
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                //检查工作线程数是否超过了界限,超过了返回false
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //使用CAS修改workerCount,加1,成功跳出循环
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                //+1操作失败,其他任务添加修改了workerCount值,继续循环。
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
            }
        }
        
        //工作线程是否启动
        boolean workerStarted = false;
        
        //工作线程是否添加成功
        boolean workerAdded = false;
        Worker w = null;
        try {
            //新加一个工作线程,Worker本身也实现了Runabble
            w = new Worker(firstTask);
            
            //这路获取到的其实是使用Worker生成的Thread
            final Thread t = w.thread;
            if (t != null) {
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();
                try {
                    
                    //在获取到锁之后再次检查状态
                    int rs = runStateOf(ctl.get());
                    
                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        
                        //添加到工作线程组中
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            //记录当前工作线程总数
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                //添加工作线程成功。
                if (workerAdded) {
                    //启动工作线程,其实执行的是Worker中的run()
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker中的run()

Worker的run()方法实际调用的是runWorker(this),启动工作线程(注意这里不是直接启动的任务),在工作线程中执行任务。

工作线程在这循环,反复的从队列中获取任务并执行它们。

  1. 启动了初始的任务,也就是最开始创建工作线程的时候的firstTask。否则,只要线程池还在运行中,使用getTask()从队列中获取任务来执行。如果改变了线程池的工作状态,或者工作参数,getTask()返回null,此时工作线程将退出。或者工作线程因为异常而退出,此时异常退出标记completedAbruptly为true,之后会通过processWorkerExit新建一个工作线程来替换它。
  2. 在运行所有任务之前,需要获取锁,来保证当任务在运行的时候不会被中断,除非线程池正在停止(Stop)
public void run() {
            runWorker(this);
        }
        
final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        //获取任务
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // 将AQS同步状态设置为0(初始值为-1),表示当前工作线程为闲置状态,可以被中断了。
        //异常退出标记。
        boolean completedAbruptly = true;
        try {
            //getTask()从队列中获取任务
            while (task != null || (task = getTask()) != null) {
                w.lock();
                //再次检查工作线程状态
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    //执行前的钩子方法
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        //注意区分和start(),调用run()并不会另启线程,而是在当前线程中执行,所以任务其实是在工作线程中执行的。
                        task.run();
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        //执行后的钩子方法
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            //工作线程退出后的操作。
            processWorkerExit(w, completedAbruptly);
        }
    }

getTask()

从队列中获取Task,大概分为以下几个步骤。

  1. 判断线程池以及队列的状态,如果线程池状态在STOP以上,此时线程池不处理队列中的任务;或者线程池处于SHUTDOWN但是队列为空(SHUTDOWN不再接受新的任务),workerCount减1,返回null,注意此时只是将变量减1,其实工作线程并没有终止真正的终止在 processWorkerExit(w, completedAbruptly);中。
  2. 如果通过了状态检查,判断是否要进行线程回收,如果需要workerCount数量减1,成功后返回null。
  3. 根据timed(timed表示需要进行超时闲置线程回收),选择是限时等待还是阻塞的方式从队列中获取任务。
private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // 如果线程池已经关闭(STOP状态不再处理队列中的任务),或者队列为空,workerCount减1,返回空。
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // allowCoreThreadTimeOut,是否允许回收核心线程。
            // timed表示需要进行超时闲置线程回收。
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;
            
            //1. (wc > maximumPoolSize || (timed && timedOut):如果工作线程数量大于maximumPoolSize,或者闲置线程超时。
            //2. (wc > 1 || workQueue.isEmpty()):队列不为空时,至少需要保留一个工作线程来处理队列中的任务
            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) {
                //workerCount数量减1,CAS操作,失败了会不断循环。
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try {
                //根据timed,选择是限时等待还是阻塞的方式从队列中获取任务。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                //workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) 一段时间拿不到返回null,表示超时。
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

说到getTask()顺便介绍几种队列:

  • SynchronousQueue:Executors.newCachedThreadPool()中使用的队列。与其他BlockingQueue不同,SynchronousQueue是一个不存储元素的BlockingQueue,每一个put操作必须要等待一个take操作,否则不能继续添加元素,反之亦然。所以使用这个队列,任务会直接提交给线程处理,所以一般会把maximumPoolSize指定成Integer.MAX_VALUE。
  • ArrayBlockingQueue:有界队列,队列可以限定长度,当添加新任务的时候,如果当前工作线程数量小于corePoolSize,则新建线程执行任务,否则如队列,以为队列是有限的,如果队列已经满了就会新建非核心线程执行任务,如果非核心线程也饱和了就会被拒绝,执行拒绝策略。
  • LinkedBlockingQueue(不设置预定值):无界队列,因为是无界队列,所以maximumPoolSize这个属性的设定失效,线程池中的工作线程数用于不会大于corePoolSize。

processWorkerExit(Worker w, boolean completedAbruptly)

工作线程退出后的操作,在这里进行线程池的终止以及工作线程的回收。

private void processWorkerExit(Worker w, boolean completedAbruptly) {
        //如果是异常退出,workerCount数量减1
        if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
            decrementWorkerCount();

        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            completedTaskCount += w.completedTasks;
            //从工作线程组中移除工作线程
            workers.remove(w);
        } finally {
            mainLock.unlock();
        }
        
        //尝试终止线程池
        tryTerminate();

        int c = ctl.get();
        if (runStateLessThan(c, STOP)) {
            //如果是异常退出,直接新加一个工作线程。
            if (!completedAbruptly) {
                //如何设置了allowCoreThreadTimeOut,最小工作线程数为0,否则为corePoolSize
                int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
                //如果最小工作线程数为0,而队列又不为空,说明至少要保留一个工作线程
                if (min == 0 && ! workQueue.isEmpty())
                    min = 1;
                //如果当前工作线程数还大于最小工作线程数,直接返回,不添加新的工作线程
                if (workerCountOf(c) >= min)
                    return; // replacement not needed
            }
            addWorker(null, false);
        }
    }

tryTerminate()

尝试终止线程池。

 final void tryTerminate() {
        for (;;) {
            int c = ctl.get();
            //以下三种情况表示不需要终止线程池。
            //1. isRunning(c):处于Running状态。
            //2. runStateAtLeast(c, TIDYING):TIDYING或者TERMINATED,已经在关闭了,不用重复关闭。
            //3. (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()):线程池处于SHUTDOWN状态,但是队列中还有任务没处理完。
            if (isRunning(c) ||
                runStateAtLeast(c, TIDYING) ||
                (runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))
                return;
            
            //如果还有工作线程,终止一个空闲的工作线程后退回,这个时候表示并不想终止线程池。
            if (workerCountOf(c) != 0) { // Eligible to terminate
                //只终止一个工作线程
                interruptIdleWorkers(ONLY_ONE);
                return;
            }
            
            //下面的操作是在终止线程池,修改线程池状态。
            final ReentrantLock mainLock = this.mainLock;
            mainLock.lock();
            try {
                //设置为TIDYING状态,调用terminated()钩子方法
                if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {
                    try {
                        terminated();
                    } finally {
                        //终止操作完成,设置状态为TERMINATED
                        ctl.set(ctlOf(TERMINATED, 0));
                        // 释放在termination条件上等待的所有线程
                        termination.signalAll();
                    }
                    return;
                }
            } finally {
                mainLock.unlock();
            }
            // CAS操作失败后不断的循环。
        }
    }

interruptIdleWorkers(boolean onlyOne)

中断闲置工作线程。重点是如何判断哪些是闲置的工作线程,并且可以被中断。满足 !t.isInterrupted() && w.tryLock() :

  1. !t.isInterrupted()表示没有被中断。
  2. w.tryLock()表示能获取到锁,获取到锁AQS的同步状态为0,表示工作线程闲置。
//tru表示只中断一个线程
private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;
                //如果没有被中断,并且能成功获取到锁(获取到锁AQS的同步状态为0,表示工作线程闲置),那么就进行中断,不能获取到锁表示队列中有任务正在执行。
                if (!t.isInterrupted() && w.tryLock()) {
                    try {
                        //进行中断操作。
                        t.interrupt();
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                //如果只中断一个,中断一个跳出。
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }

shutdown()

调用shutdown(),拒绝接受新的任务加入,会将正在执行的任务以及等待执行的任务执行完成,终止所有的闲置工作线程。

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //检查是否有Shutdown权限
            checkShutdownAccess();
            //设置线程池状态为SHUTDOWN
            advanceRunState(SHUTDOWN);
            //关闭闲置的工作线程,最后调用的是interruptIdleWorkers(false)
            interruptIdleWorkers();
            //钩子方法
            onShutdown(); // hook for ScheduledThreadPoolExecutor
        } finally {
            mainLock.unlock();
        }
        //终止
        tryTerminate();
    }

shutdownNow()

调用shutdownNow()会尝试停止所有执行的任务,也不会对队列中的等待任务进行处理,并返回等待执行的任务列表,它会终止所有的工作线程,与shutdown()不同,shutdown()只会终止所有的空闲工作线程。

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            //检查是否有Shutdown权限
            checkShutdownAccess();
            //设置线程池状态为STOP
            advanceRunState(STOP);
            //终止所有工作线程
            interruptWorkers();
            //获取等待执行的任务列表
            tasks = drainQueue();
        } finally {
            mainLock.unlock();
        }
        //尝试终止线程池。
        tryTerminate();
        return tasks;
    }

interruptWorkers() 以及 interruptIfStarted()

终止所有的工作线程。主要看看interruptIfStarted()中是如何判断哪些工作线程是可以被终止的,满足 getState() >= 0 && (t = thread) != null && !t.isInterrupted() :

  1. getState() >= 0 获取CAS同步状态,上面说到同步状态初始化为-1,-1表示不可以被中断(可以看做当前工作线程没有启动),0表示闲置工作线程,1表示正在执行任务的工作线程。
  2. (t = thread) != null 这个不用说了,为空还终止个啥。
  3. !t.isInterrupted(),没有终止,避免重复调用。
private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
    }
    
        void interruptIfStarted() {
            Thread t;
            //getState()用于判断线程是否启动,具体看上面的解释。
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }
    原文作者:BrightLoong
    原文地址: https://www.jianshu.com/p/d6634246e1b1
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞