JUC ConcurrentLinkedQueue

今天来介绍一个高性能的非阻塞队列,同样来自java.util.concurrent包,出自我们的大神Doug Lea(献上我的膝盖)

An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
This queue orders elements FIFO (first-in-first-out).
The <em>head</em> of the queue is that element that has been on the
queue the longest time.
The <em>tail</em> of the queue is that element that has been on the
queue the shortest time. New elements
are inserted at the tail of the queue, and the queue retrieval
operations obtain elements at the head of the queue.
A {@code ConcurrentLinkedQueue} is an appropriate choice when
many threads will share access to a common collection.
Like most other concurrent collection implementations, this class
does not permit the use of {@code null} elements.

先来段源码的注释:

ConcurrentLinkedQueue是基于链表数据结构的一个无界的线程安全队列.满足队列的基本特性FIFO,先进先出.队列的头节点是在队列中存在时间最长的元素,队列的尾节点是在队列中存在时间最短的元素.新的入队元素会插入在队列的尾部,队列从头部进行获取队列的元素.在多线程同时操作共享集合的时候ConcurrentLinkedQueue是一个非常好的选择.和其他并发集合的实现一样,这个类也不允许使用null类型的元素.

这个翻译比之前的翻译有了进步,简单的介绍了下ConcurrentLinkedQueue的特性

下面来看一下Node这个内部类:

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    /**
     * Constructs a new node.  Uses relaxed write because item can
     * only be seen after publication via casNext.
     */
    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics

    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

有没有一见如故的感觉,Node是实现ConcurrentLinkedQueue的基础数据结构,主要包含了一个数据承载成员变量item和一个指向下一个节点的next成员变量(形成链式结构).除了这些就是大量的CAS操作,相信阅读过之前博客后可以很容易理解,此处就不再赘述.

下面来介绍ConcurrentLinkedQueue中比较重要的两个”指针”,头节点和尾节点,贴源码:

/**
 * A node from which the first live (non-deleted) node (if any)
 * can be reached in O(1) time.
 * Invariants:
 * - all live nodes are reachable from head via succ()
 * - head != null
 * - (tmp = head).next != tmp || tmp != head
 * Non-invariants:
 * - head.item may or may not be null.
 * - it is permitted for tail to lag behind head, that is, for tail
 *   to not be reachable from head!
 */
private transient volatile Node<E> head;

/**
 * A node from which the last node on list (that is, the unique
 * node with node.next == null) can be reached in O(1) time.
 * Invariants:
 * - the last node is always reachable from tail via succ()
 * - tail != null
 * Non-invariants:
 * - tail.item may or may not be null.
 * - it is permitted for tail to lag behind head, that is, for tail
 *   to not be reachable from head!
 * - tail.next may or may not be self-pointing to tail.
 */
private transient volatile Node<E> tail;

/**
 * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
 */
public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

head节点代表的是第一个存活的(未被删除)的节点,可以以时间复杂度O(1)进行访问

tail节点代表队列中的最后一个节点(唯一一个后继元素为null的节点),同样也可以以时间复杂度O(1)进行访问

构造器直接将head和tail赋值,所以head和tail不会为null.

另外一个构造方法,应该不难理解,贴下源码

/**
 * Creates a {@code ConcurrentLinkedQueue}
 * initially containing the elements of the given collection,
 * added in traversal order of the collection's iterator.
 *
 * @param c the collection of elements to initially contain
 * @throws NullPointerException if the specified collection or any
 *         of its elements are null
 */
public ConcurrentLinkedQueue(Collection<? extends E> c) {
    Node<E> h = null, t = null;
    for (E e : c) {
        checkNotNull(e);
        Node<E> newNode = new Node<E>(e);
        if (h == null)
            h = t = newNode;
        else {
            t.lazySetNext(newNode);
            t = newNode;
        }
    }
    if (h == null)
        h = t = new Node<E>(null);
    head = h;
    tail = t;
}

下面介绍一个比较重要的方法:

/**
 * Tries to CAS head to p. If successful, repoint old head to itself
 * as sentinel for succ(), below.
 */
final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

尝试通过CAS更新的方式将head节点更新为p,如果CAS成功的话,将旧的head指针指向它自己,作为一个”哨兵节点”在succ()方法中使用

