优雅的使用Java线程池

从线程池使用谈起

创建并使用一个线程池

线程池这一概念,想必所有开发者都不陌生。它的应用场景十分广泛,可以被广泛的用于高并发的处理场景。Java 在 juc 包内提供了许多线程池相关的类,可以帮我们快速的构建一个线程池。目前 juc 提供的 Executors 工厂类,可以方便的创建线程池,其提供了创建无限大的线程池、指定大小线程池、定时调度线程池以及单个线程池等等,我们可以通过以下代码简单的创建一个线程池。

//创建一个不限制线程个数的线程池 ExecutorService executor = Executors.newCachedThreadPool();
//创建一个固定线程个数的线程池 ExecutorService executor = Executors.newFixedThreadPool(10);

同样,Executors提供的工厂方法中,我们也可以通过传入一个 ThreadFactory 来自定义线程创建时的一些属性,如下。

private static ExecutorService exec = Executors.newCachedThreadPool(new ThreadFactory() {
    @Override
    public Thread newThread(Runnable r) {
        Thread t = new Thread(r);
        t.setName("worker-thread-" + UUID.randomUUID().toString());
        return t;
    }
});

//使用lambda可替换为 private static ExecutorService exec = Executors.newCachedThreadPool(r -> {
    Thread t = new Thread(r);
    t.setName("worker-thread-" + UUID.randomUUID().toString());
    return t;
});

以上代码可以看出,通过Executors提供的工厂方法,我们可以很简单的创建一个线程池来使用。通过 ExecutorService 提供的 submit(Runnable) 接口,即可简单的向线程池提交任务。

CachedThreadPool 与 FixedThreadPool

作为最常用的两种线程池,CachedThreadPool 和 FixedThreadPool 在不同场景,有着不同的应用。 – CachedThreadPool 主要被应用在响应时间要求高、数据量可控的场景,由于其不限制创建线程的个数,故若数据量不可控,会造成程序 OOM – FixedThreadPool 主要被应用在线程资源有限,数据量较小或不可控场景,由于其线程数量有限,针对于过多的数据量,默认将会进行丢弃,但是不会造成程序 OOM 我们可以通过自己的实际场景需求,选择不同的线程池。

ThreadPoolExecutor

Executors 工厂的实现

Executors仅是一个工厂类,查看它的实现我们便可以看到,其不同线程池(暂时不考虑调度线程池)的底层均为 ThreadPoolExecutor 实例,通过不同的初始化参数形成了不同的特性,CachedThreadPool 和 FixedThreadPool 的创建部分源码如下。

//创建CachedThreadPool public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}
//创建FixedThreadPool public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

ThreadPoolExecutor 的构建

以上便是工厂 Executors 中创建线程池的具体实现。从实现代码中,可以看出,不同特性的线程池本质都是构建 ThreadPoolExecutor 对象。查看 ThreadPoolExecutor 类的源码可以看到,其构造方法定义如下。

public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
    if (corePoolSize < 0 ||
        maximumPoolSize <= 0 ||
        maximumPoolSize < corePoolSize ||
        keepAliveTime < 0)
        throw new IllegalArgumentException();
    if (workQueue == null || threadFactory == null || handler == null)
        throw new NullPointerException();
    this.acc = System.getSecurityManager() == null ?
            null :
            AccessController.getContext();
    this.corePoolSize = corePoolSize;
    this.maximumPoolSize = maximumPoolSize;
    this.workQueue = workQueue;
    this.keepAliveTime = unit.toNanos(keepAliveTime);
    this.threadFactory = threadFactory;
    this.handler = handler;
}

构造参数简介如下: – corePoolSize:线程池中一直有的线程个数,默认情况下即使空闲也不会被回收(可以通过设置allowCoreThreadTimeOut参数来改变默认) – maximumPoolSize:线程池中可以持有的最多线程数 – keepAliveTime:超过corePoolSize数的空闲线程在被销毁之前等待新任务到达的最长时间 – unit:keepAliveTime参数的单位 – workQueue:线程池的等待队列,被execute方法提交的任务将进入这一队列,默认无限大 – threadFactory:线程工厂,可以自定义线程的创建过程 – handler:拒绝处理器,负责在workQueue满的时候处理新提交的任务

反观Executors工厂的实现,可以看出,针对于 FixedThreadPool 的创建,其实就是创建一个核心线程和最大线程均为固定值的线程池,以保证只有固定个线程提供服务;针对于 CachedThreadPool 的创建,则是创建一个核心线程数为0、最大线程数为整型最大值的线程池。

Executors存在的问题

由于篇幅有限,此处仅针对日常使用最多的 CachedThreadPool 和 FixedThreadPool 两种线程池进行问题分析。

FixedThreadPool

再次回顾FixedThreadPool的定义,代码如下。

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}

