Java多线程 -- JUC包源码分析14 -- ScheduledThreadPoolExecutor与DelayQueue源码分析

在前面的篇章中,我们分析了ThreadPoolExecutor,知道了execute和submit的内部实现原理,知道了Runnable/Callable的内在关系。

周期/非周期 AtFixedRate/WithFixedDelay

而ScheduledThreadPoolExecutor,正像其名字所反映的,实现了时间调度相关的功能,具体说来,有2个方面:
(1) 非周期性的

 //把一个Runnable/Callable 延迟到未来执行,延迟时间是delay,单位是TimeUnit
 public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit);
 public <V> ScheduledFuture<V> schedule(Callable<V> callable,
                                           long delay,
                                           TimeUnit unit);

(2) 周期性的

//把一个Runable延迟initialDelay执行,同时以后周期性执行。
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) ;

       public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) ;

那上面2个函数使用上面有什么区别呢?

AtFixedRate:顾名思义,按固定频率执行,与任务本身执行时间没有关系。(但这有个前提条件:任务执行时间必须小于间隔时间,比如间隔时间是5s,每5s执行一次任务,那任务的执行时间,必须小于5s。关于这1点,后面源码会详细分析)

WithFixedDelay: 按固定间隔执行,与任务本身执行时间有关。比如任务本身执行时间是10s,间隔2s,那下1次开始执行时间就是12s。

知道了ScheduledThreadPoolExecutor的用法,下面分析其源码实现

DelayQueue – Scheduled的基石

我们的分析,首先从其构造函数开始。看构造函数,我们会发现,其使用了一个我们之前没用到过的队列,即DelayedWorkQueue。

    public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, TimeUnit.NANOSECONDS,
              new DelayedWorkQueue());
    }

看其源码,我们会发现,它同样实现了BlockingQueue接口。同时,它内部包装了一个DelayedQueue,其所有方法,都是代理给DelayedQueue来实现的。

//ScheduledThreadPoolExecutor的内部类
    private static class DelayedWorkQueue extends AbstractCollection<Runnable> implements BlockingQueue<Runnable> {

        private final DelayQueue<RunnableScheduledFuture> dq = new DelayQueue<RunnableScheduledFuture>();
        public Runnable poll() { return dq.poll(); }
        public Runnable peek() { return dq.peek(); }
        public Runnable take() throws InterruptedException { return dq.take(); }
        public Runnable poll(long timeout, TimeUnit unit) throws InterruptedException {
            return dq.poll(timeout, unit);
        }

那这个DelayQueue,又是何方神圣呢?

public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {

    private transient final ReentrantLock lock = new ReentrantLock();
    private transient final Condition available = lock.newCondition();
    private final PriorityQueue<E> q = new PriorityQueue<E>();

    ...

看到这,我们知道了,DelayQueue其实就是一个线程安全的PriorityQueue(其内部数据结构就是2叉堆),跟我们前面所讲的PriorityBlockingQueue原理几乎一样。

既然是PriorityQueue,其放进去的元素,就需要比较大小,因为就必须实现Comparable接口。

那比较的什么东西呢?就是delay的时间,所以称之为DelayQueue。

public interface Delayed extends Comparable<Delayed> {

    long getDelay(TimeUnit unit);
}

所以,所有放进DelayQueue的元素,都要实现getDelay函数,实现Comparable接口。

DelayQueue的一个重要特性

DelayQueue有一个重要特性:往外take元素的时候,如果时间还未到,也就是delay > 0,调用者会阻塞,直到直接到了,元素才拿的出去。

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            for (;;) {
                E first = q.peek();
                if (first == null) {
                    available.await();  //队列为空,阻塞
                } else {
                    long delay =  first.getDelay(TimeUnit.NANOSECONDS);
                    if (delay > 0) {
                        long tl = available.awaitNanos(delay);  //任务的时间未到,阻塞
                    } else {
                        E x = q.poll();  //时间到了,任务拿出来
                        assert x != null;
                        if (q.size() != 0)
                            available.signalAll(); //唤醒其他的消费者
                        return x;

                    }
                }
            }
        } finally {
            lock.unlock();
        }
    }

schedule内部实现

schedule

