在前面的篇章中,我们详细分析了AQS,并提到了里面一个关键数据结构:所有阻塞线程组成的一个等待队列,这个队列是用单向无锁链表实现的。
今天所讲的ConcurrentLinkedQueue,其实现和AQS中的无锁队列基本一样。所以,如果你深刻理解了AQS,ConcurrentLinkedQueue也就知道了。出于内容的完整性,在此还是列一下其源码:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E> implements Queue<E>, java.io.Serializable {
//单向链表的Node
private static class Node<E> {
private volatile E item;
private volatile Node<E> next;```
。。。
}
//整个队列记录1头1尾2个结点
private transient volatile Node<E> head = new Node<E>(null);
private transient volatile Node<E> tail = head;
//入队,也即cas tail (乐观锁)
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
Node<E> n = new Node<E>(e);
retry:
for (;;) {
Node<E> t = tail;
Node<E> p = t;
for (int hops = 0; ; hops++) {
Node<E> next = succ(p);
if (next != null) {
if (hops > HOPS && t != tail)
continue retry;
p = next;
} else if (p.casNext(null, n)) {
if (hops >= HOPS)
casTail(t, n); // Failure is OK.
return true;
} else {
p = succ(p);
}
}
}
}
//出队,cas head,乐观锁
public E poll() {
Node<E> h = head;
Node<E> p = h;
for (int hops = 0; ; hops++) {
E item = p.getItem();
if (item != null && p.casItem(item, null)) {
if (hops >= HOPS) {
Node<E> q = p.getNext();
updateHead(h, (q != null) ? q : p);
}
return item;
}
Node<E> next = succ(p);
if (next == null) {
updateHead(h, p);
break;
}
p = next;
}
return null;
}
。。。
}