Jdk1.6 JUC源码解析(24)-ConcurrentLinkedQueue

Jdk1.6 JUC源码解析(24)-ConcurrentLinkedQueue

作者:大飞

 

功能简介:

  • ConcurrentLinkedQueue是一种基于单向链表实现的无界的线程安全队列。队列中的元素遵循先入先出(FIFO)的规则。新元素插入到队列的尾部,从队列头部取出元素。
  • ConcurrentLinkedQueue内部采用一种wait-free(无等待)算法来实现。

 
源码分析:

  • 首先看下内部结构:
public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
        implements Queue<E>, java.io.Serializable {
    private static final long serialVersionUID = 196745693267521676L;

    private static class Node<E> {
        private volatile E item;
        private volatile Node<E> next;
        Node(E item) {
            // Piggyback on imminent casNext()
            lazySetItem(item);
         } 
        E getItem() {
            return item;
        }
        boolean casItem(E cmp, E val) {
            return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
        }
        void setItem(E val) {
             item = val;
        }
        void lazySetItem(E val) {
            UNSAFE.putOrderedObject(this, itemOffset, val);
        }
        void lazySetNext(Node<E> val) {
            UNSAFE.putOrderedObject(this, nextOffset, val);
        } 
        Node<E> getNext() {
            return next;
        }
        boolean casNext(Node<E> cmp, Node<E> val) {
            return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
        }
        // Unsafe mechanics
        private static final sun.misc.Unsafe UNSAFE =
        sun.misc.Unsafe.getUnsafe();
        private static final long nextOffset =
        objectFieldOffset(UNSAFE, "next", Node.class);
        private static final long itemOffset =
        objectFieldOffset(UNSAFE, "item", Node.class);
    }
    /**
    * 表示第一个存活的节点(头节点),访问此节点时间复杂度为O(1)。
    * Invariants:
    * - 所有存活的节点都能通过头结点到达。
    * Non-invariants:
    * - 头节点的元素可以为null。
    * - 可以允许尾节点比头节点滞后,也就是说,可能从头节点无法到达尾节点。
    */
    private transient volatile Node<E> head = new Node<E>(null);
    /**
    * 队列的尾节点(唯一的next为null的节点),访问此节点时间复杂度为O(1)。
    * Invariants:
    * - 最后的节点可以通过对tail调用succ()方法访问到。
    * Non-invariants:
    * - 尾节点的元素可以为null。
    * - 可以允许尾节点比头节点滞后,也就是说,可能从头节点无法到达尾节点。
    * - 尾节点的next可能指向自身。
    */ 
    private transient volatile Node<E> tail = head;

    public ConcurrentLinkedQueue() {}

       
可见,内部并没有锁,只有一个头节点和一个尾节点。初始时头节点和尾节点都指向同一节点。  

  • 接下来看一下添加元素到队列的方法:
    public boolean add(E e) {
        return offer(e);
    }
    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();
        Node<E> n = new Node<E>(e);
        retry:
        for (;;) {
            Node<E> t = tail;
            Node<E> p = t;
            for (int hops = 0; ; hops++) {
                Node<E> next = succ(p); // 1.获取p的后继节点。(如果p的next指向自身,返回head节点)
                if (next != null) { // 2.如果next不为null
                    if (hops > HOPS && t != tail) 
                        continue retry; // 3.如果自旋次数大于HOPS,且t不是尾节点,跳出2层循环重试。
                    p = next; // 4.如果自旋字数小于HOPS或者t是尾节点,将p指向next。
                } else if (p.casNext(null, n)) { // 5.如果next为null,尝试将p的next节点设置为n,然后自旋。
                    if (hops >= HOPS)
                        casTail(t, n); // 6.如果设置成功且自旋次数大于HOPS,尝试将n设置为尾节点,失败也没关系。 
                    return true; // 7.添加成功。
                } else {
                    p = succ(p); // 8。如果第5步尝试将p的next节点设置为n失败,那么将p指向p的后继节点,然后自旋。
                }
            }
        }
    }
     final Node<E> succ(Node<E> p) {
         Node<E> next = p.getNext();
         //如果p节点的next节点指向自身,那么返回head节点;否则返回p的next节点。
         return (p == next) ? head : next;
     }
    /**
     * 允许头尾节点的指针滞后,所以当头尾节点离"实际位置"的距离 
     * (按节点)小于HOPS时,不会去更新头尾指针。这里是假设volatile写代价比volatile读高。
     */
     private static final int HOPS = 1;

 

       
