学习笔记 08 --- JUC线程池

学习笔记 08 — JUC线程池

线程池的优点:

1)避免线程的创建和销毁带来的性能开销。

2)避免大量的线程间因互相抢占系统资源导致的阻塞现象。

3}能够对线程进行简单的管理并提供定时执行、间隔执行等功能。

ThreadPoolExecutor

ThreadPoolExecutor是线程池类。对于线程池,可以通俗的将它理解为”存放一定数量线程的一个线程集合。线程池允许多个线程同时运行,允许同时运行的线程数量就是线程池的容量;当添加到线程池中的线程超过它的容量时,会有一部分线程阻塞等待。线程池会通过相应的调度策略和拒绝策略,对添加到线程池中的线程进行管理。”

ThreadPoolExecutor的数据结构如下图所示:

1. workers

workers是HashSet<Work>类型,即它是一个Worker集合。而一个Worker对应一个线程,也就是说线程池通过workers包含了”一个线程集合”。当Worker对应的线程池启动时,它会执行线程池中的任务;当执行完一个任务后,它会从线程池的阻塞队列中取出一个阻塞的任务来继续运行。

wokers的作用是,线程池通过它实现了”允许多个线程同时运行”。

2. workQueue

workQueue是BlockingQueue类型,即它是一个阻塞队列。当线程池中的线程数超过它的容量的时候,线程会进入阻塞队列进行阻塞等待。

通过workQueue,线程池实现了阻塞功能。

–SynchronousQueue(默认):

直接将任务移交给线程而不是入队,如果已经没有线程立即来处理提交到pool中的任务时,会创建一个新的线程来处理该任务;

这种策略需要maximumPoolSizes无界来确保新提交的任务不会被rejection;

这种方式的最大缺点:当任务到来的速度大于任务被处理的速度时,线程数会疯长。

–无界队列LinkedBlockingQueue:

由于队列无界,当运行的线程等于corePoolSize时,新到来的任务会入队而不会创建新的线程来执行(即pool中的线程数永远不会大于corePoolSize);

这种方式的缺点:当任务到来的速度大于任务被处理的速度时,队列长度会疯长。

–有界队列ArrayBlockingQueue:

这种方式是非常难处理好的一种方式,要考虑好ArrayBlockingQueue的大小和maximumPoolSize的大小;

当ArrayBlockingQueue较大而maximumPoolSize较小时,会降低CPU使用率、减少OS资源、减少上下文切换,但是吞吐量会降低。–>线程较少的特点就是这样;

如果任务频繁的被阻塞(例如,they are I/O bound),就需要更多的线程了;

当ArrayBlockingQueue较小而maximumPoolSize较大时,会使CPU使用繁忙但也会遇到一些不可接受的scheduling,吞吐量也会降低。

3. mainLock

mainLock是互斥锁,通过mainLock实现了对线程池的互斥访问。

4. corePoolSize和maximumPoolSize

corePoolSize是”核心池大小”,maximumPoolSize是”最大池大小”。它们的作用是调整”线程池中实际运行的线程的数量”。

例如,当新任务提交给线程池时(通过execute方法)。

–池中线程数小于corePoolSize,新任务都不排队而是直接添加新线程。

— 池中线程数大于等于corePoolSize,workQueue未满,首选将新任务假如workQueue而不是添加新线程。

–池中线程数大于等于corePoolSize,workQueue已满,但是线程数小于maximumPoolSize,添加新的线程来处理被添加的任务。

–池中线程数大于大于corePoolSize,workQueue已满,并且线程数大于等于maximumPoolSize,新任务被拒绝,使用handler处理被拒绝的任务。

如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。在大多数情况下,核心池大小和最大池大小的值是在创建线程池设置的;但是,也可以使用 setCorePoolSize(int) 和 setMaximumPoolSize(int) 进行动态更改。

5. poolSize

poolSize是当前线程池的实际大小,即线程池中任务的数量。

6. allowCoreThreadTimeOut和keepAliveTime

