Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue

参考:http://www.cnblogs.com/skywang12345/p/java_threads_category.html 

概要

本章对Java.util.concurrent包中的ConcurrentHashMap类进行详细的介绍。内容包括:
ConcurrentLinkedQueue介绍
ConcurrentLinkedQueue原理和数据结构

ConcurrentLinkedQueue函数列表
ConcurrentLinkedQueue源码分析(JDK1.7.0_40版本)
ConcurrentLinkedQueue示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3498995.html

 

ConcurrentLinkedQueue介绍

ConcurrentLinkedQueue是线程安全的队列,它适用于“高并发”的场景。
它是一个基于链接节点的无界线程安全队列,按照 FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。

 

ConcurrentLinkedQueue原理和数据结构

ConcurrentLinkedQueue的数据结构,如下图所示:

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

说明
1. ConcurrentLinkedQueue继承于AbstractQueue。
2. ConcurrentLinkedQueue内部是通过链表来实现的。它同时包含链表的头节点head和尾节点tail。ConcurrentLinkedQueue按照 FIFO(先进先出)原则对元素进行排序。元素都是从尾部插入到链表,从头部开始返回。
3. ConcurrentLinkedQueue的链表Node中的next的类型是volatile,而且链表数据item的类型也是volatile。关于volatile,我们知道它的语义包含:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。ConcurrentLinkedQueue就是通过volatile来实现多线程对竞争资源的互斥访问的。

 

ConcurrentLinkedQueue函数列表

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

// 创建一个最初为空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue()
// 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
ConcurrentLinkedQueue(Collection<? extends E> c)

// 将指定元素插入此队列的尾部。
boolean add(E e)
// 如果此队列包含指定元素,则返回 true。
boolean contains(Object o)
// 如果此队列不包含任何元素,则返回 true。
boolean isEmpty()
// 返回在此队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入此队列的尾部。
boolean offer(E e)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 从队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回此队列中的元素数量。
int size()
// 返回以恰当顺序包含此队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

 

ConcurrentLinkedQueue源码分析(JDK1.7.0_40版本)

