JUC队列-ConcurrentLinkedQueue(四)

ConcurrentLinkedQueue介绍

ConcurrentLinkedQueue是线程安全的队列,它适用于“高并发”的场景。

它是一个基于链接节点的无界线程安全队列,按照 FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。

ConcurrentLinkedQueue的UML图:

《JUC队列-ConcurrentLinkedQueue(四)》

说明:

  1. ConcurrentLinkedQueue继承于AbstractQueue。
  2. ConcurrentLinkedQueue内部是通过链表来实现的。它同时包含链表的头节点head和尾节点tail。ConcurrentLinkedQueue按照 FIFO(先进先出)原则对元素进行排序。元素都是从尾部插入到链表,从头部开始返回。
  3. ConcurrentLinkedQueue的链表Node中的next的类型是volatile,而且链表数据item的类型也是volatile。关于volatile,我们知道它的语义包含:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。ConcurrentLinkedQueue就是通过volatile来实现多线程对竞争资源的互斥访问的。

LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁。对于插入操作,通过“插入锁putLock”进行同步;对于取出操作,通过“取出锁takeLock”进行同步。

此外,插入锁putLock和“非满条件notFull”相关联,取出锁takeLock和“非空条件notEmpty”相关联。通过notFull和notEmpty更细腻的控制锁。

ConcurrentLinkedQueue的源码分析

构造方法

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

head和tail的定义如下:

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

所以他们都是线程可见的。

Node的声明如下:

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

Node是个单向链表节点,next用于指向下一个Node,item用于存储数据。它们都是可见的,所以Node中操作节点数据的API,都是可以通过Unsafe机制的CAS函数实现的;例如casNext()是通过CAS函数“比较并设置节点的下一个节点”

添加元素

public boolean add(E e) {
    return offer(e);
}

offer()方法

public boolean offer(E e) {
    // 检查e是不是null,是的话抛出NullPointerException异常。
    checkNotNull(e);
    // 创建新的节点
    final Node<E> newNode = new Node<E>(e);

    // 将“新的节点”添加到链表的末尾。
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // 情况1:q为空
        if (q == null) {
            // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。
            // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。
            // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。
            if (p.casNext(null, newNode)) {
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        // 情况2:p和q相等
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        // 情况3:其它
        else
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

取出元素

public E poll() {
    // 设置“标记”
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            // 情况1
            // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
            // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
            if (item != null && p.casItem(item, null)) {
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 情况2
            // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // 情况3
            // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
            else if (p == q)
                continue restartFromHead;
            // 情况4
            // 设置p为q
            else
                p = q;
        }
    }

}

总结:ConcurrentLinkedQueue为什么支持高并发,它与其他队列有什么不同之处?

我们知道,常用的多线程同步机制有下面三种:

volatile 变量:轻量级多线程同步机制,不会引起上下文切换和线程调度。仅提供内存可见性保证,不提供原子性。

CAS 原子指令:轻量级多线程同步机制,不会引起上下文切换和线程调度。它同时提供内存可见性和原子化更新保证。

互斥锁:重量级多线程同步机制,可能会引起上下文切换和线程调度,它同时提供内存可见性和原子性。

ConcurrentLinkedQueue用了volatile和CAS原子指令操作队列,比起其他队列的互斥锁,更加轻量,更加适合高并发。

    原文作者:JUC
    原文地址: https://blog.csdn.net/qq_33394088/article/details/79058887
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