java thread reuse(good)


I have always read that creating threads is expensive. I also know that you cannot rerun a thread.

I see in the doc of Executors class: Creates a thread pool that creates new threads as needed, but will reuse previously constructed threads when they are available.

Mind the word ‘reuse’.

How do thread pools ‘reuse’ threads?


The thread pool consists of a number of fixed worker threads that can take tasks from an internal task queue.
So if one task ends, the thread does not end but waits for the next task. If you abort a thread, it is automatically replaced.

Look at the documentation for more details.

From Thread.start() Javadoc:

 * Causes this thread to begin execution; the Java Virtual Machine 
 * calls the <code>run</code> method of this thread.

BUT then inside each Thread’s run() method Runnable shall be dequeued and the run() method of each Runnable is going to be called. So each thread can process several Runnable. That’s what they refer to by “thread reuse”.

One way to do your own thread pool is to use a blocking queue on to which you enqueue runnables and have each of your thread, once it’s done processing the run() method of a Runnable, dequeue the next Runnable (or block) and run its run() method, then rinse and repeat.

I guess part of the confusion (and it is a bit confusing) comes from the fact that a Thread takes a Runnable and upon calling start() the Runnable ‘s run() method is called while the default thread pools also take Runnable.



Runnable getTask() {
for (;;) {
try {
int state = runState;
if (state > SHUTDOWN)
return null;
Runnable r;
if (state == SHUTDOWN) // Help drain queue
r = workQueue.poll();
else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
r = workQueue.take();
if (r != null)
return r;
if (workerCanExit()) {
if (runState >= SHUTDOWN) // Wake up others
return null;
// Else retry
} catch (InterruptedException ie) {
// On interruption, re-check runState
回到execute方法 ,execute 方法部分实现:

if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated


private void ensureQueuedTaskHandled(Runnable command) {
final ReentrantLock mainLock = this.mainLock;
boolean reject = false;
Thread t = null;
try {
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
t = addThread(null);
} finally {
if (reject)
else if (t != null)
ensureQueuedTaskHandled方法判断线程池运行,如果状态不为运行状态,从workQueue中删除, 并调用reject做拒绝处理。
void reject(Runnable command) {
handler.rejectedExecution(command, this);


if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
如线程池workQueue offer失败或不处于运行状态,调用addIfUnderMaximumPoolSize,addIfUnderMaximumPoolSize方法基本和addIfUnderCorePoolSize实现类似,不同点在于根据最大线程数(maximumPoolSize)进行比较,如果超过最大线程数,返回false,调用reject方法,下面是addIfUnderMaximumPoolSize方法实现:

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
if (t == null)
return false;
return true;

3. 添加任务处理流程
如果当前线程池中的数量等于corePoolSize,并线程池处于Running状态,缓冲队列 workQueue未满,那么任务被放入缓冲队列、等待任务调度执行。





  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. ……….
  3. private void ensureQueuedTaskHandled(Runnable command) {
  4. final ReentrantLock mainLock = this.mainLock;
  5. mainLock.lock();
  6. boolean reject = false;
  7. Thread t = null;
  8. try {
  9. int state = runState;
  10. if (state != RUNNING && workQueue.remove(command))
  11. reject = true;
  12. else if (state < STOP &&
  13. poolSize < Math.max(corePoolSize, 1) &&
  14. !workQueue.isEmpty())
  15. t = addThread(null);
  16. } finally {
  17. mainLock.unlock();
  18. }
  19. if (reject)
  20. reject(command);
  21. else if (t != null)
  22. t.start();
  23. }
  24. ……….
  25. }



  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. ……….
  3. private Thread addThread(Runnable firstTask) {
  4. Worker w = new Worker(firstTask);
  5. Thread t = threadFactory.newThread(w);
  6. if (t != null) {
  7. w.thread = t;
  8. workers.add(w);
  9. int nt = ++poolSize;
  10. if (nt > largestPoolSize)
  11. largestPoolSize = nt;
  12. }
  13. return t;
  14. }
  15. ……….
  16. }

1、Worker w = new Worker(firstTask)
2、Thread t = threadFactory.newThread(w)


  1. public class ThreadPoolExecutor extends AbstractExecutorService {
  2. ……….
  3. private final class Worker implements Runnable {
  4. ……….
  5. Worker(Runnable firstTask) {
  6. this.firstTask = firstTask;
  7. }
  8. private Runnable firstTask;
  9. ……….
  10. public void run() {
  11. try {
  12. Runnable task = firstTask;
  13. firstTask = null;
  14. while (task != null || (task = getTask()) != null) {
  15. runTask(task);
  16. task = null;
  17. }
  18. } finally {
  19. workerDone(this);
  20. }
  21. }
  22. }
  23. Runnable getTask() {
  24. for (;;) {
  25. try {
  26. int state = runState;
  27. if (state > SHUTDOWN)
  28. return null;
  29. Runnable r;
  30. if (state == SHUTDOWN) // Help drain queue
  31. r = workQueue.poll();
  32. else if (poolSize > corePoolSize || allowCoreThreadTimeOut)
  33. r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);
  34. else
  35. r = workQueue.take();
  36. if (r != null)
  37. return r;
  38. if (workerCanExit()) {
  39. if (runState >= SHUTDOWN) // Wake up others
  40. interruptIdleWorkers();
  41. return null;
  42. }
  43. // Else retry
  44. } catch (InterruptedException ie) {
  45. // On interruption, re-check runState
  46. }
  47. }
  48. }
  49. }
  50. ……….
  51. }



  1. public class Executors {
  2. ……….
  3. static class DefaultThreadFactory implements ThreadFactory {
  4. ……….
  5. public Thread newThread(Runnable r) {
  6. Thread t = new Thread(group, r,
  7. namePrefix + threadNumber.getAndIncrement(),
  8. 0);
  9. if (t.isDaemon())
  10. t.setDaemon(false);
  11. if (t.getPriority() != Thread.NORM_PRIORITY)
  12. t.setPriority(Thread.NORM_PRIORITY);
  13. return t;
  14. }
  15. ……….
  16. }
  17. ……….
  18. }


之前我们提到了t.start()这个方法执行了线程。那么现在从头顺一下,看看到底是执行了谁的run方法。首先知道,t=addThread(null),而addThread内部执行了下面三步,Worker w = new Worker(null);Thread t = threadFactory.newThread(w);return t;这里两个t是一致的。




  1. private boolean addIfUnderCorePoolSize(Runnable firstTask) {  
  2.        Thread t = null;  
  3.        final ReentrantLock mainLock = this.mainLock;  
  4.        mainLock.lock();  
  5.        try {  
  6.         //poolSize < corePoolSize 即当前工作线程的数量一定要小于你设置的线程最大数量  
  7.         //CachedThreadPool永远也不会进入该方法,因为它的corePoolSize初始为0  
  8.            if (poolSize < corePoolSize && runState == RUNNING)  
  9.                t = addThread(firstTask);  
  10.        } finally {  
  11.            mainLock.unlock();  
  12.        }  
  13.        if (t == null)  
  14.            return false;  
  15.        t.start();   //线程执行了  
  16.        return true;  
  17.    }  

    看’t.start()’,这表示工作线程启动了,工作线程t启动的前提条件是’t = addThread(firstTask); ‘返回值t必须不为null。好了,现在想看看java线程池中工作线程是怎么样的吗?请看addThread方法: 


  1. private Thread addThread(Runnable firstTask) {  
  2.     //Worker就是典型的工作线程,所以的核心线程都在工作线程中执行  
  3.        Worker w = new Worker(firstTask);  
  4.        //采用默认的线程工厂生产出一线程。注意就是设置一些线程的默认属性,如优先级、是否为后台线程等  
  5.        Thread t = threadFactory.newThread(w);   
  6.        if (t != null) {  
  7.            w.thread = t;  
  8.            workers.add(w);  
  9.          //没生成一个工作线程 poolSize加1,但poolSize等于最大线程数corePoolSize时,则不能再生成工作线程  
  10.            int nt = ++poolSize;    
  11.            if (nt > largestPoolSize)  
  12.                largestPoolSize = nt;  
  13.        }  
  14.        return t;  
  15.    }  




  1. public void run() {  
  2.             try {  
  3.                 Runnable task = firstTask;  
  4.                 firstTask = null;  
  5.                 /** 
  6.                  * 注意这段while循环的执行逻辑,没执行完一个核心线程后,就会去线程池 
  7.                  * 队列中取下一个核心线程,如取出的核心线程为null,则当前工作线程终止 
  8.                  */  
  9.                 while (task != null || (task = getTask()) != null) {  
  10.                     runTask(task);  //你所提交的核心线程(任务)的运行逻辑  
  11.                     task = null;  
  12.                 }  
  13.             } finally {  
  14.                 workerDone(this); // 当前工作线程退出  
  15.             }  
  16.         }  
  17.     }  




  1. Runnable getTask() {  
  2.         for (;;) {  
  3.             try {  
  4.                 int state = runState;  
  5.                 if (state > SHUTDOWN)    
  6.                     return null;  
  7.                 Runnable r;  
  8.                 if (state == SHUTDOWN)  //帮助清空队列  
  9.                     r = workQueue.poll();  
  10.                /* 
  11.                 * 对于条件1,如果可以超时,则在等待keepAliveTime时间后,则返回一null对象,这时就 
  12.                 *  销毁该工作线程,这就是CachedThreadPool为什么能回收空闲线程的原因了。 
  13.                 * 注意以下几点:1.这种功能情况一般不可能在fixedThreadPool中出现 
  14.                 *            2.在使用CachedThreadPool时,条件1一般总是成立,因为CachedThreadPool的corePoolSize 
  15.                 *              初始为0 
  16.                 */  
  17.                 else if (poolSize > corePoolSize || allowCoreThreadTimeOut)  //——————条件1  
  18.                     r = workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS);    
  19.                 else  
  20.                     r = workQueue.take();       //如果队列不存在任何元素 则一直等待。 FiexedThreadPool典型模式———-条件2  
  21.                 if (r != null)  
  22.                     return r;  
  23.                 if (workerCanExit()) {       //————————–条件3  
  24.                     if (runState >= SHUTDOWN) // Wake up others  
  25.                         interruptIdleWorkers();  
  26.                     return null;  
  27.                 }  
  28.                 // Else retry  
  29.             } catch (InterruptedException ie) {  
  30.                 // On interruption, re-check runState  
  31.             }  
  32.         }  
  33.     }  





  1. /** 
  2.     * 工作线程退出要处理的逻辑 
  3.     * @param w 
  4.     */  
  5.    void workerDone(Worker w) {  
  6.        final ReentrantLock mainLock = this.mainLock;  
  7.        mainLock.lock();  
  8.        try {  
  9.            completedTaskCount += w.completedTasks;   
  10.            workers.remove(w);  //从工作线程缓存中删除  
  11.            if (–poolSize == 0) //poolSize减一,这时其实又可以创建工作线程了  
  12.                tryTerminate(); //尝试终止  
  13.        } finally {  
  14.            mainLock.unlock();  
  15.        }  
  16.    }  




    1. private void tryTerminate() {  
    2.     //终止的前提条件就是线程池里已经没有工作线程(Worker)了  
    3.        if (poolSize == 0) {  
    4.            int state = runState;  
    5.            /** 
    6.             * 如果当前已经没有了工作线程(Worker),但是线程队列里还有等待的线程任务,则创建一个 
    7.             * 工作线程来执行线程队列中等待的任务 
    8.             */  
    9.            if (state < STOP && !workQueue.isEmpty()) {      
    10.                state = RUNNING; // disable termination check below  
    11.                Thread t = addThread(null);  
    12.                if (t != null)  
    13.                    t.start();  
    14.            }  
    15.            //设置池状态为终止状态  
    16.            if (state == STOP || state == SHUTDOWN) {  
    17.                runState = TERMINATED;  
    18.                termination.signalAll();   
    19.                terminated();   
    20.            }  
    21.        }  
    22.    }



0.    ThreadPoolExecutor类的声明属性变量分析

1 public class ThreadPoolExecutor extends AbstractExecutorService



  • mainLock 对整个ThreadPoolExecutor对象的锁
  • workers  存储工作线程对应Worker对象的HashSet
  • termination 线程池ThreadPoolExecutor对象的生命周期终止条件,和mainLock相关
  • largestPoolSize 线程池跑过的最大线程数
  • completedTaskCount 完成任务数
  • ctl 执行器ThreadPoolExecutor的生命周期状态和活动状态的worker数封装

稍微需要说一下最后一个, ctl是一个AtomicInteger对象,以位运算的方式打包封装了当前线程池ThreadPoolExecutor对象的状态和活动线程数两个数据

1.    执行器状态



  • RUNNING 在ThreadPoolExecutor被实例化的时候就是这个状态
  • SHUTDOWN 通常是已经执行过shutdown()方法,不再接受新任务,等待线程池中和队列中任务完成
  • STOP 通常是已经执行过shutdownNow()方法,不接受新任务,队列中的任务也不再执行,并尝试终止线程池中的线程
  • TIDYING 线程池为空,就会到达这个状态,执行terminated()方法
  • TERMINATED terminated()执行完毕,就会到达这个状态,ThreadPoolExecutor终结

2.    Worker内部类

它既实现了Runnable,同时也是一个AQS ( AbstractQueuedSynchronizer )。

1 2 3 private final class Worker extends AbstractQueuedSynchronizer implements Runnable


1 2 3 final Thread thread; Runnable firstTask; volatile long completedTasks;

这个类还提供了interruptIfStarted()这样一个方法,里面做了(getState()>= 0)的判断。与此呼应,Worker的构造方法里对state设置了-1,避免在线程执行前被停掉。

1 2 3 4 5 Worker(Runnable firstTask) {      setState(- 1 ); // inhibit interrupts until runWorker      this .firstTask = firstTask;      this .thread = getThreadFactory().newThread( this ); }

3. 提交任务

上篇文章已经提到了,提交新任务的时候,如果没达到核心线程数corePoolSize,则开辟新线程执行。如果达到核心线程数corePoolSize, 而队列未满,则放入队列,否则开新线程处理任务,直到maximumPoolSize,超出则丢弃处理。


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 public void execute(Runnable command) {      if (command == null )          throw new NullPointerException();        int c = ctl.get();      if (workerCountOf(c) < corePoolSize) {          if (addWorker(command, true ))              return ;          c = ctl.get();      }      if (isRunning(c) && workQueue.offer(command)) {          int recheck = ctl.get();          if (! isRunning(recheck) && remove(command))              reject(command);          else if (workerCountOf(recheck) == 0 )              addWorker( null , false );      }      else if (!addWorker(command, false ))          reject(command); }

4. addWorker()的实现


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 private boolean addWorker(Runnable firstTask, boolean core) {      retry:      for (;;) {          int c = ctl.get();          int rs = runStateOf(c);            // Check if queue empty only if necessary.          if (rs >= SHUTDOWN &&              ! (rs == SHUTDOWN &&                 firstTask == null &&                 ! workQueue.isEmpty()))              return false ;            for (;;) {              int wc = workerCountOf(c);              if (wc >= CAPACITY ||                  wc >= (core ? corePoolSize : maximumPoolSize))                  return false ;              if (compareAndIncrementWorkerCount(c))                  break retry;              c = ctl.get();  // Re-read ctl              if (runStateOf(c) != rs)                  continue retry;              // else CAS failed due to workerCount change; retry inner loop          }      }        boolean workerStarted = false ;      boolean workerAdded = false ;      Worker w = null ;      try {          final ReentrantLock mainLock = this .mainLock;          w = new Worker(firstTask);          final Thread t = w.thread;          if (t != null ) {              mainLock.lock();              try {                  // Recheck while holding lock.                  // Back out on ThreadFactory failure or if                  // shut down before lock acquired.                  int c = ctl.get();                  int rs = runStateOf(c);                    if (rs < SHUTDOWN ||                      (rs == SHUTDOWN && firstTask == null )) {                      if (t.isAlive()) // precheck that t is startable                          throw new IllegalThreadStateException();                      workers.add(w);                      int s = workers.size();                      if (s > largestPoolSize)                          largestPoolSize = s;                      workerAdded = true ;                  }              } finally {                  mainLock.unlock();              }              if (workerAdded) {                  t.start();                  workerStarted = true ;              }          }      } finally {          if (! workerStarted)              addWorkerFailed(w);      }      return workerStarted; }




5. 任务的执行runWorker()

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 final void runWorker(Worker w) {          Thread wt = Thread.currentThread();          Runnable task = w.firstTask;          w.firstTask = null ;          w.unlock(); // allow interrupts          boolean completedAbruptly = true ;          try {              while (task != null || (task = getTask()) != null ) {                  w.lock();                  // If pool is stopping, ensure thread is interrupted;                  // if not, ensure thread is not interrupted.  This                  // requires a recheck in second case to deal with                  // shutdownNow race while clearing interrupt                  if ((runStateAtLeast(ctl.get(), STOP) ||                       (Thread.interrupted() &&                        runStateAtLeast(ctl.get(), STOP))) &&                      !wt.isInterrupted())                      wt.interrupt();                  try {                      beforeExecute(wt, task);                      Throwable thrown = null ;                      try {                ;                      } catch (RuntimeException x) {                          thrown = x; throw x;                      } catch (Error x) {                          thrown = x; throw x;                      } catch (Throwable x) {                          thrown = x; throw new Error(x);                      } finally {                          afterExecute(task, thrown);                      }                  } finally {                      task = null ;                      w.completedTasks++;                      w.unlock();                  }              }              completedAbruptly = false ;          } finally {              processWorkerExit(w, completedAbruptly);          }      }


  • 线程开始执行前,需要对worker加锁,完成一个任务后执行unlock()
  • 在任务执行前后,执行beforeExecute()和afterExecute()方法
  • 记录任务执行中的异常后,继续抛出
  • 每个任务完成后,会记录当前线程完成的任务数
  • 当worker执行完一个任务的时候,包括初始任务firstTask,会调用getTask()继续获取任务,这个方法调用是可以阻塞的
  • 线程退出,执行processWorkerExit(w, completedAbruptly)处理

5. Worker线程的复用和任务的获取getTask()


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 private Runnable getTask() {          boolean timedOut = false ; // Did the last poll() time out?            retry:          for (;;) {              int c = ctl.get();              int rs = runStateOf(c);                // Check if queue empty only if necessary.              if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {                  decrementWorkerCount();                  return null ;              }                boolean timed;      // Are workers subject to culling?                for (;;) {                  int wc = workerCountOf(c);                  timed = allowCoreThreadTimeOut || wc > corePoolSize;                    if (wc <= maximumPoolSize && ! (timedOut && timed))                       break ;                  if (compareAndDecrementWorkerCount(c))                       return null ;                  c = ctl.get();                  // Re-read ctl                  if (runStateOf(c) != rs)                       continue retry;                  // else CAS failed due to workerCount change; retry inner loop               }               try {                   Runnable r = timed ?                       workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :                       workQueue.take();                   if (r != null )                       return r;                   timedOut = true ;               } catch (InterruptedException retry) {                   timedOut = false ;               }           }       }

getTask()实际上是从工作队列(workQueue)中取提交进来的任务。这个workQueue是一个BlockingQueue,通常当队列中没有新任务的时候,则getTask()会阻塞。另外,还有定时阻塞这样一段逻辑:如果从队列中取任务是计时的,则用poll()方法,并设置等待时间为keepAlive,否则调用阻塞方法take()。当poll()超时,则获取到的任务为null,timeOut设置为 true。这段代码也是放在一个for(;;)循环中,前面有判断超时的语句,如果超时,则return null。这意味着runWorker()方法的while循环结束,线程将退出,执行processWorkerExit()方法。


1 2 int wc = workerCountOf(c); timed = allowCoreThreadTimeOut || wc &gt; corePoolSize;



6. 线程池线程数的维护和线程的退出处理


1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 private void processWorkerExit(Worker w, boolean completedAbruptly) {           if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted               decrementWorkerCount();           final ReentrantLock mainLock = this .mainLock;           mainLock.lock();           try {               completedTaskCount += w.completedTasks;               workers.remove(w);           } finally {               mainLock.unlock();           }           tryTerminate();           int c = ctl.get();           if (runStateLessThan(c, STOP)) {               if (!completedAbruptly) {                   int min = allowCoreThreadTimeOut ? 0 : corePoolSize;                   if (min == 0 && ! workQueue.isEmpty())                      min = 1 ;                   if (workerCountOf(c) >= min)                      return ; // replacement not needed              }              addWorker( null , false );          }      }

这个方法的第二个参数是判断是否在runWorker()中正常退出了循环向下执行,如果不是,说明在执行任务的过程中出现了异常,completedAbruptly为true,线程直接退出,需要直接对活动线程数减1 。





