Java多线程系列--【JUC集合08】- LinkedBlockingQueue

参考:http://www.cnblogs.com/skywang12345/p/java_threads_category.html

概要

本章介绍JUC包中的LinkedBlockingQueue。内容包括:
LinkedBlockingQueue介绍
LinkedBlockingQueue原理和数据结构
LinkedBlockingQueue函数列表
LinkedBlockingQueue源码分析(JDK1.7.0_40版本)
LinkedBlockingQueue示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3503458.html

 

LinkedBlockingQueue介绍

LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

此外,LinkedBlockingQueue还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

 

LinkedBlockingQueue原理和数据结构

LinkedBlockingQueue的数据结构,如下图所示:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

说明
1. LinkedBlockingQueue继承于AbstractQueue,它本质上是一个FIFO(先进先出)的队列。
2. LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
3. LinkedBlockingQueue是通过单链表实现的。
(01) head是链表的表头。取出数据时,都是从表头head处插入。
(02) last是链表的表尾。新增数据时,都是从表尾last处插入。
(03) count是链表的实际大小,即当前链表中包含的节点个数。
(04) capacity是列表的容量,它是在创建链表时指定的。
(05) putLock是插入锁,takeLock是取出锁;notEmpty是“非空条件”,notFull是“未满条件”。通过它们对链表进行并发控制。
       LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁。对于插入操作,通过“插入锁putLock”进行同步;对于取出操作,通过“取出锁takeLock”进行同步。
       此外,插入锁putLock和“非满条件notFull”相关联,取出锁takeLock和“非空条件notEmpty”相关联。通过notFull和notEmpty更细腻的控制锁。

     -- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。 -- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。

关于ReentrantLock 和 Condition等更多的内容,可以参考:
    (01) Java多线程系列–“JUC锁”02之 互斥锁ReentrantLock
    (02) Java多线程系列–“JUC锁”03之 公平锁(一)
    (03) Java多线程系列–“JUC锁”04之 公平锁(二)
    (04) Java多线程系列–“JUC锁”05之 非公平锁
    (05) Java多线程系列–“JUC锁”06之 Condition条件

 