ConcurrentLinkedQueue的完整源码如下:

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》
《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

  1 /*
  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3  *
  4  *
  5  *
  6  *
  7  *
  8  *
  9  *
 10  *
 11  *
 12  *
 13  *
 14  *
 15  *
 16  *
 17  *
 18  *
 19  *
 20  *
 21  *
 22  *
 23  */
 24 
 25 /*
 26  *
 27  *
 28  *
 29  *
 30  *
 31  * Written by Doug Lea and Martin Buchholz with assistance from members of
 32  * JCP JSR-166 Expert Group and released to the public domain, as explained
 33  * at http://creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;
 37 
 38 import java.util.AbstractQueue;
 39 import java.util.ArrayList;
 40 import java.util.Collection;
 41 import java.util.Iterator;
 42 import java.util.NoSuchElementException;
 43 import java.util.Queue;
 44 
 45 /**
 46  * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
 47  * This queue orders elements FIFO (first-in-first-out).
 48  * The <em>head</em> of the queue is that element that has been on the
 49  * queue the longest time.
 50  * The <em>tail</em> of the queue is that element that has been on the
 51  * queue the shortest time. New elements
 52  * are inserted at the tail of the queue, and the queue retrieval
 53  * operations obtain elements at the head of the queue.
 54  * A {@code ConcurrentLinkedQueue} is an appropriate choice when
 55  * many threads will share access to a common collection.
 56  * Like most other concurrent collection implementations, this class
 57  * does not permit the use of {@code null} elements.
 58  *
 59  * <p>This implementation employs an efficient &quot;wait-free&quot;
 60  * algorithm based on one described in <a
 61  * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
 62  * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
 63  * Algorithms</a> by Maged M. Michael and Michael L. Scott.
 64  *
 65  * <p>Iterators are <i>weakly consistent</i>, returning elements
 66  * reflecting the state of the queue at some point at or since the
 67  * creation of the iterator.  They do <em>not</em> throw {@link
 68  * java.util.ConcurrentModificationException}, and may proceed concurrently
 69  * with other operations.  Elements contained in the queue since the creation
 70  * of the iterator will be returned exactly once.
 71  *
 72  * <p>Beware that, unlike in most collections, the {@code size} method
 73  * is <em>NOT</em> a constant-time operation. Because of the
 74  * asynchronous nature of these queues, determining the current number
 75  * of elements requires a traversal of the elements, and so may report
 76  * inaccurate results if this collection is modified during traversal.
 77  * Additionally, the bulk operations {@code addAll},
 78  * {@code removeAll}, {@code retainAll}, {@code containsAll},
 79  * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
 80  * to be performed atomically. For example, an iterator operating
 81  * concurrently with an {@code addAll} operation might view only some
 82  * of the added elements.
 83  *
 84  * <p>This class and its iterator implement all of the <em>optional</em>
 85  * methods of the {@link Queue} and {@link Iterator} interfaces.
 86  *
 87  * <p>Memory consistency effects: As with other concurrent
 88  * collections, actions in a thread prior to placing an object into a
 89  * {@code ConcurrentLinkedQueue}
 90  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 91  * actions subsequent to the access or removal of that element from
 92  * the {@code ConcurrentLinkedQueue} in another thread.
 93  *
 94  * <p>This class is a member of the
 95  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 96  * Java Collections Framework</a>.
 97  *
 98  * @since 1.5
 99  * @author Doug Lea
100  * @param <E> the type of elements held in this collection
101  *
102  */
103 public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
104         implements Queue<E>, java.io.Serializable {
105     private static final long serialVersionUID = 196745693267521676L;
106 
107     /*
108      * This is a modification of the Michael & Scott algorithm,
109      * adapted for a garbage-collected environment, with support for
110      * interior node deletion (to support remove(Object)).  For
111      * explanation, read the paper.
112      *
113      * Note that like most non-blocking algorithms in this package,
114      * this implementation relies on the fact that in garbage
115      * collected systems, there is no possibility of ABA problems due
116      * to recycled nodes, so there is no need to use "counted
117      * pointers" or related techniques seen in versions used in
118      * non-GC'ed settings.
119      *
120      * The fundamental invariants are:
121      * - There is exactly one (last) Node with a null next reference,
122      *   which is CASed when enqueueing.  This last Node can be
123      *   reached in O(1) time from tail, but tail is merely an
124      *   optimization - it can always be reached in O(N) time from
125      *   head as well.
126      * - The elements contained in the queue are the non-null items in
127      *   Nodes that are reachable from head.  CASing the item
128      *   reference of a Node to null atomically removes it from the
129      *   queue.  Reachability of all elements from head must remain
130      *   true even in the case of concurrent modifications that cause
131      *   head to advance.  A dequeued Node may remain in use
132      *   indefinitely due to creation of an Iterator or simply a
133      *   poll() that has lost its time slice.
134      *
135      * The above might appear to imply that all Nodes are GC-reachable
136      * from a predecessor dequeued Node.  That would cause two problems:
137      * - allow a rogue Iterator to cause unbounded memory retention
138      * - cause cross-generational linking of old Nodes to new Nodes if
139      *   a Node was tenured while live, which generational GCs have a
140      *   hard time dealing with, causing repeated major collections.
141      * However, only non-deleted Nodes need to be reachable from
142      * dequeued Nodes, and reachability does not necessarily have to
143      * be of the kind understood by the GC.  We use the trick of
144      * linking a Node that has just been dequeued to itself.  Such a
145      * self-link implicitly means to advance to head.
146      *
147      * Both head and tail are permitted to lag.  In fact, failing to
148      * update them every time one could is a significant optimization
149      * (fewer CASes). As with LinkedTransferQueue (see the internal
150      * documentation for that class), we use a slack threshold of two;
151      * that is, we update head/tail when the current pointer appears
152      * to be two or more steps away from the first/last node.
153      *
154      * Since head and tail are updated concurrently and independently,
155      * it is possible for tail to lag behind head (why not)?
156      *
157      * CASing a Node's item reference to null atomically removes the
158      * element from the queue.  Iterators skip over Nodes with null
159      * items.  Prior implementations of this class had a race between
160      * poll() and remove(Object) where the same element would appear
161      * to be successfully removed by two concurrent operations.  The
162      * method remove(Object) also lazily unlinks deleted Nodes, but
163      * this is merely an optimization.
164      *
165      * When constructing a Node (before enqueuing it) we avoid paying
166      * for a volatile write to item by using Unsafe.putObject instead
167      * of a normal write.  This allows the cost of enqueue to be
168      * "one-and-a-half" CASes.
169      *
170      * Both head and tail may or may not point to a Node with a
171      * non-null item.  If the queue is empty, all items must of course
172      * be null.  Upon creation, both head and tail refer to a dummy
173      * Node with null item.  Both head and tail are only updated using
174      * CAS, so they never regress, although again this is merely an
175      * optimization.
176      */
177 
178     private static class Node<E> {
179         volatile E item;
180         volatile Node<E> next;
181 
182         /**
183          * Constructs a new node.  Uses relaxed write because item can
184          * only be seen after publication via casNext.
185          */
186         Node(E item) {
187             UNSAFE.putObject(this, itemOffset, item);
188         }
189 
190         boolean casItem(E cmp, E val) {
191             return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
192         }
193 
194         void lazySetNext(Node<E> val) {
195             UNSAFE.putOrderedObject(this, nextOffset, val);
196         }
197 
198         boolean casNext(Node<E> cmp, Node<E> val) {
199             return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
200         }
201 
202         // Unsafe mechanics
203 
204         private static final sun.misc.Unsafe UNSAFE;
205         private static final long itemOffset;
206         private static final long nextOffset;
207 
208         static {
209             try {
210                 UNSAFE = sun.misc.Unsafe.getUnsafe();
211                 Class k = Node.class;
212                 itemOffset = UNSAFE.objectFieldOffset
213                     (k.getDeclaredField("item"));
214                 nextOffset = UNSAFE.objectFieldOffset
215                     (k.getDeclaredField("next"));
216             } catch (Exception e) {
217                 throw new Error(e);
218             }
219         }
220     }
221 
222     /**
223      * A node from which the first live (non-deleted) node (if any)
224      * can be reached in O(1) time.
225      * Invariants:
226      * - all live nodes are reachable from head via succ()
227      * - head != null
228      * - (tmp = head).next != tmp || tmp != head
229      * Non-invariants:
230      * - head.item may or may not be null.
231      * - it is permitted for tail to lag behind head, that is, for tail
232      *   to not be reachable from head!
233      */
234     private transient volatile Node<E> head;
235 
236     /**
237      * A node from which the last node on list (that is, the unique
238      * node with node.next == null) can be reached in O(1) time.
239      * Invariants:
240      * - the last node is always reachable from tail via succ()
241      * - tail != null
242      * Non-invariants:
243      * - tail.item may or may not be null.
244      * - it is permitted for tail to lag behind head, that is, for tail
245      *   to not be reachable from head!
246      * - tail.next may or may not be self-pointing to tail.
247      */
248     private transient volatile Node<E> tail;
249 
250 
251     /**
252      * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
253      */
254     public ConcurrentLinkedQueue() {
255         head = tail = new Node<E>(null);
256     }
257 
258     /**
259      * Creates a {@code ConcurrentLinkedQueue}
260      * initially containing the elements of the given collection,
261      * added in traversal order of the collection's iterator.
262      *
263      * @param c the collection of elements to initially contain
264      * @throws NullPointerException if the specified collection or any
265      *         of its elements are null
266      */
267     public ConcurrentLinkedQueue(Collection<? extends E> c) {
268         Node<E> h = null, t = null;
269         for (E e : c) {
270             checkNotNull(e);
271             Node<E> newNode = new Node<E>(e);
272             if (h == null)
273                 h = t = newNode;
274             else {
275                 t.lazySetNext(newNode);
276                 t = newNode;
277             }
278         }
279         if (h == null)
280             h = t = new Node<E>(null);
281         head = h;
282         tail = t;
283     }
284 
285     // Have to override just to update the javadoc
286 
287     /**
288      * Inserts the specified element at the tail of this queue.
289      * As the queue is unbounded, this method will never throw
290      * {@link IllegalStateException} or return {@code false}.
291      *
292      * @return {@code true} (as specified by {@link Collection#add})
293      * @throws NullPointerException if the specified element is null
294      */
295     public boolean add(E e) {
296         return offer(e);
297     }
298 
299     /**
300      * Try to CAS head to p. If successful, repoint old head to itself
301      * as sentinel for succ(), below.
302      */
303     final void updateHead(Node<E> h, Node<E> p) {
304         if (h != p && casHead(h, p))
305             h.lazySetNext(h);
306     }
307 
308     /**
309      * Returns the successor of p, or the head node if p.next has been
310      * linked to self, which will only be true if traversing with a
311      * stale pointer that is now off the list.
312      */
313     final Node<E> succ(Node<E> p) {
314         Node<E> next = p.next;
315         return (p == next) ? head : next;
316     }
317 
318     /**
319      * Inserts the specified element at the tail of this queue.
320      * As the queue is unbounded, this method will never return {@code false}.
321      *
322      * @return {@code true} (as specified by {@link Queue#offer})
323      * @throws NullPointerException if the specified element is null
324      */
325     public boolean offer(E e) {
326         checkNotNull(e);
327         final Node<E> newNode = new Node<E>(e);
328 
329         for (Node<E> t = tail, p = t;;) {
330             Node<E> q = p.next;
331             if (q == null) {
332                 // p is last node
333                 if (p.casNext(null, newNode)) {
334                     // Successful CAS is the linearization point
335                     // for e to become an element of this queue,
336                     // and for newNode to become "live".
337                     if (p != t) // hop two nodes at a time
338                         casTail(t, newNode);  // Failure is OK.
339                     return true;
340                 }
341                 // Lost CAS race to another thread; re-read next
342             }
343             else if (p == q)
344                 // We have fallen off list.  If tail is unchanged, it
345                 // will also be off-list, in which case we need to
346                 // jump to head, from which all live nodes are always
347                 // reachable.  Else the new tail is a better bet.
348                 p = (t != (t = tail)) ? t : head;
349             else
350                 // Check for tail updates after two hops.
351                 p = (p != t && t != (t = tail)) ? t : q;
352         }
353     }
354 
355     public E poll() {
356         restartFromHead:
357         for (;;) {
358             for (Node<E> h = head, p = h, q;;) {
359                 E item = p.item;
360 
361                 if (item != null && p.casItem(item, null)) {
362                     // Successful CAS is the linearization point
363                     // for item to be removed from this queue.
364                     if (p != h) // hop two nodes at a time
365                         updateHead(h, ((q = p.next) != null) ? q : p);
366                     return item;
367                 }
368                 else if ((q = p.next) == null) {
369                     updateHead(h, p);
370                     return null;
371                 }
372                 else if (p == q)
373                     continue restartFromHead;
374                 else
375                     p = q;
376             }
377         }
378     }
379 
380     public E peek() {
381         restartFromHead:
382         for (;;) {
383             for (Node<E> h = head, p = h, q;;) {
384                 E item = p.item;
385                 if (item != null || (q = p.next) == null) {
386                     updateHead(h, p);
387                     return item;
388                 }
389                 else if (p == q)
390                     continue restartFromHead;
391                 else
392                     p = q;
393             }
394         }
395     }
396 
397     /**
398      * Returns the first live (non-deleted) node on list, or null if none.
399      * This is yet another variant of poll/peek; here returning the
400      * first node, not element.  We could make peek() a wrapper around
401      * first(), but that would cost an extra volatile read of item,
402      * and the need to add a retry loop to deal with the possibility
403      * of losing a race to a concurrent poll().
404      */
405     Node<E> first() {
406         restartFromHead:
407         for (;;) {
408             for (Node<E> h = head, p = h, q;;) {
409                 boolean hasItem = (p.item != null);
410                 if (hasItem || (q = p.next) == null) {
411                     updateHead(h, p);
412                     return hasItem ? p : null;
413                 }
414                 else if (p == q)
415                     continue restartFromHead;
416                 else
417                     p = q;
418             }
419         }
420     }
421 
422     /**
423      * Returns {@code true} if this queue contains no elements.
424      *
425      * @return {@code true} if this queue contains no elements
426      */
427     public boolean isEmpty() {
428         return first() == null;
429     }
430 
431     /**
432      * Returns the number of elements in this queue.  If this queue
433      * contains more than {@code Integer.MAX_VALUE} elements, returns
434      * {@code Integer.MAX_VALUE}.
435      *
436      * <p>Beware that, unlike in most collections, this method is
437      * <em>NOT</em> a constant-time operation. Because of the
438      * asynchronous nature of these queues, determining the current
439      * number of elements requires an O(n) traversal.
440      * Additionally, if elements are added or removed during execution
441      * of this method, the returned result may be inaccurate.  Thus,
442      * this method is typically not very useful in concurrent
443      * applications.
444      *
445      * @return the number of elements in this queue
446      */
447     public int size() {
448         int count = 0;
449         for (Node<E> p = first(); p != null; p = succ(p))
450             if (p.item != null)
451                 // Collection.size() spec says to max out
452                 if (++count == Integer.MAX_VALUE)
453                     break;
454         return count;
455     }
456 
457     /**
458      * Returns {@code true} if this queue contains the specified element.
459      * More formally, returns {@code true} if and only if this queue contains
460      * at least one element {@code e} such that {@code o.equals(e)}.
461      *
462      * @param o object to be checked for containment in this queue
463      * @return {@code true} if this queue contains the specified element
464      */
465     public boolean contains(Object o) {
466         if (o == null) return false;
467         for (Node<E> p = first(); p != null; p = succ(p)) {
468             E item = p.item;
469             if (item != null && o.equals(item))
470                 return true;
471         }
472         return false;
473     }
474 
475     /**
476      * Removes a single instance of the specified element from this queue,
477      * if it is present.  More formally, removes an element {@code e} such
478      * that {@code o.equals(e)}, if this queue contains one or more such
479      * elements.
480      * Returns {@code true} if this queue contained the specified element
481      * (or equivalently, if this queue changed as a result of the call).
482      *
483      * @param o element to be removed from this queue, if present
484      * @return {@code true} if this queue changed as a result of the call
485      */
486     public boolean remove(Object o) {
487         if (o == null) return false;
488         Node<E> pred = null;
489         for (Node<E> p = first(); p != null; p = succ(p)) {
490             E item = p.item;
491             if (item != null &&
492                 o.equals(item) &&
493                 p.casItem(item, null)) {
494                 Node<E> next = succ(p);
495                 if (pred != null && next != null)
496                     pred.casNext(p, next);
497                 return true;
498             }
499             pred = p;
500         }
501         return false;
502     }
503 
504     /**
505      * Appends all of the elements in the specified collection to the end of
506      * this queue, in the order that they are returned by the specified
507      * collection's iterator.  Attempts to {@code addAll} of a queue to
508      * itself result in {@code IllegalArgumentException}.
509      *
510      * @param c the elements to be inserted into this queue
511      * @return {@code true} if this queue changed as a result of the call
512      * @throws NullPointerException if the specified collection or any
513      *         of its elements are null
514      * @throws IllegalArgumentException if the collection is this queue
515      */
516     public boolean addAll(Collection<? extends E> c) {
517         if (c == this)
518             // As historically specified in AbstractQueue#addAll
519             throw new IllegalArgumentException();
520 
521         // Copy c into a private chain of Nodes
522         Node<E> beginningOfTheEnd = null, last = null;
523         for (E e : c) {
524             checkNotNull(e);
525             Node<E> newNode = new Node<E>(e);
526             if (beginningOfTheEnd == null)
527                 beginningOfTheEnd = last = newNode;
528             else {
529                 last.lazySetNext(newNode);
530                 last = newNode;
531             }
532         }
533         if (beginningOfTheEnd == null)
534             return false;
535 
536         // Atomically append the chain at the tail of this collection
537         for (Node<E> t = tail, p = t;;) {
538             Node<E> q = p.next;
539             if (q == null) {
540                 // p is last node
541                 if (p.casNext(null, beginningOfTheEnd)) {
542                     // Successful CAS is the linearization point
543                     // for all elements to be added to this queue.
544                     if (!casTail(t, last)) {
545                         // Try a little harder to update tail,
546                         // since we may be adding many elements.
547                         t = tail;
548                         if (last.next == null)
549                             casTail(t, last);
550                     }
551                     return true;
552                 }
553                 // Lost CAS race to another thread; re-read next
554             }
555             else if (p == q)
556                 // We have fallen off list.  If tail is unchanged, it
557                 // will also be off-list, in which case we need to
558                 // jump to head, from which all live nodes are always
559                 // reachable.  Else the new tail is a better bet.
560                 p = (t != (t = tail)) ? t : head;
561             else
562                 // Check for tail updates after two hops.
563                 p = (p != t && t != (t = tail)) ? t : q;
564         }
565     }
566 
567     /**
568      * Returns an array containing all of the elements in this queue, in
569      * proper sequence.
570      *
571      * <p>The returned array will be "safe" in that no references to it are
572      * maintained by this queue.  (In other words, this method must allocate
573      * a new array).  The caller is thus free to modify the returned array.
574      *
575      * <p>This method acts as bridge between array-based and collection-based
576      * APIs.
577      *
578      * @return an array containing all of the elements in this queue
579      */
580     public Object[] toArray() {
581         // Use ArrayList to deal with resizing.
582         ArrayList<E> al = new ArrayList<E>();
583         for (Node<E> p = first(); p != null; p = succ(p)) {
584             E item = p.item;
585             if (item != null)
586                 al.add(item);
587         }
588         return al.toArray();
589     }
590 
591     /**
592      * Returns an array containing all of the elements in this queue, in
593      * proper sequence; the runtime type of the returned array is that of
594      * the specified array.  If the queue fits in the specified array, it
595      * is returned therein.  Otherwise, a new array is allocated with the
596      * runtime type of the specified array and the size of this queue.
597      *
598      * <p>If this queue fits in the specified array with room to spare
599      * (i.e., the array has more elements than this queue), the element in
600      * the array immediately following the end of the queue is set to
601      * {@code null}.
602      *
603      * <p>Like the {@link #toArray()} method, this method acts as bridge between
604      * array-based and collection-based APIs.  Further, this method allows
605      * precise control over the runtime type of the output array, and may,
606      * under certain circumstances, be used to save allocation costs.
607      *
608      * <p>Suppose {@code x} is a queue known to contain only strings.
609      * The following code can be used to dump the queue into a newly
610      * allocated array of {@code String}:
611      *
612      * <pre>
613      *     String[] y = x.toArray(new String[0]);</pre>
614      *
615      * Note that {@code toArray(new Object[0])} is identical in function to
616      * {@code toArray()}.
617      *
618      * @param a the array into which the elements of the queue are to
619      *          be stored, if it is big enough; otherwise, a new array of the
620      *          same runtime type is allocated for this purpose
621      * @return an array containing all of the elements in this queue
622      * @throws ArrayStoreException if the runtime type of the specified array
623      *         is not a supertype of the runtime type of every element in
624      *         this queue
625      * @throws NullPointerException if the specified array is null
626      */
627     @SuppressWarnings("unchecked")
628     public <T> T[] toArray(T[] a) {
629         // try to use sent-in array
630         int k = 0;
631         Node<E> p;
632         for (p = first(); p != null && k < a.length; p = succ(p)) {
633             E item = p.item;
634             if (item != null)
635                 a[k++] = (T)item;
636         }
637         if (p == null) {
638             if (k < a.length)
639                 a[k] = null;
640             return a;
641         }
642 
643         // If won't fit, use ArrayList version
644         ArrayList<E> al = new ArrayList<E>();
645         for (Node<E> q = first(); q != null; q = succ(q)) {
646             E item = q.item;
647             if (item != null)
648                 al.add(item);
649         }
650         return al.toArray(a);
651     }
652 
653     /**
654      * Returns an iterator over the elements in this queue in proper sequence.
655      * The elements will be returned in order from first (head) to last (tail).
656      *
657      * <p>The returned iterator is a "weakly consistent" iterator that
658      * will never throw {@link java.util.ConcurrentModificationException
659      * ConcurrentModificationException}, and guarantees to traverse
660      * elements as they existed upon construction of the iterator, and
661      * may (but is not guaranteed to) reflect any modifications
662      * subsequent to construction.
663      *
664      * @return an iterator over the elements in this queue in proper sequence
665      */
666     public Iterator<E> iterator() {
667         return new Itr();
668     }
669 
670     private class Itr implements Iterator<E> {
671         /**
672          * Next node to return item for.
673          */
674         private Node<E> nextNode;
675 
676         /**
677          * nextItem holds on to item fields because once we claim
678          * that an element exists in hasNext(), we must return it in
679          * the following next() call even if it was in the process of
680          * being removed when hasNext() was called.
681          */
682         private E nextItem;
683 
684         /**
685          * Node of the last returned item, to support remove.
686          */
687         private Node<E> lastRet;
688 
689         Itr() {
690             advance();
691         }
692 
693         /**
694          * Moves to next valid node and returns item to return for
695          * next(), or null if no such.
696          */
697         private E advance() {
698             lastRet = nextNode;
699             E x = nextItem;
700 
701             Node<E> pred, p;
702             if (nextNode == null) {
703                 p = first();
704                 pred = null;
705             } else {
706                 pred = nextNode;
707                 p = succ(nextNode);
708             }
709 
710             for (;;) {
711                 if (p == null) {
712                     nextNode = null;
713                     nextItem = null;
714                     return x;
715                 }
716                 E item = p.item;
717                 if (item != null) {
718                     nextNode = p;
719                     nextItem = item;
720                     return x;
721                 } else {
722                     // skip over nulls
723                     Node<E> next = succ(p);
724                     if (pred != null && next != null)
725                         pred.casNext(p, next);
726                     p = next;
727                 }
728             }
729         }
730 
731         public boolean hasNext() {
732             return nextNode != null;
733         }
734 
735         public E next() {
736             if (nextNode == null) throw new NoSuchElementException();
737             return advance();
738         }
739 
740         public void remove() {
741             Node<E> l = lastRet;
742             if (l == null) throw new IllegalStateException();
743             // rely on a future traversal to relink.
744             l.item = null;
745             lastRet = null;
746         }
747     }
748 
749     /**
750      * Saves the state to a stream (that is, serializes it).
751      *
752      * @serialData All of the elements (each an {@code E}) in
753      * the proper order, followed by a null
754      * @param s the stream
755      */
756     private void writeObject(java.io.ObjectOutputStream s)
757         throws java.io.IOException {
758 
759         // Write out any hidden stuff
760         s.defaultWriteObject();
761 
762         // Write out all elements in the proper order.
763         for (Node<E> p = first(); p != null; p = succ(p)) {
764             Object item = p.item;
765             if (item != null)
766                 s.writeObject(item);
767         }
768 
769         // Use trailing null as sentinel
770         s.writeObject(null);
771     }
772 
773     /**
774      * Reconstitutes the instance from a stream (that is, deserializes it).
775      * @param s the stream
776      */
777     private void readObject(java.io.ObjectInputStream s)
778         throws java.io.IOException, ClassNotFoundException {
779         s.defaultReadObject();
780 
781         // Read in elements until trailing null sentinel found
782         Node<E> h = null, t = null;
783         Object item;
784         while ((item = s.readObject()) != null) {
785             @SuppressWarnings("unchecked")
786             Node<E> newNode = new Node<E>((E) item);
787             if (h == null)
788                 h = t = newNode;
789             else {
790                 t.lazySetNext(newNode);
791                 t = newNode;
792             }
793         }
794         if (h == null)
795             h = t = new Node<E>(null);
796         head = h;
797         tail = t;
798     }
799 
800     /**
801      * Throws NullPointerException if argument is null.
802      *
803      * @param v the element
804      */
805     private static void checkNotNull(Object v) {
806         if (v == null)
807             throw new NullPointerException();
808     }
809 
810     private boolean casTail(Node<E> cmp, Node<E> val) {
811         return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
812     }
813 
814     private boolean casHead(Node<E> cmp, Node<E> val) {
815         return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
816     }
817 
818     // Unsafe mechanics
819 
820     private static final sun.misc.Unsafe UNSAFE;
821     private static final long headOffset;
822     private static final long tailOffset;
823     static {
824         try {
825             UNSAFE = sun.misc.Unsafe.getUnsafe();
826             Class k = ConcurrentLinkedQueue.class;
827             headOffset = UNSAFE.objectFieldOffset
828                 (k.getDeclaredField("head"));
829             tailOffset = UNSAFE.objectFieldOffset
830                 (k.getDeclaredField("tail"));
831         } catch (Exception e) {
832             throw new Error(e);
833         }
834     }
835 }

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

 

