实现优先级队列的线程池

线程池更多知识点请阅读另一篇博客

在实际的开发中,会将各种不同的异步任务提交到线程池执行,它们有轻重缓急。

如果任务量少,一来就有空闲线程处理,哦那没事了。

如果任务量多,我们希望队列根据任务的优先级有序存储,即优先级高的将会被优先消费。

实现的话有两个关键点

  1. 线程池的任务队列具备排序功能。
  2. 提交的任务具备可比性。

第1点,ThreadPoolExecutor的构造函数有一个BlockingQueue<Runnable> workQueue参数,这个接口有一个实现类PriorityBlockingQueue<E>,它的构造函数可以传入一个比较器Comparator,能够满足要求。

第2点,ThreadPoolExecutor的submit、invokeXxx、execute方法入参都是Runnable、Callable,均不具备可排序的属性。我们可以弄一个实现类,加一些额外的属性,这样就可以让它们具备可比较性了。

除此之外,还有一些难点

  1. PriorityBlockingQueue是无界的,它的offer方法永远返回true。有什么问题吗?第一,OOM风险;第二,设置的最大线程数和拒绝执行策略没有意义(没有机会触发,说shutdown的就不讲武德了)。
  2. 别人在使用ThreadPoolExecutor时,只要是提交Runnable、Callable及其子类都可以,我们就无法保证传进来的任务具备可比较性。有时候,把选择权交给用户不一定是好事。

怎么解决呢?

第1点,我们需要重写一下这个类的offer方法,如果元素超过指定数量直接返回false,否则调用原来逻辑。

第2点,扩展线程池的submit、invokeXxx、execute方法,可以选择继承或组合的方式,类似装饰者模式/静态代理模式。这里我们选择组合,原因:更灵活(开放自定义的方法、屏蔽原有的方法),模仿AQS。

呃,直接上代码吧!!!

package thread_pool;

import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
 * 优先级线程池执行器
 *
 * @author Zhou huanghua
 */
@SuppressWarnings("all")
public class PriorityThreadPoolExecutor {

    private static final Logger LOGGER = Logger.getAnonymousLogger();

    private final PriorityBlockingQueue<Runnable> priorityBlockingQueue;

    private final ThreadPoolExecutor threadPoolExecutor;

    // 自定义默认拒绝执行处理器:让调用线程执行最先提交到队列的任务,然后提交当前任务
    private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() {
        @Override
        public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
            if (e.isShutdown()) {
                return;
            }
            Optional.ofNullable(e.getQueue().poll()).ifPresent(Runnable::run);
            e.execute(r);
        }
    };

    /**
     * 默认构造器
     */
    public PriorityThreadPoolExecutor() {
        this(2, Runtime.getRuntime().availableProcessors() * 2 + 1, 60, TimeUnit.SECONDS, 1000, defaultHandler);
    }

    /**
     * 构造器
     *
     * @param corePoolSize    核心线程数
     * @param maximumPoolSize 最大线程数
     * @param keepAliveTime   保留时间
     * @param unit            保留时间单位
     * @param workQueueSize   任务队列容量
     * @param handler         拒绝执行处理器
     */
    public PriorityThreadPoolExecutor(int corePoolSize,
                                      int maximumPoolSize,
                                      long keepAliveTime,
                                      TimeUnit unit,
                                      int workQueueSize,
                                      RejectedExecutionHandler handler) {
        LOGGER.log(Level.INFO, "创建优先级线程池执行器:corePoolSize={0},maximumPoolSize={1},keepAliveTime={2},unit={3},workQueueSize={4},handler={5}",
                new Object[]{corePoolSize, maximumPoolSize, keepAliveTime, unit.name(), workQueueSize, handler.getClass().getSimpleName()});

        /*
         * 优先级阻塞队列
         * 第一优先级priority,第二优先级taskNumber
         * 重写offer方法,改为有界
         */
        priorityBlockingQueue = new PriorityBlockingQueue(10, (Comparator<PriorityTask>) (t1, t2) ->
                t1.priority != t2.priority ? Integer.compare(t1.priority, t2.priority) : Integer.compare(t1.taskNumber, t2.taskNumber)
        ) {
            @Override
            public boolean offer(Object o) {
                return super.size() < workQueueSize ? super.offer(o) : false;
            }
        };

        // 线程池执行器,使用默认线程工厂
        threadPoolExecutor = new ThreadPoolExecutor(
                corePoolSize,
                maximumPoolSize,
                keepAliveTime,
                unit,
                priorityBlockingQueue,
                handler);
    }

    /**
     * 异步执行,默认最低优先级
     *
     * @param command 任务
     */
    public void asyncExecute(Runnable command) {
        asyncExecute(command, TaskPriority.LOW);
    }

    /**
     * 异步执行
     *
     * @param command  任务
     * @param priority 优先级
     */
    public void asyncExecute(Runnable command, TaskPriority priority) {
        threadPoolExecutor.execute(new PriorityTask(command, priority.value));
        LOGGER.log(Level.INFO, "活跃线程数量:{0},队列任务数量:{1}", new Object[]{threadPoolExecutor.getActiveCount(), priorityBlockingQueue.size()});
    }

    /**
     * 优先级任务
     */
    private static class PriorityTask implements Runnable {
        private static final AtomicInteger NUMBER_GENERATOR = new AtomicInteger(1);
        private final Runnable command;
        private final int priority;
        private final int taskNumber;

        public PriorityTask(Runnable command, Integer priority) {
            this.command = command;
            this.priority = priority;
            this.taskNumber = NUMBER_GENERATOR.getAndIncrement();
        }

        @Override
        public void run() {
            command.run();
        }
    }

    /**
     * 任务优先级枚举:HIGH > MEDIUM > LOW
     */
    public enum TaskPriority {
        HIGH(1),
        MEDIUM(2),
        LOW(3);

        private Integer value;

        TaskPriority(Integer value) {
            this.value = value;
        }
    }
}