LinkedBlockingQueue函数列表

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。
LinkedBlockingQueue()
// 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。
LinkedBlockingQueue(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingQueue。
LinkedBlockingQueue(int capacity)

// 从队列彻底移除所有元素。
void clear()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在队列中的元素上按适当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
boolean offer(E e)
// 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
boolean offer(E e, long timeout, TimeUnit unit)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
void put(E e)
// 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
int remainingCapacity()
// 从此队列移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回队列中的元素个数。
int size()
// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
E take()
// 返回按适当顺序包含此队列中所有元素的数组。
Object[] toArray()
// 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

 

LinkedBlockingQueue源码分析(JDK1.7.0_40版本)

LinkedBlockingQueue.java的完整源码如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》
《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

  1 /*
  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3  *
  4  *
  5  *
  6  *
  7  *
  8  *
  9  *
 10  *
 11  *
 12  *
 13  *
 14  *
 15  *
 16  *
 17  *
 18  *
 19  *
 20  *
 21  *
 22  *
 23  */
 24 
 25 /*
 26  *
 27  *
 28  *
 29  *
 30  *
 31  * Written by Doug Lea with assistance from members of JCP JSR-166
 32  * Expert Group and released to the public domain, as explained at
 33  * http://creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;
 37 
 38 import java.util.concurrent.atomic.AtomicInteger;
 39 import java.util.concurrent.locks.Condition;
 40 import java.util.concurrent.locks.ReentrantLock;
 41 import java.util.AbstractQueue;
 42 import java.util.Collection;
 43 import java.util.Iterator;
 44 import java.util.NoSuchElementException;
 45 
 46 /**
 47  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
 48  * linked nodes.
 49  * This queue orders elements FIFO (first-in-first-out).
 50  * The <em>head</em> of the queue is that element that has been on the
 51  * queue the longest time.
 52  * The <em>tail</em> of the queue is that element that has been on the
 53  * queue the shortest time. New elements
 54  * are inserted at the tail of the queue, and the queue retrieval
 55  * operations obtain elements at the head of the queue.
 56  * Linked queues typically have higher throughput than array-based queues but
 57  * less predictable performance in most concurrent applications.
 58  *
 59  * <p> The optional capacity bound constructor argument serves as a
 60  * way to prevent excessive queue expansion. The capacity, if unspecified,
 61  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
 62  * dynamically created upon each insertion unless this would bring the
 63  * queue above capacity.
 64  *
 65  * <p>This class and its iterator implement all of the
 66  * <em>optional</em> methods of the {@link Collection} and {@link
 67  * Iterator} interfaces.
 68  *
 69  * <p>This class is a member of the
 70  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 71  * Java Collections Framework</a>.
 72  *
 73  * @since 1.5
 74  * @author Doug Lea
 75  * @param <E> the type of elements held in this collection
 76  *
 77  */
 78 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
 79         implements BlockingQueue<E>, java.io.Serializable {
 80     private static final long serialVersionUID = -6903933977591709194L;
 81 
 82     /*
 83      * A variant of the "two lock queue" algorithm.  The putLock gates
 84      * entry to put (and offer), and has an associated condition for
 85      * waiting puts.  Similarly for the takeLock.  The "count" field
 86      * that they both rely on is maintained as an atomic to avoid
 87      * needing to get both locks in most cases. Also, to minimize need
 88      * for puts to get takeLock and vice-versa, cascading notifies are
 89      * used. When a put notices that it has enabled at least one take,
 90      * it signals taker. That taker in turn signals others if more
 91      * items have been entered since the signal. And symmetrically for
 92      * takes signalling puts. Operations such as remove(Object) and
 93      * iterators acquire both locks.
 94      *
 95      * Visibility between writers and readers is provided as follows:
 96      *
 97      * Whenever an element is enqueued, the putLock is acquired and
 98      * count updated.  A subsequent reader guarantees visibility to the
 99      * enqueued Node by either acquiring the putLock (via fullyLock)
100      * or by acquiring the takeLock, and then reading n = count.get();
101      * this gives visibility to the first n items.
102      *
103      * To implement weakly consistent iterators, it appears we need to
104      * keep all Nodes GC-reachable from a predecessor dequeued Node.
105      * That would cause two problems:
106      * - allow a rogue Iterator to cause unbounded memory retention
107      * - cause cross-generational linking of old Nodes to new Nodes if
108      *   a Node was tenured while live, which generational GCs have a
109      *   hard time dealing with, causing repeated major collections.
110      * However, only non-deleted Nodes need to be reachable from
111      * dequeued Nodes, and reachability does not necessarily have to
112      * be of the kind understood by the GC.  We use the trick of
113      * linking a Node that has just been dequeued to itself.  Such a
114      * self-link implicitly means to advance to head.next.
115      */
116 
117     /**
118      * Linked list node class
119      */
120     static class Node<E> {
121         E item;
122 
123         /**
124          * One of:
125          * - the real successor Node
126          * - this Node, meaning the successor is head.next
127          * - null, meaning there is no successor (this is the last node)
128          */
129         Node<E> next;
130 
131         Node(E x) { item = x; }
132     }
133 
134     /** The capacity bound, or Integer.MAX_VALUE if none */
135     private final int capacity;
136 
137     /** Current number of elements */
138     private final AtomicInteger count = new AtomicInteger(0);
139 
140     /**
141      * Head of linked list.
142      * Invariant: head.item == null
143      */
144     private transient Node<E> head;
145 
146     /**
147      * Tail of linked list.
148      * Invariant: last.next == null
149      */
150     private transient Node<E> last;
151 
152     /** Lock held by take, poll, etc */
153     private final ReentrantLock takeLock = new ReentrantLock();
154 
155     /** Wait queue for waiting takes */
156     private final Condition notEmpty = takeLock.newCondition();
157 
158     /** Lock held by put, offer, etc */
159     private final ReentrantLock putLock = new ReentrantLock();
160 
161     /** Wait queue for waiting puts */
162     private final Condition notFull = putLock.newCondition();
163 
164     /**
165      * Signals a waiting take. Called only from put/offer (which do not
166      * otherwise ordinarily lock takeLock.)
167      */
168     private void signalNotEmpty() {
169         final ReentrantLock takeLock = this.takeLock;
170         takeLock.lock();
171         try {
172             notEmpty.signal();
173         } finally {
174             takeLock.unlock();
175         }
176     }
177 
178     /**
179      * Signals a waiting put. Called only from take/poll.
180      */
181     private void signalNotFull() {
182         final ReentrantLock putLock = this.putLock;
183         putLock.lock();
184         try {
185             notFull.signal();
186         } finally {
187             putLock.unlock();
188         }
189     }
190 
191     /**
192      * Links node at end of queue.
193      *
194      * @param node the node
195      */
196     private void enqueue(Node<E> node) {
197         // assert putLock.isHeldByCurrentThread();
198         // assert last.next == null;
199         last = last.next = node;
200     }
201 
202     /**
203      * Removes a node from head of queue.
204      *
205      * @return the node
206      */
207     private E dequeue() {
208         // assert takeLock.isHeldByCurrentThread();
209         // assert head.item == null;
210         Node<E> h = head;
211         Node<E> first = h.next;
212         h.next = h; // help GC
213         head = first;
214         E x = first.item;
215         first.item = null;
216         return x;
217     }
218 
219     /**
220      * Lock to prevent both puts and takes.
221      */
222     void fullyLock() {
223         putLock.lock();
224         takeLock.lock();
225     }
226 
227     /**
228      * Unlock to allow both puts and takes.
229      */
230     void fullyUnlock() {
231         takeLock.unlock();
232         putLock.unlock();
233     }
234 
235 //     /**
236 //      * Tells whether both locks are held by current thread.
237 //      */
238 //     boolean isFullyLocked() {
239 //         return (putLock.isHeldByCurrentThread() &&
240 //                 takeLock.isHeldByCurrentThread());
241 //     }
242 
243     /**
244      * Creates a {@code LinkedBlockingQueue} with a capacity of
245      * {@link Integer#MAX_VALUE}.
246      */
247     public LinkedBlockingQueue() {
248         this(Integer.MAX_VALUE);
249     }
250 
251     /**
252      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
253      *
254      * @param capacity the capacity of this queue
255      * @throws IllegalArgumentException if {@code capacity} is not greater
256      *         than zero
257      */
258     public LinkedBlockingQueue(int capacity) {
259         if (capacity <= 0) throw new IllegalArgumentException();
260         this.capacity = capacity;
261         last = head = new Node<E>(null);
262     }
263 
264     /**
265      * Creates a {@code LinkedBlockingQueue} with a capacity of
266      * {@link Integer#MAX_VALUE}, initially containing the elements of the
267      * given collection,
268      * added in traversal order of the collection's iterator.
269      *
270      * @param c the collection of elements to initially contain
271      * @throws NullPointerException if the specified collection or any
272      *         of its elements are null
273      */
274     public LinkedBlockingQueue(Collection<? extends E> c) {
275         this(Integer.MAX_VALUE);
276         final ReentrantLock putLock = this.putLock;
277         putLock.lock(); // Never contended, but necessary for visibility
278         try {
279             int n = 0;
280             for (E e : c) {
281                 if (e == null)
282                     throw new NullPointerException();
283                 if (n == capacity)
284                     throw new IllegalStateException("Queue full");
285                 enqueue(new Node<E>(e));
286                 ++n;
287             }
288             count.set(n);
289         } finally {
290             putLock.unlock();
291         }
292     }
293 
294 
295     // this doc comment is overridden to remove the reference to collections
296     // greater in size than Integer.MAX_VALUE
297     /**
298      * Returns the number of elements in this queue.
299      *
300      * @return the number of elements in this queue
301      */
302     public int size() {
303         return count.get();
304     }
305 
306     // this doc comment is a modified copy of the inherited doc comment,
307     // without the reference to unlimited queues.
308     /**
309      * Returns the number of additional elements that this queue can ideally
310      * (in the absence of memory or resource constraints) accept without
311      * blocking. This is always equal to the initial capacity of this queue
312      * less the current {@code size} of this queue.
313      *
314      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
315      * an element will succeed by inspecting {@code remainingCapacity}
316      * because it may be the case that another thread is about to
317      * insert or remove an element.
318      */
319     public int remainingCapacity() {
320         return capacity - count.get();
321     }
322 
323     /**
324      * Inserts the specified element at the tail of this queue, waiting if
325      * necessary for space to become available.
326      *
327      * @throws InterruptedException {@inheritDoc}
328      * @throws NullPointerException {@inheritDoc}
329      */
330     public void put(E e) throws InterruptedException {
331         if (e == null) throw new NullPointerException();
332         // Note: convention in all put/take/etc is to preset local var
333         // holding count negative to indicate failure unless set.
334         int c = -1;
335         Node<E> node = new Node(e);
336         final ReentrantLock putLock = this.putLock;
337         final AtomicInteger count = this.count;
338         putLock.lockInterruptibly();
339         try {
340             /*
341              * Note that count is used in wait guard even though it is
342              * not protected by lock. This works because count can
343              * only decrease at this point (all other puts are shut
344              * out by lock), and we (or some other waiting put) are
345              * signalled if it ever changes from capacity. Similarly
346              * for all other uses of count in other wait guards.
347              */
348             while (count.get() == capacity) {
349                 notFull.await();
350             }
351             enqueue(node);
352             c = count.getAndIncrement();
353             if (c + 1 < capacity)
354                 notFull.signal();
355         } finally {
356             putLock.unlock();
357         }
358         if (c == 0)
359             signalNotEmpty();
360     }
361 
362     /**
363      * Inserts the specified element at the tail of this queue, waiting if
364      * necessary up to the specified wait time for space to become available.
365      *
366      * @return {@code true} if successful, or {@code false} if
367      *         the specified waiting time elapses before space is available.
368      * @throws InterruptedException {@inheritDoc}
369      * @throws NullPointerException {@inheritDoc}
370      */
371     public boolean offer(E e, long timeout, TimeUnit unit)
372         throws InterruptedException {
373 
374         if (e == null) throw new NullPointerException();
375         long nanos = unit.toNanos(timeout);
376         int c = -1;
377         final ReentrantLock putLock = this.putLock;
378         final AtomicInteger count = this.count;
379         putLock.lockInterruptibly();
380         try {
381             while (count.get() == capacity) {
382                 if (nanos <= 0)
383                     return false;
384                 nanos = notFull.awaitNanos(nanos);
385             }
386             enqueue(new Node<E>(e));
387             c = count.getAndIncrement();
388             if (c + 1 < capacity)
389                 notFull.signal();
390         } finally {
391             putLock.unlock();
392         }
393         if (c == 0)
394             signalNotEmpty();
395         return true;
396     }
397 
398     /**
399      * Inserts the specified element at the tail of this queue if it is
400      * possible to do so immediately without exceeding the queue's capacity,
401      * returning {@code true} upon success and {@code false} if this queue
402      * is full.
403      * When using a capacity-restricted queue, this method is generally
404      * preferable to method {@link BlockingQueue#add add}, which can fail to
405      * insert an element only by throwing an exception.
406      *
407      * @throws NullPointerException if the specified element is null
408      */
409     public boolean offer(E e) {
410         if (e == null) throw new NullPointerException();
411         final AtomicInteger count = this.count;
412         if (count.get() == capacity)
413             return false;
414         int c = -1;
415         Node<E> node = new Node(e);
416         final ReentrantLock putLock = this.putLock;
417         putLock.lock();
418         try {
419             if (count.get() < capacity) {
420                 enqueue(node);
421                 c = count.getAndIncrement();
422                 if (c + 1 < capacity)
423                     notFull.signal();
424             }
425         } finally {
426             putLock.unlock();
427         }
428         if (c == 0)
429             signalNotEmpty();
430         return c >= 0;
431     }
432 
433 
434     public E take() throws InterruptedException {
435         E x;
436         int c = -1;
437         final AtomicInteger count = this.count;
438         final ReentrantLock takeLock = this.takeLock;
439         takeLock.lockInterruptibly();
440         try {
441             while (count.get() == 0) {
442                 notEmpty.await();
443             }
444             x = dequeue();
445             c = count.getAndDecrement();
446             if (c > 1)
447                 notEmpty.signal();
448         } finally {
449             takeLock.unlock();
450         }
451         if (c == capacity)
452             signalNotFull();
453         return x;
454     }
455 
456     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
457         E x = null;
458         int c = -1;
459         long nanos = unit.toNanos(timeout);
460         final AtomicInteger count = this.count;
461         final ReentrantLock takeLock = this.takeLock;
462         takeLock.lockInterruptibly();
463         try {
464             while (count.get() == 0) {
465                 if (nanos <= 0)
466                     return null;
467                 nanos = notEmpty.awaitNanos(nanos);
468             }
469             x = dequeue();
470             c = count.getAndDecrement();
471             if (c > 1)
472                 notEmpty.signal();
473         } finally {
474             takeLock.unlock();
475         }
476         if (c == capacity)
477             signalNotFull();
478         return x;
479     }
480 
481     public E poll() {
482         final AtomicInteger count = this.count;
483         if (count.get() == 0)
484             return null;
485         E x = null;
486         int c = -1;
487         final ReentrantLock takeLock = this.takeLock;
488         takeLock.lock();
489         try {
490             if (count.get() > 0) {
491                 x = dequeue();
492                 c = count.getAndDecrement();
493                 if (c > 1)
494                     notEmpty.signal();
495             }
496         } finally {
497             takeLock.unlock();
498         }
499         if (c == capacity)
500             signalNotFull();
501         return x;
502     }
503 
504     public E peek() {
505         if (count.get() == 0)
506             return null;
507         final ReentrantLock takeLock = this.takeLock;
508         takeLock.lock();
509         try {
510             Node<E> first = head.next;
511             if (first == null)
512                 return null;
513             else
514                 return first.item;
515         } finally {
516             takeLock.unlock();
517         }
518     }
519 
520     /**
521      * Unlinks interior Node p with predecessor trail.
522      */
523     void unlink(Node<E> p, Node<E> trail) {
524         // assert isFullyLocked();
525         // p.next is not changed, to allow iterators that are
526         // traversing p to maintain their weak-consistency guarantee.
527         p.item = null;
528         trail.next = p.next;
529         if (last == p)
530             last = trail;
531         if (count.getAndDecrement() == capacity)
532             notFull.signal();
533     }
534 
535     /**
536      * Removes a single instance of the specified element from this queue,
537      * if it is present.  More formally, removes an element {@code e} such
538      * that {@code o.equals(e)}, if this queue contains one or more such
539      * elements.
540      * Returns {@code true} if this queue contained the specified element
541      * (or equivalently, if this queue changed as a result of the call).
542      *
543      * @param o element to be removed from this queue, if present
544      * @return {@code true} if this queue changed as a result of the call
545      */
546     public boolean remove(Object o) {
547         if (o == null) return false;
548         fullyLock();
549         try {
550             for (Node<E> trail = head, p = trail.next;
551                  p != null;
552                  trail = p, p = p.next) {
553                 if (o.equals(p.item)) {
554                     unlink(p, trail);
555                     return true;
556                 }
557             }
558             return false;
559         } finally {
560             fullyUnlock();
561         }
562     }
563 
564     /**
565      * Returns {@code true} if this queue contains the specified element.
566      * More formally, returns {@code true} if and only if this queue contains
567      * at least one element {@code e} such that {@code o.equals(e)}.
568      *
569      * @param o object to be checked for containment in this queue
570      * @return {@code true} if this queue contains the specified element
571      */
572     public boolean contains(Object o) {
573         if (o == null) return false;
574         fullyLock();
575         try {
576             for (Node<E> p = head.next; p != null; p = p.next)
577                 if (o.equals(p.item))
578                     return true;
579             return false;
580         } finally {
581             fullyUnlock();
582         }
583     }
584 
585     /**
586      * Returns an array containing all of the elements in this queue, in
587      * proper sequence.
588      *
589      * <p>The returned array will be "safe" in that no references to it are
590      * maintained by this queue.  (In other words, this method must allocate
591      * a new array).  The caller is thus free to modify the returned array.
592      *
593      * <p>This method acts as bridge between array-based and collection-based
594      * APIs.
595      *
596      * @return an array containing all of the elements in this queue
597      */
598     public Object[] toArray() {
599         fullyLock();
600         try {
601             int size = count.get();
602             Object[] a = new Object[size];
603             int k = 0;
604             for (Node<E> p = head.next; p != null; p = p.next)
605                 a[k++] = p.item;
606             return a;
607         } finally {
608             fullyUnlock();
609         }
610     }
611 
612     /**
613      * Returns an array containing all of the elements in this queue, in
614      * proper sequence; the runtime type of the returned array is that of
615      * the specified array.  If the queue fits in the specified array, it
616      * is returned therein.  Otherwise, a new array is allocated with the
617      * runtime type of the specified array and the size of this queue.
618      *
619      * <p>If this queue fits in the specified array with room to spare
620      * (i.e., the array has more elements than this queue), the element in
621      * the array immediately following the end of the queue is set to
622      * {@code null}.
623      *
624      * <p>Like the {@link #toArray()} method, this method acts as bridge between
625      * array-based and collection-based APIs.  Further, this method allows
626      * precise control over the runtime type of the output array, and may,
627      * under certain circumstances, be used to save allocation costs.
628      *
629      * <p>Suppose {@code x} is a queue known to contain only strings.
630      * The following code can be used to dump the queue into a newly
631      * allocated array of {@code String}:
632      *
633      * <pre>
634      *     String[] y = x.toArray(new String[0]);</pre>
635      *
636      * Note that {@code toArray(new Object[0])} is identical in function to
637      * {@code toArray()}.
638      *
639      * @param a the array into which the elements of the queue are to
640      *          be stored, if it is big enough; otherwise, a new array of the
641      *          same runtime type is allocated for this purpose
642      * @return an array containing all of the elements in this queue
643      * @throws ArrayStoreException if the runtime type of the specified array
644      *         is not a supertype of the runtime type of every element in
645      *         this queue
646      * @throws NullPointerException if the specified array is null
647      */
648     @SuppressWarnings("unchecked")
649     public <T> T[] toArray(T[] a) {
650         fullyLock();
651         try {
652             int size = count.get();
653             if (a.length < size)
654                 a = (T[])java.lang.reflect.Array.newInstance
655                     (a.getClass().getComponentType(), size);
656 
657             int k = 0;
658             for (Node<E> p = head.next; p != null; p = p.next)
659                 a[k++] = (T)p.item;
660             if (a.length > k)
661                 a[k] = null;
662             return a;
663         } finally {
664             fullyUnlock();
665         }
666     }
667 
668     public String toString() {
669         fullyLock();
670         try {
671             Node<E> p = head.next;
672             if (p == null)
673                 return "[]";
674 
675             StringBuilder sb = new StringBuilder();
676             sb.append('[');
677             for (;;) {
678                 E e = p.item;
679                 sb.append(e == this ? "(this Collection)" : e);
680                 p = p.next;
681                 if (p == null)
682                     return sb.append(']').toString();
683                 sb.append(',').append(' ');
684             }
685         } finally {
686             fullyUnlock();
687         }
688     }
689 
690     /**
691      * Atomically removes all of the elements from this queue.
692      * The queue will be empty after this call returns.
693      */
694     public void clear() {
695         fullyLock();
696         try {
697             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
698                 h.next = h;
699                 p.item = null;
700             }
701             head = last;
702             // assert head.item == null && head.next == null;
703             if (count.getAndSet(0) == capacity)
704                 notFull.signal();
705         } finally {
706             fullyUnlock();
707         }
708     }
709 
710     /**
711      * @throws UnsupportedOperationException {@inheritDoc}
712      * @throws ClassCastException            {@inheritDoc}
713      * @throws NullPointerException          {@inheritDoc}
714      * @throws IllegalArgumentException      {@inheritDoc}
715      */
716     public int drainTo(Collection<? super E> c) {
717         return drainTo(c, Integer.MAX_VALUE);
718     }
719 
720     /**
721      * @throws UnsupportedOperationException {@inheritDoc}
722      * @throws ClassCastException            {@inheritDoc}
723      * @throws NullPointerException          {@inheritDoc}
724      * @throws IllegalArgumentException      {@inheritDoc}
725      */
726     public int drainTo(Collection<? super E> c, int maxElements) {
727         if (c == null)
728             throw new NullPointerException();
729         if (c == this)
730             throw new IllegalArgumentException();
731         boolean signalNotFull = false;
732         final ReentrantLock takeLock = this.takeLock;
733         takeLock.lock();
734         try {
735             int n = Math.min(maxElements, count.get());
736             // count.get provides visibility to first n Nodes
737             Node<E> h = head;
738             int i = 0;
739             try {
740                 while (i < n) {
741                     Node<E> p = h.next;
742                     c.add(p.item);
743                     p.item = null;
744                     h.next = h;
745                     h = p;
746                     ++i;
747                 }
748                 return n;
749             } finally {
750                 // Restore invariants even if c.add() threw
751                 if (i > 0) {
752                     // assert h.item == null;
753                     head = h;
754                     signalNotFull = (count.getAndAdd(-i) == capacity);
755                 }
756             }
757         } finally {
758             takeLock.unlock();
759             if (signalNotFull)
760                 signalNotFull();
761         }
762     }
763 
764     /**
765      * Returns an iterator over the elements in this queue in proper sequence.
766      * The elements will be returned in order from first (head) to last (tail).
767      *
768      * <p>The returned iterator is a "weakly consistent" iterator that
769      * will never throw {@link java.util.ConcurrentModificationException
770      * ConcurrentModificationException}, and guarantees to traverse
771      * elements as they existed upon construction of the iterator, and
772      * may (but is not guaranteed to) reflect any modifications
773      * subsequent to construction.
774      *
775      * @return an iterator over the elements in this queue in proper sequence
776      */
777     public Iterator<E> iterator() {
778       return new Itr();
779     }
780 
781     private class Itr implements Iterator<E> {
782         /*
783          * Basic weakly-consistent iterator.  At all times hold the next
784          * item to hand out so that if hasNext() reports true, we will
785          * still have it to return even if lost race with a take etc.
786          */
787         private Node<E> current;
788         private Node<E> lastRet;
789         private E currentElement;
790 
791         Itr() {
792             fullyLock();
793             try {
794                 current = head.next;
795                 if (current != null)
796                     currentElement = current.item;
797             } finally {
798                 fullyUnlock();
799             }
800         }
801 
802         public boolean hasNext() {
803             return current != null;
804         }
805 
806         /**
807          * Returns the next live successor of p, or null if no such.
808          *
809          * Unlike other traversal methods, iterators need to handle both:
810          * - dequeued nodes (p.next == p)
811          * - (possibly multiple) interior removed nodes (p.item == null)
812          */
813         private Node<E> nextNode(Node<E> p) {
814             for (;;) {
815                 Node<E> s = p.next;
816                 if (s == p)
817                     return head.next;
818                 if (s == null || s.item != null)
819                     return s;
820                 p = s;
821             }
822         }
823 
824         public E next() {
825             fullyLock();
826             try {
827                 if (current == null)
828                     throw new NoSuchElementException();
829                 E x = currentElement;
830                 lastRet = current;
831                 current = nextNode(current);
832                 currentElement = (current == null) ? null : current.item;
833                 return x;
834             } finally {
835                 fullyUnlock();
836             }
837         }
838 
839         public void remove() {
840             if (lastRet == null)
841                 throw new IllegalStateException();
842             fullyLock();
843             try {
844                 Node<E> node = lastRet;
845                 lastRet = null;
846                 for (Node<E> trail = head, p = trail.next;
847                      p != null;
848                      trail = p, p = p.next) {
849                     if (p == node) {
850                         unlink(p, trail);
851                         break;
852                     }
853                 }
854             } finally {
855                 fullyUnlock();
856             }
857         }
858     }
859 
860     /**
861      * Save the state to a stream (that is, serialize it).
862      *
863      * @serialData The capacity is emitted (int), followed by all of
864      * its elements (each an {@code Object}) in the proper order,
865      * followed by a null
866      * @param s the stream
867      */
868     private void writeObject(java.io.ObjectOutputStream s)
869         throws java.io.IOException {
870 
871         fullyLock();
872         try {
873             // Write out any hidden stuff, plus capacity
874             s.defaultWriteObject();
875 
876             // Write out all elements in the proper order.
877             for (Node<E> p = head.next; p != null; p = p.next)
878                 s.writeObject(p.item);
879 
880             // Use trailing null as sentinel
881             s.writeObject(null);
882         } finally {
883             fullyUnlock();
884         }
885     }
886 
887     /**
888      * Reconstitute this queue instance from a stream (that is,
889      * deserialize it).
890      *
891      * @param s the stream
892      */
893     private void readObject(java.io.ObjectInputStream s)
894         throws java.io.IOException, ClassNotFoundException {
895         // Read in capacity, and any hidden stuff
896         s.defaultReadObject();
897 
898         count.set(0);
899         last = head = new Node<E>(null);
900 
901         // Read in all elements and place in queue
902         for (;;) {
903             @SuppressWarnings("unchecked")
904             E item = (E)s.readObject();
905             if (item == null)
906                 break;
907             add(item);
908         }
909     }
910 }

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