下面从ConcurrentLinkedQueue的创建,添加,删除这几个方面对它进行分析。

1 创建

下面以ConcurrentLinkedQueue()来进行说明。

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

说明:在构造函数中,新建了一个“内容为null的节点”,并设置表头head和表尾tail的值为新节点。

head和tail的定义如下:

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

head和tail都是volatile类型,他们具有volatile赋予的含义:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。

Node的声明如下:

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

说明
Node是个单向链表节点,next用于指向下一个Node,item用于存储数据。Node中操作节点数据的API,都是通过Unsafe机制的CAS函数实现的;例如casNext()是通过CAS函数“比较并设置节点的下一个节点”。

 

2. 添加

下面以add(E e)为例对ConcurrentLinkedQueue中的添加进行说明。

public boolean add(E e) {
    return offer(e);
}

说明:add()实际上是调用的offer()来完成添加操作的。

offer()的源码如下:

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

public boolean offer(E e) {
    // 检查e是不是null,是的话抛出NullPointerException异常。
    checkNotNull(e);
    // 创建新的节点
    final Node<E> newNode = new Node<E>(e);

    // 将“新的节点”添加到链表的末尾。
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // 情况1:q为空
        if (q == null) {
            // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。
            // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。
            // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。
            if (p.casNext(null, newNode)) {
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        // 情况2:p和q相等
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        // 情况3:其它
        else
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

说明offer(E e)的作用就是将元素e添加到链表的末尾。offer()比较的地方是理解for循环,下面区分3种情况对for进行分析。

情况1 — q为空。这意味着q是尾节点的下一个节点。此时,通过p.casNext(null, newNode)将“p的下一个节点设为newNode”,若设置成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。否则的话(意味着“其它线程对尾节点进行了修改”),什么也不做,继续进行for循环。
p.casNext(null, newNode),是调用CAS对p进行操作。若“p的下一个节点等于null”,则设置“p的下一个节点等于newNode”;设置成功的话,返回true,失败的话返回false。

情况2 — p和q相等。这种情况什么时候会发生呢?通过“情况3”,我们知道,经过“情况3”的处理后,p的值可能等于q。
此时,若尾节点没有发生变化的话,那么,应该是头节点发生了变化,则设置p为头节点,然后重新遍历链表;否则(尾节点变化的话),则设置p为尾节点。

情况3 — 其它。
我们将p = (p != t && t != (t = tail)) ? t : q;转换成如下代码。

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

if (p==t) {
    p = q;
} else {
    Node<E> tmp=t;
    t = tail;
    if (tmp==t) {
        p=q;
    } else {
        p=t;
    }
}

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

如果p和t相等,则设置p为q。否则的话,判断“尾节点是否发生变化”,没有变化的话,则设置p为q;否则,设置p为尾节点。

checkNotNull()的源码如下:

private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}

 

3. 删除

下面以poll()为例对ConcurrentLinkedQueue中的删除进行说明。

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

public E poll() {
    // 设置“标记”
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            // 情况1
            // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
            // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
            if (item != null && p.casItem(item, null)) {
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 情况2
            // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // 情况3
            // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
            else if (p == q)
                continue restartFromHead;
            // 情况4
            // 设置p为q
            else
                p = q;
        }
    }
}

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

说明poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。

情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。
p.casItem(item, null) — 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。
在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。
updateHead()的源码如下:

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。
casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。
lazySetNext()的源码如下:

void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}

