学习笔记 08 — JUC线程池
线程池的优点:
1)避免线程的创建和销毁带来的性能开销。
2)避免大量的线程间因互相抢占系统资源导致的阻塞现象。
3}能够对线程进行简单的管理并提供定时执行、间隔执行等功能。
ThreadPoolExecutor
ThreadPoolExecutor是线程池类。对于线程池,可以通俗的将它理解为”存放一定数量线程的一个线程集合。线程池允许多个线程同时运行,允许同时运行的线程数量就是线程池的容量;当添加到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。”
ThreadPoolExecutor的数据结构如下图所示:
1. workers
workers是HashSet<Work>类型,即它是一个Worker集合。而一个Worker对应一个线程,也就是说线程池通过workers包含了”一个线程集合”。当Worker对应的线程池启动时,它会执行线程池中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一个阻塞的任务来继续运行。
wokers的作用是,线程池通过它实现了”允许多个线程同时运行”。
2. workQueue
workQueue是BlockingQueue类型,即它是一个阻塞队列。当线程池中的线程数超过它的容量的时候,线程会进入阻塞队列进行阻塞等待。
通过workQueue,线程池实现了阻塞功能。
–SynchronousQueue(默认):
直接将任务移交给线程而不是入队,如果已经没有线程立即来处理提交到pool中的任务时,会创建一个新的线程来处理该任务;
这种策略需要maximumPoolSizes无界来确保新提交的任务不会被rejection;
这种方式的最大缺点:当任务到来的速度大于任务被处理的速度时,线程数会疯长。
–无界队列LinkedBlockingQueue:
由于队列无界,当运行的线程等于corePoolSize时,新到来的任务会入队而不会创建新的线程来执行(即pool中的线程数永远不会大于corePoolSize);
这种方式的缺点:当任务到来的速度大于任务被处理的速度时,队列长度会疯长。
–有界队列ArrayBlockingQueue:
这种方式是非常难处理好的一种方式,要考虑好ArrayBlockingQueue的大小和maximumPoolSize的大小;
当ArrayBlockingQueue较大而maximumPoolSize较小时,会降低CPU使用率、减少OS资源、减少上下文切换,但是吞吐量会降低。–>线程较少的特点就是这样;
如果任务频繁的被阻塞(例如,they are I/O bound),就需要更多的线程了;
当ArrayBlockingQueue较小而maximumPoolSize较大时,会使CPU使用繁忙但也会遇到一些不可接受的scheduling,吞吐量也会降低。
3. mainLock
mainLock是互斥锁,通过mainLock实现了对线程池的互斥访问。
4. corePoolSize和maximumPoolSize
corePoolSize是”核心池大小”,maximumPoolSize是”最大池大小”。它们的作用是调整”线程池中实际运行的线程的数量”。
例如,当新任务提交给线程池时(通过execute方法)。
–池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程。
— 池中线程数大于等于corePoolSize,workQueue未满,首选将新任务假如workQueue而不是添加新线程。
–池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务。
–池中线程数大于大于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务。
如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心池大小和最大池大小的值是在创建线程池设置的;但是,也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。
5. poolSize
poolSize是当前线程池的实际大小,即线程池中任务的数量。
6. allowCoreThreadTimeOut和keepAliveTime
allowCoreThreadTimeOut表示是否允许”线程在空闲状态时,仍然能够存活”;而keepAliveTime是当线程池处于空闲状态的时候,超过keepAliveTime时间之后,空闲的线程会被终止。
7. threadFactory
threadFactory是ThreadFactory对象。它是一个线程工厂类,”线程池通过ThreadFactory创建线程”。
通过使用java.util.concurrent.ThreadFactory可以创建新的线程
如果不额外指定ThreadFactory,则使用默认的Executors#defaultThreadFactory;
通过该默认的线程工厂,所有创建的线程都会被加入到同一个ThreadGroup中去,并且这些线程都会有相同的优先级(NORM_PRIORITY),并且都是non-daemon线程
8. handler
handler是RejectedExecutionHandler类型。它是”线程池拒绝策略”的句柄,也就是说”当某任务添加到线程池中,而线程池拒绝该任务时,线程池会通过handler进行相应的处理”。当任务添加到线程池被拒绝的理由可能为:一是线程池异常关闭。二是任务数量超过线程池的最大限制。
–AbortPolicy
直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略
–CallerRunsPolicy
尝试直接运行被拒绝的任务,如果线程池已经被关闭了,任务就被丢弃了
–DiscardOldestPolicy
移除最晚的那个没有被处理的任务,然后执行被拒绝的任务。同样,如果线程池已经被关闭了,任务就被丢弃了
–DiscardPolicy
不能执行的任务将被删除
1.ThreadFactory的创建:
public static ThreadFactory defaultThreadFactory() {
return new DefaultThreadFactory();
}
/**
* 默认的线程工厂
*/
static class DefaultThreadFactory implements ThreadFactory {
static final AtomicInteger poolNumber = new AtomicInteger(1);//池数量
final ThreadGroup group;//线程组
final AtomicInteger threadNumber = new AtomicInteger(1);//线程数量
final String namePrefix;
/*
* 创建默认的线程工厂
*/
DefaultThreadFactory() {
SecurityManager s = System.getSecurityManager();
group = (s != null)? s.getThreadGroup() :
Thread.currentThread().getThreadGroup();
namePrefix = “pool-” +
poolNumber.getAndIncrement() +
“-thread-“;
}
/*
* 创建一个新的线程
*/
public Thread newThread(Runnable r) {
Thread t = new Thread(group, r,
namePrefix + threadNumber.getAndIncrement(),//新线程的名字
0);
/*
* 将后台线程设置为应用线程
*/
if (t.isDaemon())
t.setDaemon(false);
/*
* 将线程的优先级全部设置为NORM_PRIORITY
*/
if (t.getPriority() != Thread.NORM_PRIORITY)
t.setPriority(Thread.NORM_PRIORITY);
return t;
}
}
2.execute(Runnable command):
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
/**
* 这一块儿就是整个工作机理的部分(代码比较精致)
* 1、addIfUnderCorePoolSize
* 1)如果当前线程数poolSize<核心线程数corePoolSize并且pool的状态为RUNNING,
* 1.1)先获取锁
* 1.2)根据传入的任务firstTask创建一个Work对象,在该对象中编写了run()方法,在该run()方法中会真正的去执行firstTask的run()
* 说明:关于Work对象run部分的内容,查看Work内部类的run()方法上边的注释以及与其相关方法的注释
* 1.3)通过线程工厂与上边创建出来的work对象w创建新的线程t,将w加入工作线程集合,
* 然后启动线程t,之后就会自动执行w中的run(),w中的run()又会调用firstTask的run(),即处理真正的业务逻辑
*
* 2、如果poolSize>=corePoolSize或者上边的执行失败了
* 1)如果pool的状态处于RUNNING,将该任务入队(offer(command))
* 如果入队后,pool的状态不是RUNNING了或者池中的线程数为0了,下边的逻辑具体去查看注释
* 2)addIfUnderMaximumPoolSize(同addIfUnderCorePoolSize)
* 如果增加线程也不成功,则回绝任务。
*
*/
if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {
if (runState == RUNNING && workQueue.offer(command)) {
if (runState != RUNNING || poolSize == 0)
ensureQueuedTaskHandled(command);
}
else if (!addIfUnderMaximumPoolSize(command))
reject(command); // is shutdown or saturated
}
}
2.1addIFUnderCorePoolSize(Runnable firstTask):
/**
* 创建并且启动一个新的线程来处理任务
* 1、其第一个任务就是传入的firstTask参数
* 2、该方法仅仅用于当前线程数小于核心线程数并且pool没有被关掉的时候
*/
private boolean addIfUnderCorePoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();//获取锁
try {
if (poolSize < corePoolSize && runState == RUNNING)
t = addThread(firstTask);//创建新线程
} finally {
mainLock.unlock();//释放锁
}
return t != null;
}
private Thread addThread(Runnable firstTask) {
Worker w = new Worker(firstTask);//构造一个work
Thread t = threadFactory.newThread(w);//创建线程
boolean workerStarted = false;
if (t != null) {//
if (t.isAlive()) //如果t线程已经启动了,而且还没有死亡
throw new IllegalThreadStateException();
w.thread = t;
workers.add(w);//将w工作线程加入workers线程池
int nt = ++poolSize;//当前的池数量+1
if (nt > largestPoolSize)
largestPoolSize = nt;
try {
t.start();//启动线程
workerStarted = true;
}
finally {
if (!workerStarted)//启动线程没有成功
workers.remove(w);//将w从workers集合中删除
}
}
return t;
}
2.2ensureQueuedTaskHandled(Runnable command):
/**
* 在一个task入队之后重新检查state。
* 当一个task入队后,pool的state发生了变化,该方法就会被调用。
* 如果一个task入队的同时,shutdownNow方法发生了调用,该方法就必须从队列中移除并回绝
* 否则该方法会保证至少有一个线程来处理入队的task
*/
private void ensureQueuedTaskHandled(Runnable command) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
boolean reject = false;
Thread t = null;
try {
int state = runState;
if (state != RUNNING && workQueue.remove(command))
reject = true;
else if (state < STOP &&
poolSize < Math.max(corePoolSize, 1) &&
!workQueue.isEmpty())
t = addThread(null);
} finally {
mainLock.unlock();
}
if (reject)
reject(command);
}
2.3.addIfUnderMaximumPoolSize(Runnable firstTask):
private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {
Thread t = null;
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
if (poolSize < maximumPoolSize && runState == RUNNING)
t = addThread(firstTask);
} finally {
mainLock.unlock();
}
return t != null;
}
Executors
Executors是个静态工厂类,提供了一些列工厂方法用于创建线程池。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。
Executors提供的线程池:
newFixedThreadPool(int corePoolSize):
/**
* 1、创建一个线程数固定(corePoolSize==maximumPoolSize)的线程池,
* 2、核心线程会一直运行
* 3、无界队列LinkedBlockingQueue
*/
public static ExecutorService newFixedThreadPool(int nThreads) {
return new ThreadPoolExecutor(nThreads, nThreads,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>());
}
(1)、创建一个线程数固定(corePoolSize==maximumPoolSize)的线程池
(2)、核心线程会一直运行
(3)、无界队列LinkedBlockingQueue
newSingleThreadExecutor:
/**
* 1、创建一个线程数固定(corePoolSize==maximumPoolSize==1)的线程池
* 2、核心线程会一直运行
* 3、无界队列LinkedBlockingQueue
* 注意:所有task都是串行执行的
*/
public static ExecutorService newSingleThreadExecutor() {
return new FinalizableDelegatedExecutorService
(new ThreadPoolExecutor(1, 1,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<Runnable>()));
}
(1)、创建一个线程数固定(corePoolSize==maximumPoolSize==1)的线程池
(2)、核心线程会一直运行
(3)、无界队列LinkedBlockingQueue
(4)、所有task都是串行执行的(即同一时刻只有一个任务在执行)
newCachedThreadPool:
/**
* 1、创建一个线程池:当池中的线程都处于忙碌状态时,会立即新建一个线程来处理新来的任务
* 2、这种池将会在执行许多耗时短的异步任务的时候提高程序的性能。
* 3、6秒钟内没有使用的线程将会被中止,并且从线程池中移除,因此几乎不必担心耗费资源
* 4、队列:SynchronousQueue
* 5、maximumPoolSize为Integer.MAX_VALUE
*/
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
(1)、corePoolSize==0
(2)、maximumPoolSize==Integer.MAX_VALUE
(3)、队列:SynchronousQueue
(4)、创建一个线程池:当池中的线程都处于忙碌状态时,会立即新建一个线程来处理新来的任务
(5)、这种池将会在执行许多耗时短的异步任务的时候提高程序的性能
(6)、6秒钟内没有使用的线程将会被中止,并且从线程池中移除,因此几乎不必担心耗费资源
newScheduledThreadPool(int corePoolSize):
/**
* 创建一个线程池:该线程池可以用于执行延时任务或者定时任务
*/
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
return new ScheduledThreadPoolExecutor(corePoolSize);
}
/**
* 创建一个线程池:
* corePoolSize==我们指定
* maximumPoolSize==Integer.MAX_VALUE
* keepAliveTime==0纳秒(即不回收闲置线程)
* 队列: DelayedWorkQueue
*/
public ScheduledThreadPoolExecutor(int corePoolSize) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue());
}
(1)、用于执行定时或延迟执行的任务,最典型的:异步操作时的超时回调
总结:
四种线程池最常用的就是newCachedThreadPool和newFixedThreadPool(int corePoolSize)。
对于newScheduledThreadPool(int corePoolSize)使用比较少,因为在现代开发中,如果用于去开发定时任务程序的话,用spring定时器会非常简单。
Executors与ThreadPoolExecutor对比:
ThreadPoolExecutor:
可以灵活的自定义的创建线程池,可定制性很高
想创建好一个合适的线程池比较难
使用稍微麻烦一些
实际中很少使用
Executors:
可以创建4种线程池,这四种线程池基本上已经包含了所有需求,将来根据业务特点选用就好
使用非常简单
实际中很常用
线程池的生命周期
从上图我们可以看到线程池有5种状态:Running,SHUTDOWN, STOP, TIDYING, TERMINATED
1. RUNNING
(01) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态!道理很简单,在ctl的初始化代码中(如下),就将它初始化为RUNNING状态,并且”任务数量”初始化为0。
2. SHUTDOWN
(01) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。(02) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。
3. STOP
(01) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。(02) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。
4. TIDYING(01) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。(02) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。
5. TERMINATED(01) 状态说明:线程池彻底终止,就变成TERMINATED状态。(02) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。
线程池使用总结:
1、选用的两个角度
高性能:将提交到线程池中的任务直接交给线程去处理(前提:线程数小于最大线程数),不入队
缓冲执行:希望提交到线程池的任务尽量被核心线程(corePoolSize)执行掉
2、高性能
队列:SynchronousQueue
最大线程数:一般设为Integer.MAX_VALUE(整数最大值),防止回绝任务
典型案例:newCachedThreadPool
尤其适合于执行耗时短的任务
注意:
设置好闲置失效时间,keepAliveTime,用于避免资源大量耗费
对于出现大量耗时长的任务,容易造成线程数迅速增加,这种情况要衡量使用该类线程池是否合适
3、缓冲执行
队列:LinkedBlockingQueue和ArrayBlockingQueue
典型案例:newFixedThreadPool(int threadSize)
注意:
使用该类线程池,最好使用LinkedBlockingQueue(无界队列),但是当大量并发任务的涌入,导致核心线程处理不过来,队列元素会大量增加,可能会报内存溢出
当然,对于上边这种情况的话,如果是ArrayBlockingQueue的话,如果设置得当,可以回绝一些任务,而不报内存溢出
4、线程数的确定
公式:启动线程数=[任务执行时间/(任务执行时间-IO等待时间)]*CPU核数
注意:
如果任务大都是CPU计算型任务,启动线程数=CPU核数
如果任务大多需要等待磁盘操作,网络响应,启动线程数可以参照公式估算,当然>CPU核数
总结:
一般使用线程池,按照如下顺序依次考虑(只有前者不满足场景需求,才考虑后者):
newCachedThreadPool–>newFixedThreadPool(int threadSize)–>ThreadPoolExecutor
newCachedThreadPool不需要指定任何参数
newFixedThreadPool需要指定线程池数(核心线程数==最大线程数)
ThreadPoolExecutor需要指定核心线程数、最大线程数、闲置超时时间、队列、队列容量,甚至还有回绝策略和线程工厂
线程池监控
利用线程池提供的参数进行监控,参数如下:taskCount:线程池需要执行的任务数量。completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。largestPoolSize:线程池曾经创建过的最大线程数量,通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。getActiveCount:获取活动的线程数。
通过扩展线程池进行监控:继承线程池并重写线程池的beforeExecute(),afterExecute()和terminated()方法,可以在任务执行前、后和线程池关闭前自定义行为。如监控任务的平均执行时间,最大执行时间和最小执行时间等。
**********************************************未完待续*********************************************
该文为本人学习的笔记,方便以后自己查阅。参考网上各大帖子,取其精华整合自己的理解而成。如有不对的地方,请多多指正!自勉!共勉!
**************************************************************************************************