下面从LinkedBlockingQueue的创建,添加,删除,遍历这几个方面对它进行分析。

1. 创建

下面以LinkedBlockingQueue(int capacity)来进行说明。

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

说明
(01) capacity是“链式阻塞队列”的容量。
(02) head和last是“链式阻塞队列”的首节点和尾节点。它们在LinkedBlockingQueue中的声明如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

// 容量
private final int capacity;
// 当前数量
private final AtomicInteger count = new AtomicInteger(0);
private transient Node<E> head; // 链表的表头
private transient Node<E> last; // 链表的表尾
// 用于控制“删除元素”的互斥锁takeLock 和 锁对应的“非空条件”notEmpty
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// 用于控制“添加元素”的互斥锁putLock 和 锁对应的“非满条件”notFull
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

 

链表的节点定义如下:

static class Node<E> {
    E item;         // 数据
    Node<E> next;   // 下一个节点的指针

    Node(E x) { item = x; }
}

2. 添加

下面以offer(E e)为例,对LinkedBlockingQueue的添加方法进行说明。

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    // 如果“队列已满”,则返回false,表示插入失败。
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    // 新建“节点e”
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    // 获取“插入锁putLock”
    putLock.lock();
    try {
        // 再次对“队列是不是满”的进行判断。
        // 若“队列未满”,则插入节点。
        if (count.get() < capacity) {
            // 插入节点
            enqueue(node);
            // 将“当前节点数量”+1,并返回“原始的数量”
            c = count.getAndIncrement();
            // 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        // 释放“插入锁putLock”
        putLock.unlock();
    }
    // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

说明:offer()的作用很简单,就是将元素E添加到队列的末尾。

enqueue()的源码如下:

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

enqueue()的作用是将node添加到队列末尾,并设置node为新的尾节点!

signalNotEmpty()的源码如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

signalNotEmpty()的作用是唤醒notEmpty上的等待线程。

 

3. 取出

下面以take()为例,对LinkedBlockingQueue的取出方法进行说明。

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常
    takeLock.lockInterruptibly();
    try {
        // 若“队列为空”,则一直等待。
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 取出元素
        x = dequeue();
        // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放“取出锁”
        takeLock.unlock();
    }
    // 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。
    if (c == capacity)
        signalNotFull();
    return x;
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

说明:take()的作用是取出并返回队列的头。若队列为空,则一直等待。

dequeue()的源码如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

dequeue()的作用就是删除队列的头节点,并将表头指向“原头节点的下一个节点”。

signalNotFull()的源码如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

signalNotFull()的作用就是唤醒notFull上的等待线程。

 

4. 遍历

下面对LinkedBlockingQueue的遍历方法进行说明。

public Iterator<E> iterator() {
  return new Itr();
}

iterator()实际上是返回一个Iter对象。

Itr类的定义如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

private class Itr implements Iterator<E> {
    // 当前节点
    private Node<E> current;
    // 上一次返回的节点
    private Node<E> lastRet;
    // 当前节点对应的值
    private E currentElement;

    Itr() {
        // 同时获取“插入锁putLock” 和 “取出锁takeLock”
        fullyLock();
        try {
            // 设置“当前元素”为“队列表头的下一节点”,即为队列的第一个有效节点
            current = head.next;
            if (current != null)
                currentElement = current.item;
        } finally {
            // 释放“插入锁putLock” 和 “取出锁takeLock”
            fullyUnlock();
        }
    }

    // 返回“下一个节点是否为null”
    public boolean hasNext() {
        return current != null;
    }

    private Node<E> nextNode(Node<E> p) {
        for (;;) {
            Node<E> s = p.next;
            if (s == p)
                return head.next;
            if (s == null || s.item != null)
                return s;
            p = s;
        }
    }

    // 返回下一个节点
    public E next() {
        fullyLock();
        try {
            if (current == null)
                throw new NoSuchElementException();
            E x = currentElement;
            lastRet = current;
            current = nextNode(current);
            currentElement = (current == null) ? null : current.item;
            return x;
        } finally {
            fullyUnlock();
        }
    }

    // 删除下一个节点
    public void remove() {
        if (lastRet == null)
            throw new IllegalStateException();
        fullyLock();
        try {
            Node<E> node = lastRet;
            lastRet = null;
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (p == node) {
                    unlink(p, trail);
                    break;
                }
            }
        } finally {
            fullyUnlock();
        }
    }
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

 

LinkedBlockingQueue示例

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

 1 import java.util.*;
 2 import java.util.concurrent.*;
 3 
 4 /*
 5  *   LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 6  *
 7  *   下面是“多个线程同时操作并且遍历queue”的示例
 8  *   (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。
 9  *   (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
10  *
11  * @author skywang
12  */
13 public class LinkedBlockingQueueDemo1 {
14 
15     // TODO: queue是LinkedList对象时,程序会出错。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new LinkedBlockingQueue<String>();
18     public static void main(String[] args) {
19     
20         // 同时启动两个线程对queue进行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24 
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34 
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “线程名” + "-" + "序号"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通过“Iterator”遍历queue。
47                 printAll();
48             }
49         }
50     }
51 }

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

