今天来学习ArrayBlockingQueue与LinkedBlockingQueue。
ArrayBlockingQueue是一个基于数组的有界阻塞队列。“有界”表示数组容量是固定的。这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
属性
/** 队列中的数据 */
final Object[] items;
/** 下个要删除的项的索引(take, poll, peek ,remove方法使用) */
int takeIndex;
/** 下个插入的位置(put, offer, add方法使用) */
int putIndex;
/** 队列中元素的数量 */
int count;
以上是与队列相关的属性。下面是与并发控制相关的属性。
/** 锁 */
final ReentrantLock lock;
/** 不空的condition */
private final Condition notEmpty;
/** 不满的condition */
private final Condition notFull;
核心方法
offer(E e)
public boolean offer(E e) {
checkNotNull(e);
final ReentrantLock lock = this.lock;
//获取独占锁
lock.lock();
try {
// 如果队列已满,则返回false。
if (count == items.length)
return false;
else {
// 如果队列未满,则插入e,并返回true。
enqueue(e);
return true;
}
} finally {
//释放锁
lock.unlock();
}
}
enqueue(E x)方法源码如下:
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
//将x添加到队列中
items[putIndex] = x;
//如果队列已满,则将[下一个被添加元素的索引]置为0
if (++putIndex == items.length)
putIndex = 0;
count++;
//唤醒notEmpty上的等待线程
notEmpty.signal();
}
将元素添加到队列之前,必须先获得独占锁。加锁后,若发现队列已满,返回false。(为什么这里没有调用notFull.await()方法?)将元素插入到队列后,调用notEmpty.signal()唤醒notEmpty上的等待线程。
take()
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
//获取锁,若当前线程是中断状态,则抛出InterruptedException异常
lock.lockInterruptibly();
try {
//如果队列为空,则一直等待。
while (count == 0)
notEmpty.await();
//取出元素并返回
return dequeue();
} finally {
//释放锁
lock.unlock();
}
}
dequeue()方法如下
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒在notFull上等待的线程
notFull.signal();
return x;
}
将元素添从队列中移除之前,必须先获得独占锁。加锁后,若发现队列为空,调用notEmpty.await(),使线程在notEmpty上等待。如果队列不为空,将元素添从队列中移除,然后调用notFull.signal()唤醒notFull的等待线程。
LinkedBlockingQueue是一个基于单向链表的、可指定大小的阻塞队列。
可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于 Integer.MAX_VALUE。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
节点类
static class Node<E> {
E item;
/** * One of: * - the real successor Node * - this Node, meaning the successor is head.next * - null, meaning there is no successor (this is the last node) */
Node<E> next;
Node(E x) { item = x; }
}
属性
/** 容量。初始化LinkedBlockingQueue时需要指定,如果不指定则默认为Integer.MAX_VALUE if none */
private final int capacity;
/** 链表的实际大小 */
private final AtomicInteger count = new AtomicInteger();
/** * 链表的头结点 * 以下表达式一直成立: head.item == null */
transient Node<E> head;
/** * 链表的尾节点 * 以下表达式一直成立: last.next == null */
private transient Node<E> last;
/** 获取操作的锁 */
private final ReentrantLock takeLock = new ReentrantLock();
/** 获取操作的等待队列condition */
private final Condition notEmpty = takeLock.newCondition();
/** 插入操作的锁 */
private final ReentrantLock putLock = new ReentrantLock();
/** 插入操作的等待队列condition */
private final Condition notFull = putLock.newCondition();
以上属性透漏出一个重要信息:插入和获取操作使用了不同的锁。
offer(E e)
/** * 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量) * 在成功时返回 true,如果此队列已满,则返回 false。 */
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
//如果当前没有空间可用,则返回 false。
if (count.get() == capacity)
return false;
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
//加锁
putLock.lock();
try {
//再次判断队列是否已满
if (count.get() < capacity) {
//插入元素到队列
enqueue(node);
c = count.getAndIncrement();
//如果插入元素后,元素仍未满,唤醒在notFull上的等待线程
if (c + 1 < capacity)
notFull.signal();
}
} finally {
//释放所
putLock.unlock();
}
//如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程
if (c == 0)
signalNotEmpty();
return c >= 0;
}
enqueue(Node)源码如下
private void enqueue(Node<E> node) {
last = last.next = node;
}
last = last.next = node;
执行后last.next的值为null?
signalNotEmpty()源码如下
private void signalNotEmpty() {
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
notEmpty.signal();
} finally {
takeLock.unlock();
}
}
注意notEmpty是与takeLock相关联的,必须先获取takeLock锁,再调用notEmpty.signal()。
take()
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
//获取锁,若当前线程是中断状态,则抛出InterruptedException异常
takeLock.lockInterruptibly();
try {
//若队列为空,则一直等待。
while (count.get() == 0) {
notEmpty.await();
}
//取出元素
x = dequeue();
//取出元素之后,返回原始的节点数量,然后节点数量-1。
c = count.getAndDecrement();
//如果取出元素后,队列不为空,则唤醒在notEmpty上的等待线程
if (c > 1)
notEmpty.signal();
} finally {
//释放锁
takeLock.unlock();
}
//如果在获取节点之前队列为满;则取出节点后,唤醒notFull上的等待线程
if (c == capacity)
signalNotFull();
return x;
}
dequeue()源码如下:
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
本文就讲到这里,想了解Java并发编程更多内容请参考:
- Java并发编程札记-目录