自定义线程池ThreadPoolExecutor

优雅使用线程池ThreadPoolExecutor实现自定义

一.引言
线程池想必大家也都用过,JDK的Executors 也自带一些线程池。但是不知道大家有没有想过,如何才是最优雅的方式去使用过线程池吗? 生产环境要怎么去配置自己的线程池才是合理的呢?为什么在阿里的开发手册中明确指出在使用线程池时一定要使用ThreadPoolExecutor?

二.为什么要使用线程
线程是稀缺资源,如果被无限制的创建,不 仅会消耗系统资源,还会降低系统的稳定性,因此Java中提供线程池对线程进行统一分配、 调优和监控的框架。

既然线程池有这么多优势,在什么场景下适合使用线程呢?如果并发的请求数量非常多,但每个线程执行的时间很短,这样就会频繁的创建和销毁 线程,大部分时间都浪费在创建和销毁线程上了如此一来会大大降低系统的效率。可能出现服务器在为每个请求创建新线程和销毁线 程上花费的时间和消耗的系统资源要比处理实际的用户请求的时间和资源更多,在这种场景呢下使用线程池非常适合。
适合场景:1.需要处理的任务数量大;2.单任务处理时间比较短;

三.线程池相关参数配置
1.在讨论线程池相关参数配置之前,我们先来看一下线程池的构造方法

    public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) { 
        if (corePoolSize < 0 ||
            maximumPoolSize <= 0 ||
            maximumPoolSize < corePoolSize ||
            keepAliveTime < 0)
            throw new IllegalArgumentException();
        if (workQueue == null || threadFactory == null || handler == null)
            throw new NullPointerException();
        this.corePoolSize = corePoolSize;
        this.maximumPoolSize = maximumPoolSize;
        this.workQueue = workQueue;
        this.keepAliveTime = unit.toNanos(keepAliveTime);
        this.threadFactory = threadFactory;
        this.handler = handler;
    }

2.线程池中主要涉及到一下几个参数:

int corePoolSize, 核心线程数;
int maximumPoolSize,最大线程数;
long keepAliveTime,线程执行任务的超时时间,根据业务处理时间进行设置;
TimeUnit unit,时间单位
BlockingQueue workQueue,任务阻塞队列,默认是一个无界队列。如果自定义线程池需要根据具体业务情况选取一个阻塞队列的大小;
ThreadFactory threadFactory,线程工厂
RejectedExecutionHandler handler,线程负载时的拒绝策略

3.线程池大小的设置
线程中线程数的确定时面试中经常被问到的一个问题,在明确这个问题之前我们需要先了解两个概念:
(1)cpu计算密集型
顾名思义就是应用需要非常多的CPU计算资源,在多核CPU时代,我们要让每一个CPU核心都参与计算,将CPU的性能充分利用起来,这样才算是没有浪费服务器配置,如果在非常好的服务器配置上还运行着单线程程序那将是多么重大的浪费。对于计算密集型的应用,完全是靠CPU的核数来工作,所以为了让它的优势完全发挥出来,避免过多的线程上下文切换,比较理想方案是:
线程数 = CPU核数+1,也可以设置成CPU核数*2,但还要看JDK的版本以及CPU配置(服务器的CPU有超线程)。

一般设置CPU * 2即可。
(2)cpuIO密集型
我们现在做的开发大部分都是WEB应用,涉及到大量的网络传输,不仅如此,与数据库,与缓存间的交互也涉及到IO,一旦发生IO,线程就会处于等待状态,当IO结束,数据准备好后,线程才会继续执行。因此从这里可以发现,对于IO密集型的应用,我们可以多设置一些线程池中线程的数量,这样就能让在等待IO的这段时间内,线程可以去做其它事,提高并发处理效率。那么这个线程池的数据量是不是可以随便设置呢?当然不是的,请一定要记得,线程上下文切换是有代价的。目前总结了一套公式,对于IO密集型应用:
线程数 = ((线程等待时间+线程CPU时间)/线程CPU时间 )* CPU数目
注:没有绝对的方法确定线程数,这种方法只是一个估值,在实际开发过程中还需要根据自己的硬件配置和业务执行时间来进行优化;

