基本介绍
- JUC里面提供的有界阻塞队列,基于链表方式实现
- 默认队列大小为Integer.MAX_VALUE,给人的感觉是近乎无界,在构造的时候可以自行指定队列的容量
- 内部维护两把锁(putLock与takeLock),用以优化存取的并发性能;put/offer只需要获取putLock,而take/poll只需要获取takeLock,细化了锁粒度,使得存取操作可以并发进行,相比ArrayBlockingQueue存取元素都使用同一个lock来说具有更高的吞吐量
- 只有在remove,contains, clear,循环迭代器的操作中,才需要同时获取takeLock和putLock(这些操作的程使用场景相对比较少)
推荐
LinkedBlockingQueue
在构造的时候传入具体的容量大小,否则若生产者(put)的生产速度远远大于消费者(take)的消费速度的情况下,很可能会把堆内存撑爆
使用场景与示例
最常用的使用场景也还是用于实现生产者消费者模式,可参考JUC ArrayBlockingQueue 分析里面的使用场景和使用示例
源码分析
看看重要的成员变量:
// 队列容量
private final int capacity;
// 指定当前队列元素个数,用原子变量解决多线程并发访问
private final AtomicInteger count = new AtomicInteger();
// 队列head,哨兵元素,null值,方便链表insert, delete
transient Node<E> head;
// 指向队列尾元素
private transient Node<E> last;
// take锁,用于保证并发下take, poll操作的线程安全
private final ReentrantLock takeLock = new ReentrantLock();
// take操作阻塞等待条件
private final Condition notEmpty = takeLock.newCondition();
// put锁,用于维护并发下的put, offer操作
private final ReentrantLock putLock = new ReentrantLock();
// put操作阻塞等待条件
private final Condition notFull = putLock.newCondition();
入队和出队操作,比较简单,方法会被put, offer, take , poll等方法调用,多线程并发安全保证使用putLock和takeLock来保证(外层调用方法里面):
// 入队
private void enqueue(Node<E> node) {
// assert putLock.isHeldByCurrentThread();
// assert last.next == null;
last = last.next = node;
}
// 出队
private E dequeue() {
// assert takeLock.isHeldByCurrentThread();
// assert head.item == null;
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
分析put / offer操作:
public void put(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 达到容量限制,阻塞等待
while (count.get() == capacity) {
notFull.await();
}
enqueue(node); // 将元素添加到队列尾部
c = count.getAndIncrement();
// 此处若判断容量还没满,则唤醒其他可能等待阻塞的生产者线程?这里会产生很多无谓的唤醒动作?因为很多时候容量都没达到上限?
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c = count.getAndIncrement, 若为0,则证明有元素经过该方法保存到队列里面,且之前是0,代表可能有线程阻塞等待take操作(notEmpty condition),因此这个条件下,需要进行唤醒
if (c == 0)
signalNotEmpty();
}
// 唤醒阻塞的take线程,该方法会被put/offer操作调用
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
// 注意此处应该尝试获取的是takeLock
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
// 与put的逻辑大致相同,只是offer操作当容量达到上限的时候,不阻塞等待添加,而是直接返回添加失败
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 容量达到上限,无法添加,返回添加失败
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
if (count.get() < capacity) {
enqueue(node);
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
if (c == 0)
signalNotEmpty();
return c >= 0;
}
分析take / poll 方法:
// 移除队头元素,若队列为空,则阻塞等待
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 队列为空,阻塞等待
while (count.get() == 0) {
notEmpty.await();
}
x = dequeue(); // 取队头元素
c = count.getAndDecrement();
// 若发现队列还有元素,尝试唤醒其他等待的消费者线程
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// c = count.getAndDecrement,若c原先为容量上限,证明此次take之后,队列不满,需要唤醒可能在阻塞等待的生产者线程(调用put操作的线程)
if (c == capacity)
signalNotFull();
return x;
}
// 对等待put(notFull condition)的生产者线程进行唤醒
private void signalNotFull() {
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
notFull.signal();
} finally {
putLock.unlock();
}
}
// poll在队列为空的情况下不阻塞等待,直接返回null,其他逻辑与
public E poll() {
final AtomicInteger count = this.count;
if (count.get() == 0)
return null;
E x = null;
int c = -1;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
if (c == capacity)
signalNotFull();
return x;
}
超时的offer,poll方法就不展开了,重点在于使用condition的超时阻塞等待,然后加以判断:
// 超时offer实现的关键
while (count.get() == capacity) {
if (nanos <= 0)
return false;
nanos = notFull.awaitNanos(nanos);
}
// 超时poll的实现关键
while (count.get() == 0) {
if (nanos <= 0)
return null;
nanos = notEmpty.awaitNanos(nanos);
}