补充说明一下:

  • 优先级任务类除了优先级属性外,额外增加一个递增的全局序号属性,提供优先级相等时FIFO。
  • 开放两个构造函数和两个对外提交任务方法。默认的构造函数设置CPU核心*2+1的最大线程数和结合了CallerRunsPolicy、DiscardOldestPolicy的拒绝执行处理器。默认的提交任务方法设置最低优先级。任务优先级提供一个高中低级别的枚举。
  • 每次提交任务后记个日志,那个LOG不要太在意,没有引入其它依赖,就暂时用JDK自带的顶一下。

又到了最令人兴奋的测试环节。

关注下几个参数:核心线程数2,最大线程数4,任务队列容量4,拒绝执行处理器采用调用者执行策略。

任务名称采用优先级+同等优先级的递增序号组成,如31表示第3优先级的第1个任务;任务主体在输出打印之前等待2秒模仿实际情况和测试效果。别问我为啥不用JUnit

    public static void main(String[] args) {

		test1();

		System.out.println("主线程运行完毕!");
	}

	private static void test1() {
		PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, 4, new ThreadPoolExecutor.CallerRunsPolicy());
		executor.asyncExecute(new Task(31), PriorityThreadPoolExecutor.TaskPriority.LOW);
		executor.asyncExecute(new Task(32), PriorityThreadPoolExecutor.TaskPriority.LOW);
		executor.asyncExecute(new Task(21), PriorityThreadPoolExecutor.TaskPriority.MEDIUM);
		executor.asyncExecute(new Task(33));
		executor.asyncExecute(new Task(22), PriorityThreadPoolExecutor.TaskPriority.MEDIUM);
		executor.asyncExecute(new Task(34));
		executor.asyncExecute(new Task(11), PriorityThreadPoolExecutor.TaskPriority.HIGH);
		executor.asyncExecute(new Task(12), PriorityThreadPoolExecutor.TaskPriority.HIGH);
		executor.asyncExecute(new Task(13), PriorityThreadPoolExecutor.TaskPriority.HIGH);
	}

	private static class Task implements Runnable {
    	private final int i;

		public Task(int i) {
			this.i = i;
		}

		@Override
		public void run() {
			LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
			System.out.println("run-task: " + i + ", thread-name: " + Thread.currentThread().getName());
		}
	}

控制台输出如下

《实现优先级队列的线程池》

分析一下,首先两个任务31、32直接创建核心线程执行(线程1和2),然后21、33、22、34四个任务被加入到队列(顺序为21、22、33、34,参考输出顺序),接着两个任务11、12被新增的线程执行(线程3和4),最后任务13被主线程执行。

总而言之,测试通过。至于性能相关,因为需要排序,总会消耗一些的,后面可以做下试验。

    原文作者:西红柿系番茄
    原文地址: https://blog.csdn.net/qq_31142553/article/details/113750449
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