putOrderedObject()函数,我们在前面一章“TODO”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。

情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。
则调用updateHead(h, p),将表头更新p;然后返回null。

情况3:p=q
在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。

情况4:其它情况。
设置p=q。

 

ConcurrentLinkedQueue示例

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

 1 import java.util.*;
 2 import java.util.concurrent.*;
 3 
 4 /*
 5  *   ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 6  *
 7  *   下面是“多个线程同时操作并且遍历queue”的示例
 8  *   (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。
 9  *   (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
10  *
11  * @author skywang
12  */
13 public class ConcurrentLinkedQueueDemo1 {
14 
15     // TODO: queue是LinkedList对象时,程序会出错。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
18     public static void main(String[] args) {
19     
20         // 同时启动两个线程对queue进行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24 
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34 
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “线程名” + "-" + "序号"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通过“Iterator”遍历queue。
47                 printAll();
48             }
49         }
50     }
51 }

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

(某一次)运行结果

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

ta1, ta1, tb1, tb1,

ta1, ta1, tb1, tb1, ta2, ta2, tb2, 
tb2, 
ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, 
ta3, ta1, tb3, tb1, ta4, 
ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, 
tb3, ta1, ta4, tb1, tb4, ta2, ta5, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, 
ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, 
tb4, ta5, tb5, ta6, tb6, 

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

结果说明如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。

 

 

概要

本章对Java.util.concurrent包中的ConcurrentHashMap类进行详细的介绍。内容包括:
ConcurrentLinkedQueue介绍
ConcurrentLinkedQueue原理和数据结构

ConcurrentLinkedQueue函数列表
ConcurrentLinkedQueue源码分析(JDK1.7.0_40版本)
ConcurrentLinkedQueue示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3498995.html

 

ConcurrentLinkedQueue介绍

ConcurrentLinkedQueue是线程安全的队列,它适用于“高并发”的场景。
它是一个基于链接节点的无界线程安全队列,按照 FIFO(先进先出)原则对元素进行排序。队列元素中不可以放置null元素(内部实现的特殊节点除外)。

 

ConcurrentLinkedQueue原理和数据结构

ConcurrentLinkedQueue的数据结构,如下图所示:

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

说明
1. ConcurrentLinkedQueue继承于AbstractQueue。
2. ConcurrentLinkedQueue内部是通过链表来实现的。它同时包含链表的头节点head和尾节点tail。ConcurrentLinkedQueue按照 FIFO(先进先出)原则对元素进行排序。元素都是从尾部插入到链表,从头部开始返回。
3. ConcurrentLinkedQueue的链表Node中的next的类型是volatile,而且链表数据item的类型也是volatile。关于volatile,我们知道它的语义包含:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。ConcurrentLinkedQueue就是通过volatile来实现多线程对竞争资源的互斥访问的。

 

ConcurrentLinkedQueue函数列表

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

// 创建一个最初为空的 ConcurrentLinkedQueue。
ConcurrentLinkedQueue()
// 创建一个最初包含给定 collection 元素的 ConcurrentLinkedQueue,按照此 collection 迭代器的遍历顺序来添加元素。
ConcurrentLinkedQueue(Collection<? extends E> c)

// 将指定元素插入此队列的尾部。
boolean add(E e)
// 如果此队列包含指定元素,则返回 true。
boolean contains(Object o)
// 如果此队列不包含任何元素,则返回 true。
boolean isEmpty()
// 返回在此队列元素上以恰当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入此队列的尾部。
boolean offer(E e)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 从队列中移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回此队列中的元素数量。
int size()
// 返回以恰当顺序包含此队列所有元素的数组。
Object[] toArray()
// 返回以恰当顺序包含此队列所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

 

ConcurrentLinkedQueue源码分析(JDK1.7.0_40版本)