简单分析一下offer方法里的过程:
              1.假设没有竞争的情况下,初始状态时head和tail都指向一个节点,这时来了一个新节点n1,代码走向上面注释第1行,得到的next为null,然后走向第5行,然后将p(也就是tail节点)的next节点设置为n1,成功返回。注意这里并没有调整tail指针,head和tail还是指向之前的节点。然后再来一个新节点n2,还是走向第1行,得到的next是n1,然后走向第8行,将p指向n1,自旋一次,又来到第1行,得到的next为null,然后走向第5行,将p(也就是n1)的next节点设置为n2。由于已经自旋一次,所以这时还会走向第6行,将tail指向n2。
             2.有竞争的情况也很好分析,假设在注释第5行竞争失败,那么会走向第8行,将p向队尾推进一步,然后重试;如果重试了多次后(超过一次),在注释第一行获取的next仍然不为空,且同时尾节点也被其他线程推进了,那说明当前节点离尾节点太远了,跳出循环,重新定位尾节点然后再试,这样可以减少自旋次数。  

  • 再看一下从队列中获取元素的方法:
    public E poll() {
        Node<E> h = head;
        Node<E> p = h;
        for (int hops = 0; ; hops++) {
            E item = p.getItem(); // 1.获取p节点上的元素item。
            if (item != null && p.casItem(item, null)) { // 2.如果item不为null,尝试将p的item设置为null。
                if (hops >= HOPS) { 
                    Node<E> q = p.getNext();
                    updateHead(h, (q != null) ? q : p); // 3.如果自旋次数大于HOPS,尝试更新头节点。
                }
                return item; // 4.获取元素成功。
            }
            Node<E> next = succ(p); // 5.获取p的后继节点。(如果p的next指向自身,那么返回head节点)
            if (next == null) {
                updateHead(h, p); // 6.如果p的后继节点为null,尝试将p设置为头节点,然后跳出循环。
                break;
            }
            p = next;
        }
        return null; // 7.从第6步过来,没有成功获取元素,返回null。
    }

     final void updateHead(Node<E> h, Node<E> p) {
         if (h != p && casHead(h, p))
             h.lazySetNext(h); //将h的next指向自身,帮助GC。
     }

       和offer方法一样的思路。

       再看下peek方法,只获取元素,不移除节点: 

    public E peek() {
        Node<E> h = head;
        Node<E> p = h;
        E item; 
        for (;;) {
            item = p.getItem();
            if (item != null)
                break;
            Node<E> next = succ(p);
            if (next == null) {
                break;
            }
            p = next;
        }
        updateHead(h, p);
        return item;
    }

 

  • 继续看一下其他方法:

         看其他方法之前,先看一个内部支持方法first:

     Node<E> first() {
         Node<E> h = head;
         Node<E> p = h;
         Node<E> result;
         for (;;) { 
             E item = p.getItem();
             if (item != null) {
                 result = p;
                 break;
             }
             Node<E> next = succ(p);
             if (next == null) {
                 result = null;
                 break;
             }
             p = next;
         }
         updateHead(h, p); // 这里会帮助推进一下头节点。
         return result;
     }

       可见,和peek里面的逻辑基本一致。但之所以没有利用first来实现peek,而是单独写peek,是因为可以减少一次volatile读。   

    public boolean isEmpty() {
        return first() == null;
    }
    /**
     * 注意这个方法内部使用遍历实现,时间复杂度为O(n)。
     */
    public int size() {
        int count = 0;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            if (p.getItem() != null) {
                // Collections.size() spec says to max out
                if (++count == Integer.MAX_VALUE)
                    break;
            }
        }
        return count;
    }

    public boolean contains(Object o) {
        if (o == null) return false;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.getItem();
            if (item != null &&
                o.equals(item))
                return true;
        }
        return false;
    }

    public boolean remove(Object o) {
        if (o == null) return false;
        Node<E> pred = null;
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.getItem();
            if (item != null &&
                o.equals(item) &&
                p.casItem(item, null)){
                Node<E> next = succ(p);
                if(pred != null && next != null)
                   pred.casNext(p, next);
                return true;
            }
            pred = p;
        }
        return false;
    }

    public Object[] toArray() {
        // Use ArrayList to deal with resizing.
        ArrayList<E> al = new ArrayList<E>();
        for (Node<E> p = first(); p != null; p = succ(p)) {
            E item = p.getItem();
            if (item != null)
                al.add(item);
        }
        return al.toArray();
    }

    @SuppressWarnings("unchecked")
    public <T> T[] toArray(T[] a) {
        // try to use sent-in array
        int k = 0;
        Node<E> p;
        for (p = first(); p != null && k < a.length; p = succ(p)) {
            E item = p.getItem();
            if (item != null)
                a[k++] = (T)item;
        }
        if (p == null) {
            if (k < a.length)
                a[k] = null;
            return a;
        }
        // If won't fit, use ArrayList version
        ArrayList<E> al = new ArrayList<E>();
        for (Node<E> q = first(); q != null; q = succ(q)) {
            E item = q.getItem();
            if (item != null)
                al.add(item);
        }
        return al.toArray(a);
    }

        这些方法都是基于first方法实现,代码很容易看懂,这里就不啰嗦了。  

  • 最后,ConcurrentLinkedQueue的迭代器也是弱一致的。

         
小总结一下:
              ConcurrentLinkedQueue内部在插入或者移除元素时,不会即时设置头尾节点,而是有一个缓冲期(一个节点长的距离),这样能减少一些CAS操作;其次在设置头尾节点的时候,也不会CAS-Loop直到成功,只尝试一次,失败也没关系,下一次操作或者其他线程在操作时会帮忙推进头尾节点(将头尾指针指向正确位置)。          ConcurrentLinkedQueue的代码解析完毕!

 

 

    原文作者:JUC
    原文地址: https://blog.csdn.net/iteye_11160/article/details/82642748
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