线程池更多知识点请阅读另一篇博客。
在实际的开发中,会将各种不同的异步任务提交到线程池执行,它们有轻重缓急。
如果任务量少,一来就有空闲线程处理,哦那没事了。
如果任务量多,我们希望队列根据任务的优先级有序存储,即优先级高的将会被优先消费。
实现的话有两个关键点:
- 线程池的任务队列具备排序功能。
- 提交的任务具备可比性。
第1点,ThreadPoolExecutor的构造函数有一个BlockingQueue<Runnable> workQueue参数,这个接口有一个实现类PriorityBlockingQueue<E>,它的构造函数可以传入一个比较器Comparator,能够满足要求。
第2点,ThreadPoolExecutor的submit、invokeXxx、execute方法入参都是Runnable、Callable,均不具备可排序的属性。我们可以弄一个实现类,加一些额外的属性,这样就可以让它们具备可比较性了。
除此之外,还有一些难点:
- PriorityBlockingQueue是无界的,它的offer方法永远返回true。有什么问题吗?第一,OOM风险;第二,设置的最大线程数和拒绝执行策略没有意义(没有机会触发,说shutdown的就不讲武德了)。
- 别人在使用ThreadPoolExecutor时,只要是提交Runnable、Callable及其子类都可以,我们就无法保证传进来的任务具备可比较性。有时候,把选择权交给用户不一定是好事。
怎么解决呢?
第1点,我们需要重写一下这个类的offer方法,如果元素超过指定数量直接返回false,否则调用原来逻辑。
第2点,扩展线程池的submit、invokeXxx、execute方法,可以选择继承或组合的方式,类似装饰者模式/静态代理模式。这里我们选择组合,原因:更灵活(开放自定义的方法、屏蔽原有的方法),模仿AQS。
呃,直接上代码吧!!!
package thread_pool;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.logging.Level;
import java.util.logging.Logger;
/**
* 优先级线程池执行器
*
* @author Zhou huanghua
*/
@SuppressWarnings("all")
public class PriorityThreadPoolExecutor {
private static final Logger LOGGER = Logger.getAnonymousLogger();
private final PriorityBlockingQueue<Runnable> priorityBlockingQueue;
private final ThreadPoolExecutor threadPoolExecutor;
// 自定义默认拒绝执行处理器:让调用线程执行最先提交到队列的任务,然后提交当前任务
private static final RejectedExecutionHandler defaultHandler = new RejectedExecutionHandler() {
@Override
public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
if (e.isShutdown()) {
return;
}
Optional.ofNullable(e.getQueue().poll()).ifPresent(Runnable::run);
e.execute(r);
}
};
/**
* 默认构造器
*/
public PriorityThreadPoolExecutor() {
this(2, Runtime.getRuntime().availableProcessors() * 2 + 1, 60, TimeUnit.SECONDS, 1000, defaultHandler);
}
/**
* 构造器
*
* @param corePoolSize 核心线程数
* @param maximumPoolSize 最大线程数
* @param keepAliveTime 保留时间
* @param unit 保留时间单位
* @param workQueueSize 任务队列容量
* @param handler 拒绝执行处理器
*/
public PriorityThreadPoolExecutor(int corePoolSize,
int maximumPoolSize,
long keepAliveTime,
TimeUnit unit,
int workQueueSize,
RejectedExecutionHandler handler) {
LOGGER.log(Level.INFO, "创建优先级线程池执行器:corePoolSize={0},maximumPoolSize={1},keepAliveTime={2},unit={3},workQueueSize={4},handler={5}",
new Object[]{corePoolSize, maximumPoolSize, keepAliveTime, unit.name(), workQueueSize, handler.getClass().getSimpleName()});
/*
* 优先级阻塞队列
* 第一优先级priority,第二优先级taskNumber
* 重写offer方法,改为有界
*/
priorityBlockingQueue = new PriorityBlockingQueue(10, (Comparator<PriorityTask>) (t1, t2) ->
t1.priority != t2.priority ? Integer.compare(t1.priority, t2.priority) : Integer.compare(t1.taskNumber, t2.taskNumber)
) {
@Override
public boolean offer(Object o) {
return super.size() < workQueueSize ? super.offer(o) : false;
}
};
// 线程池执行器,使用默认线程工厂
threadPoolExecutor = new ThreadPoolExecutor(
corePoolSize,
maximumPoolSize,
keepAliveTime,
unit,
priorityBlockingQueue,
handler);
}
/**
* 异步执行,默认最低优先级
*
* @param command 任务
*/
public void asyncExecute(Runnable command) {
asyncExecute(command, TaskPriority.LOW);
}
/**
* 异步执行
*
* @param command 任务
* @param priority 优先级
*/
public void asyncExecute(Runnable command, TaskPriority priority) {
threadPoolExecutor.execute(new PriorityTask(command, priority.value));
LOGGER.log(Level.INFO, "活跃线程数量:{0},队列任务数量:{1}", new Object[]{threadPoolExecutor.getActiveCount(), priorityBlockingQueue.size()});
}
/**
* 优先级任务
*/
private static class PriorityTask implements Runnable {
private static final AtomicInteger NUMBER_GENERATOR = new AtomicInteger(1);
private final Runnable command;
private final int priority;
private final int taskNumber;
public PriorityTask(Runnable command, Integer priority) {
this.command = command;
this.priority = priority;
this.taskNumber = NUMBER_GENERATOR.getAndIncrement();
}
@Override
public void run() {
command.run();
}
}
/**
* 任务优先级枚举:HIGH > MEDIUM > LOW
*/
public enum TaskPriority {
HIGH(1),
MEDIUM(2),
LOW(3);
private Integer value;
TaskPriority(Integer value) {
this.value = value;
}
}
}
补充说明一下:
- 优先级任务类除了优先级属性外,额外增加一个递增的全局序号属性,提供优先级相等时FIFO。
- 开放两个构造函数和两个对外提交任务方法。默认的构造函数设置CPU核心*2+1的最大线程数和结合了CallerRunsPolicy、DiscardOldestPolicy的拒绝执行处理器。默认的提交任务方法设置最低优先级。任务优先级提供一个高中低级别的枚举。
- 每次提交任务后记个日志,那个LOG不要太在意,没有引入其它依赖,就暂时用JDK自带的顶一下。
又到了最令人兴奋的测试环节。
关注下几个参数:核心线程数2,最大线程数4,任务队列容量4,拒绝执行处理器采用调用者执行策略。
任务名称采用优先级+同等优先级的递增序号组成,如31表示第3优先级的第1个任务;任务主体在输出打印之前等待2秒模仿实际情况和测试效果。别问我为啥不用JUnit
public static void main(String[] args) {
test1();
System.out.println("主线程运行完毕!");
}
private static void test1() {
PriorityThreadPoolExecutor executor = new PriorityThreadPoolExecutor(2, 4, 60, TimeUnit.SECONDS, 4, new ThreadPoolExecutor.CallerRunsPolicy());
executor.asyncExecute(new Task(31), PriorityThreadPoolExecutor.TaskPriority.LOW);
executor.asyncExecute(new Task(32), PriorityThreadPoolExecutor.TaskPriority.LOW);
executor.asyncExecute(new Task(21), PriorityThreadPoolExecutor.TaskPriority.MEDIUM);
executor.asyncExecute(new Task(33));
executor.asyncExecute(new Task(22), PriorityThreadPoolExecutor.TaskPriority.MEDIUM);
executor.asyncExecute(new Task(34));
executor.asyncExecute(new Task(11), PriorityThreadPoolExecutor.TaskPriority.HIGH);
executor.asyncExecute(new Task(12), PriorityThreadPoolExecutor.TaskPriority.HIGH);
executor.asyncExecute(new Task(13), PriorityThreadPoolExecutor.TaskPriority.HIGH);
}
private static class Task implements Runnable {
private final int i;
public Task(int i) {
this.i = i;
}
@Override
public void run() {
LockSupport.parkNanos(TimeUnit.SECONDS.toNanos(2));
System.out.println("run-task: " + i + ", thread-name: " + Thread.currentThread().getName());
}
}
控制台输出如下
分析一下,首先两个任务31、32直接创建核心线程执行(线程1和2),然后21、33、22、34四个任务被加入到队列(顺序为21、22、33、34,参考输出顺序),接着两个任务11、12被新增的线程执行(线程3和4),最后任务13被主线程执行。
总而言之,测试通过。至于性能相关,因为需要排序,总会消耗一些的,后面可以做下试验。