java线程池源码分析

我们在关闭线程池的时候会使用shutdown()和shutdownNow(),那么问题来了:

  1. 这两个方法又什么区别呢?
  2. 他们背后的原理是什么呢?
  3. 线程池中线程超过了coresize后会怎么操作呢?

为了解决这些疑问我们需要分析java线程池的原理。

1 基本使用

1.1 继承关系

平常我们在创建线程池经常使用的方式如下:

ExecutorService executorService = Executors.newFixedThreadPool(5);

看下newFixedThreadPool源码, 其实Executors是个工厂类,内部是new了一个ThreadPoolExecuto:

public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
}

参数的意义就不介绍了,网上有很多内容,看源码注释也可以明白。

线程池中类的继承关系如下:

《java线程池源码分析》

2 源码分析

2.1 入口

将一个Runnable放到线程池执行有两种方式,一个是调用ThreadPoolExecutor#submit,一个是调用ThreadPoolExecutor#execute。其实submit是将Runnable封装成了一个RunnableFuture,然后再调用execute,最终调用的还是execute,所以我们这里就只从ThreadPoolExecutor#execute开始分析。

2.2 ctl和线程池状态

ThreadPoolExecutor中有个重要的属性是ctl

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0)); // 高3位表示状态,低29位表示线程池中线程的多少
    private static final int COUNT_BITS = Integer.SIZE - 3; // 32-3 = 29
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1; // 左移29为减1,即最终得到为高3位为0,低29位为1的数字,作为掩码,是二进制运算中常用的方法

    private static final int RUNNING    = -1 << COUNT_BITS; // 高三位111
    private static final int SHUTDOWN   =  0 << COUNT_BITS; // 高三位000
    private static final int STOP       =  1 << COUNT_BITS; // 高三位001
    private static final int TIDYING    =  2 << COUNT_BITS; // 高三位010
    private static final int TERMINATED =  3 << COUNT_BITS; // 高三位011

    // Packing and unpacking ctl
    private static int runStateOf(int c)     { return c & ~CAPACITY; } // 保留高3位,即计算线程池状态
    private static int workerCountOf(int c)  { return c & CAPACITY; } // 保留低29位, 即计算线程数量
    private static int ctlOf(int rs, int wc) { return rs | wc; } // 求ctl

ThreadPoolExecutor中使用32位Integer来表示线程池的状态和线程的数量,其中高3位表示状态,低29位表示数量。如果对二进制运行不熟悉可以参考:二进制运算。从上也可以看出线程池有五种状态,我们关心前3中状态

  1. RUNNING 接收task和处理queue中的task
  2. SHUTDOWN 不再接收新的task,但是会处理完正在运行的task和queue中的task,不会interrupt正在执行的task,其实调用shutdown后线程池处于该状态
  3. STOP 不再接收新的task,也不处理queue中的task,同时正在运行的线程会被interrupt。调用shutdownNow后线程池会处于该状态。

2.3 execute

明白了ctl和线程池的状态后我们来具体看下execute的处理逻辑

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // 线程数量小于coresize,那么就调用addWorker
            if (addWorker(command, true)) // 这里知道,返回true就不往下走了
                return;
            c = ctl.get();
        }

        // 不满足上述条件,即线程数量 >= coreSize,或者addWorker返回fasle,那么走下面的逻辑
        if (isRunning(c) && workQueue.offer(command)) { // 可以看到是往blockingqueue中放task
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        // 如果不满足上述条件,即blockingqueue也放不进去,那么就走下面的逻辑
        else if (!addWorker(command, false))
            reject(command);
    }

从上面的代码我们可以看到线程池处理线程的基本思路是: 如果线程数量小于coresize那么就执行task,否则就放到queue中,如果queue也放不下就走下面addWorker,如果也失败了,那么就调用reject策略。当然还涉及一些细节,需要进一步分析。

2.4 addWorker

execute中反复调用的是addWorker

private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) { 
            int c = ctl.get();
            int rs = runStateOf(c); // 计算线程池状态

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&  // 先忽略
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c); // 线程数量
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize)) // 可见如果超过了运行的最大线程数量则返回false
                    return false;
                if (compareAndIncrementWorkerCount(c)) // 如果成功,线程数量肯定加1
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask); // 将task封装成了Worker
            final Thread t = w.thread; // 来获取worker的thread
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w); // 将worker添加到hashset中报存,关闭的时候要使用
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) { // 经过一些检查, 启动了work的thread
                    t.start();
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w); // 如果线程启动失败,则将线程数减1
        }
        return workerStarted;
    }

上面的代码看起来比较复杂,但是如果我们忽略具体的细节,从大致思路上看,其实也比较简单。上面代码的主要思路就是:除了一些状态检查外,首先将线程数量加1,然后将runnable分装成一个worker,去启动worker线程,如果启动失败则再将线程数量减1。返回false的原因可能是线程数量大于允许的数量。所以addWorker调用成功,则会启动一个work线程,且线程池中线程数量加1

2.5 worker