有了DelayQueue这个大杀器,schedule就很容易实现了。如下:

    public ScheduledFuture<?> schedule(Runnable command,
                                       long delay,
                                       TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Void>(command, null,
                                          triggerTime(delay, unit)));

        delayedExecute(t);  //不是execute,自己实现的delayedExecute
        return t;
    }

    private void delayedExecute(Runnable command) {
        if (isShutdown()) {
            reject(command);
            return;
        }

        if (getPoolSize() < getCorePoolSize())
            prestartCoreThread();  //线程数 < corePoolSize,就把线程数开到等于corePoolSize

        super.getQueue().add(command); //新进来的请求,直接扔到DelayQueue里面
    }

ScheduledFutureTask

上面的schedule过程很简单,直接把任务扔进队列里就可以了。下面复杂一点的,是拿出来执行的过程,也就是ScheduledFutureTask。跟上一篇讲的FutureTask类似,它也要实现run函数:

        public void run() {
            if (isPeriodic())
                runPeriodic();  //如果是周期性的,调用runPeriodic
            else
                ScheduledFutureTask.super.run();  //不是周期性任务,直接执行。因为从DelayQueue拿出来的时候,就正好是执行时间
        }
        private void runPeriodic() {
            boolean ok = ScheduledFutureTask.super.runAndReset(); //先执行任务,并且复原AQS
            boolean down = isShutdown();

            if (ok && (!down ||
                       (getContinueExistingPeriodicTasksAfterShutdownPolicy() &&
                        !isStopped()))) {
                long p = period;
                if (p > 0)   //p > 0 代表AtFixedRate
                    time += p;   //从上1次任务开始时间算起,时间加上p
                else  // p < 0 代表WithFixedDelay
                    time = triggerTime(-p);  //从当前时间算起,时间加上-p

                ScheduledThreadPoolExecutor.super.getQueue().add(this); //关键点:任务执行完了,把任务的时间延期后,再次放入队列。从而实现了周期性。
//另外,因为是任务执行完了,才放进去的。因此任务的执行时间必须小于间隔时间,也就是前面所讲的AtFixedRate 
            }
            else if (down)
                interruptIdleWorkers();
        }

下面看一下其实现的Comparable接口:

        public long getDelay(TimeUnit unit) {
            return unit.convert(time - now(), TimeUnit.NANOSECONDS);
        }

        public int compareTo(Delayed other) {
            if (other == this) // compare zero ONLY if same object
                return 0;
            if (other instanceof ScheduledFutureTask) {
                ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
                long diff = time - x.time;
                if (diff < 0)
                    return -1;
                else if (diff > 0)
                    return 1;
                else if (sequenceNumber < x.sequenceNumber) //关键点:当两个任务的delay时间相等时,比较序列号,从而保证任务按顺序执行。序列号是在构造函数里面创建的,原子自增。
                    return -1;
                else
                    return 1;
            }
            long d = (getDelay(TimeUnit.NANOSECONDS) -
                      other.getDelay(TimeUnit.NANOSECONDS));
            return (d == 0)? 0 : ((d < 0)? -1 : 1);
        }

scheduleAtFixedRate/scheduleWithFixedDelay

//2者的差别很小,一个传进去的正的period,一个传进去的负的delay。这也印证了上面所分析的runPeriodic
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command,
                                                  long initialDelay,
                                                  long period,
                                                  TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (period <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Object>(command,
                                            null,
                                            triggerTime(initialDelay, unit),
                                            unit.toNanos(period)));
        delayedExecute(t);
        return t;
    }

    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command,
                                                     long initialDelay,
                                                     long delay,
                                                     TimeUnit unit) {
        if (command == null || unit == null)
            throw new NullPointerException();
        if (delay <= 0)
            throw new IllegalArgumentException();
        RunnableScheduledFuture<?> t = decorateTask(command,
            new ScheduledFutureTask<Boolean>(command,
                                             null,
                                             triggerTime(initialDelay, unit),
                                             unit.toNanos(-delay)));
        delayedExecute(t);
        return t;
    }

总结

(1)ScheduledThreadPoolExecutor的核心实现依靠DelayQueue,其内部是一个PriorityQueue

(2)周期性的实现原理:执行完一个Task之后,把Task的时间 加上一个周期,重新扔回到队列里面。

(3)AtFixedRate和WithFixedDelay的区别,就是计算任务的下1次开始时间不一样:一个是从上一次开始时间 + 一个周期,一个是从上一次结束时间(即当前时间) + 一个周期。

    原文作者:JUC
    原文地址: https://blog.csdn.net/chunlongyu/article/details/52496856
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