1、个人总结及想法:
(1)ThreadPoolExecutor的继承关系?
ThreadPoolExecutor继承AbstractExectorService,AbstractExecutorService 实现 ExcutorService接口,ExcutorService继承Executor
AbstractExecutorService实现了Executor的默认方法,实现了一些基本操作。
(2)线程池的状态说明?
RUNNING:可以接受新的任务,也可以处理阻塞队列里的任务
SHUNTDOWN:不接受新的任务,但可以处理阻塞队列里面的任务
STOP:不再接受新的任务,不再处理阻塞队列的任务,并中断正在处理的任务
TIDYING:中间状态,线程池中没有有效的线程,调用terminated 进入TERMINATED状态
TERMINATED:终止状态
(3)线程池的几种拒绝策略?
AbortPolicy:直接抛出异常
CallerRunPolicy:使用调用者的线程来处理这个任务
DiscardOldestPolicy:该策略是丢弃最老的一个请求,也就是任务队列第一个节点,会再次提交任务。
DiscardPolicy:直接丢弃任务,不做任何处理
(4)让我产生深深费解的疑问:
为什么execute()方法中存在:
1 else if (workerCountOf(recheck) == 0)
2 addWorker(null, false);
我不懂为什么这里要添加一个空任务?
我说一下我们分析的两点可能:
1、可能是我们之前添加了一个任务
workQueue.offer(command)
我们需要一个worker去消化它?
2、可能是防止线程池关闭,需要防止一个空worker?
2、源码解析:
要分析线程池的源码,首先就要先弄清楚底层是什么数据结构所支撑。
从源码中我们可以看到有一个内部类Worker类
1 private final class Worker 2 extends AbstractQueuedSynchronizer 3 implements Runnable 4 { 5 6 //序列号 7 private static final long serialVersionUID = 6138294804551838833L; 8 9 //线程 10 final Thread thread; 11 //任务 12 Runnable firstTask; 13 //完成的任务 14 volatile long completedTasks; 15 16 //构造方法 17 Worker(Runnable firstTask) { 18 setState(-1); // 初始化 19 this.firstTask = firstTask; 20 this.thread = getThreadFactory().newThread(this); 21 } 22 23 //重写了run方法 24 public void run() { 25 runWorker(this); 26 } 27 28 //重写了AQS 29 30 protected boolean isHeldExclusively() { 31 return getState() != 0; 32 } 33 34 //重写了AQS的tryAcquire 35 protected boolean tryAcquire(int unused) { 36 if (compareAndSetState(0, 1)) { 37 setExclusiveOwnerThread(Thread.currentThread()); 38 return true; 39 } 40 return false; 41 } 42 //也是重写了 43 protected boolean tryRelease(int unused) { 44 setExclusiveOwnerThread(null); 45 setState(0); 46 return true; 47 } 48 49 public void lock() { acquire(1); } 50 public boolean tryLock() { return tryAcquire(1); } 51 public void unlock() { release(1); } 52 public boolean isLocked() { return isHeldExclusively(); } 53 54 void interruptIfStarted() { 55 Thread t; 56 if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) { 57 try { 58 t.interrupt(); 59 } catch (SecurityException ignore) { 60 } 61 } 62 } 63 }
总结:首先看到worker类实现了Runnable接口,也继承了AQS并且重写了AQS的方法。
ThreadPoolExector的重要属性分析:
1 //工作队列 2 private final BlockingQueue<Runnable> workQueue; 3 4 //需要用到的可重入锁 5 private final ReentrantLock mainLock = new ReentrantLock(); 6 //存放worker的集合 7 private final HashSet<Worker> workers = new HashSet<Worker>(); 8 9 10 11 //线程池最大的size 12 private int largestPoolSize; 13 14 //完成任务数 15 private long completedTaskCount; 16 17 18 //线程工厂 19 private volatile ThreadFactory threadFactory; 20 21 拒绝策略 22 private volatile RejectedExecutionHandler handler; 23 24 //非核心线程空闲时最大的存活时间 25 private volatile long keepAliveTime; 26 27 //允许核心线程数空闲时存活时间 28 private volatile boolean allowCoreThreadTimeOut; 29 30 //核心线程数 31 private volatile int corePoolSize; 32 33 //最大线程数 34 private volatile int maximumPoolSize; 35 36
单独说一下可以指定的几种workQueue:用来保存等待被执行的阻塞队列,且任务必须实现Runnable接口,有如下几种队列:
ArrayBlockingQueue:底层数据结构是数组,有界,先进先出。
LinkBlockingQueue:底层数据结构是链表,也是先进先出。
SynchronousQueue:一个不存储元素的阻塞队列,每次插入操作都必须等到另一个线程的移除操作,吞吐量高于上面两种。
priorityBlockingQueue:具有优先级的误解阻塞队列
构造函数:
指定核心线程和最大线程数,非核心线程超时时间和单位,任务队列
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue) { 6 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 7 Executors.defaultThreadFactory(), defaultHandler); 8 }
指定核心线程和最大线程数,非核心线程超时时间和单位,任务队列,线程工厂
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 ThreadFactory threadFactory) { 7 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 8 threadFactory, defaultHandler); 9 }
指定核心线程和最大线程数,非核心线程超时时间和单位,任务队列,线程工厂,拒绝策略
1 public ThreadPoolExecutor(int corePoolSize, 2 int maximumPoolSize, 3 long keepAliveTime, 4 TimeUnit unit, 5 BlockingQueue<Runnable> workQueue, 6 RejectedExecutionHandler handler) { 7 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, 8 Executors.defaultThreadFactory(), handler); 9 }
正式开始源码分析:
按照我的逻辑来分析:
execute()源码分析:
1 public void execute(Runnable command) { 2 if (command == null) 3 throw new NullPointerException(); 4 5 //获取线程池的状态位 6 int c = ctl.get(); 7 //计算线程池的数量 8 if (workerCountOf(c) < corePoolSize) { 9 //小于核心线程就封装成一个worker 10 if (addWorker(command, true)) 11 return; 12 //失败了再次获取线程池的状态 13 c = ctl.get(); 14 } 15 //判断线程池是否正常运行 正常运行的话就把线程添加到工作队列 16 if (isRunning(c) && workQueue.offer(command)) { 17 //再次获取线程池的状态位 18 int recheck = ctl.get(); 19 //如果没有运行了 就把刚才添加的线程移除 20 if (! isRunning(recheck) && remove(command)) 21 //成功后使用拒绝策略 22 reject(command); 23 //如果线程池的工序哦线程为0 24 else if (workerCountOf(recheck) == 0) 25 //就添加一个空任务进去 保证可以接受任务可以继续运行 26 addWorker(null, false); 27 } 28 //核心线程满了 工作队列也满了 如果开启非核心线程也失败了就拒绝 此时可能已经到了最大的线程数了 29 else if (!addWorker(command, false)) 30 reject(command); 31 }
总结:其实线程池添加任务历程是如下:
1、开启线程执行任务,直到达到核心线程数。
2、当达到核心线程数时,接受的任务放进工作队列。
3、当工作队列也放满了过后,就开启线程来执行任务,直到达到最大线程数。
4、当以上几个条件都不满足的时候,就执行指定或者默认的拒绝策略。
addWorker()源码分析:
1 private boolean addWorker(Runnable firstTask, boolean core) { 2 retry: 3 for (;;) { 4 //获取状态位 5 int c = ctl.get(); 6 //计算线程池的状态 7 int rs = runStateOf(c); 8 9 //线程池不正常运行 就返回false 10 if (rs >= SHUTDOWN && 11 ! (rs == SHUTDOWN && 12 firstTask == null && 13 ! workQueue.isEmpty())) 14 return false; 15 16 //死循环 17 for (;;) { 18 //计算线程数 19 int wc = workerCountOf(c); 20 //如果超过容量就返回false 21 if (wc >= CAPACITY || 22 wc >= (core ? corePoolSize : maximumPoolSize)) 23 return false; 24 //首先增加worker的数量 25 if (compareAndIncrementWorkerCount(c)) 26 break retry; 27 //再次获取状态 28 c = ctl.get(); // Re-read ctl 29 //如果状态发生了改变 30 if (runStateOf(c) != rs) 31 //回退 32 continue retry; 33 // else CAS failed due to workerCount change; retry inner loop 34 } 35 } 36 37 //是否开始运行 38 boolean workerStarted = false; 39 //是否添加成功 40 boolean workerAdded = false; 41 Worker w = null; 42 try { 43 //封装成worker 44 w = new Worker(firstTask); 45 final Thread t = w.thread; 46 if (t != null) { 47 final ReentrantLock mainLock = this.mainLock; 48 //加锁 49 mainLock.lock(); 50 try { 51 //计算线程池的状态 52 int rs = runStateOf(ctl.get()); 53 54 if (rs < SHUTDOWN || 55 (rs == SHUTDOWN && firstTask == null)) { 56 if (t.isAlive()) // precheck that t is startable 57 throw new IllegalThreadStateException(); 58 workers.add(w); 59 int s = workers.size(); 60 if (s > largestPoolSize) 61 //线程池的大小 62 largestPoolSize = s; 63 //添加成功了 64 workerAdded = true; 65 } 66 } finally { 67 mainLock.unlock(); 68 } 69 if (workerAdded) { 70 //启动刚添加的任务 71 t.start(); 72 //添加了worker过后就立即启动线程 73 workerStarted = true; 74 } 75 } 76 } finally { 77 if (! workerStarted) 78 //执行添加失败后的操作 79 addWorkerFailed(w); 80 } 81 return workerStarted; 82 }
这里还要贴一个源码一起分析 ,之前给我留过坑。。。。。
worker的构造函数:
1 Worker(Runnable firstTask) { 2 setState(-1); // inhibit interrupts until runWorker 3 this.firstTask = firstTask; 4 //注意这里传进去了一个this ,就是worker本身 5 this.thread = getThreadFactory().newThread(this); 6 }
所以
44 w = new Worker(firstTask);
45 final Thread t = w.thread;
t.start();//表面看起的是线程,但是实际上启动的是worker自身的run方法,因为在创建worker的时候创建线程传的是worker本身this,因为worker也是继承了Runnable接口的,但是我们之前谈过worker重写了run方法
之前没有注意看这里,让我很是疑惑,卡壳了很久,看源码不求一定要读懂每一部分,但是真的要仔细啊,尤其是关键逻辑部分。
worker 的run():
1 public void run() { 2 runWorker(this); 3 }
runWorker()源码分析:
1 final void runWorker(Worker w) { 2 //获取当前线程 3 Thread wt = Thread.currentThread(); 4 //从worker中取出任务 5 Runnable task = w.firstTask; 6 //释放引用 7 w.firstTask = null; 8 w.unlock(); // 允许中断 9 boolean completedAbruptly = true; 10 try { 11 //如果任务不为null或者从队列中取出的任务不为空 12 while (task != null || (task = getTask()) != null) { 13 //加锁 14 w.lock(); 15 //查看并比较线程池运行状态 16 if ((runStateAtLeast(ctl.get(), STOP) || 17 (Thread.interrupted() && 18 runStateAtLeast(ctl.get(), STOP))) && 19 !wt.isInterrupted()) 20 wt.interrupt(); 21 try { 22 //自定义逻辑 23 beforeExecute(wt, task); 24 Throwable thrown = null; 25 try { 26 //这里才是将任务线程启动 27 task.run(); 28 } catch (RuntimeException x) { 29 thrown = x; throw x; 30 } catch (Error x) { 31 thrown = x; throw x; 32 } catch (Throwable x) { 33 thrown = x; throw new Error(x); 34 } finally { 35 //自定义逻辑 36 afterExecute(task, thrown); 37 } 38 } finally { 39 //帮助垃圾回收 40 task = null; 41 //worker已完成的任务+1 42 w.completedTasks++; 43 //释放锁 44 w.unlock(); 45 } 46 } 47 completedAbruptly = false; 48 } finally { 49 processWorkerExit(w, completedAbruptly); 50 } 51 }
主要完成的逻辑: 1、线程启动之后,通过unlock方法释放锁,表示允许中断
2、获取第一个任务firstTask,加锁执行任务的run方法,完成后释放 3、可以重写afterExecute()和beforeExecute()完成自定义的逻辑
4、firstTask执行完成之后,通过getTask方法从阻塞队列中获取等待的任务,如果队列中没有任务,getTask方法会被阻塞并挂起,不会占用cpu资源;
getTask()源码:
1 private Runnable getTask() { 2 boolean timedOut = false; // Did the last poll() time out? 3 4 for (;;) { 5 int c = ctl.get(); 6 //计算线程池的状态 7 int rs = runStateOf(c); 8 9 //根据线程池的状态和工作队列 减少worker的数量 10 if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 11 decrementWorkerCount(); 12 return null; 13 } 14 15 //计算worker数量 16 int wc = workerCountOf(c); 17 18 //是否开启了空闲超时 关闭核心线程 19 boolean timed = allowCoreThreadTimeOut || wc > corePoolSize; 20 //worker数量当大于最大线程数的时候 或者worker存在但是工作队列里面没 21 //有线程处理 那就也减少worker数量 22 if ((wc > maximumPoolSize || (timed && timedOut)) 23 && (wc > 1 || workQueue.isEmpty())) { 24 if (compareAndDecrementWorkerCount(c)) 25 //返回空 26 return null; 27 continue; 28 } 29 30 try { 31 Runnable r = timed ? 32 workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) : 33 workQueue.take();//这个为没有任务的时候会阻塞 34 if (r != null) 35 return r; 36 timedOut = true; 37 } catch (InterruptedException retry) { 38 timedOut = false; 39 } 40 } 41 }
这部分的代码是从工作队列中不断的获取任务执行。
有一些方法没有进行分析,我也存在一些困惑,如果大佬们,觉得我的分析有错误,希望能对我的错误指正,当然如果能解答我之前的疑问就更好了