一、基本思想
基于数组的阻塞队列,它既有队列的特性–先进先出(first-in-first-out),又有数组的特性–有限大小(bounded-buffer),一旦被创建,就不会增加容量。
二、源码解析
2.1基本数据
/** The queued items */
private final E[] items; //数组
/** items index for next take, poll or remove */
private int takeIndex; //头部
/** items index for next put, offer, or add. */
private int putIndex; //尾部
/** Number of items in the queue */
private int count; //元素个数,不是容量
/*
* Concurrency control uses the classic two-condition algorithm
* found in any textbook.
*/
/** Main lock guarding all access */
private final ReentrantLock lock; //可重入锁,保证线程并发访问
/** Condition for waiting takes */
private final Condition notEmpty; //消费者线程可以获得资源的条件
/** Condition for waiting puts */
private final Condition notFull;//生产者线程可以继续生产的条件
2.2 get
public E poll() {//获取并删除队列的头元素,如果没有就返回空
final ReentrantLock lock = this.lock;
lock.lock(); //锁住
try {
if (count == 0)
return null;
E x = extract();
return x;
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException { //获取并删除队列的头元素,
//如果没有就阻塞一直到有,或者线程被打断
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();//获得锁,除非线程被打断
try {
try {
while (count == 0)
notEmpty.await();//循环等待,一直到有为止,为什么要循环等待呢,
//因为在多个线程等待的情况下,虽然会唤醒,当还是存在资源被其它等待的线程占用的情况
} catch (InterruptedException ie) {
notEmpty.signal(); // propagate to non-interrupted thread
throw ie;
}
E x = extract();
return x;
} finally {
lock.unlock();
}
}
private E extract() {
final E[] items = this.items;
E x = items[takeIndex];
items[takeIndex] = null; //获取最后一个之后,数组的这个坐标引用指向空
takeIndex = inc(takeIndex);//头部坐标到下一个
--count;//数据个数减少一个
notFull.signal();//唤醒一个因容量满而停止生产的线程
return x;
}
final int inc(int i) {
return (++i == items.length)? 0 : i;//利用头尾相接来模拟一个循环的队列
}
2.3 put
三、适用范围
来自源码注释的解释:
这个类有可选的公平策略对于等待的生产者或者消费者线程来说,默认情况下不启动公平策略。因为公平策略虽然减少可变性和可以避免线程饥饿的情况发生,但是通常会减少吞吐量(throughput)
个人理解:
对消费者来说,如果有很多消费者都在等待生产者生产产品,但是因为大家都没有顺序,特别是在资源比较紧张的情况,很有可能有的线程一直处于饥饿状态(即一直没有抢到产品),悲剧!
对生产者来说,可能有的线程忙的不可开交,另外一些线程闲的蛋疼。
(某些国有企业的人,应该深有感触)
四、测试