如果h!=p,并且CAS设置head节点操作成功,将节点的next节点设置为自己(老的head节点的next指向自己)

获取节点p的后继节点方法succ:

/**
 * Returns the successor of p, or the head node if p.next has been
 * linked to self, which will only be true if traversing with a
 * stale pointer that is now off the list.
 */
final Node<E> succ(Node<E> p) {
    Node<E> next = p.next;
    return (p == next) ? head : next;
}

如果节点p的后继节点指向了自己就返回头节点head,否则返回节点p的next节点

下面将迎来我们非常重量级的方法offer:

/**
 * Inserts the specified element at the tail of this queue.
 * As the queue is unbounded, this method will never return {@code false}.
 *
 * @return {@code true} (as specified by {@link Queue#offer})
 * @throws NullPointerException if the specified element is null
 */
public boolean offer(E e) {
    checkNotNull(e);                                          (1)
    final Node<E> newNode = new Node<E>(e);                   (2)

    for (Node<E> t = tail, p = t;;) {                         (3)
        Node<E> q = p.next;                                   (4)
        if (q == null) {                                      (5)
            // p is last node
            if (p.casNext(null, newNode)) {                   (6)
                // Successful CAS is the linearization point
                // for e to become an element of this queue,
                // and for newNode to become "live".
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
            // Lost CAS race to another thread; re-read next
        }
        else if (p == q)
            // We have fallen off list.  If tail is unchanged, it
            // will also be off-list, in which case we need to
            // jump to head, from which all live nodes are always
            // reachable.  Else the new tail is a better bet.
            p = (t != (t = tail)) ? t : head;
        else
            // Check for tail updates after two hops.
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

无锁操作的经典使用在这里体现的是淋漓尽致,说下我自己的理解,不对的地方大家见谅,指正

1>入队元素进行检查,如果为null直接抛出空指针异常

2>通过给定入队元素构建一个新的Node节点

3>开始一个循环,初始化操作t 和 p都指向tail节点

4>将p节点的next节点赋值给q节点

5>判断q节点是否为null,q为p节点的后继节点,如果q为null的话说明p节点是最后一个节点

6>判定了p节点是最后一个节点,则通过CAS操作设置p节点的后继节点为新构造的节点,如果CAS操作成功,就会返回true并退出循环.当然中间还有一个判定操作,如果p != t则去CAS更新tail节点为新节点.第一次入队操作显然是不满足,不会进行CAS更新tail节点.官方注释hop two nodes at a time表明tail节点是延迟更新的,tail节点可能存在不指向尾节点的情况 casTail后面的注释 Failure is OK CAS更新尾节点失败也是可以的,表明其他线程已经更新了尾节点,所以该线程CAS更新尾节点失败也是可以的

下面以第二次入队为例,再进行说明.第一次入队的时候没有更新tail节点,所以p = t = tail此时p节点并不是指向最后一个节点.p.next是最后一个节点并且赋值给了q,此时null == q的条件肯定是不满足, p == q的条件也不满足,左右进入最后的else分支

p = (p != t && t != (t = tail) ? t : q

这是个值得深思的表达式

p != t在第二次入队操作的时候肯定是不满足的,所以直接执行的操作就是 p = q, p节点直接指向了q节点,也就是p节点的next节点,在第二次入队操作中同样是队列中的最后一个节点.赋值操作完成后重新进行循环,基本流程和步骤6差不多,但是此时p != t 为false判断条件已经成立,所以会CAS更新tail节点(成功失败都可以,因为肯定会有一个线程更新成功才会导致其他线程失败)

关于p == q也就是第二个else if分支是怎么产生的,其实这就是”哨兵节点”,后续会进行讲解, 如果遇到了”哨兵节点” 执行 p = (t != (t = tail)) ? t : head 这种情况在多线程并发时是存在的, 当其他线程修改了tail节点,表达式成立t被赋值为t, 不成立的话从head头结点继续遍历

这样循环往复每两次更新下tail节点进行入队操作

未完待续…poll()出队操作下次讲解

    原文作者:JUC
    原文地址: https://blog.csdn.net/liuhongze123/article/details/80135031
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