基本介绍
ScheduledThreadPoolExecutor
,JUC中提供的调度线程池实现,可以用来执行定时任务,实现与Timer
一样的功能特性,相比于Timer
,ScheduledThreadPoolExecutor
具有以下优势:
- 支持同时调度多种多样的调度任务,耗时的任务调度不会影响其他调度任务的调度(Timer为单线程执行,若存在多个调度任务,则可能任务之间的执行时间会影响到各自的调度时间)
ScheduledThreadPoolExecutor vs Timer 可参考StackOverflow: java-timer-vs-executorservice
使用示例
以最主要的两个方法展示其使用:(1)scheduleAtFixedRate (2)scheduleWithFixedDelay
public class ScheduledThreadPoolExecutorTest {
public static void main(String[] args) {
ThreadFactory tf = new ThreadFactory() {
AtomicLong counter = new AtomicLong(0);
@Override
public Thread newThread(Runnable r) {
return new Thread(r, "scheduler" + counter.getAndIncrement());
}
};
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(4, tf);
// fixed rate 方式, 每隔3s执行一次, 若任务执行耗时超过3s, 则下一次调度会立刻执行
scheduler.scheduleAtFixedRate(new FixedRateTask(), 0, 3, TimeUnit.SECONDS);
// fixed delay 方式, 能够保证每两次调度之间的时间间隔都一致, 在这里就是设置的3s
// scheduler.scheduleWithFixedDelay(new FixedDelayTask(), 0, 3, TimeUnit.SECONDS);
for (;;) {
try {
TimeUnit.SECONDS.sleep(5);
} catch (InterruptedException e) {
// ignore ..
}
}
}
static class FixedRateTask implements Runnable {
@Override
public void run() {
System.out.println("FixedRateTask execute : " + printTime());
try {
TimeUnit.SECONDS.sleep(10);
}
catch (InterruptedException e) {
// ignore
}
}
}
static class FixedDelayTask implements Runnable {
@Override
public void run() {
System.out.println("FixedDelayTask execute : " + printTime());
try {
TimeUnit.SECONDS.sleep(5);
}
catch (InterruptedException e) {
// ignore
}
}
}
static String printTime() {
Date now = new Date(System.currentTimeMillis());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
return sdf.format(now);
}
}
通过上面的例子的运行结果可以捋清楚scheduleAtFixedRate与scheduleWithFixedDelay的区别
源码分析
部分解释在代码上面+注释的形式进行,因此请注意一下代码上面的中文注释
类继承关系
ScheduledThreadPoolExecutor extends ThreadPoolExecutor
implements ScheduledExecutorService
ScheduledThreadPoolExecutor
继承了普通线程池ThreadPoolExecutor
,在其基础上扩展并实现ScheduledExecutorService
。在ScheduledExecutorService
中提供了主要的调度实现方法。
ScheduledExecutorService
扩展ExecutorService
提供一些调度池必须实现的方法:
public interface ScheduledExecutorService extends ExecutorService {
// 只调度一次,可设置延迟调度
public ScheduledFuture<?> schedule(Runnable command,
long delay, TimeUnit unit);
// 同上面方法,只是执行的是callable
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay, TimeUnit unit);
// 固定rate执行,若任务执行时间超过period,则下一次调度立刻就调度起来
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit);
// 真正实现固定间隔执行的方法,一次调度结束之后,间隔period之后再进行一次调度
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit);
}
重要组成
ScheduledFutureTask
在ThreadPoolExecutor
中submit实现里面,使用了FutureTask
来封装我们真实的任务对象(Runnable or Callable);而在ScheduledThreadPoolExecutor
中,为了实现不同的处理逻辑,在FutureTask
的基础上进行扩展,这就形成了ScheduledFutureTask
,主要的扩展点简述如下:
- 为了能够将task丢到内部实现的延迟工作队列,需要实现
Delayed
接口,实现getDelay和compareTo方法(RunnableScheduledFutur
是Delayed
的子接口) - 为了自行实现调度逻辑,比如一些周期任务实现,需要在run任务之后,重新计算下一次执行时间,再add到内部实现的延迟工作队列,因此需要重写run方法(该run方法最终会由线程池中的工作线程进行调用)
ScheduledFutureTask
扩展了FutureTask
并实现了RunnableScheduledFuture
(RunnableScheduledFuture
其实是RunnableFuture
和ScheduledFuture
的结合体[Runnable, Future, Delayed])
private class ScheduledFutureTask<V> extends FutureTask<V> implements RunnableScheduledFuture<V> {
// 用于协助进行compareTo
// 当两个ScheduledFutureTask的执行时间time一致,将比较其sequenceNumber值的大小来区分谁排在前面(先执行)
private final long sequenceNumber;
// 当前task下一次执行时间
private long time;
// 调度周期,有三种情况:
// 1. 若是一次性调度,则值为0
// 2. 若是固定调度时间调度(scheduleAtFixedRate),则为正数
// 3. 若是固定延迟时间调度(scheduleAtFixedDelay),则为负数(程序里面再次进行取反计算)
private final long period;
RunnableScheduledFuture<V> outerTask = this;
// 工作队列采用优先最小堆实现(数组形式),这里用heapIndex记录该task在堆中的具体位置
int heapIndex;
// schedule runnable方法采用此构造函数构造task
ScheduledFutureTask(Runnable r, V result, long ns) {
super(r, result);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// schedule callable方法采用此构造函数构造task
ScheduledFutureTask(Callable<V> callable, long ns) {
super(callable);
this.time = ns;
this.period = 0;
this.sequenceNumber = sequencer.getAndIncrement();
}
// scheduleAtFixedRate/scheduleWithFixedDelay采用此构造函数构造task
ScheduledFutureTask(Runnable r, V result, long ns, long period) {
super(r, result);
this.time = ns;
this.period = period;
this.sequenceNumber = sequencer.getAndIncrement();
}
// 该方法被延迟工作队列使用,满足延迟时间到达的元素才能从延迟队列中被取出
public long getDelay(TimeUnit unit) {
return unit.convert(time - now(), TimeUnit.NANOSECONDS);
}
// 用此方法来决定ScheduledFutureTask在工作队列中的位置(堆的构建调整需要用到)
public int compareTo(Delayed other) {
if (other == this) // compare zero ONLY if same object
return 0;
if (other instanceof ScheduledFutureTask) {
ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
long diff = time - x.time;
if (diff < 0)
return -1;
else if (diff > 0)
return 1;
else if (sequenceNumber < x.sequenceNumber) // 当执行时间都一致的情况下,使用序列号判断
return -1;
else
return 1;
}
long d = (getDelay(TimeUnit.NANOSECONDS) -
other.getDelay(TimeUnit.NANOSECONDS));
return (d == 0) ? 0 : ((d < 0) ? -1 : 1);
}
// 判断该task是否为周期调度task(fixedRate or fixedDelay)
public boolean isPeriodic() {
return period != 0;
}
// 用于在task执行结束后设置下一次执行的时间(前提是该task为周期调度task)
private void setNextRunTime() {
long p = period;
if (p > 0) // fixedRate方式,直接加上间隔
time += p;
else
time = triggerTime(-p); // fixedDelay方式,在当前时间(执行结束后)的基础上加上间隔,注意在这种情况下,period存储的是负数,因此这里进行取反
}
// 该run方法将被线程池中的工作线程调用
public void run() {
boolean periodic = isPeriodic(); // 是否为周期调度task
if (!canRunInCurrentRunState(periodic)) // 无法在当前state下进行,取消
cancel(false);
else if (!periodic) // 不是周期调度task,直接调用run
ScheduledFutureTask.super.run();
else if (ScheduledFutureTask.super.runAndReset()) { // 周期调度task,run后重置FutureTask的状态
setNextRunTime(); // 计算下一次调度时间
reExecutePeriodic(outerTask); // 将该task尝试重新add到延迟工作队列中
}
}
}
从该类的实现中,我们已经可以看出scheduleAtFixedRate 与 scheduleWithFixedDelay 的区别:前者是在上一次执行开始的时间基础上增加间隔来计算下一次调度时间(可能出现计算完后的时间已经是过去时,此时再次调度会立刻发起);后者是在任务已经跑完,根据当前时间再计算间隔,再跑起来(scheduleWithFixedDelay)
DelayedWorkQueue
我们知道,在普通的线程池ThreadPoolExecutor
实现中所采用的工作队列,若元素添加进去,立马就会被工作线程取出并执行。为了实现周期调度,需要采用延迟阻塞队列的思路来做,工作队列中的元素必须满足达到一定的时间(延迟)才能被取出执行。必须实现类似于延迟阻塞队列的功能,在JUC中有DelayQueue
,但是在ScheduledThreadPoolExecutor
中,选择自己实现了延迟阻塞队列,说的就是DelayedWorkQueue
,实现分析可参考DelayQueue
的实现分析: JUC DelayQueue 分析
核心方法实现分析
构造方法
ScheduledThreadPoolExecutor
继承自ThreadPoolExecutor
,其构造方法其实就是直接调用了父类的方法进行构造,最大线程数可理解为无限大(int的最大值)且工作线程控制永远不会超时,工作队列使用自己定制的DelayedWorkQueue
,其中一个构造方法如下所示:
public ScheduledThreadPoolExecutor(int corePoolSize,
ThreadFactory threadFactory,
RejectedExecutionHandler handler) {
super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
new DelayedWorkQueue(), threadFactory, handler);
}
schedule
调度任务的添加都是先offer到工作队列,延迟时间到了再取出来。即使是execute or submit,也是先将任务丢到工作队列
Tips: execute/submit最终调用的还是schedule方法
针对Runnable
和Callable
各有一个方法:
public ScheduledFuture<?> schedule(Runnable command,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<?> t = decorateTask(command,
new ScheduledFutureTask<Void>(command, null,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
public <V> ScheduledFuture<V> schedule(Callable<V> callable,
long delay,
TimeUnit unit) {
if (callable == null || unit == null)
throw new NullPointerException();
RunnableScheduledFuture<V> t = decorateTask(callable,
new ScheduledFutureTask<V>(callable,
triggerTime(delay, unit)));
delayedExecute(t);
return t;
}
triggerTime用于计算执行的具体时间(nanoTime)
delayExecute的核心作用就是将task丢到工作队列里面:
private void delayedExecute(RunnableScheduledFuture<?> task) {
if (isShutdown()) // 当前线程池执行了shutdown方法,新的task应该reject
reject(task);
else {
super.getQueue().add(task);
if (isShutdown() &&
!canRunInCurrentRunState(task.isPeriodic()) &&
remove(task)) // 添加完之后发生shutdown的处理
task.cancel(false); // 将task进行取消
else
ensurePrestart(); // 确保线程池的线程启动(小于corePoolSize则尝试添加一个工作线程)
}
}
canRunInCurrentRunState方法判断任务是否能在当前的状态下执行,允许的条件:
- RUNNING状态
- SHUTDOWN状态的前提下,若task为周期调度task,continueExistingPeriodicTasksAfterShutdown得为true,否则,executeExistingDelayedTasksAfterShutdown得为true
boolean canRunInCurrentRunState(boolean periodic) {
return isRunningOrShutdown(periodic ?
continueExistingPeriodicTasksAfterShutdown :
executeExistingDelayedTasksAfterShutdown);
}
scheduleAtFixedRate / scheduleWithFixedDelay
public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
long initialDelay,
long period,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (period <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(period));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
与scheduleAtFixedRate方法的区别只有构造ScheduledFutureTask的时候,delay设置为负数(不明白的请参考上面ScheduledFutreTask
的介绍)
public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
long initialDelay,
long delay,
TimeUnit unit) {
if (command == null || unit == null)
throw new NullPointerException();
if (delay <= 0)
throw new IllegalArgumentException();
ScheduledFutureTask<Void> sft =
new ScheduledFutureTask<Void>(command,
null,
triggerTime(initialDelay, unit),
unit.toNanos(-delay));
RunnableScheduledFuture<Void> t = decorateTask(command, sft);
sft.outerTask = t;
delayedExecute(t);
return t;
}
这两个核心方法的调用的逻辑跟上面的schedule也类似,重点还是在delayedExecutre方法上面,实际上就是将task先add到工作队列,等到延迟时间到达,自然就会由线程池中的工作线程取出并执行,执行的时候就会调用task(ScheduledFutureTask)的run方法,再根据task类型决定是否需要重新add到工作队列(当然这么做之前要重新计算下一次执行时间)
reExecutePeriodic
在ScheduledFutureTask
执行(在其run方法内部)后,若为周期任务,需要将此任务重新计算时间后重新添加到工作队列,调用的就是该方法:
void reExecutePeriodic(RunnableScheduledFuture<?> task) {
if (canRunInCurrentRunState(true)) {
super.getQueue().add(task); // 重新添加到工作队列中
if (!canRunInCurrentRunState(true) && remove(task))
task.cancel(false);
else
ensurePrestart();
}
}
onShutdown
经过前面对JUC ThreadPoolExecutor的分析我们知道,调用shutdown之后,会有一个onShutdown方法的调用,该方法在ThreadPoolExecutor
是个空实现,但是在ScheduledThreadPool
里面,有它的实现逻辑,主要是为了处理当发生shutdown的时候,怎么处理工作队列中的任务:
队列中的任务有两种:
– 普通delayed任务(不用周期调度的)
– 周期调度的任务(fixedRate or fixedDelayed)
对两种任务在shutdown调用之后是否需要继续执行
@Override void onShutdown() {
BlockingQueue<Runnable> q = super.getQueue();
boolean keepDelayed =
getExecuteExistingDelayedTasksAfterShutdownPolicy(); // executeExistingDelayedTasksAfterShutdown默认为true
boolean keepPeriodic =
getContinueExistingPeriodicTasksAfterShutdownPolicy(); // continueExistingPeriodicTasksAfterShutdown默认为false
// 下面的逻辑主要是为了处理工作队列中仍存在的延迟任务or周期任务是否需要继续执行还是移除cancel
// 工作队列中仍存在的延迟任务(keepDelayed)和周期任务(keepPeriodic)都不需要执行,取消任务,清空工作队列
if (!keepDelayed && !keepPeriodic) {
for (Object e : q.toArray())
if (e instanceof RunnableScheduledFuture<?>)
((RunnableScheduledFuture<?>) e).cancel(false);
q.clear();
}
else {
// else分支处理三种情况:
// 1. 延迟任务(keepDelayed) 不需要执行
// 2. 周期任务(keepPeriodic)不需要执行
// 3. 两种任务都需要执行
for (Object e : q.toArray()) {
if (e instanceof RunnableScheduledFuture) {
RunnableScheduledFuture<?> t =
(RunnableScheduledFuture<?>)e;
// 对不需要执行的任务从工作队列中移除并取消
if ((t.isPeriodic() ? !keepPeriodic : !keepDelayed) ||
t.isCancelled()) { // also remove if already cancelled
if (q.remove(t)) // 对不需要执行的任务进行移除取消(延迟任务 or 周期任务)
t.cancel(false);
}
}
}
}
tryTerminate();
}