(某一次)运行结果

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

tb1, ta1, 
tb1, ta1, ta2, 
tb1, ta1, ta2, ta3, 
tb1, ta1, ta2, ta3, ta4, 
tb1, ta1, tb1, ta2, ta1, ta3, ta2, ta4, ta3, ta5, 
ta4, tb1, ta5, ta1, ta6, 
ta2, tb1, ta3, ta1, ta4, ta2, ta5, ta3, ta6, ta4, tb2, 
ta5, ta6, tb2, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb6,

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

结果说明
示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingQueue进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingQueue中;接着,遍历并输出LinkedBlockingQueue中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。
当queue是LinkedBlockingQueue对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。

 

 

 

概要

本章介绍JUC包中的LinkedBlockingQueue。内容包括:
LinkedBlockingQueue介绍
LinkedBlockingQueue原理和数据结构
LinkedBlockingQueue函数列表
LinkedBlockingQueue源码分析(JDK1.7.0_40版本)
LinkedBlockingQueue示例

转载请注明出处:http://www.cnblogs.com/skywang12345/p/3503458.html

 

LinkedBlockingQueue介绍

LinkedBlockingQueue是一个单向链表实现的阻塞队列。该队列按 FIFO(先进先出)排序元素,新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。

此外,LinkedBlockingQueue还是可选容量的(防止过度膨胀),即可以指定队列的容量。如果不指定,默认容量大小等于Integer.MAX_VALUE。

 

LinkedBlockingQueue原理和数据结构

LinkedBlockingQueue的数据结构,如下图所示:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

说明
1. LinkedBlockingQueue继承于AbstractQueue,它本质上是一个FIFO(先进先出)的队列。
2. LinkedBlockingQueue实现了BlockingQueue接口,它支持多线程并发。当多线程竞争同一个资源时,某线程获取到该资源之后,其它线程需要阻塞等待。
3. LinkedBlockingQueue是通过单链表实现的。
(01) head是链表的表头。取出数据时,都是从表头head处插入。
(02) last是链表的表尾。新增数据时,都是从表尾last处插入。
(03) count是链表的实际大小,即当前链表中包含的节点个数。
(04) capacity是列表的容量,它是在创建链表时指定的。
(05) putLock是插入锁,takeLock是取出锁;notEmpty是“非空条件”,notFull是“未满条件”。通过它们对链表进行并发控制。
       LinkedBlockingQueue在实现“多线程对竞争资源的互斥访问”时,对于“插入”和“取出(删除)”操作分别使用了不同的锁。对于插入操作,通过“插入锁putLock”进行同步;对于取出操作,通过“取出锁takeLock”进行同步。
       此外,插入锁putLock和“非满条件notFull”相关联,取出锁takeLock和“非空条件notEmpty”相关联。通过notFull和notEmpty更细腻的控制锁。

     -- 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒“notEmpty上的等待线程”。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock。 -- 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒“notFull上的等待线程”。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock。

关于ReentrantLock 和 Condition等更多的内容,可以参考:
    (01) Java多线程系列–“JUC锁”02之 互斥锁ReentrantLock
    (02) Java多线程系列–“JUC锁”03之 公平锁(一)
    (03) Java多线程系列–“JUC锁”04之 公平锁(二)
    (04) Java多线程系列–“JUC锁”05之 非公平锁
    (05) Java多线程系列–“JUC锁”06之 Condition条件

 

LinkedBlockingQueue函数列表

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