allowCoreThreadTimeOut表示是否允许”线程在空闲状态时,仍然能够存活”;而keepAliveTime是当线程池处于空闲状态的时候,超过keepAliveTime时间之后,空闲的线程会被终止。

7. threadFactory

threadFactory是ThreadFactory对象。它是一个线程工厂类,”线程池通过ThreadFactory创建线程”。

通过使用java.util.concurrent.ThreadFactory可以创建新的线程

如果不额外指定ThreadFactory,则使用默认的Executors#defaultThreadFactory;

通过该默认的线程工厂,所有创建的线程都会被加入到同一个ThreadGroup中去,并且这些线程都会有相同的优先级(NORM_PRIORITY),并且都是non-daemon线程

8. handler

handler是RejectedExecutionHandler类型。它是”线程池拒绝策略”的句柄,也就是说”当某任务添加到线程池中,而线程池拒绝该任务时,线程池会通过handler进行相应的处理”。当任务添加到线程池被拒绝的理由可能为:一是线程池异常关闭。二是任务数量超过线程池的最大限制。

–AbortPolicy

直接抛出一个RejectedExecutionException,这也是JDK默认的拒绝策略

–CallerRunsPolicy

尝试直接运行被拒绝的任务,如果线程池已经被关闭了,任务就被丢弃了

–DiscardOldestPolicy

移除最晚的那个没有被处理的任务,然后执行被拒绝的任务。同样,如果线程池已经被关闭了,任务就被丢弃了

–DiscardPolicy

不能执行的任务将被删除

1.ThreadFactory的创建:

public static ThreadFactory defaultThreadFactory() {

return new DefaultThreadFactory();

}

/**

* 默认的线程工厂

*/

static class DefaultThreadFactory implements ThreadFactory {

static final AtomicInteger poolNumber = new AtomicInteger(1);//池数量

final ThreadGroup group;//线程组

final AtomicInteger threadNumber = new AtomicInteger(1);//线程数量

final String namePrefix;

/*

* 创建默认的线程工厂

*/

DefaultThreadFactory() {

SecurityManager s = System.getSecurityManager();

group = (s != null)? s.getThreadGroup() :

Thread.currentThread().getThreadGroup();

namePrefix = “pool-” +

poolNumber.getAndIncrement() +

“-thread-“;

}

/*

* 创建一个新的线程

*/

public Thread newThread(Runnable r) {

Thread t = new Thread(group, r,

namePrefix + threadNumber.getAndIncrement(),//新线程的名字

0);

/*

* 将后台线程设置为应用线程

*/

if (t.isDaemon())

t.setDaemon(false);

/*

* 将线程的优先级全部设置为NORM_PRIORITY

*/

if (t.getPriority() != Thread.NORM_PRIORITY)

t.setPriority(Thread.NORM_PRIORITY);

return t;

}

}

2.execute(Runnable command):

public void execute(Runnable command) {

if (command == null)

throw new NullPointerException();

/**

* 这一块儿就是整个工作机理的部分(代码比较精致)

* 1、addIfUnderCorePoolSize

* 1)如果当前线程数poolSize<核心线程数corePoolSize并且pool的状态为RUNNING,

* 1.1)先获取锁

* 1.2)根据传入的任务firstTask创建一个Work对象,在该对象中编写了run()方法,在该run()方法中会真正的去执行firstTask的run()

* 说明:关于Work对象run部分的内容,查看Work内部类的run()方法上边的注释以及与其相关方法的注释

* 1.3)通过线程工厂与上边创建出来的work对象w创建新的线程t,将w加入工作线程集合,

* 然后启动线程t,之后就会自动执行w中的run(),w中的run()又会调用firstTask的run(),即处理真正的业务逻辑

*

* 2、如果poolSize>=corePoolSize或者上边的执行失败了

* 1)如果pool的状态处于RUNNING,将该任务入队(offer(command))

* 如果入队后,pool的状态不是RUNNING了或者池中的线程数为0了,下边的逻辑具体去查看注释

* 2)addIfUnderMaximumPoolSize(同addIfUnderCorePoolSize)

* 如果增加线程也不成功,则回绝任务。

*

*/

if (poolSize >= corePoolSize || !addIfUnderCorePoolSize(command)) {

if (runState == RUNNING && workQueue.offer(command)) {

if (runState != RUNNING || poolSize == 0)

ensureQueuedTaskHandled(command);

}

else if (!addIfUnderMaximumPoolSize(command))

reject(command); // is shutdown or saturated

}

}

