java.util.concurrent.LinkedBlockingQueue 是一个基于单向链表的、范围任意的(其实是有界的)、FIFO阻塞队列。访问与移除操作是在队头进行,添加操作是在队尾进行,并分别使用不同的锁进行保护,只有在可能涉及多个节点的操作才同时对两个锁进行加锁。
队列是否为空、是否已满仍然是通过元素数量的计数器(count)进行判断的,由于可以同时在队头、队尾并发地进行访问、添加操作,所以这个计数器必须是线程安全的,这里使用了一个原子类 AtomicInteger,这就决定了它的容量范围是: 1 –Integer.MAX_VALUE。
由于同时使用了两把锁,在需要同时使用两把锁时,加锁顺序与释放顺序是非常重要的:必须以固定的顺序进行加锁,再以与加锁顺序的相反的顺序释放锁。
头结点和尾结点一开始总是指向一个哨兵的结点,它不持有实际数据,当队列中有数据时,头结点仍然指向这个哨兵,尾结点指向有效数据的最后一个结点。这样做的好处在于,与计数器 count结合后,对队头、队尾的访问可以独立进行,而不需要判断头结点与尾结点的关系。
属性与链表节点类
//链表的结点类,单向链表,只有一个后继指针
static class Node<E> {
E item;
/*
* 后继指针。值为下列之一:
* 实际的后继结点。
* 自身,表示后继是 head.next(用于在遍历处理时判断)
* null,表示没有后继(这是尾结点)
*/
Node<E> next;
Node(E x) { item = x; }
}
//最大容量上限,默认是 Integer.MAX_VALUE
private final int capacity;
//当前元素数量,这是个原子类。因为读写分别使用不同的锁,但都会访问这个属性,所以它需要是线程安全的。
private final AtomicInteger count = new AtomicInteger(0);
//头结点
private transient Node<E> head;
//尾结点
private transient Node<E> last;
//队头访问锁
private final ReentrantLock takeLock = new ReentrantLock();
//队头访问等待条件、队列
private final Condition notEmpty = takeLock.newCondition();
//队尾访问锁
private final ReentrantLock putLock = new ReentrantLock();
//队尾访问等待条件、队列
private final Condition notFull = putLock.newCondition();
enqueue 操作
//在持有putLock锁下执行
private void enqueue(Node<E> node) {
// assertputLock.isHeldByCurrentThread();
// assert last.next ==null;
last = last.next = node;
}
dequeue 操作
返回队列里第一个有效元素。
//在持有takeLock锁下执行
private E dequeue() {
// asserttakeLock.isHeldByCurrentThread();
// assert head.item ==null;
Node<E> h = head;
Node<E> first =h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null; // 出队列后的结点作为新的哨兵结点
return x;
}
对两把锁的加锁与释放
在需要对两把锁同时加锁时,把加锁的顺序与释放的顺序封装成方法,确保所有地方都是一致的。而且获取锁时都是不响应中断的,一直获取直到加锁成功,这就避免了第一把锁加锁成功,而第二把锁加锁失败导致锁不释放的风险。
注意,锁的释放顺序与加锁顺序是相反的。
//把固定的加锁顺序封装在方法内,确保所有的对两把锁加锁的顺序都是一致的。
void fullyLock() {
putLock.lock();
takeLock.lock();
}
//把固定的释放锁顺序封装在方法内,确保所有的对两把锁的释放顺序都是一致的。
void fullyUnlock() {
takeLock.unlock();
putLock.unlock();
}
put 操作
put 操作把指定元素添加到队尾,如果没有空间则一直等待。
public void put(E e) throws InterruptedException{
if (e == null) throw newNullPointerException();
//在所有的 put/take/etc等操作中预设值本地变量 c为负数表示失败。成功会设置为 >= 0的值。
int c = -1;
Node<E> node = new Node(e);
//下面两行是访问优化
final ReentrantLock putLock =this.putLock;
final AtomicInteger count =this.count;
putLock.lockInterruptibly();
try {
/*
* 注意,count用于等待监视,即使它没有用锁保护。这个可行是因为
* count 只能在此刻(持有putLock)减小(其他put线程都被锁拒之门外),
* 当count对capacity发生变化时,当前线程(或其他put等待线程)将被通知。
* 在其他等待监视的使用中也类似。
*/
while (count.get() == capacity){
notFull.await();
}
enqueue(node);
c = count.getAndIncrement();
//还有可添加空间则唤醒put等待线程。
if (c + 1 <capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// c由 count.getAndIncrement()返回,如果等于0,
// 则 count应该是大于等于 1了,唤醒take线程。
if (c == 0)
signalNotEmpty();
}
take 操作
take 操作会一直阻塞直到有元素可返回。
public E take() throws InterruptedException{
E x;
int c = -1;
// 下面两行是访问优化
final AtomicInteger count =this.count;
final ReentrantLock takeLock =this.takeLock;
takeLock.lockInterruptibly();
try {
// 循环里等待直到有数据可获取
while (count.get() == 0){
notEmpty.await();
}
//获取第一个有效元素
x = dequeue();
//如果还有可获取元素,唤醒等待获取的线程。
c =count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
//注意,c是调用getAndDecrement返回的,如果 if成立,
// 表明当前的 count是capacity – 1,可以添加新元素,所以唤醒添加线程。
if (c == capacity)
signalNotFull();
return x;
}
remove 操作
//移除指定元素。由于移除元素涉及该结点前后两个结点的访问与修改,
// 对两把锁加锁简化了同步管理。
public boolean remove(Object o) {
if (o == null ) return false;
fullyLock();
try {
for (Node<E> trail = head,p = trail.next;
p != null ;
trail = p, p = p.next){
if (o.equals(p.item)){
unlink(p, trail);
return true ;
}
}
return false ;
} finally {
fullyUnlock();
}
}
转载自并发编程网 – ifeve.com本文链接地址: JUC LinkedBlockingQueue