// 创建一个容量为 Integer.MAX_VALUE 的 LinkedBlockingQueue。
LinkedBlockingQueue()
// 创建一个容量是 Integer.MAX_VALUE 的 LinkedBlockingQueue,最初包含给定 collection 的元素,元素按该 collection 迭代器的遍历顺序添加。
LinkedBlockingQueue(Collection<? extends E> c)
// 创建一个具有给定(固定)容量的 LinkedBlockingQueue。
LinkedBlockingQueue(int capacity)

// 从队列彻底移除所有元素。
void clear()
// 移除此队列中所有可用的元素,并将它们添加到给定 collection 中。
int drainTo(Collection<? super E> c)
// 最多从此队列中移除给定数量的可用元素,并将这些元素添加到给定 collection 中。
int drainTo(Collection<? super E> c, int maxElements)
// 返回在队列中的元素上按适当顺序进行迭代的迭代器。
Iterator<E> iterator()
// 将指定元素插入到此队列的尾部(如果立即可行且不会超出此队列的容量),在成功时返回 true,如果此队列已满,则返回 false。
boolean offer(E e)
// 将指定元素插入到此队列的尾部,如有必要,则等待指定的时间以使空间变得可用。
boolean offer(E e, long timeout, TimeUnit unit)
// 获取但不移除此队列的头;如果此队列为空,则返回 null。
E peek()
// 获取并移除此队列的头,如果此队列为空,则返回 null。
E poll()
// 获取并移除此队列的头部,在指定的等待时间前等待可用的元素(如果有必要)。
E poll(long timeout, TimeUnit unit)
// 将指定元素插入到此队列的尾部,如有必要,则等待空间变得可用。
void put(E e)
// 返回理想情况下(没有内存和资源约束)此队列可接受并且不会被阻塞的附加元素数量。
int remainingCapacity()
// 从此队列移除指定元素的单个实例(如果存在)。
boolean remove(Object o)
// 返回队列中的元素个数。
int size()
// 获取并移除此队列的头部,在元素变得可用之前一直等待(如果有必要)。
E take()
// 返回按适当顺序包含此队列中所有元素的数组。
Object[] toArray()
// 返回按适当顺序包含此队列中所有元素的数组;返回数组的运行时类型是指定数组的运行时类型。
<T> T[] toArray(T[] a)
// 返回此 collection 的字符串表示形式。
String toString()

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

 

LinkedBlockingQueue源码分析(JDK1.7.0_40版本)

