ConcurrentLinkedQueue是一个基于链表的、无界的、线程安全的队列。此队列按照FIFO原则对元素进行排序。此队列不允许使用null元素,采用了有效的“无等待(wait-free)”算法(CAS算法)。
与大多数collection不同,size方法不是一个固定时间操作。由于这些队列的异步特性,确定当前元素的数量需要遍历这些元素。
节点类
private static class Node<E> {
volatile E item;
volatile Node<E> next;
Node(E item) {
UNSAFE.putObject(this, itemOffset, item);
}
boolean casItem(E cmp, E val) {
return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
void lazySetNext(Node<E> val) {
UNSAFE.putOrderedObject(this, nextOffset, val);
}
boolean casNext(Node<E> cmp, Node<E> val) {
return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = Node.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
可以看出,item与next都是volatile的,关于volatile,我们知道volatile支持可见性。同时,对节点的操作如casItem()、casNext()方法都是CAS操作。
属性
/** * 头节点 * 以下表达式一直成立: * - all live nodes are reachable from head via succ() * - head != null * - (tmp = head).next != tmp || tmp != head * 以下表达式不一定成立: * - head.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! */
private transient volatile Node<E> head;
/** * 尾节点 * 以下表达式一直成立: * - the last node is always reachable from tail via succ() * - tail != null * 以下表达式不一定成立: * - tail.item may or may not be null. * - it is permitted for tail to lag behind head, that is, for tail * to not be reachable from head! * - tail.next may or may not be self-pointing to tail. */
private transient volatile Node<E> tail;
ConcurrentLinkedQueue的属性成员与其他队列的相比,出奇地少,没有元素数量count、队列容量capacity、锁lock 、condition。
方法摘要
//构造方法摘要
ConcurrentLinkedQueue()
//创建一个最初为空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue(Collection<? extends E> c)
//创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
//方法摘要
boolean add(E e)
//将指定元素插入此队列的尾部。
boolean contains(Object o)
//如果此队列包含指定元素,则返回 true。
boolean isEmpty()
//如果此队列不包含任何元素,则返回 true。
Iterator<E> iterator()
//返回在此队列元素上以恰当顺序进行迭代的迭代器。
boolean offer(E e)
//将指定元素插入此队列的尾部。
E peek()
//获取但不移除此队列的头;如果此队列为空,则返回 null。
E poll()
//获取并移除此队列的头,如果此队列为空,则返回 null。
boolean remove(Object o)
//从队列中移除指定元素的单个实例(如果存在)。
int size()
//返回此队列中的元素数量。
Object[] toArray()
//返回以恰当顺序包含此队列所有元素的数组。
<T> T[] toArray(T[] a)
//返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
offer(E)
//将指定元素插入此队列的尾部。
public boolean offer(E e) {
//不允许元素为null
checkNotNull(e);
//新建节点
final Node<E> newNode = new Node<E>(e);
//将节点插入队列
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
//case1:q == null,说明p是尾节点
if (q == null) {
//如果p的next为null,则将p的next置为newNode。
if (p.casNext(null, newNode)) {
//如果p != t,说明?
if (p != t) // hop two nodes at a time
//将newNode设为新的尾节点
casTail(t, newNode); // Failure is OK.
return true;
}
// Lost CAS race to another thread; re-read next
}
//case2:?
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
p = (t != (t = tail)) ? t : head;
//case3:?
else
// Check for tail updates after two hops.
p = (p != t && t != (t = tail)) ? t : q;
}
}
从以上代码中可以看出,offer(E e)是通过CAS来实现线程安全的,而不是锁。
poll()
//获取并移除此队列的头,如果此队列为空,则返回 null。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
//case1:如果表头节点item不为null,且p.casItem(item, null)操作成功
if (item != null && p.casItem(item, null)) {
//如果?,则更新表头,设置表头为p
if (p != h) // hop two nodes at a time
updateHead(h, ((q = p.next) != null) ? q : p);
//返回原表头的item值。
return item;
}
//case2:如果表头节点item为null,且只有表头一个节点
else if ((q = p.next) == null) {
//将表头更新为p?
updateHead(h, p);
//返回null
return null;
}
//case3:由于情况4导致此种情况发生
else if (p == q)
//跳转到restartFromHead标记重新操作
continue restartFromHead;
//case4:如果p.casItem(item, null)操作失败
else
//交由case3处理
p = q;
}
}
}
updateHead(Node, Node)源码为
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
size()
public int size() {
int count = 0;
for (Node<E> p = first(); p != null; p = succ(p))
if (p.item != null)
// Collection.size() spec says to max out
if (++count == Integer.MAX_VALUE)
break;
return count;
}
可以看出,与大多数collection不同,size方法不是一个固定时间操作。由于队列的异步特性,确定当前元素的数量需要遍历这些元素。
本文就讲到这里,想了解Java并发编程更多内容请参考:
- Java并发编程札记-目录