JDK自带的拒绝策略如下:
AbortPolicy:直接抛出异常,默认策略;
CallerRunsPolicy:用调用者所在的线程来执行任务;
DiscardOldestPolicy:丢弃阻塞队列中靠最前的任务, 并执行当前任务;
DiscardPolicy:直接丢弃任务;
注:其他核心参数会在后面的线程池的使用中具体讲解

4.Executors中自带的四种线程池
使用过JDK自带的Executors线程池工具的同学应该都知道,在Executors中为我们提供了四种基本的线程池,虽然这四种线程池在使用上非常方便,但是四种线程池也各自有一定的问题,下面就来为大家分析一下这四种线程池和相应的问题所在:
(1)固定线程数的线程池newFixedThreadPool(int nThreads)

   public static ExecutorService newFixedThreadPool(int nThreads) { 
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());
    }

public LinkedBlockingQueue() { 
        this(Integer.MAX_VALUE);
    }

问题:大家可以从线程池的初始化方法newFixedThreadPool(int nThreads)发现,这种线程池我们可以自定线程数,但是却不能指定阻塞队列的大小,而初始化方法给我们默认设置的阻塞队列的大小为 this(Integer.MAX_VALUE);这样就会造成我们的队列可以无限的进行膨胀,知道发生OOM,因此使用这种线程池需要谨慎,需要对系统中任务增长速率和处理速率有一个很好的评估;

(2)可缓存线程数的线程池 newCachedThreadPool()

   public static ExecutorService newCachedThreadPool() { 
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());
    }

  TransferQueue() { 
            QNode h = new QNode(null, false); // initialize to dummy node.
            head = h;
            tail = h;
        }

问题:从源码中我们可以看出newCachedThreadPool()线程池中核心线程数最大值为Integer.MAX_VALUE,而阻塞队列使用的是一个 TransferQueue()链表,理论上这个链表也是会无限增长的,但是由于核心线程数是Integer.MAX_VALUE;这样就会出现一个问题,当大量的任务提交到线程池后,并不会在阻塞队列中进行阻塞,而是会无限制的创建新的线程来处理任务,这时大量的线程会造成OOM;

(3)单线程的线程池newSingleThreadExecutor()

   public static ExecutorService newSingleThreadExecutor() { 
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));
    }
 public LinkedBlockingQueue() { 
        this(Integer.MAX_VALUE);
    }

问题:从单线程的线程池的初始化方法中可以很清晰的发现这种线程池只有一个核心线程和一个 new LinkedBlockingQueue()无界队列,首先使用这种线程池会串行化的去执行每个任务,这样效率会很低,而且单线程效率低会导致大量任务堆积在阻塞队列中,这个队列的最大值为Integer.MAX_VALUE,这时就会跟newFixedThreadPool()有一样的问题,阻塞队列任务堆积造成OOM;

(4)定时、周期执行线程池newSingleThreadScheduledExecutor()

   public static ScheduledExecutorService newSingleThreadScheduledExecutor() { 
        return new DelegatedScheduledExecutorService
            (new ScheduledThreadPoolExecutor(1));
    }

 public ScheduledThreadPoolExecutor(int corePoolSize) { 
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());
    }

分析:这种线程对于大部分业务开发来说可能使用的比较少,但是在一些框架源码中经常使用如:定时的向服务器发送心跳,客户端定时从注册中心上拉取可用服务列表,这种线程池本身并没有什么OOM的问题,因为我们一个系统中需要进行定时处理的任务并不是很多,但是使用用这种线程池需要对其执行流程和API有一定了解;
定时线程池执行流程:
《自定义线程池ThreadPoolExecutor》
注:在使用定时线程池一下三个方法一定要注意区别:

//方法一 :延迟执行且执行一次
schedule(Runnable command,long delay,TimeUnit unit)
//方法二 :定时周期执行,不考虑上一个周期是否执行完成
scheduleAtFixedRate(Runnable command, long initialDelay,long period,TimeUnit unit)
//方法三 :定时周期执行,但要等待上一个周期执行完成再顺延delay时长执行下一个周期
scheduleWithFixedDelay(Runnable command,long initialDelay,
long delay,TimeUnit unit)

四.ThreadPoolExecutor执行流程和源码分析
1.线程执行流程
《自定义线程池ThreadPoolExecutor》
2.ThreadPoolExecutor执行源码分析
本文主要以ThreadPoolExecutor提交Runnable任务为例经讲解源码执行流程,如果想了解线程池执行Callable的源码请移步:https://blog.csdn.net/weixin_41251135/article/details/110210300

