Jdk1.6 JUC源码解析(22)-LinkedBlockingDeque
作者:大飞
功能简介:
- LinkedBlockingDeque是一种基于双向链表实现的有界的(可选的,不指定默认int最大值)阻塞双端队列。
双端队列一般适用于工作密取模式,即每个消费者都拥有自己的双端队列,如果某个消费者完成了自己队列的全部任务,可以到其他消费者双端队列尾部秘密获取任务来处理。
源码分析:
- LinkedBlockingDeque实现了BlockingDeque接口,简单看下这个接口:
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
/**
* 将元素插入队头,如果队列满了,抛出IllegalStateException。
*/
void addFirst(E e);
/**
* 将元素插入队尾,如果队列满了,抛出IllegalStateException。
*/
void addLast(E e);
/**
* 将元素插入队头,如果成功,返回true;如果队列满了,返回false。
*/
boolean offerFirst(E e);
/**
* 将元素插入队尾,如果成功,返回true;如果队列满了,返回false。
*/
boolean offerLast(E e);
/**
* 将元素插入队头。如果队列满了,阻塞等待,直到队列有可用空间。
*/
void putFirst(E e) throws InterruptedException;
/**
* 将元素插入队尾。如果队列满了,阻塞等待,直到队列有可用空间。
*/
void putLast(E e) throws InterruptedException;
/**
* 将元素插入队头。如果队列满了,阻塞等待。
* 如果插入成功,返回true;如果等待超时,返回false。
*/
boolean offerFirst(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 将元素插入队尾。如果队列满了,阻塞等待。
* 如果插入成功,返回true;如果等待超时,返回false。
*/
boolean offerLast(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 获取并移除队头元素,如果队头没有元素,阻塞等待,直到有元素可以获取。
*/
E takeFirst() throws InterruptedException;
/**
* 获取并移除队尾元素,如果队头没有元素,阻塞等待,直到有元素可以获取。
*/
E takeLast() throws InterruptedException;
/**
* 获取并移除队头元素,如果队头没有元素,阻塞等待。
* 如果过了给定的时间还不能获取元素,返回null。
*/
E pollFirst(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 获取并移除队尾元素,如果队头没有元素,阻塞等待。
* 如果过了给定的时间还不能获取元素,返回null。
*/
E pollLast(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 从队列中删除第一个出现的指定元素。
* 如果有这个元素并删除,返回true;如果没有指定元素,返回false。
*/
boolean removeFirstOccurrence(Object o);
/**
* 从队列中删除最后一个出现的指定元素。
* 如果有这个元素并删除,返回true;如果没有指定元素,返回false。
*/
boolean removeLastOccurrence(Object o);
// *** BlockingQueue methods ***
/**
* 添加一个元素到队尾,如果成功,返回true。如果队列满了,抛出IllegalStateException。
*/
boolean add(E e);
/**
* 添加一个元素到队尾。添加成功,返回treue;如果队列满了,返回false。
*/
boolean offer(E e);
/**
* 将元素插入队尾。如果队列满了,阻塞等待。
*/
void put(E e) throws InterruptedException;
/**
* 将元素插入队尾。如果队列满了,阻塞等待。
* 如果插入成功,返回true;如果等待超时,返回false。
*/
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 获取并移除队头元素,如果队头没有元素,抛出NoSuchElementException异常。
*/
E remove();
/**
* 获取并移除队头元素,如果队头没有元素,返回null。
*/
E poll();
/**
* 获取并移除队头元素,如果队头没有元素,阻塞等待,直到有元素可以获取。
*/
E take() throws InterruptedException;
/**
* 获取并移除队头元素,如果队头没有元素,阻塞等待。
* 如果过了给定的时间还不能获取元素,返回null。
*/
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
/**
* 查看队头元素,如果队头没有元素,抛出NoSuchElementException。
*/
E element();
/**
* 查看队头元素,如果队头没有元素,返回null。
*/
E peek();
/**
* 从队列中删除第一个出现的指定元素。
* 如果有这个元素并删除,返回true;如果没有指定元素,返回false。
*/
boolean remove(Object o);
/**
* 查看队列是否包含给定的元素,包含返回true;不包含返回false。
*/
public boolean contains(Object o);
/**
* 返回队列中元素数量。
*/
public int size();
/**
* 返回一个从头到尾排序的迭代器。
*/
Iterator<E> iterator();
// *** Stack methods ***
/**
* 相当于addFirst(Object)
*/
void push(E e);
}
- 接下来看下LinkedBlockingDeque内部数据结构:
public class LinkedBlockingDeque<E>
extends AbstractQueue<E>
implements BlockingDeque<E>, java.io.Serializable {
private static final long serialVersionUID = -387911632671998426L;
/** Doubly-linked list node class */
static final class Node<E> {
/**
* 保存元素的域,如果为nul说明当前节点已被删除。
*/
E item;
/**
* - 指向其前驱节点。
* - 如果指向自身,说明前面是队尾节点。
* - 如果为null,说明没有前驱节点。
*/
Node<E> prev;
/**
* - 指向其后继节点。
* - 如果指向自身,说明后面是队头节点。
* - 如果为null,说明没有后继节点。
*/
Node<E> next;
Node(E x, Node<E> p, Node<E> n) {
item = x;
prev = p;
next = n;
}
}
/** 指向队头节点 */
transient Node<E> first;
/** 指向队尾节点 */
transient Node<E> last;
/** 队列中元素数量 */
private transient int count;
/** 队列最大容量 */
private final int capacity;
/** 队列中保护访问使用的锁 */
final ReentrantLock lock = new ReentrantLock();
/** 获取元素的等待条件(队列非空) */
private final Condition notEmpty = lock.newCondition();
/** 插入元素的等待条件(队列非满) */
private final Condition notFull = lock.newCondition();
/**
* 不指定容量,默认为Integer.MAX_VALUE(可认为无限)。
*/
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE);
}
public LinkedBlockingDeque(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
}
内部结构:一个双向链表、一把锁和两个锁条件。
- 再看下内部的一些基础操作,这些操作必须在持有锁的前提下调用:
/**
* Links e as first element, or returns false if full.
*/
private boolean linkFirst(E e) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false; //如果队列已满,返回false;
Node<E> f = first;
//新建节点x,用来存放数据e,将e插入到队头节点前面。
Node<E> x = new Node<E>(e, null, f);
//然后将e设置为队头节点。
first = x;
if (last == null)
last = x; //如果没有队尾节点,那么将x设置为队尾节点。
else
f.prev = x; //如果有队尾节点,那么将f的prev指向x,完成节点拼接。
++count; // 累加当前元素计数
notEmpty.signal(); //有元素入队了,唤醒在notEmpty上等待的获取元素的线程。
return true;
}
linkFirst就是将一个元素插入到队头,并成为新的队头元素。
/**
* Links e as last element, or returns false if full.
*/
private boolean linkLast(E e) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false; //如果队列已满,返回false;
Node<E> l = last;
//新建节点x,用来存放数据e,将e插入到队尾节点后面。
Node<E> x = new Node<E>(e, l, null);
last = x;
if (first == null)
first = x; //如果没有队头节点,那么将x设置为队头节点。
else
l.next = x; //如果有队头节点,那么将l的next指向x,完成节点拼接。
++count; // 累加当前元素计数
notEmpty.signal(); //有元素入队了,唤醒在notEmpty上等待的获取元素的线程。
return true;
}
linkLast就是将一个元素插入到队尾,并成为新的队尾元素。
/**
* Removes and returns first element, or null if empty.
*/
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first; //获取队头节点f。
if (f == null)
return null; //如果没有队头节点,返回null。
Node<E> n = f.next; //获取f的后继节点。
E item = f.item; //获取f的元素item。
f.item = null; //将f的item域置空。
f.next = f; // 将f的next域指向自身,帮助GC。
first = n; //将f的后继节点n设置为新的队头节点。
if (n == null)
last = null; //如果n为空,说明队列为空了,把队尾节点也置空一下。
else
n.prev = null; //如果n不为空,现在n是队头节点,需要将其prev域置空。
--count; //递减当前元素计数
notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。
return item; //返回元素item。
}
unlinkFirst就是将现有的队头节点移除,并将其后继节点设置为新的队头节点,并返回移除的队头节点中保存的元素。
/**
* Removes and returns last element, or null if empty.
*/
private E unlinkLast() {
// assert lock.isHeldByCurrentThread();
Node<E> l = last; //获取队尾节点l。
if (l == null)
return null; //如果没有队尾节点,返回null。
Node<E> p = l.prev; //获取l的前驱节点。
E item = l.item; //获取l的元素item。
l.item = null; //将l的item域置空。
l.prev = l; // 将f的prev域指向自身,帮助GC。
last = p; //将l的前驱节点p设置为新的队尾节点。
if (p == null)
first = null; //如果p为空,说明队列为空了,把队头节点也置空一下。
else
p.next = null; //如果p不为空,现在p是队尾节点,需要将其next域置空。
--count; //递减当前元素计数
notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。
return item; //返回元素item。
}
unlinkLast就是将现有的队尾节点移除,并将其前驱节点设置为新的队尾节点,并返回移除的队尾节点中保存的元素。
/**
* Unlinks x
*/
void unlink(Node<E> x) {
// assert lock.isHeldByCurrentThread();
Node<E> p = x.prev;
Node<E> n = x.next;
if (p == null) {
unlinkFirst(); //如果x没有前驱节点,那么x就是队头节点,所以调用一下unlinkFirst就可以了。
} else if (n == null) {
unlinkLast(); //如果x没有后继节点,那么x就是队尾节点,所以调用一下unlinkLast就可以了。
} else {
p.next = n; //将x的前驱节点p的next指向x的后继节点n。
n.prev = p; //将x的后继节点n的prev指向x的前驱节点n。
x.item = null; //置空x的item域。
// 注意上面并没有清理x本身的prev和next域,因为它们可能正在被某个迭代器使用中。
--count; //递减当前元素计数
notFull.signal(); //有元素出队了,唤醒在notFull上等待的插入元素的线程。
}
}
- LinkedBlockingDeque中利用上述基础方法来实现BlockingDeque接口定义的方法,随便看几个:
public void putFirst(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkFirst(e))
notFull.await(); //如果插入元素到队头失败,在notFull条件上等待。
} finally {
lock.unlock();
}
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(e))
notFull.await(); //如果插入元素到队尾失败,在notFull条件上等待。
} finally {
lock.unlock();
}
}
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await(); //如果从队头获取并删除元素失败,在notEmpty条件上等待。
return x;
} finally {
lock.unlock();
}
}
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkLast()) == null)
notEmpty.await(); //如果从队尾获取并删除元素失败,在notEmpty条件上等待。
return x;
} finally {
lock.unlock();
}
}
- 其他的方法也很容易看懂了,这里就不啰嗦了。最后注意LinkedBlockingDeque的迭代器是弱一致的,而且支持双向迭代器。
LinkedBlockingDeque的代码解析完毕! 参见:
Jdk1.6 JUC源码解析(7)-locks-ReentrantLock
Jdk1.6 Collections Framework源码解析(3)-ArrayDeque