之前对线程池的理解很是主观,我的理解是在线程池初始化的时候就生成指定的数量的线程,然后将一些任务添加到一个阻塞队列中,然后多个线程同时从阻塞队列中取任务执行,当没有任务时线程阻塞,今天下午看了下大神的博客以及源码,发现我自己的理解有些偏颇,所以今天再总结一下。
我们创建线程池时一般都是通过使用Executors.newXXX来创建,所以我们从这个方法入手,无论是创建Fixed还是cached,还是single的线程池,都是调用的同一个方法——ThreadPoolExecutor的构造方法,只不过是参数不同。
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,//这两二个参数都是整数,用于限制线程的数量,在后面会用到
0L, TimeUnit.MILLISECONDS,//这两个参数都是用来指定从阻塞队列中获取任务的时候的超时时间,more on this later。
new LinkedBlockingQueue<Runnable>());//用来保存任务的阻塞队列
}
上面的构造方法最终调用的构造方法为:
public ThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
BlockingQueue<Runnable> workQueue,//上面这5个已经说过了。
ThreadFactory threadFactory,//这个用来产生thread,默认是 Executors.defaultThreadFactory()这个的返回值,他负责产生thread,设置thread的name,优先级,设置为daemon,没有别的作用。
RejectedExecutionHandler handler) {//这个用来处理当过多的任务添加到队列时,如果队列无法添加的时候的操作,默认的是AbortPolicy,当发生上述情况时,会抛一个异常。
。。。。//复制操作省略了
}
我们看一下提交任务的方法execute:
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {//如果当前的worker(也即是工作的线程数量)小于corePoolSize,进入if,这里的corePoolSize也就是我们在构造方法中穿入的第一个参数,很重要,他表示一个线程池只少要保存的线程的个数,这些线程是不会过期的,如果阻塞队列中没有任务,则会
在阻塞队列的poll方法中阻塞,而超过这个值的线程在没有任务的时候就要被回收掉。
if (addWorker(command, true))//添加一个worker,即一个线程,more on this later,
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {//如果上面的没有return,则进入这个if的判断,条件是正在运行,且这个任务能添加到队列中,则进入if
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))//重新检查,虽然上面应有个isRunning的判断,但是可能有时间差。进入这个if,如果不是running的话
reject(command);//拒绝这个任务,调用rejectPolicy,默认是抛个异常。
else if (workerCountOf(recheck) == 0)//如果当前的工作线程个数为0,则调用addWorker添加一个工作线程,如果不为0的话,那么就会有线程不停的从阻塞队列中获得任务,所以一定会有线程执行这个任务的。
addWorker(null, false);
}
else if (!addWorker(command, false))//如果添加到队列中失败,则判断是否可以添加工作线程,如果不可以,则拒绝这个任务。
reject(command);
}
经过上面,我们知道了corePoolSize,他是要保存的最小的工作的线程数(more on this later),但是还没有涉及到maximumPoolSize,我们趁热打铁看看addWorder方法
private boolean addWorker(Runnable firstTask, boolean core) {//第一个参数表示如果要创建工作线程的话,他要执行的第一个任务,第二个参数是用来做判断的,如果是true,表示和corePoolSize判断,false表示和maximumPoolSize判断。
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN &&//已经shutdown,不能继续接受任务。
! (rs == SHUTDOWN &&
firstTask == null &&
! workQueue.isEmpty()))
return false;
for (;;) {
int wc = workerCountOf(c);//工作线程的个数
if (wc >= CAPACITY ||//如果现在已经创建的线程的数量太多,
wc >= (core ? corePoolSize : maximumPoolSize))//这里显示了core的作用,如果是true,则根据第一个判断,
return false;//如果创建的worker太多,则返回false,表示不能继续创建。
if (compareAndIncrementWorkerCount(c))//使用cas增加c,退出循环,进行创建worker
break retry;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
}
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
final ReentrantLock mainLock = this.mainLock;
w = new Worker(firstTask);//创建worker
final Thread t = w.thread;
if (t != null) {
mainLock.lock();
try {
// Recheck while holding lock.
// Back out on ThreadFactory failure or if
// shut down before lock acquired.
int c = ctl.get();
int rs = runStateOf(c);
if (rs < SHUTDOWN ||
(rs == SHUTDOWN && firstTask == null)) {
if (t.isAlive()) // precheck that t is startable
throw new IllegalThreadStateException();
workers.add(w);//将创建的worker添加到集合中
int s = workers.size();
if (s > largestPoolSize)
largestPoolSize = s;
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();//调用创建的worker中的Thread的start方法,
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
下面我们看一下Worker这个类:
Worker(Runnable firstTask) {//这个参数表示当这个worker创建成功后执行的第一个任务。
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;//第一个任务
this.thread = getThreadFactory().newThread(this);//worker中封装了一个线程,并且这个线程执行的任务是this,因为Worker也是一个runnable,也就是说调用的worker类的run方法
}
同时我们发现Worker也是一个runnable,他的run方法是调用的ThreadPoolExecutor的runWorker方法,也就是当调用worker.thread.start方法的时候是执行的ThreadPoolExecutor的runWorker方法,参数是worker自己,我们看一下
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;//穿入的第一个任务
w.firstTask = null;
w.unlock(); // worker继承了aqs,实现了lock方法,他的目的是害怕并发执行造成不确定性的后果
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {//或者是第一个任务不为null或者是从队列中获得任务不是null,这个getTask方法就是从队列中获得任务,比较重要,等会看
w.lock();
// If pool is stopping, ensure thread is interrupted;
// if not, ensure thread is not interrupted. This
// requires a recheck in second case to deal with
// shutdownNow race while clearing interrupt
if ((runStateAtLeast(ctl.get(), STOP) ||
(Thread.interrupted() &&
runStateAtLeast(ctl.get(), STOP))) &&
!wt.isInterrupted())
wt.interrupt();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();//执行获取的任务,执行这个任务的是worker中封装的线程。从这里基本就可以总结出大概了:当线程池添加任务时,会判断worker的数量,如果过小,就会创建worker,添加到集合,用这个worker执行第一个任务,执行完了再从阻塞队列中获取任务。
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);//这个方法也很重要,它用来处理在所有的任务都处理完成之后的操作。
}
}
看到这里有两个方法比较重要,一个是getTask,一个是processWorderExit,
getTask方法:
private Runnable getTask() {
boolean timedOut = false; // Did the last poll() time out?
retry:
for (;;) {
int c = ctl.get();
int rs = runStateOf(c);
// Check if queue empty only if necessary.
if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
decrementWorkerCount();
return null;
}
boolean timed; // Are workers subject to culling?
for (;;) {
int wc = workerCountOf(c);
timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果现在工作的线程多于corePoolSize就会是true
if (wc <= maximumPoolSize && ! (timedOut && timed))//刚上来timed是false,跳出第二个for
break;
if (compareAndDecrementWorkerCount(c))
return null;
c = ctl.get(); // Re-read ctl
if (runStateOf(c) != rs)
continue retry;
// else CAS failed due to workerCount change; retry inner loop
}
try {
Runnable r = timed ?//如果当前的工作线程多余corePoolSize,则是true
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) ://如果是true,表示有超时时间的等待。从这里就能看出corePoolSize的用处了,如果有超过corePoolSize的线程,那么再从队列中获取任务时就会有时间的等待,而且我们之前传的keepAliveTime是0,也就是说
如果当前队列中没有任务的话,超过corePoolSize那些线程就立即返回,而如果不是超过corePoolSize的线程则会一直等待,
workQueue.take();//没有超时时间的等待,会一直阻塞。
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
在这个getTask方法中,我们看到了corePoolSize的重要作用。
processWorderExit方法:
private void processWorkerExit(Worker w, boolean completedAbruptly) {
if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
decrementWorkerCount();
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
completedTaskCount += w.completedTasks;
workers.remove(w);//这里可以发现,对于那些在获取任务时没有阻塞的线程,在没有任务后就会被移除集合,也就是被垃圾回收掉。但是对于那些阻塞的就不会被回收掉。
} finally {
mainLock.unlock();
}
。。。。。//下面的没看。
}
再看一下在Executors中的静态方法:
1、创建一个fixedThreadPool的话,corePoolSize和maximumPoolSize是一样的,因为创建的worker的数量不会超过maximumPoolSize,因此worker的数量最多是corePoolSize,即getTask方法中的timed一定是true 所以所有的线程在从阻塞队列中获取任务时都是会永远阻塞的,不会丢掉worker。并且传入的等待时间是0,所以在从阻塞队列中获得任务时不会阻塞,worker直接直接返回。
2、创建一个cachedThreadPool,
new ThreadPoolExecutor(0, //线程最小的数量是0,即getTask中所有的timed都是false,所以所有的worker在获取不到任务时都会被抛弃,再有任务时就会创建建立worker
Integer.MAX_VALUE,//最大值超级大,也就是说创建woker的个数几乎是不受限制的。
60L, TimeUnit.SECONDS,//每个worker在从阻塞队列中获得任务时可以阻塞60秒。
new SynchronousQueue<Runnable>() );//一个同步的queue,比较特殊,插入的线程会阻塞直到一个获取的线程的到来,同样获取的线程也会阻塞直到插入的线程到来。
可以看出,cachedThreadPool几乎可以被认为是不断创建worker的,worker从阻塞队列中获取任务时如果没有任务时就会阻塞60秒,然后被抛弃。
3、创建一个单线程的线程池:
new ThreadPoolExecutor(1, 1,//核心的数量和最大的数量都是1,也就是永远是1,
0L, TimeUnit.MILLISECONDS,//阻塞直到有任务
new LinkedBlockingQueue<Runnable>())
很好的纠正了对线程池的理解。