2.1addIFUnderCorePoolSize(Runnable firstTask):

/**

* 创建并且启动一个新的线程来处理任务

* 1、其第一个任务就是传入的firstTask参数

* 2、该方法仅仅用于当前线程数小于核心线程数并且pool没有被关掉的时候

*/

private boolean addIfUnderCorePoolSize(Runnable firstTask) {

Thread t = null;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();//获取锁

try {

if (poolSize < corePoolSize && runState == RUNNING)

t = addThread(firstTask);//创建新线程

} finally {

mainLock.unlock();//释放锁

}

return t != null;

}

private Thread addThread(Runnable firstTask) {

Worker w = new Worker(firstTask);//构造一个work

Thread t = threadFactory.newThread(w);//创建线程

boolean workerStarted = false;

if (t != null) {//

if (t.isAlive()) //如果t线程已经启动了,而且还没有死亡

throw new IllegalThreadStateException();

w.thread = t;

workers.add(w);//将w工作线程加入workers线程池

int nt = ++poolSize;//当前的池数量+1

if (nt > largestPoolSize)

largestPoolSize = nt;

try {

t.start();//启动线程

workerStarted = true;

}

finally {

if (!workerStarted)//启动线程没有成功

workers.remove(w);//将w从workers集合中删除

}

}

return t;

}

2.2ensureQueuedTaskHandled(Runnable command):

/**

* 在一个task入队之后重新检查state。

* 当一个task入队后,pool的state发生了变化,该方法就会被调用。

* 如果一个task入队的同时,shutdownNow方法发生了调用,该方法就必须从队列中移除并回绝

* 否则该方法会保证至少有一个线程来处理入队的task

*/

private void ensureQueuedTaskHandled(Runnable command) {

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

boolean reject = false;

Thread t = null;

try {

int state = runState;

if (state != RUNNING && workQueue.remove(command))

reject = true;

else if (state < STOP &&

poolSize < Math.max(corePoolSize, 1) &&

!workQueue.isEmpty())

t = addThread(null);

} finally {

mainLock.unlock();

}

if (reject)

reject(command);

}

2.3.addIfUnderMaximumPoolSize(Runnable firstTask):

private boolean addIfUnderMaximumPoolSize(Runnable firstTask) {

Thread t = null;

final ReentrantLock mainLock = this.mainLock;

mainLock.lock();

try {

if (poolSize < maximumPoolSize && runState == RUNNING)

t = addThread(firstTask);

} finally {

mainLock.unlock();

}

return t != null;

}

Executors

Executors是个静态工厂类,提供了一些列工厂方法用于创建线程池。它通过静态工厂方法返回ExecutorService、ScheduledExecutorService、ThreadFactory 和 Callable 等类的对象。

Executors提供的线程池:

newFixedThreadPool(int corePoolSize):

/**

* 1、创建一个线程数固定(corePoolSize==maximumPoolSize)的线程池,

* 2、核心线程会一直运行

* 3、无界队列LinkedBlockingQueue

*/

public static ExecutorService newFixedThreadPool(int nThreads) {

return new ThreadPoolExecutor(nThreads, nThreads,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>());

}

(1)、创建一个线程数固定(corePoolSize==maximumPoolSize)的线程池

(2)、核心线程会一直运行

(3)、无界队列LinkedBlockingQueue

newSingleThreadExecutor:

/**

* 1、创建一个线程数固定(corePoolSize==maximumPoolSize==1)的线程池

* 2、核心线程会一直运行

* 3、无界队列LinkedBlockingQueue

* 注意:所有task都是串行执行的

*/