ConcurrentLinkedQueue的完整源码如下:

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》
《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

  1 /*
  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3  *
  4  *
  5  *
  6  *
  7  *
  8  *
  9  *
 10  *
 11  *
 12  *
 13  *
 14  *
 15  *
 16  *
 17  *
 18  *
 19  *
 20  *
 21  *
 22  *
 23  */
 24 
 25 /*
 26  *
 27  *
 28  *
 29  *
 30  *
 31  * Written by Doug Lea and Martin Buchholz with assistance from members of
 32  * JCP JSR-166 Expert Group and released to the public domain, as explained
 33  * at http://creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;
 37 
 38 import java.util.AbstractQueue;
 39 import java.util.ArrayList;
 40 import java.util.Collection;
 41 import java.util.Iterator;
 42 import java.util.NoSuchElementException;
 43 import java.util.Queue;
 44 
 45 /**
 46  * An unbounded thread-safe {@linkplain Queue queue} based on linked nodes.
 47  * This queue orders elements FIFO (first-in-first-out).
 48  * The <em>head</em> of the queue is that element that has been on the
 49  * queue the longest time.
 50  * The <em>tail</em> of the queue is that element that has been on the
 51  * queue the shortest time. New elements
 52  * are inserted at the tail of the queue, and the queue retrieval
 53  * operations obtain elements at the head of the queue.
 54  * A {@code ConcurrentLinkedQueue} is an appropriate choice when
 55  * many threads will share access to a common collection.
 56  * Like most other concurrent collection implementations, this class
 57  * does not permit the use of {@code null} elements.
 58  *
 59  * <p>This implementation employs an efficient &quot;wait-free&quot;
 60  * algorithm based on one described in <a
 61  * href="http://www.cs.rochester.edu/u/michael/PODC96.html"> Simple,
 62  * Fast, and Practical Non-Blocking and Blocking Concurrent Queue
 63  * Algorithms</a> by Maged M. Michael and Michael L. Scott.
 64  *
 65  * <p>Iterators are <i>weakly consistent</i>, returning elements
 66  * reflecting the state of the queue at some point at or since the
 67  * creation of the iterator.  They do <em>not</em> throw {@link
 68  * java.util.ConcurrentModificationException}, and may proceed concurrently
 69  * with other operations.  Elements contained in the queue since the creation
 70  * of the iterator will be returned exactly once.
 71  *
 72  * <p>Beware that, unlike in most collections, the {@code size} method
 73  * is <em>NOT</em> a constant-time operation. Because of the
 74  * asynchronous nature of these queues, determining the current number
 75  * of elements requires a traversal of the elements, and so may report
 76  * inaccurate results if this collection is modified during traversal.
 77  * Additionally, the bulk operations {@code addAll},
 78  * {@code removeAll}, {@code retainAll}, {@code containsAll},
 79  * {@code equals}, and {@code toArray} are <em>not</em> guaranteed
 80  * to be performed atomically. For example, an iterator operating
 81  * concurrently with an {@code addAll} operation might view only some
 82  * of the added elements.
 83  *
 84  * <p>This class and its iterator implement all of the <em>optional</em>
 85  * methods of the {@link Queue} and {@link Iterator} interfaces.
 86  *
 87  * <p>Memory consistency effects: As with other concurrent
 88  * collections, actions in a thread prior to placing an object into a
 89  * {@code ConcurrentLinkedQueue}
 90  * <a href="package-summary.html#MemoryVisibility"><i>happen-before</i></a>
 91  * actions subsequent to the access or removal of that element from
 92  * the {@code ConcurrentLinkedQueue} in another thread.
 93  *
 94  * <p>This class is a member of the
 95  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 96  * Java Collections Framework</a>.
 97  *
 98  * @since 1.5
 99  * @author Doug Lea
100  * @param <E> the type of elements held in this collection
101  *
102  */
103 public class ConcurrentLinkedQueue<E> extends AbstractQueue<E>
104         implements Queue<E>, java.io.Serializable {
105     private static final long serialVersionUID = 196745693267521676L;
106 
107     /*
108      * This is a modification of the Michael & Scott algorithm,
109      * adapted for a garbage-collected environment, with support for
110      * interior node deletion (to support remove(Object)).  For
111      * explanation, read the paper.
112      *
113      * Note that like most non-blocking algorithms in this package,
114      * this implementation relies on the fact that in garbage
115      * collected systems, there is no possibility of ABA problems due
116      * to recycled nodes, so there is no need to use "counted
117      * pointers" or related techniques seen in versions used in
118      * non-GC'ed settings.
119      *
120      * The fundamental invariants are:
121      * - There is exactly one (last) Node with a null next reference,
122      *   which is CASed when enqueueing.  This last Node can be
123      *   reached in O(1) time from tail, but tail is merely an
124      *   optimization - it can always be reached in O(N) time from
125      *   head as well.
126      * - The elements contained in the queue are the non-null items in
127      *   Nodes that are reachable from head.  CASing the item
128      *   reference of a Node to null atomically removes it from the
129      *   queue.  Reachability of all elements from head must remain
130      *   true even in the case of concurrent modifications that cause
131      *   head to advance.  A dequeued Node may remain in use
132      *   indefinitely due to creation of an Iterator or simply a
133      *   poll() that has lost its time slice.
134      *
135      * The above might appear to imply that all Nodes are GC-reachable
136      * from a predecessor dequeued Node.  That would cause two problems:
137      * - allow a rogue Iterator to cause unbounded memory retention
138      * - cause cross-generational linking of old Nodes to new Nodes if
139      *   a Node was tenured while live, which generational GCs have a
140      *   hard time dealing with, causing repeated major collections.
141      * However, only non-deleted Nodes need to be reachable from
142      * dequeued Nodes, and reachability does not necessarily have to
143      * be of the kind understood by the GC.  We use the trick of
144      * linking a Node that has just been dequeued to itself.  Such a
145      * self-link implicitly means to advance to head.
146      *
147      * Both head and tail are permitted to lag.  In fact, failing to
148      * update them every time one could is a significant optimization
149      * (fewer CASes). As with LinkedTransferQueue (see the internal
150      * documentation for that class), we use a slack threshold of two;
151      * that is, we update head/tail when the current pointer appears
152      * to be two or more steps away from the first/last node.
153      *
154      * Since head and tail are updated concurrently and independently,
155      * it is possible for tail to lag behind head (why not)?
156      *
157      * CASing a Node's item reference to null atomically removes the
158      * element from the queue.  Iterators skip over Nodes with null
159      * items.  Prior implementations of this class had a race between
160      * poll() and remove(Object) where the same element would appear
161      * to be successfully removed by two concurrent operations.  The
162      * method remove(Object) also lazily unlinks deleted Nodes, but
163      * this is merely an optimization.
164      *
165      * When constructing a Node (before enqueuing it) we avoid paying
166      * for a volatile write to item by using Unsafe.putObject instead
167      * of a normal write.  This allows the cost of enqueue to be
168      * "one-and-a-half" CASes.
169      *
170      * Both head and tail may or may not point to a Node with a
171      * non-null item.  If the queue is empty, all items must of course
172      * be null.  Upon creation, both head and tail refer to a dummy
173      * Node with null item.  Both head and tail are only updated using
174      * CAS, so they never regress, although again this is merely an
175      * optimization.
176      */
177 
178     private static class Node<E> {
179         volatile E item;
180         volatile Node<E> next;
181 
182         /**
183          * Constructs a new node.  Uses relaxed write because item can
184          * only be seen after publication via casNext.
185          */
186         Node(E item) {
187             UNSAFE.putObject(this, itemOffset, item);
188         }
189 
190         boolean casItem(E cmp, E val) {
191             return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
192         }
193 
194         void lazySetNext(Node<E> val) {
195             UNSAFE.putOrderedObject(this, nextOffset, val);
196         }
197 
198         boolean casNext(Node<E> cmp, Node<E> val) {
199             return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
200         }
201 
202         // Unsafe mechanics
203 
204         private static final sun.misc.Unsafe UNSAFE;
205         private static final long itemOffset;
206         private static final long nextOffset;
207 
208         static {
209             try {
210                 UNSAFE = sun.misc.Unsafe.getUnsafe();
211                 Class k = Node.class;
212                 itemOffset = UNSAFE.objectFieldOffset
213                     (k.getDeclaredField("item"));
214                 nextOffset = UNSAFE.objectFieldOffset
215                     (k.getDeclaredField("next"));
216             } catch (Exception e) {
217                 throw new Error(e);
218             }
219         }
220     }
221 
222     /**
223      * A node from which the first live (non-deleted) node (if any)
224      * can be reached in O(1) time.
225      * Invariants:
226      * - all live nodes are reachable from head via succ()
227      * - head != null
228      * - (tmp = head).next != tmp || tmp != head
229      * Non-invariants:
230      * - head.item may or may not be null.
231      * - it is permitted for tail to lag behind head, that is, for tail
232      *   to not be reachable from head!
233      */
234     private transient volatile Node<E> head;
235 
236     /**
237      * A node from which the last node on list (that is, the unique
238      * node with node.next == null) can be reached in O(1) time.
239      * Invariants:
240      * - the last node is always reachable from tail via succ()
241      * - tail != null
242      * Non-invariants:
243      * - tail.item may or may not be null.
244      * - it is permitted for tail to lag behind head, that is, for tail
245      *   to not be reachable from head!
246      * - tail.next may or may not be self-pointing to tail.
247      */
248     private transient volatile Node<E> tail;
249 
250 
251     /**
252      * Creates a {@code ConcurrentLinkedQueue} that is initially empty.
253      */
254     public ConcurrentLinkedQueue() {
255         head = tail = new Node<E>(null);
256     }
257 
258     /**
259      * Creates a {@code ConcurrentLinkedQueue}
260      * initially containing the elements of the given collection,
261      * added in traversal order of the collection's iterator.
262      *
263      * @param c the collection of elements to initially contain
264      * @throws NullPointerException if the specified collection or any
265      *         of its elements are null
266      */
267     public ConcurrentLinkedQueue(Collection<? extends E> c) {
268         Node<E> h = null, t = null;
269         for (E e : c) {
270             checkNotNull(e);
271             Node<E> newNode = new Node<E>(e);
272             if (h == null)
273                 h = t = newNode;
274             else {
275                 t.lazySetNext(newNode);
276                 t = newNode;
277             }
278         }
279         if (h == null)
280             h = t = new Node<E>(null);
281         head = h;
282         tail = t;
283     }
284 
285     // Have to override just to update the javadoc
286 
287     /**
288      * Inserts the specified element at the tail of this queue.
289      * As the queue is unbounded, this method will never throw
290      * {@link IllegalStateException} or return {@code false}.
291      *
292      * @return {@code true} (as specified by {@link Collection#add})
293      * @throws NullPointerException if the specified element is null
294      */
295     public boolean add(E e) {
296         return offer(e);
297     }
298 
299     /**
300      * Try to CAS head to p. If successful, repoint old head to itself
301      * as sentinel for succ(), below.
302      */
303     final void updateHead(Node<E> h, Node<E> p) {
304         if (h != p && casHead(h, p))
305             h.lazySetNext(h);
306     }
307 
308     /**
309      * Returns the successor of p, or the head node if p.next has been
310      * linked to self, which will only be true if traversing with a
311      * stale pointer that is now off the list.
312      */
313     final Node<E> succ(Node<E> p) {
314         Node<E> next = p.next;
315         return (p == next) ? head : next;
316     }
317 
318     /**
319      * Inserts the specified element at the tail of this queue.
320      * As the queue is unbounded, this method will never return {@code false}.
321      *
322      * @return {@code true} (as specified by {@link Queue#offer})
323      * @throws NullPointerException if the specified element is null
324      */
325     public boolean offer(E e) {
326         checkNotNull(e);
327         final Node<E> newNode = new Node<E>(e);
328 
329         for (Node<E> t = tail, p = t;;) {
330             Node<E> q = p.next;
331             if (q == null) {
332                 // p is last node
333                 if (p.casNext(null, newNode)) {
334                     // Successful CAS is the linearization point
335                     // for e to become an element of this queue,
336                     // and for newNode to become "live".
337                     if (p != t) // hop two nodes at a time
338                         casTail(t, newNode);  // Failure is OK.
339                     return true;
340                 }
341                 // Lost CAS race to another thread; re-read next
342             }
343             else if (p == q)
344                 // We have fallen off list.  If tail is unchanged, it
345                 // will also be off-list, in which case we need to
346                 // jump to head, from which all live nodes are always
347                 // reachable.  Else the new tail is a better bet.
348                 p = (t != (t = tail)) ? t : head;
349             else
350                 // Check for tail updates after two hops.
351                 p = (p != t && t != (t = tail)) ? t : q;
352         }
353     }
354 
355     public E poll() {
356         restartFromHead:
357         for (;;) {
358             for (Node<E> h = head, p = h, q;;) {
359                 E item = p.item;
360 
361                 if (item != null && p.casItem(item, null)) {
362                     // Successful CAS is the linearization point
363                     // for item to be removed from this queue.
364                     if (p != h) // hop two nodes at a time
365                         updateHead(h, ((q = p.next) != null) ? q : p);
366                     return item;
367                 }
368                 else if ((q = p.next) == null) {
369                     updateHead(h, p);
370                     return null;
371                 }
372                 else if (p == q)
373                     continue restartFromHead;
374                 else
375                     p = q;
376             }
377         }
378     }
379 
380     public E peek() {
381         restartFromHead:
382         for (;;) {
383             for (Node<E> h = head, p = h, q;;) {
384                 E item = p.item;
385                 if (item != null || (q = p.next) == null) {
386                     updateHead(h, p);
387                     return item;
388                 }
389                 else if (p == q)
390                     continue restartFromHead;
391                 else
392                     p = q;
393             }
394         }
395     }
396 
397     /**
398      * Returns the first live (non-deleted) node on list, or null if none.
399      * This is yet another variant of poll/peek; here returning the
400      * first node, not element.  We could make peek() a wrapper around
401      * first(), but that would cost an extra volatile read of item,
402      * and the need to add a retry loop to deal with the possibility
403      * of losing a race to a concurrent poll().
404      */
405     Node<E> first() {
406         restartFromHead:
407         for (;;) {
408             for (Node<E> h = head, p = h, q;;) {
409                 boolean hasItem = (p.item != null);
410                 if (hasItem || (q = p.next) == null) {
411                     updateHead(h, p);
412                     return hasItem ? p : null;
413                 }
414                 else if (p == q)
415                     continue restartFromHead;
416                 else
417                     p = q;
418             }
419         }
420     }
421 
422     /**
423      * Returns {@code true} if this queue contains no elements.
424      *
425      * @return {@code true} if this queue contains no elements
426      */
427     public boolean isEmpty() {
428         return first() == null;
429     }
430 
431     /**
432      * Returns the number of elements in this queue.  If this queue
433      * contains more than {@code Integer.MAX_VALUE} elements, returns
434      * {@code Integer.MAX_VALUE}.
435      *
436      * <p>Beware that, unlike in most collections, this method is
437      * <em>NOT</em> a constant-time operation. Because of the
438      * asynchronous nature of these queues, determining the current
439      * number of elements requires an O(n) traversal.
440      * Additionally, if elements are added or removed during execution
441      * of this method, the returned result may be inaccurate.  Thus,
442      * this method is typically not very useful in concurrent
443      * applications.
444      *
445      * @return the number of elements in this queue
446      */
447     public int size() {
448         int count = 0;
449         for (Node<E> p = first(); p != null; p = succ(p))
450             if (p.item != null)
451                 // Collection.size() spec says to max out
452                 if (++count == Integer.MAX_VALUE)
453                     break;
454         return count;
455     }
456 
457     /**
458      * Returns {@code true} if this queue contains the specified element.
459      * More formally, returns {@code true} if and only if this queue contains
460      * at least one element {@code e} such that {@code o.equals(e)}.
461      *
462      * @param o object to be checked for containment in this queue
463      * @return {@code true} if this queue contains the specified element
464      */
465     public boolean contains(Object o) {
466         if (o == null) return false;
467         for (Node<E> p = first(); p != null; p = succ(p)) {
468             E item = p.item;
469             if (item != null && o.equals(item))
470                 return true;
471         }
472         return false;
473     }
474 
475     /**
476      * Removes a single instance of the specified element from this queue,
477      * if it is present.  More formally, removes an element {@code e} such
478      * that {@code o.equals(e)}, if this queue contains one or more such
479      * elements.
480      * Returns {@code true} if this queue contained the specified element
481      * (or equivalently, if this queue changed as a result of the call).
482      *
483      * @param o element to be removed from this queue, if present
484      * @return {@code true} if this queue changed as a result of the call
485      */
486     public boolean remove(Object o) {
487         if (o == null) return false;
488         Node<E> pred = null;
489         for (Node<E> p = first(); p != null; p = succ(p)) {
490             E item = p.item;
491             if (item != null &&
492                 o.equals(item) &&
493                 p.casItem(item, null)) {
494                 Node<E> next = succ(p);
495                 if (pred != null && next != null)
496                     pred.casNext(p, next);
497                 return true;
498             }
499             pred = p;
500         }
501         return false;
502     }
503 
504     /**
505      * Appends all of the elements in the specified collection to the end of
506      * this queue, in the order that they are returned by the specified
507      * collection's iterator.  Attempts to {@code addAll} of a queue to
508      * itself result in {@code IllegalArgumentException}.
509      *
510      * @param c the elements to be inserted into this queue
511      * @return {@code true} if this queue changed as a result of the call
512      * @throws NullPointerException if the specified collection or any
513      *         of its elements are null
514      * @throws IllegalArgumentException if the collection is this queue
515      */
516     public boolean addAll(Collection<? extends E> c) {
517         if (c == this)
518             // As historically specified in AbstractQueue#addAll
519             throw new IllegalArgumentException();
520 
521         // Copy c into a private chain of Nodes
522         Node<E> beginningOfTheEnd = null, last = null;
523         for (E e : c) {
524             checkNotNull(e);
525             Node<E> newNode = new Node<E>(e);
526             if (beginningOfTheEnd == null)
527                 beginningOfTheEnd = last = newNode;
528             else {
529                 last.lazySetNext(newNode);
530                 last = newNode;
531             }
532         }
533         if (beginningOfTheEnd == null)
534             return false;
535 
536         // Atomically append the chain at the tail of this collection
537         for (Node<E> t = tail, p = t;;) {
538             Node<E> q = p.next;
539             if (q == null) {
540                 // p is last node
541                 if (p.casNext(null, beginningOfTheEnd)) {
542                     // Successful CAS is the linearization point
543                     // for all elements to be added to this queue.
544                     if (!casTail(t, last)) {
545                         // Try a little harder to update tail,
546                         // since we may be adding many elements.
547                         t = tail;
548                         if (last.next == null)
549                             casTail(t, last);
550                     }
551                     return true;
552                 }
553                 // Lost CAS race to another thread; re-read next
554             }
555             else if (p == q)
556                 // We have fallen off list.  If tail is unchanged, it
557                 // will also be off-list, in which case we need to
558                 // jump to head, from which all live nodes are always
559                 // reachable.  Else the new tail is a better bet.
560                 p = (t != (t = tail)) ? t : head;
561             else
562                 // Check for tail updates after two hops.
563                 p = (p != t && t != (t = tail)) ? t : q;
564         }
565     }
566 
567     /**
568      * Returns an array containing all of the elements in this queue, in
569      * proper sequence.
570      *
571      * <p>The returned array will be "safe" in that no references to it are
572      * maintained by this queue.  (In other words, this method must allocate
573      * a new array).  The caller is thus free to modify the returned array.
574      *
575      * <p>This method acts as bridge between array-based and collection-based
576      * APIs.
577      *
578      * @return an array containing all of the elements in this queue
579      */
580     public Object[] toArray() {
581         // Use ArrayList to deal with resizing.
582         ArrayList<E> al = new ArrayList<E>();
583         for (Node<E> p = first(); p != null; p = succ(p)) {
584             E item = p.item;
585             if (item != null)
586                 al.add(item);
587         }
588         return al.toArray();
589     }
590 
591     /**
592      * Returns an array containing all of the elements in this queue, in
593      * proper sequence; the runtime type of the returned array is that of
594      * the specified array.  If the queue fits in the specified array, it
595      * is returned therein.  Otherwise, a new array is allocated with the
596      * runtime type of the specified array and the size of this queue.
597      *
598      * <p>If this queue fits in the specified array with room to spare
599      * (i.e., the array has more elements than this queue), the element in
600      * the array immediately following the end of the queue is set to
601      * {@code null}.
602      *
603      * <p>Like the {@link #toArray()} method, this method acts as bridge between
604      * array-based and collection-based APIs.  Further, this method allows
605      * precise control over the runtime type of the output array, and may,
606      * under certain circumstances, be used to save allocation costs.
607      *
608      * <p>Suppose {@code x} is a queue known to contain only strings.
609      * The following code can be used to dump the queue into a newly
610      * allocated array of {@code String}:
611      *
612      * <pre>
613      *     String[] y = x.toArray(new String[0]);</pre>
614      *
615      * Note that {@code toArray(new Object[0])} is identical in function to
616      * {@code toArray()}.
617      *
618      * @param a the array into which the elements of the queue are to
619      *          be stored, if it is big enough; otherwise, a new array of the
620      *          same runtime type is allocated for this purpose
621      * @return an array containing all of the elements in this queue
622      * @throws ArrayStoreException if the runtime type of the specified array
623      *         is not a supertype of the runtime type of every element in
624      *         this queue
625      * @throws NullPointerException if the specified array is null
626      */
627     @SuppressWarnings("unchecked")
628     public <T> T[] toArray(T[] a) {
629         // try to use sent-in array
630         int k = 0;
631         Node<E> p;
632         for (p = first(); p != null && k < a.length; p = succ(p)) {
633             E item = p.item;
634             if (item != null)
635                 a[k++] = (T)item;
636         }
637         if (p == null) {
638             if (k < a.length)
639                 a[k] = null;
640             return a;
641         }
642 
643         // If won't fit, use ArrayList version
644         ArrayList<E> al = new ArrayList<E>();
645         for (Node<E> q = first(); q != null; q = succ(q)) {
646             E item = q.item;
647             if (item != null)
648                 al.add(item);
649         }
650         return al.toArray(a);
651     }
652 
653     /**
654      * Returns an iterator over the elements in this queue in proper sequence.
655      * The elements will be returned in order from first (head) to last (tail).
656      *
657      * <p>The returned iterator is a "weakly consistent" iterator that
658      * will never throw {@link java.util.ConcurrentModificationException
659      * ConcurrentModificationException}, and guarantees to traverse
660      * elements as they existed upon construction of the iterator, and
661      * may (but is not guaranteed to) reflect any modifications
662      * subsequent to construction.
663      *
664      * @return an iterator over the elements in this queue in proper sequence
665      */
666     public Iterator<E> iterator() {
667         return new Itr();
668     }
669 
670     private class Itr implements Iterator<E> {
671         /**
672          * Next node to return item for.
673          */
674         private Node<E> nextNode;
675 
676         /**
677          * nextItem holds on to item fields because once we claim
678          * that an element exists in hasNext(), we must return it in
679          * the following next() call even if it was in the process of
680          * being removed when hasNext() was called.
681          */
682         private E nextItem;
683 
684         /**
685          * Node of the last returned item, to support remove.
686          */
687         private Node<E> lastRet;
688 
689         Itr() {
690             advance();
691         }
692 
693         /**
694          * Moves to next valid node and returns item to return for
695          * next(), or null if no such.
696          */
697         private E advance() {
698             lastRet = nextNode;
699             E x = nextItem;
700 
701             Node<E> pred, p;
702             if (nextNode == null) {
703                 p = first();
704                 pred = null;
705             } else {
706                 pred = nextNode;
707                 p = succ(nextNode);
708             }
709 
710             for (;;) {
711                 if (p == null) {
712                     nextNode = null;
713                     nextItem = null;
714                     return x;
715                 }
716                 E item = p.item;
717                 if (item != null) {
718                     nextNode = p;
719                     nextItem = item;
720                     return x;
721                 } else {
722                     // skip over nulls
723                     Node<E> next = succ(p);
724                     if (pred != null && next != null)
725                         pred.casNext(p, next);
726                     p = next;
727                 }
728             }
729         }
730 
731         public boolean hasNext() {
732             return nextNode != null;
733         }
734 
735         public E next() {
736             if (nextNode == null) throw new NoSuchElementException();
737             return advance();
738         }
739 
740         public void remove() {
741             Node<E> l = lastRet;
742             if (l == null) throw new IllegalStateException();
743             // rely on a future traversal to relink.
744             l.item = null;
745             lastRet = null;
746         }
747     }
748 
749     /**
750      * Saves the state to a stream (that is, serializes it).
751      *
752      * @serialData All of the elements (each an {@code E}) in
753      * the proper order, followed by a null
754      * @param s the stream
755      */
756     private void writeObject(java.io.ObjectOutputStream s)
757         throws java.io.IOException {
758 
759         // Write out any hidden stuff
760         s.defaultWriteObject();
761 
762         // Write out all elements in the proper order.
763         for (Node<E> p = first(); p != null; p = succ(p)) {
764             Object item = p.item;
765             if (item != null)
766                 s.writeObject(item);
767         }
768 
769         // Use trailing null as sentinel
770         s.writeObject(null);
771     }
772 
773     /**
774      * Reconstitutes the instance from a stream (that is, deserializes it).
775      * @param s the stream
776      */
777     private void readObject(java.io.ObjectInputStream s)
778         throws java.io.IOException, ClassNotFoundException {
779         s.defaultReadObject();
780 
781         // Read in elements until trailing null sentinel found
782         Node<E> h = null, t = null;
783         Object item;
784         while ((item = s.readObject()) != null) {
785             @SuppressWarnings("unchecked")
786             Node<E> newNode = new Node<E>((E) item);
787             if (h == null)
788                 h = t = newNode;
789             else {
790                 t.lazySetNext(newNode);
791                 t = newNode;
792             }
793         }
794         if (h == null)
795             h = t = new Node<E>(null);
796         head = h;
797         tail = t;
798     }
799 
800     /**
801      * Throws NullPointerException if argument is null.
802      *
803      * @param v the element
804      */
805     private static void checkNotNull(Object v) {
806         if (v == null)
807             throw new NullPointerException();
808     }
809 
810     private boolean casTail(Node<E> cmp, Node<E> val) {
811         return UNSAFE.compareAndSwapObject(this, tailOffset, cmp, val);
812     }
813 
814     private boolean casHead(Node<E> cmp, Node<E> val) {
815         return UNSAFE.compareAndSwapObject(this, headOffset, cmp, val);
816     }
817 
818     // Unsafe mechanics
819 
820     private static final sun.misc.Unsafe UNSAFE;
821     private static final long headOffset;
822     private static final long tailOffset;
823     static {
824         try {
825             UNSAFE = sun.misc.Unsafe.getUnsafe();
826             Class k = ConcurrentLinkedQueue.class;
827             headOffset = UNSAFE.objectFieldOffset
828                 (k.getDeclaredField("head"));
829             tailOffset = UNSAFE.objectFieldOffset
830                 (k.getDeclaredField("tail"));
831         } catch (Exception e) {
832             throw new Error(e);
833         }
834     }
835 }

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

 

