Jdk1.6 JUC源码解析(14)-PriorityBlockingQueue
作者:大飞
功能简介:
- PriorityBlockingQueue是一种基于PriorityQueue实现的无界的阻塞队列。队列中的元素按照某种排序规则出队。插入队列的元素必须是可比较的。
源码分析:
- 首先看下PriorityBlockingQueue内部的数据结构:
public class PriorityBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
private static final long serialVersionUID = 5595510919245408276L;
private final PriorityQueue<E> q;
private final ReentrantLock lock = new ReentrantLock(true);
private final Condition notEmpty = lock.newCondition();
public PriorityBlockingQueue() {
q = new PriorityQueue<E>();
}
public PriorityBlockingQueue(int initialCapacity) {
q = new PriorityQueue<E>(initialCapacity, null);
}
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
q = new PriorityQueue<E>(initialCapacity, comparator);
}
public PriorityBlockingQueue(Collection<? extends E> c) {
q = new PriorityQueue<E>(c);
}
首先可见,内部是一个PriorityQueue(优先队列),然后有一把锁,一个锁条件。结构比较简单。
注:PriorityQueue内部数据结构是二叉堆,所谓二叉堆是一个完全二叉树,每个节点都比其左右两个子节点大(或者小),保证最大(小)的元素在堆顶,所以放入PriorityQueue的元素必须是可比较的。
- 还是从put和take实现看起,put实现如下:
public void put(E e) {
offer(e); // never need to block
}
public boolean offer(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
boolean ok = q.offer(e);
assert ok;
notEmpty.signal();
return true;
} finally {
lock.unlock();
}
}
由于PriorityBlockingQueue是一个无界队列,所以put方法不需要阻塞,直接往队列里面放。但要注意内存的限制。
再看下take方法实现:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
try {
while (q.size() == 0)
notEmpty.await();//队列中没有元素时,当前线程在notEmpty上等待。
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = q.poll();
assert x != null;
return x;
} finally {
lock.unlock();
}
}
- 其他方法实现都较简单,这里不做一一分析。最后说一下,PriorityBlockingQueue的迭代器返回的元素是不保证顺序的。
PriorityBlockingQueue的代码解析完毕!
参见:Jdk1.6 JUC源码解析(7)-locks-ReentrantLock
Jdk1.6 Collections Framework源码解析(9)-PriorityQueue