public static ExecutorService newSingleThreadExecutor() {

return new FinalizableDelegatedExecutorService

(new ThreadPoolExecutor(1, 1,

0L, TimeUnit.MILLISECONDS,

new LinkedBlockingQueue<Runnable>()));

}

(1)、创建一个线程数固定(corePoolSize==maximumPoolSize==1)的线程池

(2)、核心线程会一直运行

(3)、无界队列LinkedBlockingQueue

(4)、所有task都是串行执行的(即同一时刻只有一个任务在执行)

newCachedThreadPool:

/**

* 1、创建一个线程池:当池中的线程都处于忙碌状态时,会立即新建一个线程来处理新来的任务

* 2、这种池将会在执行许多耗时短的异步任务的时候提高程序的性能。

* 3、6秒钟内没有使用的线程将会被中止,并且从线程池中移除,因此几乎不必担心耗费资源

* 4、队列:SynchronousQueue

* 5、maximumPoolSize为Integer.MAX_VALUE

*/

public static ExecutorService newCachedThreadPool() {

return new ThreadPoolExecutor(0, Integer.MAX_VALUE,

60L, TimeUnit.SECONDS,

new SynchronousQueue<Runnable>());

}

(1)、corePoolSize==0

(2)、maximumPoolSize==Integer.MAX_VALUE

(3)、队列:SynchronousQueue

(4)、创建一个线程池:当池中的线程都处于忙碌状态时,会立即新建一个线程来处理新来的任务

(5)、这种池将会在执行许多耗时短的异步任务的时候提高程序的性能

(6)、6秒钟内没有使用的线程将会被中止,并且从线程池中移除,因此几乎不必担心耗费资源

newScheduledThreadPool(int corePoolSize):

/**

* 创建一个线程池:该线程池可以用于执行延时任务或者定时任务

*/

public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {

return new ScheduledThreadPoolExecutor(corePoolSize);

}

/**

* 创建一个线程池:

* corePoolSize==我们指定

* maximumPoolSize==Integer.MAX_VALUE

* keepAliveTime==0纳秒(即不回收闲置线程)

* 队列: DelayedWorkQueue

*/

public ScheduledThreadPoolExecutor(int corePoolSize) {

super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,

new DelayedWorkQueue());

}

(1)、用于执行定时或延迟执行的任务,最典型的:异步操作时的超时回调

总结:

四种线程池最常用的就是newCachedThreadPool和newFixedThreadPool(int corePoolSize)。

对于newScheduledThreadPool(int corePoolSize)使用比较少,因为在现代开发中,如果用于去开发定时任务程序的话,用spring定时器会非常简单。

Executors与ThreadPoolExecutor对比:

ThreadPoolExecutor:

可以灵活的自定义的创建线程池,可定制性很高

想创建好一个合适的线程池比较难

使用稍微麻烦一些

实际中很少使用

Executors:

可以创建4种线程池,这四种线程池基本上已经包含了所有需求,将来根据业务特点选用就好

使用非常简单

实际中很常用

线程池的生命周期

从上图我们可以看到线程池有5种状态:Running,SHUTDOWN, STOP, TIDYING, TERMINATED

1. RUNNING

(01) 状态说明:线程池处在RUNNING状态时,能够接收新任务,以及对已添加的任务进行处理。(02) 状态切换:线程池的初始化状态是RUNNING。换句话说,线程池被一旦被创建,就处于RUNNING状态!道理很简单,在ctl的初始化代码中(如下),就将它初始化为RUNNING状态,并且”任务数量”初始化为0。

2. SHUTDOWN

(01) 状态说明:线程池处在SHUTDOWN状态时,不接收新任务,但能处理已添加的任务。(02) 状态切换:调用线程池的shutdown()接口时,线程池由RUNNING -> SHUTDOWN。

3. STOP

(01) 状态说明:线程池处在STOP状态时,不接收新任务,不处理已添加的任务,并且会中断正在处理的任务。(02) 状态切换:调用线程池的shutdownNow()接口时,线程池由(RUNNING or SHUTDOWN ) -> STOP。