woker是线程池中真正的线程实体。线程池中的线程不是自定义的Runnable实现的线程,而是woker线程,worker在run方法里调用了自定义的Runnable的run方法

Worker继承了AQS,并实现了runnable接口:

private final class Worker extends AbstractQueuedSynchronizer implements Runnable {
    Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this); // 这个时候回头看看addWorker中t.start(), 就明白了启动的实际是一个Woker线程,而不是用户定义的Runnable
    }

    public void run() {
        runWorker(this);
    }
}

Worker中firstTask存储了用户定义的Runnable,thread是以他自身为参数的Thread对象。getThreadFactory()默认返回是Executors#DefaultThreadFactory,用来新建线程,并定义了线程名称的前缀等:

static class DefaultThreadFactory implements ThreadFactory {
        private static final AtomicInteger poolNumber = new AtomicInteger(1);
        private final ThreadGroup group;
        private final AtomicInteger threadNumber = new AtomicInteger(1);
        private final String namePrefix;

        DefaultThreadFactory() {
            SecurityManager s = System.getSecurityManager();
            group = (s != null) ? s.getThreadGroup() :
                                  Thread.currentThread().getThreadGroup();
            namePrefix = "pool-" +
                          poolNumber.getAndIncrement() +
                         "-thread-"; //
        }

        public Thread newThread(Runnable r) { //  调用后新建一个线程
            Thread t = new Thread(group, r,
                                  namePrefix + threadNumber.getAndIncrement(),
                                  0);
            if (t.isDaemon())
                t.setDaemon(false);
            if (t.getPriority() != Thread.NORM_PRIORITY)
                t.setPriority(Thread.NORM_PRIORITY);
            return t;
        }
    }

2.6 runWoker

Worker的run方法调用了runWorker,并将自身作为参数传了进去,下面看看问题的关键:runWorker:

final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) { // 注意这里的while循环,这里很关键。这里注意,如果两个条件都满足了,那么线程就结束了
                w.lock(); // 注意worker继承了AQS,相当于自己实现了锁,这个在关闭线程的时候有用
            
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try {
                        task.run(); // 仅仅是回调了Runnable的run方法
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);
                    }
                } finally {
                    task = null; // 重点,task执行完后就被置位null
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly); // 注意while循环结束后worker线程就结束了
        }
    }

runWorker中有个while循环,while中判断条件为(task != null || (task = getTask()) != null)。假设我们按照正常的逻辑,即task != null,则会调用task.run方法,执行完run方法后然后在finally中task被置为null;接着又进入while循环判断,这次task == null,所以不符合第一个判断条件,则会继续判断 task == getTask()) != null。我们来看下getTask做了什么。

2.7 getTask

private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) { // 死循环
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) { // 通过死循环设置状态
                int wc = workerCountOf(c);

                // 设置允许core线程timeout或者线程数量大于coresize,则允许线程超时
                timed = allowCoreThreadTimeOut || wc > corePoolSize;

                // 如果线程数量 <= 最大线程数 且 没有超时和允许超时 则跳出死循环
                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {

                // 这里是关键,如果允许超时则调用poll从queue中取出task,否则就调用take可阻塞的获取task
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null) // 获取到task则返回,然后runWorker的while循环就继续执行,并调用task的run方法
                    return r;
                timedOut = true; // 否则设置为timeOut,继续循环,但是下次循环会走到if (compareAndDecrementWorkerCount(c)) 处,并返回null。
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

忽略掉具体细节,getTask的整体思路是: 从blockqueue中拿去task,如果queue中没有task则分两种情况:

  1. 如果允许超时则调用poll(keepAliveTime, TimeUnit.NANOSECONDS),在规定时间没有返回了则getTask返回null,runWorker结束while循环,work线程结束。当线程数量大于coresize且blockqueue满的时候且小于maxsize的时候,新创建的线程便是走这个逻辑;或者允许core线程超时的时候也是走这个逻辑
  2. 如果不允许超时,则会一直阻塞直到blockqueue中有了新的task。take方法阻塞则表示worker线程也阻塞,也就是在没有task执行的情况下,worker线程便会阻塞等待。core线程走的就是这个逻辑。

这个时候回头再看下runWorker,如果task != null,那么就会执行task的run方法,执行完后task就会为被置为null,再次进入while循环执行getTask阻塞在这里了。通过这种方式保留住了线程。如果while循环结束了,那么worker线程也就结束了。

2.8 再看addWorker


分析到这里我们再来看下addWoker。addWorker可以将第一个参数设置为null。例如ThreadPoolExecutor#prestartAllCoreThreads:

public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true)) // addWorker第一个参数是null
            ++n;
        return n;
    }

经过前面的分析,我们知道addWoker用来启动一个worker线程,worker线程调用runWorker来执行,而runWorker中有个while循环,判断条件是(task != null || (task = getTask()) != null)。因为我们传入的task为null,所以就会判断task = getTask()) != null,而getTask就是去blockqueue中拿去数据,如果没有任务就会阻塞住。这个时候就是一个阻塞的线程在等待task的到来了。所以传入参数为null表示创建一个空的线程,什么都不执行。

