线程池应用达到的目的
1、降低资源消耗;可以重复利用已创建的线程从而降低线程创建和销毁所带来的消耗。
2、提高响应速度;当任务到达时,不需要等线程创建就可以立即执行。
3、提高线程的可管理性;使用线程池统一分配、调优和监控。
- 线程池实现原理
1、 最核心的ThreadPoolExecutor类,ThreadPoolExecutor、AbstractExecutorService、ExecutorService和Executor几个之间的关系
public
class
ThreadPoolExecutor
extends
AbstractExecutorService {
public
abstract
class
AbstractExecutorService
implements
ExecutorService {
public
interface
ExecutorService
extends
Executor {
public
interface
Executor {
void
execute(Runnable command);
}
以上继承关系; ExecutorService接口继承了Executor接口,并声明了一些方法:submit、invokeAll、invokeAny以及shutDown等;
2、ThreadPoolExecutor 实现 Executor接口的逻辑 源码分析:
a、当运行线程少于corePoolSize,则创建新线程来执行任务
b、当运行线程等于或大于corePoolSize ,则将任务加入BlockingQueue(任务缓存队列,用于存放等待执行任务)
c、BlockingQueue 队列已经满了而无法加入任务,必须获取全局锁从而创建新的线程来处理任务
d、创建新线程将使当前运行的线程超出maximumPoolSize,任务将被拒绝,并调用RejectedExecutionException()方法,抛出异常
public void execute(Runnable command) { if (command == null) throw new NullPointerException(); if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) { if (runState == RUNNING && workQueue.offer(command)) { if (runState != RUNNING || poolSize == 0) ensureQueuedTaskHandled(command); } else if (!addIfUnderMaximumPoolSize(command))
//抛出RejectedExecutionException异常 reject(command); // is shutdown or saturated } }
- 线程池的使用
new ThreadPoolExecutor(corePoolSize, maximumPoolSize,keepAliveTime,milliseconds,runnableTaskQueue,handler);
这里解释一下入参含义
corePoolSize 设置线程池的基本大小,
maximumPoolSize 设置线程池最大能创建的线程数多少
keepAliveTime 线程活动保存时间,也就是工作线程完成后还继续存活的时间,有必要时单个任务完成时间短,可把存活时间设大保持较高线程利用率;
milliseconds 毫秒-线程活动保持时间的单位 可以是days\hours\等等
runnableTaskQueue 任务队列,用于保存等待执行的任务的阻塞队列
runnableTaskQueue 的类型为BlockingQueue<Runnable>,通常可以取下面三种类型:
1)ArrayBlockingQueue:基于数组的先进先出队列,此队列创建时必须指定大小;
2)LinkedBlockingQueue:基于链表的先进先出队列,如果创建时没有指定此队列大小,则默认为Integer.MAX_VALUE;
3)synchronousQueue:这个队列比较特殊,它不会保存提交的任务,而是将直接新建一个线程来执行新来的任务。
4)PriorityBlockingQueue:一个具有优先级的无限阻塞队列
demo例子
public class Test { public static void main(String[] args) { ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<Runnable>(5)); for(int i=0;i<15;i++){ MyTask myTask = new MyTask(i); executor.execute(myTask); System.out.println("线程池中线程数目:"+executor.getPoolSize()+",队列中等待执行的任务数目:"+ executor.getQueue().size()+",已执行玩别的任务数目:"+executor.getCompletedTaskCount()); } executor.shutdown(); } } class MyTask implements Runnable { private int taskNum; public MyTask(int num) { this.taskNum = num; } @Override public void run() { System.out.println("正在执行task "+taskNum); try { Thread.currentThread().sleep(4000); } catch (InterruptedException e) { e.printStackTrace(); } System.out.println("task "+taskNum+"执行完毕"); } }