4. TIDYING(01) 状态说明:当所有的任务已终止,ctl记录的”任务数量”为0,线程池会变为TIDYING状态。当线程池变为TIDYING状态时,会执行钩子函数terminated()。terminated()在ThreadPoolExecutor类中是空的,若用户想在线程池变为TIDYING时,进行相应的处理;可以通过重载terminated()函数来实现。(02) 状态切换:当线程池在SHUTDOWN状态下,阻塞队列为空并且线程池中执行的任务也为空时,就会由 SHUTDOWN -> TIDYING。当线程池在STOP状态下,线程池中执行的任务为空时,就会由STOP -> TIDYING。

5. TERMINATED(01) 状态说明:线程池彻底终止,就变成TERMINATED状态。(02) 状态切换:线程池处在TIDYING状态时,执行完terminated()之后,就会由 TIDYING -> TERMINATED。

线程池使用总结:

1、选用的两个角度

高性能:将提交到线程池中的任务直接交给线程去处理(前提:线程数小于最大线程数),不入队

缓冲执行:希望提交到线程池的任务尽量被核心线程(corePoolSize)执行掉

2、高性能

队列:SynchronousQueue

最大线程数:一般设为Integer.MAX_VALUE(整数最大值),防止回绝任务

典型案例:newCachedThreadPool

尤其适合于执行耗时短的任务

注意:

设置好闲置失效时间,keepAliveTime,用于避免资源大量耗费

对于出现大量耗时长的任务,容易造成线程数迅速增加,这种情况要衡量使用该类线程池是否合适

3、缓冲执行

队列:LinkedBlockingQueue和ArrayBlockingQueue

典型案例:newFixedThreadPool(int threadSize)

注意:

使用该类线程池,最好使用LinkedBlockingQueue(无界队列),但是当大量并发任务的涌入,导致核心线程处理不过来,队列元素会大量增加,可能会报内存溢出

当然,对于上边这种情况的话,如果是ArrayBlockingQueue的话,如果设置得当,可以回绝一些任务,而不报内存溢出

4、线程数的确定

公式:启动线程数=[任务执行时间/(任务执行时间-IO等待时间)]*CPU核数

注意:

如果任务大都是CPU计算型任务,启动线程数=CPU核数

如果任务大多需要等待磁盘操作,网络响应,启动线程数可以参照公式估算,当然>CPU核数

总结:

一般使用线程池,按照如下顺序依次考虑(只有前者不满足场景需求,才考虑后者):

newCachedThreadPool–>newFixedThreadPool(int threadSize)–>ThreadPoolExecutor

newCachedThreadPool不需要指定任何参数

newFixedThreadPool需要指定线程池数(核心线程数==最大线程数)

ThreadPoolExecutor需要指定核心线程数、最大线程数、闲置超时时间、队列、队列容量,甚至还有回绝策略和线程工厂

线程池监控

利用线程池提供的参数进行监控,参数如下:taskCount:线程池需要执行的任务数量。completedTaskCount:线程池在运行过程中已完成的任务数量,小于或等于taskCount。largestPoolSize:线程池曾经创建过的最大线程数量,通过这个数据可以知道线程池是否满过。如等于线程池的最大大小,则表示线程池曾经满了。getPoolSize:线程池的线程数量。如果线程池不销毁的话,池里的线程不会自动销毁,所以这个大小只增不减。getActiveCount:获取活动的线程数。

通过扩展线程池进行监控:继承线程池并重写线程池的beforeExecute(),afterExecute()和terminated()方法,可以在任务执行前、后和线程池关闭前自定义行为。如监控任务的平均执行时间,最大执行时间和最小执行时间等。

**********************************************未完待续*********************************************

该文为本人学习的笔记,方便以后自己查阅。参考网上各大帖子,取其精华整合自己的理解而成。如有不对的地方,请多多指正!自勉!共勉!

**************************************************************************************************

    原文作者:JUC
    原文地址: https://blog.csdn.net/tianya3530/article/details/54599550
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