下面从ConcurrentLinkedQueue的创建,添加,删除这几个方面对它进行分析。

1 创建

下面以ConcurrentLinkedQueue()来进行说明。

public ConcurrentLinkedQueue() {
    head = tail = new Node<E>(null);
}

说明:在构造函数中,新建了一个“内容为null的节点”,并设置表头head和表尾tail的值为新节点。

head和tail的定义如下:

private transient volatile Node<E> head;
private transient volatile Node<E> tail;

head和tail都是volatile类型,他们具有volatile赋予的含义:“即对一个volatile变量的读,总是能看到(任意线程)对这个volatile变量最后的写入”。

Node的声明如下:

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

private static class Node<E> {
    volatile E item;
    volatile Node<E> next;

    Node(E item) {
        UNSAFE.putObject(this, itemOffset, item);
    }

    boolean casItem(E cmp, E val) {
        return UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    void lazySetNext(Node<E> val) {
        UNSAFE.putOrderedObject(this, nextOffset, val);
    }

    boolean casNext(Node<E> cmp, Node<E> val) {
        return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class k = Node.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

说明
Node是个单向链表节点,next用于指向下一个Node,item用于存储数据。Node中操作节点数据的API,都是通过Unsafe机制的CAS函数实现的;例如casNext()是通过CAS函数“比较并设置节点的下一个节点”。

 

2. 添加

下面以add(E e)为例对ConcurrentLinkedQueue中的添加进行说明。

public boolean add(E e) {
    return offer(e);
}

说明:add()实际上是调用的offer()来完成添加操作的。

offer()的源码如下:

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

public boolean offer(E e) {
    // 检查e是不是null,是的话抛出NullPointerException异常。
    checkNotNull(e);
    // 创建新的节点
    final Node<E> newNode = new Node<E>(e);

    // 将“新的节点”添加到链表的末尾。
    for (Node<E> t = tail, p = t;;) {
        Node<E> q = p.next;
        // 情况1:q为空
        if (q == null) {
            // CAS操作:如果“p的下一个节点为null”(即p为尾节点),则设置p的下一个节点为newNode。
            // 如果该CAS操作成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。
            // 如果该CAS操作失败,这意味着“其它线程对尾节点进行了修改”,则重新循环。
            if (p.casNext(null, newNode)) {
                if (p != t) // hop two nodes at a time
                    casTail(t, newNode);  // Failure is OK.
                return true;
            }
        }
        // 情况2:p和q相等
        else if (p == q)
            p = (t != (t = tail)) ? t : head;
        // 情况3:其它
        else
            p = (p != t && t != (t = tail)) ? t : q;
    }
}

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

说明offer(E e)的作用就是将元素e添加到链表的末尾。offer()比较的地方是理解for循环,下面区分3种情况对for进行分析。

情况1 — q为空。这意味着q是尾节点的下一个节点。此时,通过p.casNext(null, newNode)将“p的下一个节点设为newNode”,若设置成功的话,则比较“p和t”(若p不等于t,则设置newNode为新的尾节点),然后返回true。否则的话(意味着“其它线程对尾节点进行了修改”),什么也不做,继续进行for循环。
p.casNext(null, newNode),是调用CAS对p进行操作。若“p的下一个节点等于null”,则设置“p的下一个节点等于newNode”;设置成功的话,返回true,失败的话返回false。

情况2 — p和q相等。这种情况什么时候会发生呢?通过“情况3”,我们知道,经过“情况3”的处理后,p的值可能等于q。
此时,若尾节点没有发生变化的话,那么,应该是头节点发生了变化,则设置p为头节点,然后重新遍历链表;否则(尾节点变化的话),则设置p为尾节点。

情况3 — 其它。
我们将p = (p != t && t != (t = tail)) ? t : q;转换成如下代码。

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

if (p==t) {
    p = q;
} else {
    Node<E> tmp=t;
    t = tail;
    if (tmp==t) {
        p=q;
    } else {
        p=t;
    }
}

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

如果p和t相等,则设置p为q。否则的话,判断“尾节点是否发生变化”,没有变化的话,则设置p为q;否则,设置p为尾节点。

checkNotNull()的源码如下:

private static void checkNotNull(Object v) {
    if (v == null)
        throw new NullPointerException();
}

 

3. 删除

下面以poll()为例对ConcurrentLinkedQueue中的删除进行说明。

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

public E poll() {
    // 设置“标记”
    restartFromHead:
    for (;;) {
        for (Node<E> h = head, p = h, q;;) {
            E item = p.item;

            // 情况1
            // 表头的数据不为null,并且“设置表头的数据为null”这个操作成功的话;
            // 则比较“p和h”(若p!=h,即表头发生了变化,则更新表头,即设置表头为p),然后返回原表头的item值。
            if (item != null && p.casItem(item, null)) {
                if (p != h) // hop two nodes at a time
                    updateHead(h, ((q = p.next) != null) ? q : p);
                return item;
            }
            // 情况2
            // 表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。则更新表头为p,并返回null。
            else if ((q = p.next) == null) {
                updateHead(h, p);
                return null;
            }
            // 情况3
            // 这可能到由于“情况4”的发生导致p=q,在该情况下跳转到restartFromHead标记重新操作。
            else if (p == q)
                continue restartFromHead;
            // 情况4
            // 设置p为q
            else
                p = q;
        }
    }
}

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

说明poll()的作用就是删除链表的表头节点,并返回被删节点对应的值。poll()的实现原理和offer()比较类似,下面根将or循环划分为4种情况进行分析。

情况1:“表头节点的数据”不为null,并且“设置表头节点的数据为null”这个操作成功。
p.casItem(item, null) — 调用CAS函数,比较“节点p的数据值”与item是否相等,是的话,设置节点p的数据值为null。
在情况1发生时,先比较“p和h”,若p!=h,即表头发生了变化,则调用updateHead()更新表头;然后返回删除节点的item值。
updateHead()的源码如下:

final void updateHead(Node<E> h, Node<E> p) {
    if (h != p && casHead(h, p))
        h.lazySetNext(h);
}

说明:updateHead()的最终目的是更新表头为p,并设置h的下一个节点为h本身。
casHead(h,p)是通过CAS函数设置表头,若表头等于h的话,则设置表头为p。
lazySetNext()的源码如下:

void lazySetNext(Node<E> val) {
    UNSAFE.putOrderedObject(this, nextOffset, val);
}

putOrderedObject()函数,我们在前面一章“TODO”中介绍过。h.lazySetNext(h)的作用是通过CAS函数设置h的下一个节点为h自身,该设置可能会延迟执行。

情况2:如果表头的下一个节点为null,即链表只有一个“内容为null的表头节点”。
则调用updateHead(h, p),将表头更新p;然后返回null。

情况3:p=q
在“情况4”的发生后,会导致p=q;此时,“情况3”就会发生。当“情况3”发生后,它会跳转到restartFromHead标记重新操作。

情况4:其它情况。
设置p=q。

 

ConcurrentLinkedQueue示例

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

 1 import java.util.*;
 2 import java.util.concurrent.*;
 3 
 4 /*
 5  *   ConcurrentLinkedQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 6  *
 7  *   下面是“多个线程同时操作并且遍历queue”的示例
 8  *   (01) 当queue是ConcurrentLinkedQueue对象时,程序能正常运行。
 9  *   (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
10  *
11  * @author skywang
12  */
13 public class ConcurrentLinkedQueueDemo1 {
14 
15     // TODO: queue是LinkedList对象时,程序会出错。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new ConcurrentLinkedQueue<String>();
18     public static void main(String[] args) {
19     
20         // 同时启动两个线程对queue进行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24 
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34 
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “线程名” + "-" + "序号"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通过“Iterator”遍历queue。
47                 printAll();
48             }
49         }
50     }
51 }

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

(某一次)运行结果

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

ta1, ta1, tb1, tb1,

ta1, ta1, tb1, tb1, ta2, ta2, tb2, 
tb2, 
ta1, ta1, tb1, tb1, ta2, ta2, tb2, tb2, ta3, tb3, 
ta3, ta1, tb3, tb1, ta4, 
ta2, ta1, tb2, tb1, ta3, ta2, tb3, tb2, ta4, ta3, tb4, 
tb3, ta1, ta4, tb1, tb4, ta2, ta5, 
tb2, ta1, ta3, tb1, tb3, ta2, ta4, tb2, tb4, ta3, ta5, tb3, tb5, 
ta4, ta1, tb4, tb1, ta5, ta2, tb5, tb2, ta6, 
ta3, ta1, tb3, tb1, ta4, ta2, tb4, tb2, ta5, ta3, tb5, tb3, ta6, ta4, tb6, 
tb4, ta5, tb5, ta6, tb6, 

《Java多线程系列--【JUC集合10】- ConcurrentLinkedQueue》

结果说明如果将源码中的queue改成LinkedList对象时,程序会产生ConcurrentModificationException异常。

 

    原文作者:JUC
    原文地址: https://blog.csdn.net/m0_37955444/article/details/79179715
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