由代码可以看到,除却核心线程和最大线程数都设置为固定值,FixedThreadPool还使用了一个无长度限制的等待队列。 在使用上,通常都会认为FixedThreadPool是不会占过多资源的。但是使用中,FixedThreadPool仍然会可能出现 OOM 的风险。这是因为,由于FixedThreadPool采用无界的等待队列,一旦空闲线程被用尽,就会向队列中加入任务,这时一旦任务进入速度远高于线程处理能力,就有出现 OOM 的可能。 阿里巴巴编码规范中,也有关于线程池的使用说明。其建议通过直接定义 ThreadPoolExecutor 来代替使用 Executors 提供的工厂方法。在我们处理的数据量较大或者并发量很大时,应避免直接使用 Executors 提供的 FixedThreadPool。

CachedThreadPool

CachedThreadPool的定义在前文中也已经介绍过,具体如下。

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

由代码可以看到,CachedThreadPool将空闲线程销毁前的等待时间设置成了60s,同时采用SynchronousQueue,不进行等待队列的设置。 CachedThreadPool 在一定程度上能够应对不间断突增的并发量,但是一旦对总量把控不好,就容易引发OOM。

线程池提交任务实现阻塞等待

由于 FixedThreadPool 因为等待队列无限大可能会导致OOM,所以我们可以通过直接创建 ThreadPoolExecutor 来替代使用 Executors.newFixedThreadPool,在构建过程中通过指定等待队列大小,来避免出现OOM。但是由于 ThreadPoolExecutor 在等待队列满时,会拒绝任务插入并直接丢弃,所以针对于不可以丢弃的任务,就不能简单的采用这种方式。 例如,一个 Consumer 在不断的消费 MQ,并希望通过不同的 Worker 线程来并发处理。如果采用上述方案,那么在消费速率快,Worker 线程池等待队列慢的情况下,就会发生丢数据,这显然是我们不想看到的。在更多时候,我们需要的都是一个可以阻塞等待的线程池。

变更拒绝策略

说到让线程池提交任务阻塞等待,最简单的方式就是通过增加一个拒绝策略,该策略中做的便是对等待队列进行阻塞写入,也就实现了线程池提交任务的阻塞等待,具体如下。

/**  * @author jayden  */
public class ExecutorsDemo {

    private static ExecutorService exec = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.SECONDS, new LinkedBlockingQueue<>(100), r -> {
        Thread t = new Thread(r);
        t.setName("worker-thread-" + UUID.randomUUID().toString());
        return t;
    }, (r, executor) -> {
        if (!executor.isShutdown()) {
            try {
                //阻塞等待put操作                 System.err.println("waiting queue is full, putting...");
                executor.getQueue().put(r);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    });

    private static AtomicInteger at = new AtomicInteger(0);

    public static void main(String[] args) {
        while (true) {
            exec.submit(() -> {
                System.err.println("Worker" + at.getAndIncrement() + " start.");
                try {
                    Thread.sleep(2000);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.err.println("Worker end.");
            });
        }
    }
}

以上程序中,通过直接初始化 ThreadPoolExecutor 并指定拒绝策略的方式,来实现线程池任务的阻塞提交。

使用信号量

为每个需要阻塞的线程池增加拒绝策略的这种方式虽然可行,但是每次初始化都要添加重复代码明显感觉不太优雅。在思考如何能优雅的实现时,想到可以添加一层代理。代理类持有真正的线程池,同时持有信号量。通过信号量来控制线程池任务的提交,不改变原有线程池的定义。具体如下。

/**  * @author jayden  */
public class BlockedThreadPool {

    private ExecutorService executor;

    private Semaphore semaphore;

    /**  * 接收两个参数,最大允许线程数,自定义线程名  *  * @param nThreads  * @param name  */
    private BlockedThreadPool(int nThreads, String name) {
        if (nThreads <= 0) {
            throw new IllegalArgumentException();
        }
        semaphore = new Semaphore(nThreads);
        executor = Executors.newCachedThreadPool(r -> {
            Thread t = new Thread(r);
            t.setName(name + UUID.randomUUID().toString());
            return t;
        });
    }

    /**  * 提供工厂方法  *  * @param nThread  * @param name  */
    public static BlockedThreadPool createBlockedThreadPool(int nThread, String name) {
        return new BlockedThreadPool(nThread, name);
    }

    /**  * 向线程池提交任务  * @param r  */
    public void submit(Runnable r) {
        executor.submit(() -> {
            try {
                semaphore.acquire();
                r.run();
                semaphore.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        });
    }
}

在该示例中,使用了 Executors.newCachedThreadPool 来构造真正线程池实例,由于这种实现是通过信号量来控制并发及阻塞的,故不需要在线程池本身层面进行限制设置。 上文代码仅是一个示例demo,根据不同业务场景,还需要进一步的抽象及扩展。

总结

本文中所考虑的问题最初来自于Kafka消费服务在生产环境的一次OOM。在最初使用 Executors.newFixedCachePool 方法构建线程池中,虽然指定了线程池大小但是还是出现了OOM,经调查才搞明白具体的原因。本文意在抛砖引玉,仅是针对于单一场景的一个总结。JUC 博大精深,还需要继续深入探究。本文中示例代码,请参考java-demo/threadpool

    原文作者:Jayden
    原文地址: https://zhuanlan.zhihu.com/p/60986630
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