(1)ThreadPoolExecutor. execute(Runnable command)任务提交入口

public void execute(Runnable command) { 
        if (command == null)
            throw new NullPointerException();
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) { //当前线程数小于核心线程,添加Worker,创建核心线程
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        if (isRunning(c) && workQueue.offer(command)) { //将任务添加到阻塞队列
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);//创建非核心线程
        }
        else if (!addWorker(command, false))
            reject(command);//执行拒绝策略
    }

(2)核心方法addWorker(Runnable firstTask, boolean core)创建核心/非核心线程

private boolean addWorker(Runnable firstTask, boolean core) { 
        retry:
        for (;;) { //这里使用了一个for循环判断线程的状态,大家不必过多关心,后文会详细讲解线程池状态;
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) { 
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try { 
            w = new Worker(firstTask);//这行是整个线程池最核心的一行代码,创建了一个Worker对象worker对象中包括 this.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
            this.thread = getThreadFactory().newThread(this);
            final Thread t = w.thread;//当前的thread就是Worker
            if (t != null) { 
                final ReentrantLock mainLock = this.mainLock;
                mainLock.lock();//独占锁
                try { 
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int rs = runStateOf(ctl.get());

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) { 
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally { 
                    mainLock.unlock();//解锁
                }
                if (workerAdded) { 
                    t.start();//调用执行Worker.run(),这里如果不明白继续向下看
                    workerStarted = true;
                }
            }
        } finally { 
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

Worker对象分析

 Worker(Runnable firstTask) { 
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);//Worker类本身就实现了Runnable接口,所以此时的thread 就是当前Worker本身,这一点很重要,这样封装可以在addWorker()方法中调用 t.start()方法时首先执行Worker类的run()方法;
        }

Worker.run()

 public void run() { 
            runWorker(this);
        }

(3)runWorker(Worker w)这个是Runnable任务执行的方法

final void runWorker(Worker w) { 
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try { 
            while (task != null || (task = getTask()) != null) { //这里使用了循环的执行任务,当前任务执行完了,还可以getTask()从阻塞队列中获取任务执行,这就是为什么线程池中的一个线程可以反复的执行多个任务的原因;
                w.lock();
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try { 
                    beforeExecute(wt, task);
                    Throwable thrown = null;
                    try { 
                        task.run();//执行具体的Runnable.run()的业务逻辑,这里其实是一个多态还可以执行FutureTask包装的Callable任务
                    } catch (RuntimeException x) { 
                        thrown = x; throw x;
                    } catch (Error x) { 
                        thrown = x; throw x;
                    } catch (Throwable x) { 
                        thrown = x; throw new Error(x);
                    } finally { 
                        afterExecute(task, thrown);
                    }
                } finally { 
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally { 
            processWorkerExit(w, completedAbruptly);
        }
    }

(4) getTask()从阻塞队列中获取任务执行
这个方法的作用一个是workQueue获取等待任务,二是根据当前workQueue是否为空动态的回收多余的线程

 private Runnable getTask() { 
        boolean timedOut = false; // Did the last poll() time out?

        for (;;) { 
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) { 
                decrementWorkerCount();
                return null;
            }

            int wc = workerCountOf(c);

            // Are workers subject to culling?
            boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

            if ((wc > maximumPoolSize || (timed && timedOut))
                && (wc > 1 || workQueue.isEmpty())) { 
                if (compareAndDecrementWorkerCount(c))
                    return null;
                continue;
            }

            try { 
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) { 
                timedOut = false;
            }
        }
    }

五.线程池的状态转换过程
1.线程池的五种状态
RUNNING = ­1 << COUNT_BITS; //高3位为111
SHUTDOWN = 0 << COUNT_BITS; //高3位为000
STOP = 1 << COUNT_BITS; //高3位为001
TIDYING = 2 << COUNT_BITS; //高3位为010
TERMINATED = 3 << COUNT_BITS; //高3位为011
2.五种状态流转过程
《自定义线程池ThreadPoolExecutor》

    原文作者:小小书童_9527
    原文地址: https://blog.csdn.net/weixin_41251135/article/details/109590380
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