Executor框架集对线程调度进行了封装,将任务提交和任务执行解耦。
它提供了线程生命周期调度的所有方法,大大简化了线程调度和同步的门槛。
Executor框架集的核心类图如下:
从上往下,可以很清晰的看出框架集的各个类,以及它们之间的关系:
Executor,是一个可以提交可执行(Runnable)任务的Object,这个接口解耦了任务提交和执行细节(线程使用、调度等),Executor主要用来替代显示的创建和运行线程;
ExecutorService提供了异步的管理一个或多个线程终止、执行过程(Future)的方法;
AbstractExecutorService提供了ExecutorService的一个默认实现,这个类通过RunnableFuture(实现类FutureTask)实现了submit, invokeAny, invokeAll几个方法;
ThreadPoolExecutor是ExecutorService的一个可配置的线程池实现,它的两个重要的配置参数:corePoolSize(线程池的基本大小,即在没有任务需要执行的时候线程池的大小,并且只有在工作队列满了的情况下才会创建超出这个数量的线程。这里需要注意的是:在刚刚创建ThreadPoolExecutor的时候,线程并不会立即启动,而是要等到有任务提交时才会启动,除非调用了prestartCoreThread/prestartAllCoreThreads事先启动核心线程。再考虑到keepAliveTime和allowCoreThreadTimeOut超时参数的影响,所以没有任务需要执行的时候,线程池的大小不一定是corePoolSize。), maximumPoolSize(线程池中允许的最大线程数,线程池中的当前线程数目不会超过该值。如果队列中任务已满,并且当前线程个数小于maximumPoolSize,那么会创建新的线程来执行任务。这里值得一提的是largestPoolSize,该变量记录了线程池在整个生命周期中曾经出现的最大线程个数。为什么说是曾经呢?因为线程池创建之后,可以调用setMaximumPoolSize() 改变运行的最大线程的数目。);
ScheduledExecutorService 是添加了调度特性(延迟或者定时执行)的ExecutorService;
ScheduledThreadPoolExecutor是具有调度特性的ExecutorService的池化实现;
Executors是一个Executor, ExecutorService, ScheduledExecutorService, ThreadFactory, Callable的工具类,它能满足大部分的日常应用场景。使用它创建线程池:
接下来,分析下ThreadPoolExecutor的实现。
ThreadPoolExecutor的作者Doug Lea,他将workerCount(线程池当前有效线程数)和runState(线程池当前所处状态)放置到一个原子变量ctl(AtomicInteger)上,原子变量高三位保存runStatus,低29位保存workerCount。因此,ThreadPoolExecutor(JDK8)支持的最大线程数为2^29-1。线程池状态有以下五中:
RUNNING(正常运行,-1): Accept new tasks and process queued tasks
SHUTDOWN(关闭,0): Don’t accept new tasks, but process queued tasks
STOP(停止,1): Don’t accept new tasks, don’t process queued tasks, and interrupt in-progress tasks
TIDYING(整理中,2): All tasks have terminated, workerCount is zero, the thread transitioning to state TIDYING will run the terminated() hook method
TERMINATED(终结,3): terminated() has completed
线程池状态的变迁,并不严格按照数字增加变化:
RUNNING -> SHUTDOWN
On invocation of shutdown(), perhaps implicitly in finalize()
(RUNNING or SHUTDOWN) -> STOP
On invocation of shutdownNow()
SHUTDOWN -> TIDYING
When both queue and pool are empty
STOP -> TIDYING
When pool is empty
TIDYING -> TERMINATED
When the terminated() hook method has completed
Threads waiting in awaitTermination() will return when the
state reaches TERMINATED.
当前工作线程计数以及线程池的状态变迁,通过ctl原子变量的CAS操作完成。
ThreadPoolExecutor会将所有提交的任务放置到workQueue中,它是一个BlockingQueue.
所有的工作线程集(workers,HashSet<Worker>)的获取和预定,使用一个final的ReentrantLock(mainLock)控制,还有mainLock上的等待条件termination(Condition)。
largestPoolSize(最大池容量),completedTaskCount(已完成线程计数)等私有变量,通过mainLock控制访问。
threadFactory(volatile,线程工厂,工厂模式的典型运用),所有的线程通过addWorker方法,间接调用这个工厂创建,以下为Executors中的DefaultThreadFactory类的默认构造方法(namePrefix非常熟悉)。
DefaultThreadFactory() { SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); namePrefix = "pool-" + poolNumber.getAndIncrement() + "-thread-"; }
keepAliveTime,线程等待工作的空闲时间(当allowCoreThreadTimeOut设置或者工作线程workerCount大于corePoolSize时,会超时退出,否则线程讲一直运行)
allowCoreThreadTimeOut,允许核心线程超时退出(默认为false)
corePoolSize,核心线程数目(如果没有设置allowCoreThreadTimeOut,它将是线程池中,最少活跃的线程数)
类Worker主要维护线程执行任务时的状态打断和其它功能预定,它通过继承AbstractQueuedSynchronizer来简化任务执行时锁的获取和释放,Worker没有使用可重入锁,而是实现了一个互斥锁,因为我们不想工作线程访问线程池控制变量时再次获得锁(如setCorePoolSize)。
接下来,我们看看addWorker方法,通过指定参数,它允许以核心线程运行任务。addWorker会首先检查当前的线程池状态,当前运行的线程数是否允许(添加新worker),前两项检查通过后,会尝试设置ctl中的线程计数(因为活跃工作线程数存储在ctl的低位,因此,直接自增ctl便可)。线程池计数器设置后,剩下的就是添并启动Worker,Worker集合由mainLock控制,所有workers集的修改都是由mainLock控制的。只有当集合添加成功并且新添加的线程启动成功时,线程池计数器的设置生效,否则,计数器将回退(由addWorkerFailed方法执行)。
private boolean addWorker(Runnable firstTask, boolean core) { retry: for (;;) { int c = ctl.get(); int rs = runStateOf(c); // Check if queue empty only if necessary. if (rs >= SHUTDOWN && ! (rs == SHUTDOWN && firstTask == null && ! workQueue.isEmpty())) return false; for (;;) { int wc = workerCountOf(c); if (wc >= CAPACITY || wc >= (core ? corePoolSize : maximumPoolSize)) return false; if (compareAndIncrementWorkerCount(c)) break retry; c = ctl.get(); // Re-read ctl if (runStateOf(c) != rs) continue retry; // else CAS failed due to workerCount change; retry inner loop } } boolean workerStarted = false; boolean workerAdded = false; Worker w = null; try { w = new Worker(firstTask); final Thread t = w.thread; if (t != null) { final ReentrantLock mainLock = this.mainLock; mainLock.lock(); try { // Recheck while holding lock. // Back out on ThreadFactory failure or if // shut down before lock acquired. int rs = runStateOf(ctl.get()); if (rs < SHUTDOWN || (rs == SHUTDOWN && firstTask == null)) { if (t.isAlive()) // precheck that t is startable throw new IllegalThreadStateException(); workers.add(w); int s = workers.size(); if (s > largestPoolSize) largestPoolSize = s; workerAdded = true; } } finally { mainLock.unlock(); } if (workerAdded) { t.start(); workerStarted = true; } } } finally { if (! workerStarted) addWorkerFailed(w); } return workerStarted; }
只有当新添加的worker线程启动成功时,addWorker返回成功(此时worker线程启动start(),它的run方法中调用了runWorker方法),其它情况返回失败。
最后看一个方法,runWorker 方法:worker线程,不断从BlockQueu中取出任务,执行它并处理执行过程中的各种情况(如线程池的状态变化,已执行计数)。
final void runWorker(Worker w) { Thread wt = Thread.currentThread(); Runnable task = w.firstTask; w.firstTask = null; w.unlock(); // allow interrupts boolean completedAbruptly = true; try { while (task != null || (task = getTask()) != null) { w.lock(); // If pool is stopping, ensure thread is interrupted; // if not, ensure thread is not interrupted. This // requires a recheck in second case to deal with // shutdownNow race while clearing interrupt if ((runStateAtLeast(ctl.get(), STOP) || (Thread.interrupted() && runStateAtLeast(ctl.get(), STOP))) && !wt.isInterrupted()) wt.interrupt(); try { beforeExecute(wt, task); Throwable thrown = null; try { task.run(); } catch (RuntimeException x) { thrown = x; throw x; } catch (Error x) { thrown = x; throw x; } catch (Throwable x) { thrown = x; throw new Error(x); } finally { afterExecute(task, thrown); } } finally { task = null; w.completedTasks++; w.unlock(); } } completedAbruptly = false; } finally { processWorkerExit(w, completedAbruptly); } }
runWorker方法中,直接调用了task的run()方法,大致交互过程。