2.9 再看execute

已经知道了线程池内部的大概工作情况,我们再来看下如果所有core线程都创建好了且处于空置状态,这个时候新放入一个线程的执行流程。

public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { // core线程都创建好了,所以判断条件不满足
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }

        // 会走到这里,会通过offer往blockingqueue里放置一个task。这个时候阻塞的core线程会通过blockingqueue的take拿到task执行,类似一个生产者消费者的情况
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }

        // 如果blockingqueue添加失败,则创建线程直到maxsize
        else if (!addWorker(command, false))
            reject(command);
    }

可见,线程和execute通过blockingqueue来通信,而不是其他方式,execute往blockingqueue中放置task,线程通过take来获取。整体线程池的逻辑如下图

《java线程池源码分析》

2.10 shutdown

这个时候我们终于可以来看看shutdown和shutdownNow了

看下shutdown

public void shutdown() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(SHUTDOWN);
            interruptIdleWorkers(); // 重点
            onShutdown(); // 什么都没做
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
}

private void interruptIdleWorkers() {
        interruptIdleWorkers(false);
}

private void interruptIdleWorkers(boolean onlyOne) {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers) {
                Thread t = w.thread;

                // 线程没有中断 且 获取到worker的锁
                if (!t.isInterrupted() && w.tryLock()) { 
                    try {
                        t.interrupt(); // 调用interrup,中断线程
                    } catch (SecurityException ignore) {
                    } finally {
                        w.unlock();
                    }
                }
                if (onlyOne)
                    break;
            }
        } finally {
            mainLock.unlock();
        }
    }
  1. shutdown的核心方法在interruptIdleWorkers里,这里可以看到在t.interrupt的时候有个判断添加,一个是线程没有设置中断标记,第二个是获取到worker的锁,我们注意下第二个条件。回头看下runWorker,while中执行task的run方法的时候,会先获取到worker线程的锁,所以如果线程正在执行task的run方法,则shutdown的时候会获取锁失败,也就不会中断线程了。这里可以得出结论:shutdown不会中断正在执行的线程
  2. 如果blockingqueu中有task还没执行完呢? 这个时候while中的take并不会阻塞,也不会被中断,shutdown中也没有清空blockingqueue的操作。所以可以得出结论:shutdown会等blockingqueue中的task执行完成再关闭。可以说shutdown是一种比较温柔的关闭方式了。
  3. 如果core线程都阻塞在take方法上了,即没有正在执行的task了,那么这个时候 t.interrupt则会中断take方法,worker线程的while循环结束,worker线程结束。当所有的worker线程都结束后线程池就关闭了

总结下就是: shutdown会把它被调用前放到线程池中的task全部执行完。

2.11 shutdownNow

再来看下shutdownNow

public List<Runnable> shutdownNow() {
        List<Runnable> tasks;
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            checkShutdownAccess();
            advanceRunState(STOP);
            interruptWorkers(); // 重点
            tasks = drainQueue(); // 重点
        } finally {
            mainLock.unlock();
        }
        tryTerminate();
        return tasks;
}

private void interruptWorkers() {
        final ReentrantLock mainLock = this.mainLock;
        mainLock.lock();
        try {
            for (Worker w : workers)
                w.interruptIfStarted();
        } finally {
            mainLock.unlock();
        }
}

void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { // 没有去获取woker的锁
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
}

private List<Runnable> drainQueue() {
        BlockingQueue<Runnable> q = workQueue;
        List<Runnable> taskList = new ArrayList<Runnable>();
        q.drainTo(taskList); // 将blockingqueue中的task清空
        if (!q.isEmpty()) {
            for (Runnable r : q.toArray(new Runnable[0])) {
                if (q.remove(r))
                    taskList.add(r);
            }
        }
        return taskList;
    }

从上面的代码可以看出:

  1. shutdownNow不会去获取worker的锁,所以shutdownNow会导致正在运行的task也被中断
  2. shutdownNow会将blockingqueue中的task清空,所以在blockingqueue中的task也不会被执行

总结就是shutdownNow比较粗暴,调用他后,他会将所有之前提交的任务都interrupt,且将blockingqueue中的task清空

另外就是不论是shutdown还是shutdownNow都是调用Thread的interrupt()方法。如果task不响应中断或者忽略中断标记,那么这个线程就不会被终止。例如在run中执行以下逻辑

poolExecutor.execute(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    System.out.println("b");
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e) {
                        System.out.printf("不处理"); // 忽略中断
                    }
                }

            }
        });

运行结果是,即使调用了shutdownNow也终止不了线程运行

b
0
不处理b
b
b
b
b
....

3 总结

  1. 线程通过while循环不停的从blockingqueue中获取task来保留线程,避免重复重建线程

4 参考

  1. https://blog.csdn.net/cleverGump/article/details/50688008
  2. https://zhuanlan.zhihu.com/p/30108890
    原文作者:java源码分析
    原文地址: https://www.cnblogs.com/set-cookie/p/9723868.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