LinkedBlockingQueue.java的完整源码如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》
《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

  1 /*
  2  * ORACLE PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
  3  *
  4  *
  5  *
  6  *
  7  *
  8  *
  9  *
 10  *
 11  *
 12  *
 13  *
 14  *
 15  *
 16  *
 17  *
 18  *
 19  *
 20  *
 21  *
 22  *
 23  */
 24 
 25 /*
 26  *
 27  *
 28  *
 29  *
 30  *
 31  * Written by Doug Lea with assistance from members of JCP JSR-166
 32  * Expert Group and released to the public domain, as explained at
 33  * http://creativecommons.org/publicdomain/zero/1.0/
 34  */
 35 
 36 package java.util.concurrent;
 37 
 38 import java.util.concurrent.atomic.AtomicInteger;
 39 import java.util.concurrent.locks.Condition;
 40 import java.util.concurrent.locks.ReentrantLock;
 41 import java.util.AbstractQueue;
 42 import java.util.Collection;
 43 import java.util.Iterator;
 44 import java.util.NoSuchElementException;
 45 
 46 /**
 47  * An optionally-bounded {@linkplain BlockingQueue blocking queue} based on
 48  * linked nodes.
 49  * This queue orders elements FIFO (first-in-first-out).
 50  * The <em>head</em> of the queue is that element that has been on the
 51  * queue the longest time.
 52  * The <em>tail</em> of the queue is that element that has been on the
 53  * queue the shortest time. New elements
 54  * are inserted at the tail of the queue, and the queue retrieval
 55  * operations obtain elements at the head of the queue.
 56  * Linked queues typically have higher throughput than array-based queues but
 57  * less predictable performance in most concurrent applications.
 58  *
 59  * <p> The optional capacity bound constructor argument serves as a
 60  * way to prevent excessive queue expansion. The capacity, if unspecified,
 61  * is equal to {@link Integer#MAX_VALUE}.  Linked nodes are
 62  * dynamically created upon each insertion unless this would bring the
 63  * queue above capacity.
 64  *
 65  * <p>This class and its iterator implement all of the
 66  * <em>optional</em> methods of the {@link Collection} and {@link
 67  * Iterator} interfaces.
 68  *
 69  * <p>This class is a member of the
 70  * <a href="{@docRoot}/../technotes/guides/collections/index.html">
 71  * Java Collections Framework</a>.
 72  *
 73  * @since 1.5
 74  * @author Doug Lea
 75  * @param <E> the type of elements held in this collection
 76  *
 77  */
 78 public class LinkedBlockingQueue<E> extends AbstractQueue<E>
 79         implements BlockingQueue<E>, java.io.Serializable {
 80     private static final long serialVersionUID = -6903933977591709194L;
 81 
 82     /*
 83      * A variant of the "two lock queue" algorithm.  The putLock gates
 84      * entry to put (and offer), and has an associated condition for
 85      * waiting puts.  Similarly for the takeLock.  The "count" field
 86      * that they both rely on is maintained as an atomic to avoid
 87      * needing to get both locks in most cases. Also, to minimize need
 88      * for puts to get takeLock and vice-versa, cascading notifies are
 89      * used. When a put notices that it has enabled at least one take,
 90      * it signals taker. That taker in turn signals others if more
 91      * items have been entered since the signal. And symmetrically for
 92      * takes signalling puts. Operations such as remove(Object) and
 93      * iterators acquire both locks.
 94      *
 95      * Visibility between writers and readers is provided as follows:
 96      *
 97      * Whenever an element is enqueued, the putLock is acquired and
 98      * count updated.  A subsequent reader guarantees visibility to the
 99      * enqueued Node by either acquiring the putLock (via fullyLock)
100      * or by acquiring the takeLock, and then reading n = count.get();
101      * this gives visibility to the first n items.
102      *
103      * To implement weakly consistent iterators, it appears we need to
104      * keep all Nodes GC-reachable from a predecessor dequeued Node.
105      * That would cause two problems:
106      * - allow a rogue Iterator to cause unbounded memory retention
107      * - cause cross-generational linking of old Nodes to new Nodes if
108      *   a Node was tenured while live, which generational GCs have a
109      *   hard time dealing with, causing repeated major collections.
110      * However, only non-deleted Nodes need to be reachable from
111      * dequeued Nodes, and reachability does not necessarily have to
112      * be of the kind understood by the GC.  We use the trick of
113      * linking a Node that has just been dequeued to itself.  Such a
114      * self-link implicitly means to advance to head.next.
115      */
116 
117     /**
118      * Linked list node class
119      */
120     static class Node<E> {
121         E item;
122 
123         /**
124          * One of:
125          * - the real successor Node
126          * - this Node, meaning the successor is head.next
127          * - null, meaning there is no successor (this is the last node)
128          */
129         Node<E> next;
130 
131         Node(E x) { item = x; }
132     }
133 
134     /** The capacity bound, or Integer.MAX_VALUE if none */
135     private final int capacity;
136 
137     /** Current number of elements */
138     private final AtomicInteger count = new AtomicInteger(0);
139 
140     /**
141      * Head of linked list.
142      * Invariant: head.item == null
143      */
144     private transient Node<E> head;
145 
146     /**
147      * Tail of linked list.
148      * Invariant: last.next == null
149      */
150     private transient Node<E> last;
151 
152     /** Lock held by take, poll, etc */
153     private final ReentrantLock takeLock = new ReentrantLock();
154 
155     /** Wait queue for waiting takes */
156     private final Condition notEmpty = takeLock.newCondition();
157 
158     /** Lock held by put, offer, etc */
159     private final ReentrantLock putLock = new ReentrantLock();
160 
161     /** Wait queue for waiting puts */
162     private final Condition notFull = putLock.newCondition();
163 
164     /**
165      * Signals a waiting take. Called only from put/offer (which do not
166      * otherwise ordinarily lock takeLock.)
167      */
168     private void signalNotEmpty() {
169         final ReentrantLock takeLock = this.takeLock;
170         takeLock.lock();
171         try {
172             notEmpty.signal();
173         } finally {
174             takeLock.unlock();
175         }
176     }
177 
178     /**
179      * Signals a waiting put. Called only from take/poll.
180      */
181     private void signalNotFull() {
182         final ReentrantLock putLock = this.putLock;
183         putLock.lock();
184         try {
185             notFull.signal();
186         } finally {
187             putLock.unlock();
188         }
189     }
190 
191     /**
192      * Links node at end of queue.
193      *
194      * @param node the node
195      */
196     private void enqueue(Node<E> node) {
197         // assert putLock.isHeldByCurrentThread();
198         // assert last.next == null;
199         last = last.next = node;
200     }
201 
202     /**
203      * Removes a node from head of queue.
204      *
205      * @return the node
206      */
207     private E dequeue() {
208         // assert takeLock.isHeldByCurrentThread();
209         // assert head.item == null;
210         Node<E> h = head;
211         Node<E> first = h.next;
212         h.next = h; // help GC
213         head = first;
214         E x = first.item;
215         first.item = null;
216         return x;
217     }
218 
219     /**
220      * Lock to prevent both puts and takes.
221      */
222     void fullyLock() {
223         putLock.lock();
224         takeLock.lock();
225     }
226 
227     /**
228      * Unlock to allow both puts and takes.
229      */
230     void fullyUnlock() {
231         takeLock.unlock();
232         putLock.unlock();
233     }
234 
235 //     /**
236 //      * Tells whether both locks are held by current thread.
237 //      */
238 //     boolean isFullyLocked() {
239 //         return (putLock.isHeldByCurrentThread() &&
240 //                 takeLock.isHeldByCurrentThread());
241 //     }
242 
243     /**
244      * Creates a {@code LinkedBlockingQueue} with a capacity of
245      * {@link Integer#MAX_VALUE}.
246      */
247     public LinkedBlockingQueue() {
248         this(Integer.MAX_VALUE);
249     }
250 
251     /**
252      * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
253      *
254      * @param capacity the capacity of this queue
255      * @throws IllegalArgumentException if {@code capacity} is not greater
256      *         than zero
257      */
258     public LinkedBlockingQueue(int capacity) {
259         if (capacity <= 0) throw new IllegalArgumentException();
260         this.capacity = capacity;
261         last = head = new Node<E>(null);
262     }
263 
264     /**
265      * Creates a {@code LinkedBlockingQueue} with a capacity of
266      * {@link Integer#MAX_VALUE}, initially containing the elements of the
267      * given collection,
268      * added in traversal order of the collection's iterator.
269      *
270      * @param c the collection of elements to initially contain
271      * @throws NullPointerException if the specified collection or any
272      *         of its elements are null
273      */
274     public LinkedBlockingQueue(Collection<? extends E> c) {
275         this(Integer.MAX_VALUE);
276         final ReentrantLock putLock = this.putLock;
277         putLock.lock(); // Never contended, but necessary for visibility
278         try {
279             int n = 0;
280             for (E e : c) {
281                 if (e == null)
282                     throw new NullPointerException();
283                 if (n == capacity)
284                     throw new IllegalStateException("Queue full");
285                 enqueue(new Node<E>(e));
286                 ++n;
287             }
288             count.set(n);
289         } finally {
290             putLock.unlock();
291         }
292     }
293 
294 
295     // this doc comment is overridden to remove the reference to collections
296     // greater in size than Integer.MAX_VALUE
297     /**
298      * Returns the number of elements in this queue.
299      *
300      * @return the number of elements in this queue
301      */
302     public int size() {
303         return count.get();
304     }
305 
306     // this doc comment is a modified copy of the inherited doc comment,
307     // without the reference to unlimited queues.
308     /**
309      * Returns the number of additional elements that this queue can ideally
310      * (in the absence of memory or resource constraints) accept without
311      * blocking. This is always equal to the initial capacity of this queue
312      * less the current {@code size} of this queue.
313      *
314      * <p>Note that you <em>cannot</em> always tell if an attempt to insert
315      * an element will succeed by inspecting {@code remainingCapacity}
316      * because it may be the case that another thread is about to
317      * insert or remove an element.
318      */
319     public int remainingCapacity() {
320         return capacity - count.get();
321     }
322 
323     /**
324      * Inserts the specified element at the tail of this queue, waiting if
325      * necessary for space to become available.
326      *
327      * @throws InterruptedException {@inheritDoc}
328      * @throws NullPointerException {@inheritDoc}
329      */
330     public void put(E e) throws InterruptedException {
331         if (e == null) throw new NullPointerException();
332         // Note: convention in all put/take/etc is to preset local var
333         // holding count negative to indicate failure unless set.
334         int c = -1;
335         Node<E> node = new Node(e);
336         final ReentrantLock putLock = this.putLock;
337         final AtomicInteger count = this.count;
338         putLock.lockInterruptibly();
339         try {
340             /*
341              * Note that count is used in wait guard even though it is
342              * not protected by lock. This works because count can
343              * only decrease at this point (all other puts are shut
344              * out by lock), and we (or some other waiting put) are
345              * signalled if it ever changes from capacity. Similarly
346              * for all other uses of count in other wait guards.
347              */
348             while (count.get() == capacity) {
349                 notFull.await();
350             }
351             enqueue(node);
352             c = count.getAndIncrement();
353             if (c + 1 < capacity)
354                 notFull.signal();
355         } finally {
356             putLock.unlock();
357         }
358         if (c == 0)
359             signalNotEmpty();
360     }
361 
362     /**
363      * Inserts the specified element at the tail of this queue, waiting if
364      * necessary up to the specified wait time for space to become available.
365      *
366      * @return {@code true} if successful, or {@code false} if
367      *         the specified waiting time elapses before space is available.
368      * @throws InterruptedException {@inheritDoc}
369      * @throws NullPointerException {@inheritDoc}
370      */
371     public boolean offer(E e, long timeout, TimeUnit unit)
372         throws InterruptedException {
373 
374         if (e == null) throw new NullPointerException();
375         long nanos = unit.toNanos(timeout);
376         int c = -1;
377         final ReentrantLock putLock = this.putLock;
378         final AtomicInteger count = this.count;
379         putLock.lockInterruptibly();
380         try {
381             while (count.get() == capacity) {
382                 if (nanos <= 0)
383                     return false;
384                 nanos = notFull.awaitNanos(nanos);
385             }
386             enqueue(new Node<E>(e));
387             c = count.getAndIncrement();
388             if (c + 1 < capacity)
389                 notFull.signal();
390         } finally {
391             putLock.unlock();
392         }
393         if (c == 0)
394             signalNotEmpty();
395         return true;
396     }
397 
398     /**
399      * Inserts the specified element at the tail of this queue if it is
400      * possible to do so immediately without exceeding the queue's capacity,
401      * returning {@code true} upon success and {@code false} if this queue
402      * is full.
403      * When using a capacity-restricted queue, this method is generally
404      * preferable to method {@link BlockingQueue#add add}, which can fail to
405      * insert an element only by throwing an exception.
406      *
407      * @throws NullPointerException if the specified element is null
408      */
409     public boolean offer(E e) {
410         if (e == null) throw new NullPointerException();
411         final AtomicInteger count = this.count;
412         if (count.get() == capacity)
413             return false;
414         int c = -1;
415         Node<E> node = new Node(e);
416         final ReentrantLock putLock = this.putLock;
417         putLock.lock();
418         try {
419             if (count.get() < capacity) {
420                 enqueue(node);
421                 c = count.getAndIncrement();
422                 if (c + 1 < capacity)
423                     notFull.signal();
424             }
425         } finally {
426             putLock.unlock();
427         }
428         if (c == 0)
429             signalNotEmpty();
430         return c >= 0;
431     }
432 
433 
434     public E take() throws InterruptedException {
435         E x;
436         int c = -1;
437         final AtomicInteger count = this.count;
438         final ReentrantLock takeLock = this.takeLock;
439         takeLock.lockInterruptibly();
440         try {
441             while (count.get() == 0) {
442                 notEmpty.await();
443             }
444             x = dequeue();
445             c = count.getAndDecrement();
446             if (c > 1)
447                 notEmpty.signal();
448         } finally {
449             takeLock.unlock();
450         }
451         if (c == capacity)
452             signalNotFull();
453         return x;
454     }
455 
456     public E poll(long timeout, TimeUnit unit) throws InterruptedException {
457         E x = null;
458         int c = -1;
459         long nanos = unit.toNanos(timeout);
460         final AtomicInteger count = this.count;
461         final ReentrantLock takeLock = this.takeLock;
462         takeLock.lockInterruptibly();
463         try {
464             while (count.get() == 0) {
465                 if (nanos <= 0)
466                     return null;
467                 nanos = notEmpty.awaitNanos(nanos);
468             }
469             x = dequeue();
470             c = count.getAndDecrement();
471             if (c > 1)
472                 notEmpty.signal();
473         } finally {
474             takeLock.unlock();
475         }
476         if (c == capacity)
477             signalNotFull();
478         return x;
479     }
480 
481     public E poll() {
482         final AtomicInteger count = this.count;
483         if (count.get() == 0)
484             return null;
485         E x = null;
486         int c = -1;
487         final ReentrantLock takeLock = this.takeLock;
488         takeLock.lock();
489         try {
490             if (count.get() > 0) {
491                 x = dequeue();
492                 c = count.getAndDecrement();
493                 if (c > 1)
494                     notEmpty.signal();
495             }
496         } finally {
497             takeLock.unlock();
498         }
499         if (c == capacity)
500             signalNotFull();
501         return x;
502     }
503 
504     public E peek() {
505         if (count.get() == 0)
506             return null;
507         final ReentrantLock takeLock = this.takeLock;
508         takeLock.lock();
509         try {
510             Node<E> first = head.next;
511             if (first == null)
512                 return null;
513             else
514                 return first.item;
515         } finally {
516             takeLock.unlock();
517         }
518     }
519 
520     /**
521      * Unlinks interior Node p with predecessor trail.
522      */
523     void unlink(Node<E> p, Node<E> trail) {
524         // assert isFullyLocked();
525         // p.next is not changed, to allow iterators that are
526         // traversing p to maintain their weak-consistency guarantee.
527         p.item = null;
528         trail.next = p.next;
529         if (last == p)
530             last = trail;
531         if (count.getAndDecrement() == capacity)
532             notFull.signal();
533     }
534 
535     /**
536      * Removes a single instance of the specified element from this queue,
537      * if it is present.  More formally, removes an element {@code e} such
538      * that {@code o.equals(e)}, if this queue contains one or more such
539      * elements.
540      * Returns {@code true} if this queue contained the specified element
541      * (or equivalently, if this queue changed as a result of the call).
542      *
543      * @param o element to be removed from this queue, if present
544      * @return {@code true} if this queue changed as a result of the call
545      */
546     public boolean remove(Object o) {
547         if (o == null) return false;
548         fullyLock();
549         try {
550             for (Node<E> trail = head, p = trail.next;
551                  p != null;
552                  trail = p, p = p.next) {
553                 if (o.equals(p.item)) {
554                     unlink(p, trail);
555                     return true;
556                 }
557             }
558             return false;
559         } finally {
560             fullyUnlock();
561         }
562     }
563 
564     /**
565      * Returns {@code true} if this queue contains the specified element.
566      * More formally, returns {@code true} if and only if this queue contains
567      * at least one element {@code e} such that {@code o.equals(e)}.
568      *
569      * @param o object to be checked for containment in this queue
570      * @return {@code true} if this queue contains the specified element
571      */
572     public boolean contains(Object o) {
573         if (o == null) return false;
574         fullyLock();
575         try {
576             for (Node<E> p = head.next; p != null; p = p.next)
577                 if (o.equals(p.item))
578                     return true;
579             return false;
580         } finally {
581             fullyUnlock();
582         }
583     }
584 
585     /**
586      * Returns an array containing all of the elements in this queue, in
587      * proper sequence.
588      *
589      * <p>The returned array will be "safe" in that no references to it are
590      * maintained by this queue.  (In other words, this method must allocate
591      * a new array).  The caller is thus free to modify the returned array.
592      *
593      * <p>This method acts as bridge between array-based and collection-based
594      * APIs.
595      *
596      * @return an array containing all of the elements in this queue
597      */
598     public Object[] toArray() {
599         fullyLock();
600         try {
601             int size = count.get();
602             Object[] a = new Object[size];
603             int k = 0;
604             for (Node<E> p = head.next; p != null; p = p.next)
605                 a[k++] = p.item;
606             return a;
607         } finally {
608             fullyUnlock();
609         }
610     }
611 
612     /**
613      * Returns an array containing all of the elements in this queue, in
614      * proper sequence; the runtime type of the returned array is that of
615      * the specified array.  If the queue fits in the specified array, it
616      * is returned therein.  Otherwise, a new array is allocated with the
617      * runtime type of the specified array and the size of this queue.
618      *
619      * <p>If this queue fits in the specified array with room to spare
620      * (i.e., the array has more elements than this queue), the element in
621      * the array immediately following the end of the queue is set to
622      * {@code null}.
623      *
624      * <p>Like the {@link #toArray()} method, this method acts as bridge between
625      * array-based and collection-based APIs.  Further, this method allows
626      * precise control over the runtime type of the output array, and may,
627      * under certain circumstances, be used to save allocation costs.
628      *
629      * <p>Suppose {@code x} is a queue known to contain only strings.
630      * The following code can be used to dump the queue into a newly
631      * allocated array of {@code String}:
632      *
633      * <pre>
634      *     String[] y = x.toArray(new String[0]);</pre>
635      *
636      * Note that {@code toArray(new Object[0])} is identical in function to
637      * {@code toArray()}.
638      *
639      * @param a the array into which the elements of the queue are to
640      *          be stored, if it is big enough; otherwise, a new array of the
641      *          same runtime type is allocated for this purpose
642      * @return an array containing all of the elements in this queue
643      * @throws ArrayStoreException if the runtime type of the specified array
644      *         is not a supertype of the runtime type of every element in
645      *         this queue
646      * @throws NullPointerException if the specified array is null
647      */
648     @SuppressWarnings("unchecked")
649     public <T> T[] toArray(T[] a) {
650         fullyLock();
651         try {
652             int size = count.get();
653             if (a.length < size)
654                 a = (T[])java.lang.reflect.Array.newInstance
655                     (a.getClass().getComponentType(), size);
656 
657             int k = 0;
658             for (Node<E> p = head.next; p != null; p = p.next)
659                 a[k++] = (T)p.item;
660             if (a.length > k)
661                 a[k] = null;
662             return a;
663         } finally {
664             fullyUnlock();
665         }
666     }
667 
668     public String toString() {
669         fullyLock();
670         try {
671             Node<E> p = head.next;
672             if (p == null)
673                 return "[]";
674 
675             StringBuilder sb = new StringBuilder();
676             sb.append('[');
677             for (;;) {
678                 E e = p.item;
679                 sb.append(e == this ? "(this Collection)" : e);
680                 p = p.next;
681                 if (p == null)
682                     return sb.append(']').toString();
683                 sb.append(',').append(' ');
684             }
685         } finally {
686             fullyUnlock();
687         }
688     }
689 
690     /**
691      * Atomically removes all of the elements from this queue.
692      * The queue will be empty after this call returns.
693      */
694     public void clear() {
695         fullyLock();
696         try {
697             for (Node<E> p, h = head; (p = h.next) != null; h = p) {
698                 h.next = h;
699                 p.item = null;
700             }
701             head = last;
702             // assert head.item == null && head.next == null;
703             if (count.getAndSet(0) == capacity)
704                 notFull.signal();
705         } finally {
706             fullyUnlock();
707         }
708     }
709 
710     /**
711      * @throws UnsupportedOperationException {@inheritDoc}
712      * @throws ClassCastException            {@inheritDoc}
713      * @throws NullPointerException          {@inheritDoc}
714      * @throws IllegalArgumentException      {@inheritDoc}
715      */
716     public int drainTo(Collection<? super E> c) {
717         return drainTo(c, Integer.MAX_VALUE);
718     }
719 
720     /**
721      * @throws UnsupportedOperationException {@inheritDoc}
722      * @throws ClassCastException            {@inheritDoc}
723      * @throws NullPointerException          {@inheritDoc}
724      * @throws IllegalArgumentException      {@inheritDoc}
725      */
726     public int drainTo(Collection<? super E> c, int maxElements) {
727         if (c == null)
728             throw new NullPointerException();
729         if (c == this)
730             throw new IllegalArgumentException();
731         boolean signalNotFull = false;
732         final ReentrantLock takeLock = this.takeLock;
733         takeLock.lock();
734         try {
735             int n = Math.min(maxElements, count.get());
736             // count.get provides visibility to first n Nodes
737             Node<E> h = head;
738             int i = 0;
739             try {
740                 while (i < n) {
741                     Node<E> p = h.next;
742                     c.add(p.item);
743                     p.item = null;
744                     h.next = h;
745                     h = p;
746                     ++i;
747                 }
748                 return n;
749             } finally {
750                 // Restore invariants even if c.add() threw
751                 if (i > 0) {
752                     // assert h.item == null;
753                     head = h;
754                     signalNotFull = (count.getAndAdd(-i) == capacity);
755                 }
756             }
757         } finally {
758             takeLock.unlock();
759             if (signalNotFull)
760                 signalNotFull();
761         }
762     }
763 
764     /**
765      * Returns an iterator over the elements in this queue in proper sequence.
766      * The elements will be returned in order from first (head) to last (tail).
767      *
768      * <p>The returned iterator is a "weakly consistent" iterator that
769      * will never throw {@link java.util.ConcurrentModificationException
770      * ConcurrentModificationException}, and guarantees to traverse
771      * elements as they existed upon construction of the iterator, and
772      * may (but is not guaranteed to) reflect any modifications
773      * subsequent to construction.
774      *
775      * @return an iterator over the elements in this queue in proper sequence
776      */
777     public Iterator<E> iterator() {
778       return new Itr();
779     }
780 
781     private class Itr implements Iterator<E> {
782         /*
783          * Basic weakly-consistent iterator.  At all times hold the next
784          * item to hand out so that if hasNext() reports true, we will
785          * still have it to return even if lost race with a take etc.
786          */
787         private Node<E> current;
788         private Node<E> lastRet;
789         private E currentElement;
790 
791         Itr() {
792             fullyLock();
793             try {
794                 current = head.next;
795                 if (current != null)
796                     currentElement = current.item;
797             } finally {
798                 fullyUnlock();
799             }
800         }
801 
802         public boolean hasNext() {
803             return current != null;
804         }
805 
806         /**
807          * Returns the next live successor of p, or null if no such.
808          *
809          * Unlike other traversal methods, iterators need to handle both:
810          * - dequeued nodes (p.next == p)
811          * - (possibly multiple) interior removed nodes (p.item == null)
812          */
813         private Node<E> nextNode(Node<E> p) {
814             for (;;) {
815                 Node<E> s = p.next;
816                 if (s == p)
817                     return head.next;
818                 if (s == null || s.item != null)
819                     return s;
820                 p = s;
821             }
822         }
823 
824         public E next() {
825             fullyLock();
826             try {
827                 if (current == null)
828                     throw new NoSuchElementException();
829                 E x = currentElement;
830                 lastRet = current;
831                 current = nextNode(current);
832                 currentElement = (current == null) ? null : current.item;
833                 return x;
834             } finally {
835                 fullyUnlock();
836             }
837         }
838 
839         public void remove() {
840             if (lastRet == null)
841                 throw new IllegalStateException();
842             fullyLock();
843             try {
844                 Node<E> node = lastRet;
845                 lastRet = null;
846                 for (Node<E> trail = head, p = trail.next;
847                      p != null;
848                      trail = p, p = p.next) {
849                     if (p == node) {
850                         unlink(p, trail);
851                         break;
852                     }
853                 }
854             } finally {
855                 fullyUnlock();
856             }
857         }
858     }
859 
860     /**
861      * Save the state to a stream (that is, serialize it).
862      *
863      * @serialData The capacity is emitted (int), followed by all of
864      * its elements (each an {@code Object}) in the proper order,
865      * followed by a null
866      * @param s the stream
867      */
868     private void writeObject(java.io.ObjectOutputStream s)
869         throws java.io.IOException {
870 
871         fullyLock();
872         try {
873             // Write out any hidden stuff, plus capacity
874             s.defaultWriteObject();
875 
876             // Write out all elements in the proper order.
877             for (Node<E> p = head.next; p != null; p = p.next)
878                 s.writeObject(p.item);
879 
880             // Use trailing null as sentinel
881             s.writeObject(null);
882         } finally {
883             fullyUnlock();
884         }
885     }
886 
887     /**
888      * Reconstitute this queue instance from a stream (that is,
889      * deserialize it).
890      *
891      * @param s the stream
892      */
893     private void readObject(java.io.ObjectInputStream s)
894         throws java.io.IOException, ClassNotFoundException {
895         // Read in capacity, and any hidden stuff
896         s.defaultReadObject();
897 
898         count.set(0);
899         last = head = new Node<E>(null);
900 
901         // Read in all elements and place in queue
902         for (;;) {
903             @SuppressWarnings("unchecked")
904             E item = (E)s.readObject();
905             if (item == null)
906                 break;
907             add(item);
908         }
909     }
910 }

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

