Jdk1.6 JUC源码解析(18)-DelayQueue
作者:大飞
功能简介:
- DelayQueue是一种无界的阻塞队列,队列里只允许放入可以”延期”的元素,队列中列头的元素是最先”到期”的元素。如果队列中没有任何元素”到期”,尽管队列中有元素,也不能从队列头获取到任何元素。
源码分析:
- 首先还是看一下内部数据结构:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
implements BlockingQueue<E> {
private transient final ReentrantLock lock = new ReentrantLock();
private transient final Condition available = lock.newCondition();
private final PriorityQueue<E> q = new PriorityQueue<E>();
内部结构非常简单,一把锁,一个条件,一个优先队列。
DelayQueue要求放入其中的元素必须实现Delayed接口,看下这个接口:
/**
* A mix-in style interface for marking objects that should be
* acted upon after a given delay.
*
* <p>An implementation of this interface must define a
* <tt>compareTo</tt> method that provides an ordering consistent with
* its <tt>getDelay</tt> method.
*
* @since 1.5
* @author Doug Lea
*/
public interface Delayed extends Comparable<Delayed> {
/**
* Returns the remaining delay associated with this object, in the
* given time unit.
*
* @param unit the time unit
* @return the remaining delay; zero or negative values indicate
* that the delay has already elapsed
*/
long getDelay(TimeUnit unit);
}
这个接口定义了一个返回延时值的方法,而且扩展了Comparable接口,具体实现的排序方式会和延时值有关,延时值最小的会排在前面。再结合上面DelayQueue的内部数据结构,我们就可以大概脑补这个过程了。
- 既然是阻塞队列,还是从put和take方法开始入手,先看下put方法:
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取队头元素。
E first = q.peek();
//将元素放入内部队列。
q.offer(e);
if (first == null || e.compareTo(first) < 0)
available.signalAll(); //如果队头没有元素 或者 当前元素比队头元素的延时值小,那么唤醒available条件上的线程。
return true;
} finally {
lock.unlock();
}
}
/**
* 插入一个元素到延迟队列,由于队列是无界的,所以这个方法永远不会阻塞。
*
* @param e the element to add
* @throws NullPointerException {@inheritDoc}
*/
public void put(E e) {
offer(e);
}
再看下take方法:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
for (;;) {
//获取队头元素。
E first = q.peek();
if (first == null) {
available.await();//如果队头没有元素,那么当前线程在available条件上等待。
} else {
//如果队头有元素,获取元素的延时值。
long delay = first.getDelay(TimeUnit.NANOSECONDS);
if (delay > 0) {
//如果延时值大于0,那么等待一下。
long tl = available.awaitNanos(delay);
} else {
//否则获取并移除队列列头元素。
E x = q.poll();
assert x != null;
if (q.size() != 0)
available.signalAll(); // 如果内部队列中还有元素,那么唤醒其他在available条件上等待着的take线程。
return x;
}
}
}
} finally {
lock.unlock();
}
}
- 其他方法代码也很简单,这里不做分析了。最后注意下本类的迭代器是弱一致的,并且不保证元素的特定顺序。
DelayQueue的代码解析完毕! 参见:
Jdk1.6 JUC源码解析(7)-locks-ReentrantLock
Jdk1.6 Collections Framework源码解析(9)-PriorityQueue