java线程池相关,Java并发编程:线程池的使用,Java并发编程:线程池的使用

PS:本文实非一时半会写得完,看见此ps代表本文还未完全更新完整

引言:

多线程的使用非常广泛,例如android中完成耗时操作,或者有些时候需要并发的做某些任务的时候,就会用到多线程

但是到达了一定任务量的情况下,线程不应该被无限的去创建销毁,因为创建销毁也会占用许多资源和时间,所以在需要长期使用多线程的情况下,我们应该考虑线程的复用问题

即创建一个线程池以管控这些线程

该博文将会模仿这一篇博文的大致形式来写

Java并发编程:线程池的使用

之所以不直接mark一下,是因为这篇博文里有些问题,一则现在java版本更新,代码已经有所不同了;二则有些地方的代码应该是贴错了,影响理解,可能是博主手打的时候打错了吧……

无论如何,感谢博主的这篇博文给了我一个对线程池的大概理解

那么进入正题,这里说明一下,我目前使用的java版本是1.8

目录就直接用引用博文的目录吧,其实大体想说的点都是一样的

目录:

  一.Java中的ThreadPoolExecutor类

  二.深入剖析线程池实现原理

  三.使用示例

  四.如何合理配置线程池的大小

  五.引用博文列表

一.Java中的ThreadPoolExecutor类

  java.uitl.concurrent.ThreadPoolExecutor类是线程池中最核心的一个类,因此如果要透彻地了解Java中的线程池,必须先了解这个类。

  首先我们从它的构造方法开始看


  public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {//5 this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, Executors.defaultThreadFactory(), defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,threadFactory, defaultHandler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,RejectedExecutionHandler handler) { this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), handler); } public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) { if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0) throw new IllegalArgumentException(); if (workQueue == null || threadFactory == null || handler == null) throw new NullPointerException(); this.corePoolSize = corePoolSize; this.maximumPoolSize = maximumPoolSize; this.workQueue = workQueue; this.keepAliveTime = unit.toNanos(keepAliveTime); this.threadFactory = threadFactory; this.handler = handler; }

  其实可以比较明显的看出来,虽说有四个构造方法,其实前面三个最终执行的都是第四个

  这里简单说一下这七个参数吧,后面的章节会详细说明

  corePoolSize:线程池中持续保持的线程的数量,也可以称为核心池的大小,以下我都称corePool为核心池

  maximumPoolSize:线程池中在核心池以外能够临时扩展的线程的数量,也可以成为max池的大小,以下我都称maximumPool为max池(当然好像没几个人这样称呼这个池….不过其实能理解它是个啥就行)

  keepAliveTime:保持存活的时间,有时线程池可能会没有任务,在长时间没任务的情况下还保持线程的存活显然不是明智之举,设置一个存活时间,可以控制长时间没事干的线程销毁

  unit:参数keepAliveTime的时间单位,里面有几个静态属性,这个可以不用太在意,用的时候一看就明白

  workQueue:工作缓存队列,用来保存等待执行的任务,具体使用workQueue的哪个实现类需要看情况而定,区别还是比较明显的

  threadFactory:线程工厂,它的作用和名字一样,用来创建线程

  handler:RejectedExecutionHandler,即拒绝策略,其实也很容易理解,线程既然被我们规定是有限的,那么假设说max池设置的是10个线程,那么我们线程最大也只有10个,现在假设他们都在执行任务,第11个任务过来肯定不能马上执行,就需要保存在工作缓存队列,即workQueue中,而workQueue很明显也是有大小限制的(就如同int,long这些类型都是有长度限制一样,这个很容易理解),假设是20个任务,那么10+20,也就是说在前面任务都没做完或者没做的情况下,第31个任务过来的时候,很明显没地方储存它了,那么我们就需要有一个策略,或是拒绝该任务且抛出异常,或是其他方式,这就是拒绝策略。

  然后如果我们“刨根问底”,可以查出ThreadPoolExecutor的身世

    public class ThreadPoolExecutor extends AbstractExecutorService
    public abstract class AbstractExecutorService implements ExecutorService
    public interface ExecutorService extends Executor
    public interface Executor

  ThreadPoolExecutor继承了这个抽象类AbstractExecutorService

  而抽象类AbstractExecutorService实现了ExecutorService接口

  ExecutorService接口又继承自Executor

  此外,在ThreadPoolExecutor类中有几个非常重要的方法:

    execute()
    submit()
    shutdown()
    shutdownNow()

  execute()方法实际上是Executor中声明的方法,在ThreadPoolExecutor进行了具体的实现,这个方法是ThreadPoolExecutor的核心方法,通过这个方法可以向线程池提交一个任务,交由线程池去执行。

  submit()方法是在ExecutorService中声明的方法,在AbstractExecutorService就已经有了具体的实现,在ThreadPoolExecutor中并没有对其进行重写,这个方法也是用来向线程池提交任务的,但是它和execute()方法不同,它能够返回任务执行的结果,去看submit()方法的实现,会发现它实际上还是调用的execute()方法,只不过它利用了Future来获取任务执行结果(Future相关内容将在下一篇讲述)。

  shutdown()和shutdownNow()是用来关闭线程池的。

 

二.深入剖析线程池实现原理

  将从以下几个方面分析

  1.线程池状态

  2.任务的执行

  3.线程池中的线程初始化

  4.任务缓存队列及排队策略

  5.任务拒绝策略

  6.线程池的关闭

  7.线程池容量的动态调整

1.线程池状态

  首先贴上代码

    private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
    private static final int COUNT_BITS = Integer.SIZE - 3;
    private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

    // runState is stored in the high-order bits
    private static final int RUNNING    = -1 << COUNT_BITS; 
    private static final int SHUTDOWN   =  0 << COUNT_BITS;
    private static final int STOP       =  1 << COUNT_BITS;
    private static final int TIDYING    =  2 << COUNT_BITS;
    private static final int TERMINATED =  3 << COUNT_BITS;

  可以看到ThreadPoolExecutor类中用一个AtomicInteger类型的变量ctl来标志线程池的状态,其中有两个参数

  1,运行状态(RUNNING,SHUTDOWN,STOP,TIDYING,TERMINATED)

  2,线程数量

  一个int值保存两个参数,所以其中3位来保存运行时状态,剩下的来表示线程数量

  线程数量不用多说,就是字面意思,这里解释一下运行状态

  运行状态有五种:

  (1)RUNNING:

  该状态下,线程池可以接受新任务并且处理进入队列中的任务,线程池一旦创建即为该状态,且线程数量为0

  (2)SHUTDOWN

  该状态下,线程不再接受新任务,但是已经在队列中的任务会依次执行完

  (3)STOP

  该状态下,线程不再接受新任务,且会尝试终止已经在执行和等待执行的任务

  (4)TIDYING

  该状态下,会自动执行terminated()方法。所有任务都已经被终止,队列为空时会进入该状态

  (5)TERMINATED

  执行完terminated()方法后会进入该状态

  其实前三个状态很好理解,第四第五个状态就可能会有点搞不清楚了,TIDYING和TERMINATED状态除了TIDYING会执行一个terminated()方法以外还有什么区别呢,首先我们要知道terminated()方法做的是什么事情,在阅读源码或者查阅资料后,会发现其实在terminated()方法中,是为了唤醒那些外部等待的线程,告诉他们该线程池已经被关闭。而最开始说的那篇博文中,是没有TIDYING状态的,博文使用的版本是1.6,我个人认为,之所以在后续的版本中要增加一个TIDYING态,主要是为了区分,在执行terminated()方法前后,如果状态是同一个,那么外部无法了解该线程池究竟是否已经被完全关闭,而执行完terminated()后变成TERMINATED态,外部可以很清楚的知道该线程已经完全关闭

2.任务的执行

  首先我们先了解一下ThreadPoolExecutor类中其他的一些比较重要成员变量

    //任务缓存队列,存放等待执行的任务
    private final BlockingQueue<Runnable> workQueue;
    //线程池的主要状态锁,对线程池状态(即运行态或线程数等)的改变需要用到这个锁
    private final ReentrantLock mainLock = new ReentrantLock();
    //用于存放工作的集合
    private final HashSet<Worker> workers = new HashSet<Worker>();
    //"终止条件",与mainLock绑定
    private final Condition termination = mainLock.newCondition();
    //用于记录线程池中最多运行的线程数量
    private int largestPoolSize;
    //用于记录已完成的任务数
    private long completedTaskCount;
    //线程工厂,用于创建线程
    private volatile ThreadFactory threadFactory;
    //任务拒绝策略
    private volatile RejectedExecutionHandler handler;
    //线程存活时间
    private volatile long keepAliveTime;
    //是否允许超时核心线程关闭
    private volatile boolean allowCoreThreadTimeOut;
    //核心池线程数量
    private volatile int corePoolSize;
    //线程池最大运行线程数量
    private volatile int maximumPoolSize;

  大部分一看就明白啥意思,值得说明的主要是核心线程池相关的东西

  举个例子,corePoolSize是10,代表线程池里一直都有10个线程等待着任务,一旦有任务发过来,就安排各个线程去做任务,碰到任务数量比较多的时候,10个线程不够了,那么就会创建新的线程以完成任务,在完成了任务之后,如果在keepAliveTime时间内这个线程还没有接到新的任务,就把这个线程关闭。当然了,可以创建的新的线程的数量肯定也是有限制的(想也知道,要是不限制那也就不需要线程池这个东西了),具体数量的多少,就是maximumPoolSize的值,当然他肯定大于等于corePoolSize的值。默认的情况是这样,但是有时候我们连核心池的线程也不想一直保留(视情况而定),那么我们就可以把allowCoreThreadTimeOut设置为true,那么这种情况下,核心池也是空的,当有任务来了之后核心池会创建线程,而被创建的线程在完成了这些任务之后也会经历keepAliveTime时间的等待,如果这个期间没有新的任务过来,那么核心线程池里的线程也会被关闭。

  这里首先还是研究提交方法execute()方法,其实通过submit()也可以提交任务,但是实际上submit()方法里面最终调用的还是execute()方法,主要区别是submit()会有一个返回值以返回结果,实现任务提交的最终还是execute(),这里值得一说的是,开篇引用的那篇博文里面的execute()代码已经过时,并且那位博主复制的时候应该有一点小错误,有点影响理解,所以我才会去翻阅源码,发现1.8的源码已经和当时不一样了,而那篇博文却还在广为流传(当然了,核心的东西几乎没变,而且博主解释的也非常通俗易懂),所以我才写一篇博文。

  话不多说,先贴上execute()代码及官方注释(PS:这里我将corePool称为核心池,maximumPool称为max池)

    public void execute(Runnable command) {
        //如果command为空,直接抛出空指针异常
        if (command == null)
            throw new NullPointerException();
        //获取当前线程状态
        int c = ctl.get();
        //如果当前线程数小于核心池规定数量
        if (workerCountOf(c) < corePoolSize) {
            //如果在核心池中添加command成功,返回null
            if (addWorker(command, true))
                return;
            //否则再次获取线程状态
            c = ctl.get();
        }
        //如果线程池在RUNNING态&&command成功进入工作队列
        if (isRunning(c) && workQueue.offer(command)) {
            //再次获取当前线程池状态
            int recheck = ctl.get();
            //如果线程池状态不为RUNNING&&移除command成功
            if (! isRunning(recheck) && remove(command))
                //拒绝command
                reject(command);
            //如果当前线程池中线程数为0
            else if (workerCountOf(recheck) == 0)
                //在max池内新建一个没有firstTask的线程
                addWorker(null, false);
        }
        //如果在max池中添加command失败
        else if (!addWorker(command, false))
            //拒绝command
            reject(command);
    }

 

  官方注释如下:

  过程分三步:
  1,如果正在运行的线程少于corePoolSize规定的数量,会尝试创建一个新的线程,并将command作为它的firstTask,调用addWorker原子方法检查线程池的runState和workerCount,通过返回false避免在不应该创建线程的时候造成异常。
  2。如果一个任务可以成功地进入队列,那么我们仍然需要二次检查我们是否应该添加一个线程(因为可能在上次检查的期间有个现有的线程被结束了),或是进入这个方法之后,线程池就关闭了。所以我们重新检查状态,如果线程池是STOP则判断是否有必要则回滚队列,如果没有的话则启动一个新的线程。
  3。如果我们不能将任务放进队列,那么我们尝试添加一个新线程。如果失败,我们就知道线程池被关闭或是饱和了,我们就拒绝这个任务。

  相信上面的内容已经足够说明这个方法所做的工作了,唯一搞不清楚的可能就是这个addWorker方法,他的两个参数分别是什么含义,又会影响什么呢

  这里就不贴全部源码了,这方法虽说也不算太复杂,但是我认为也不用太过深入,我们只要知道它干啥,它的参数起到了什么作用和意义就行了

private boolean addWorker(Runnable firstTask, boolean core)

  可以看到,第一个参数是firstTask,其实也就是我们execute里面的command,也就是前面说的将command作为它的firstTask,而后面的core又是什么意思呢,这里把翻译过后的官方注释结合其他博文和我的理解之后的看法附上

  该方法是用来创建,运行,清理Workers的。

  两个参数中,前者command自不用说,后者是一个布尔值,这里写的参数名是core,如果core的值为true,代表确实是core(其实这句也算是废话了….),那么会在核心池中去添加这个工作,如果是false,那么就会在max池中去添加,其实core也就是判断是否在core池,即核心池中执行方法的意思。
  该方法会检查是否一个新的Worker能否能够添加到当前状态以及给定的范围(包括corePoolSize以及maximumPoolSize)的线程池中。如果可以的话,那么Worker的总数会根据添加的Worker来进行调整,并且如果可能的话,一个新的Worker会被创建,并且启动firstTask作为这个Worker的第一个任务。
  当该方法返回false的时候,说明这个当前线程池处于不能接受任务的状态或者因为其他的原因创建线程失败,那么它会返回false。

  至此,任务的执行方面理解到这我认为就可以了。

3.线程池中的线程初始化

  默认情况下,创建了线程池之后线程池后不会自动启动线程,而是等到有任务需要处理的时候才会创建并启动线程。

  有些情况下需要在创建了线程之后即让一些或者全部核心池的线程先创建并启动,也可以说是线程池中线程初始化,这个时候可以用到如下两个方法:

  启动核心池中的一个线程: 

  prestartCoreThread()

  启动核心池中的全部线程:

  prestartAllCoreThreads()

    public boolean prestartCoreThread() {
        return workerCountOf(ctl.get()) < corePoolSize &&
            addWorker(null, true);
    }
    public int prestartAllCoreThreads() {
        int n = 0;
        while (addWorker(null, true))
            ++n;
        return n;
    }

  其实源码中官方的方法注释已经说得比较清楚了,这里直接翻译一下贴上来
  prestartCoreThread()

  启动一个核心线程,让它处于空闲状态等待工作。 这修改了默认的当有新任务需执行的时候才启动核心线程的策略。 如果所有的核心线程都已经被启动了,那么方法会返回false。如果一个核心线程被成功启动,那么方法会返回true。

  prestartAllCoreThreads()

  启动所有的核心线程,让它们处于空闲状态等待工作。 这修改了默认的当有新任务需执行的时候才启动核心线程的策略。 方法返回所启动线程的数量。

  以上即线程池中线程的初始化

4.任务缓存队列及排队策略

  之前提到的任务缓存队列,即workQueue,他的类型是BlockingQueue<Runnable>,比较常用的有以下几个实现类

1,LinkedBlockingQueue

  LinkedBlockingQueue是比较常见的BlockingQueue的实现,他是基于链表的阻塞队列。在创建该对象时如果不指定可存储对象个数大小时,默认为Integer.MAX_VALUE。当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时,才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。

2,ArrayBlockingQueue

  ArrayBlockingQueue是基于数组的阻塞队列,除了有一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue。
ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。

3,SynchronousQueue

  SynchronousQueue是一种没有缓冲的阻塞队列,在生产者put的同时必须要有一个消费者进行take,否则就会阻塞。声明一个SynchronousQueue有两种不同的方式。公平模式和非公平模式的区别:如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,而后一种模式,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

4,PriorityBlockingQueue和DelayQueue

  PriorityBlockingQueue是基于优先级的阻塞队列,该队列不会阻塞生产者,只会阻塞消费者。

  DelayQueue队列存储的对象只有指定的延迟时间到了才能被取出,该队列也不会阻塞生产者。

5.任务拒绝策略

  这里说一下之前提到的任务拒绝策略RejectedExecutionHandler

  当目前线程池中的线程数=max池规定的线程数,并且任务缓存队列已满的情况下,会采取任务拒绝策略以拒绝新添加的任务

  RejectedExecutionHandler有以下四个实现类

  AbortPolicy:拒绝该任务,抛弃该任务并且抛出异常

  CallerRunsPolicy:如果线程池已满则直接执行此任务(确实没太搞清楚啥意思,可能是指的直接让该任务插队,先执行该任务吧)

  DiscardOldestPolicy:当任务添加到线程池中被拒绝时,线程池会放弃等待队列中最旧的未处理任务(即等待队列中最前面的任务),然后将被拒绝的任务添加到等待队列中。

  DiscardPolicy:拒绝该任务,抛弃该任务但是不抛出异常

  RejectedExecutionHandler默认实现的是AbortPolicy

6.线程池的关闭

  ThreadPoolExecutor提供了两个方法,用于关闭线程池,分别是shutdown()和shutdownNow()

  shutdown():不会立即终止线程池,而是要等所有任务缓存队列中的任务都执行完后才终止,但再也不会接受新的任务

  shutdownNow():立即终止线程池,并尝试打断正在执行的任务,并且清空任务缓存队列,返回尚未执行的任务

7.线程池容量的动态调整

三.使用示例

四.如何合理配置线程池的大小

五.引用博文列表:

Java并发编程:线程池的使用

Java线程池—ThreadPoolExecutor中的ctl变量

java线程池的原理学习(三)

BlockingQueue

并发编程–线程池拒绝策略RejectedExecutionHandler(三)

    原文作者:Qunter
    原文地址: https://www.cnblogs.com/Qunter/p/7569245.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