下面从LinkedBlockingQueue的创建,添加,删除,遍历这几个方面对它进行分析。

1. 创建

下面以LinkedBlockingQueue(int capacity)来进行说明。

public LinkedBlockingQueue(int capacity) {
    if (capacity <= 0) throw new IllegalArgumentException();
    this.capacity = capacity;
    last = head = new Node<E>(null);
}

说明
(01) capacity是“链式阻塞队列”的容量。
(02) head和last是“链式阻塞队列”的首节点和尾节点。它们在LinkedBlockingQueue中的声明如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

// 容量
private final int capacity;
// 当前数量
private final AtomicInteger count = new AtomicInteger(0);
private transient Node<E> head; // 链表的表头
private transient Node<E> last; // 链表的表尾
// 用于控制“删除元素”的互斥锁takeLock 和 锁对应的“非空条件”notEmpty
private final ReentrantLock takeLock = new ReentrantLock();
private final Condition notEmpty = takeLock.newCondition();
// 用于控制“添加元素”的互斥锁putLock 和 锁对应的“非满条件”notFull
private final ReentrantLock putLock = new ReentrantLock();
private final Condition notFull = putLock.newCondition();

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

 

链表的节点定义如下:

static class Node<E> {
    E item;         // 数据
    Node<E> next;   // 下一个节点的指针

    Node(E x) { item = x; }
}

2. 添加

下面以offer(E e)为例,对LinkedBlockingQueue的添加方法进行说明。

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    // 如果“队列已满”,则返回false,表示插入失败。
    final AtomicInteger count = this.count;
    if (count.get() == capacity)
        return false;
    int c = -1;
    // 新建“节点e”
    Node<E> node = new Node(e);
    final ReentrantLock putLock = this.putLock;
    // 获取“插入锁putLock”
    putLock.lock();
    try {
        // 再次对“队列是不是满”的进行判断。
        // 若“队列未满”,则插入节点。
        if (count.get() < capacity) {
            // 插入节点
            enqueue(node);
            // 将“当前节点数量”+1,并返回“原始的数量”
            c = count.getAndIncrement();
            // 如果在插入元素之后,队列仍然未满,则唤醒notFull上的等待线程。
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        // 释放“插入锁putLock”
        putLock.unlock();
    }
    // 如果在插入节点前,队列为空;则插入节点后,唤醒notEmpty上的等待线程
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

说明:offer()的作用很简单,就是将元素E添加到队列的末尾。

enqueue()的源码如下:

private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

enqueue()的作用是将node添加到队列末尾,并设置node为新的尾节点!

signalNotEmpty()的源码如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

signalNotEmpty()的作用是唤醒notEmpty上的等待线程。

 

3. 取出

下面以take()为例,对LinkedBlockingQueue的取出方法进行说明。

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    // 获取“取出锁”,若当前线程是中断状态,则抛出InterruptedException异常
    takeLock.lockInterruptibly();
    try {
        // 若“队列为空”,则一直等待。
        while (count.get() == 0) {
            notEmpty.await();
        }
        // 取出元素
        x = dequeue();
        // 取出元素之后,将“节点数量”-1;并返回“原始的节点数量”。
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        // 释放“取出锁”
        takeLock.unlock();
    }
    // 如果在“取出元素之前”,队列是满的;则在取出元素之后,唤醒notFull上的等待线程。
    if (c == capacity)
        signalNotFull();
    return x;
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

说明:take()的作用是取出并返回队列的头。若队列为空,则一直等待。

dequeue()的源码如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

dequeue()的作用就是删除队列的头节点,并将表头指向“原头节点的下一个节点”。

signalNotFull()的源码如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

signalNotFull()的作用就是唤醒notFull上的等待线程。

 

4. 遍历

下面对LinkedBlockingQueue的遍历方法进行说明。

public Iterator<E> iterator() {
  return new Itr();
}

iterator()实际上是返回一个Iter对象。

Itr类的定义如下:

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

private class Itr implements Iterator<E> {
    // 当前节点
    private Node<E> current;
    // 上一次返回的节点
    private Node<E> lastRet;
    // 当前节点对应的值
    private E currentElement;

    Itr() {
        // 同时获取“插入锁putLock” 和 “取出锁takeLock”
        fullyLock();
        try {
            // 设置“当前元素”为“队列表头的下一节点”,即为队列的第一个有效节点
            current = head.next;
            if (current != null)
                currentElement = current.item;
        } finally {
            // 释放“插入锁putLock” 和 “取出锁takeLock”
            fullyUnlock();
        }
    }

    // 返回“下一个节点是否为null”
    public boolean hasNext() {
        return current != null;
    }

    private Node<E> nextNode(Node<E> p) {
        for (;;) {
            Node<E> s = p.next;
            if (s == p)
                return head.next;
            if (s == null || s.item != null)
                return s;
            p = s;
        }
    }

    // 返回下一个节点
    public E next() {
        fullyLock();
        try {
            if (current == null)
                throw new NoSuchElementException();
            E x = currentElement;
            lastRet = current;
            current = nextNode(current);
            currentElement = (current == null) ? null : current.item;
            return x;
        } finally {
            fullyUnlock();
        }
    }

    // 删除下一个节点
    public void remove() {
        if (lastRet == null)
            throw new IllegalStateException();
        fullyLock();
        try {
            Node<E> node = lastRet;
            lastRet = null;
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                if (p == node) {
                    unlink(p, trail);
                    break;
                }
            }
        } finally {
            fullyUnlock();
        }
    }
}

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

 

LinkedBlockingQueue示例

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

 1 import java.util.*;
 2 import java.util.concurrent.*;
 3 
 4 /*
 5  *   LinkedBlockingQueue是“线程安全”的队列,而LinkedList是非线程安全的。
 6  *
 7  *   下面是“多个线程同时操作并且遍历queue”的示例
 8  *   (01) 当queue是LinkedBlockingQueue对象时,程序能正常运行。
 9  *   (02) 当queue是LinkedList对象时,程序会产生ConcurrentModificationException异常。
10  *
11  * @author skywang
12  */
13 public class LinkedBlockingQueueDemo1 {
14 
15     // TODO: queue是LinkedList对象时,程序会出错。
16     //private static Queue<String> queue = new LinkedList<String>();
17     private static Queue<String> queue = new LinkedBlockingQueue<String>();
18     public static void main(String[] args) {
19     
20         // 同时启动两个线程对queue进行操作!
21         new MyThread("ta").start();
22         new MyThread("tb").start();
23     }
24 
25     private static void printAll() {
26         String value;
27         Iterator iter = queue.iterator();
28         while(iter.hasNext()) {
29             value = (String)iter.next();
30             System.out.print(value+", ");
31         }
32         System.out.println();
33     }
34 
35     private static class MyThread extends Thread {
36         MyThread(String name) {
37             super(name);
38         }
39         @Override
40         public void run() {
41                 int i = 0;
42             while (i++ < 6) {
43                 // “线程名” + "-" + "序号"
44                 String val = Thread.currentThread().getName()+i;
45                 queue.add(val);
46                 // 通过“Iterator”遍历queue。
47                 printAll();
48             }
49         }
50     }
51 }

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

(某一次)运行结果

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

tb1, ta1, 
tb1, ta1, ta2, 
tb1, ta1, ta2, ta3, 
tb1, ta1, ta2, ta3, ta4, 
tb1, ta1, tb1, ta2, ta1, ta3, ta2, ta4, ta3, ta5, 
ta4, tb1, ta5, ta1, ta6, 
ta2, tb1, ta3, ta1, ta4, ta2, ta5, ta3, ta6, ta4, tb2, 
ta5, ta6, tb2, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, 
tb1, ta1, ta2, ta3, ta4, ta5, ta6, tb2, tb3, tb4, tb5, tb6,

《Java多线程系列--【JUC集合08】- LinkedBlockingQueue》

结果说明
示例程序中,启动两个线程(线程ta和线程tb)分别对LinkedBlockingQueue进行操作。以线程ta而言,它会先获取“线程名”+“序号”,然后将该字符串添加到LinkedBlockingQueue中;接着,遍历并输出LinkedBlockingQueue中的全部元素。 线程tb的操作和线程ta一样,只不过线程tb的名字和线程ta的名字不同。
当queue是LinkedBlockingQueue对象时,程序能正常运行。如果将queue改为LinkedList时,程序会产生ConcurrentModificationException异常。

 

 

    原文作者:JUC
    原文地址: https://blog.csdn.net/m0_37955444/article/details/79179635
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