细说并发:Java 阻塞队列源码分析(下)

上一篇 细说并发4:Java 阻塞队列源码分析(上) 我们了解了 ArrayBlockingQueue, LinkedBlockingQueuePriorityBlockingQueue,这篇文章来了解剩下的四种阻塞队列。

读完本文你将了解:

七种阻塞队列的后四种

DelayQueue

DelayQueue 是一个支持延时获取元素的、无界阻塞队列。

队列使用 PriorityQueue 实现,队列中的元素必须实现 Delayed 接口:

public class DelayQueue<E extends Delayed> extends AbstractQueue<E>
    implements BlockingQueue<E> {...}

Delayed 接口:

public interface Delayed extends Comparable<Delayed> {

    //返回当前对象的剩余执行时间
    long getDelay(TimeUnit unit);
}

可以看到,实现 Delayed 的类也需要实现 Comparable 接口,即实现 compareTo() 方法,保证集合中元素的顺序和 getDelay() 一致。

因此创建元素时可以指定多久才能从队列中获取当前元素。

DelayQueue 的关键属性

private final transient ReentrantLock lock = new ReentrantLock();
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;

/**
 * Condition signalled when a newer element becomes available
 * at the head of the queue or a new thread may need to
 * become leader.
 */
private final Condition available = lock.newCondition();

可以看到,DelayQueue 的属性只有四个,却都不简单:

  1. ReentrantLock lock
    • 读写锁
  2. PriorityQueue q
    • 无界的、优先级队列
  3. Thread leader
    • Leader-Follower 模型中的 leader
  4. Condition available
    • 队首有新元素可用或者有新线程成为 leader 时触发的 condition

简单介绍下关键属性。

1 PriorityQueue 是一个用数组实现的,基于二叉堆(元素[n] 的子孩子是 元素[2*n+1] 和元素[2*(n+1)] )数据结构的集合。

/**
 * Priority queue represented as a balanced binary heap: the two
 * children of queue[n] are queue[2*n+1] and queue[2*(n+1)].  The
 * priority queue is ordered by comparator, or by the elements'
 * natural ordering, if comparator is null: For each node n in the
 * heap and each descendant d of n, n <= d.  The element with the
 * lowest value is in queue[0], assuming the queue is nonempty.
 */
transient Object[] queue;

在添加元素时如果超出限制也会扩容:

public boolean offer(E e) {
    if (e == null)
        throw new NullPointerException();
    modCount++;
    int i = size;
    if (i >= queue.length)
        grow(i + 1);
    size = i + 1;
    if (i == 0)
        queue[0] = e;
    else
        siftUp(i, e);
    return true;
}

所以是无界的。

2.Leader-Follower 模型

《细说并发:Java 阻塞队列源码分析(下)》

这种模型中所有线程会有三种身份中的一种:leader、follower,以及一个干活中的状态:proccesser。

它的基本原则就是,永远最多只有一个 leader。而所有 follower 都在等待成为 leader。

线程池启动时会自动产生一个 Leader 负责等待事件,当有一个事件产生时,Leader 线程首先通知一个 Follower 线程将其提拔为新的 Leader,然后自己就去干活了,去处理这个事件。处理完毕后加入 Follower 线程等待队列,等待下次成为 Leader。

这种方法可以增强 CPU 高速缓存相似性,及消除动态内存分配和线程间的数据交换。这种模式是为了最小化任务等待时间,当一个线程成为 leader 后,它只需要等待下一个可执行任务的出现,而其他线程要无限制地等待。

这部分摘自:blog.csdn.net/goldlevi/ar…

实现 Delayed 接口

前面提到了,DelayQueue 的元素必须实现 Delayed 接口,我们以 JDK 中的 ScheduledFutureTask 为例,看下如何实现:

private class ScheduledFutureTask<V>
        extends FutureTask<V> implements RunnableScheduledFuture<V> {

    //1.初始化
    ScheduledFutureTask(Runnable r, V result, long ns, long period) {
    super(r, result);
    this.time = ns;
    this.period = period;
    this.sequenceNumber = sequencer.getAndIncrement();
    }
    //...
    //2.
public long getDelay(TimeUnit unit) {
    return unit.convert(time - now(), NANOSECONDS);
}

//3.
public int compareTo(Delayed other) {
    if (other == this) // compare zero if same object
        return 0;
    if (other instanceof ScheduledFutureTask) {
        ScheduledFutureTask<?> x = (ScheduledFutureTask<?>)other;
        long diff = time - x.time;
        if (diff < 0)
            return -1;
        else if (diff > 0)
            return 1;
        else if (sequenceNumber < x.sequenceNumber)
            return -1;
        else
            return 1;
    }
    long diff = getDelay(NANOSECONDS) - other.getDelay(NANOSECONDS);
    return (diff < 0) ? -1 : (diff > 0) ? 1 : 0;
}
//...
}

可以看到,实现 Delayed 接口大概有三步:

  1. 构造函数中初始化基本数据,比如执行时间等数据
  2. 实现 getDelay() 方法,返回当前元素还需要延时多久执行
  3. 实现 compareTo() 方法,指定不同元素如何比较谁先执行

延时阻塞队列如何实现

DelayQueue 中只有延迟时间到了才能从队列中取出元素。

那这个是怎么实现的呢?我们看一下获取元素的实现,以 take() 为例:

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        for (;;) {
            E first = q.peek();    //先获取队首元素,不删除
            if (first == null)    //如果为空就阻塞等待
                available.await();
            else {
                long delay = first.getDelay(NANOSECONDS);
                if (delay <= 0L)    //比较元素延时时间是否到达
                    return q.poll();    //如果是就移除并返回
                first = null; // don't retain ref while waiting
                if (leader != null)    //如果有 leader 线程,依然阻塞等待
                    available.await();
                else {        //如果没有 leader 线程,指定当前线程,然后等待任务的待执行时间
                    Thread thisThread = Thread.currentThread();
                    leader = thisThread;
                    try {
                        available.awaitNanos(delay);
                    } finally {
                        if (leader == thisThread)
                            leader = null;
                    }
                }
            }
        }
    } finally {        //最后等待时间到了后,就通知阻塞的线程
        if (leader == null && q.peek() != null)
            available.signal();
        lock.unlock();
    }
}

//PriorityQueue.peek()
public E peek() {
    return (size == 0) ? null : (E) queue[0];
}

可以看到,在取元素时,会根据元素的延时执行时间是否为 0 进行判断,如果延时执行时间已经没有了,就直接返回;否则就要等待执行时间到达后再返回。其中的 Leader-Follower 模型的调度过程这里就不分析了,越分析内容越多 – -。

DelayQueue 使用场景:

  • 缓存系统的设计
    • DelayQueue 保存元素的有效期,用一个线程来循环查询 DelayQueue ,能查到元素,就说明缓存的有效期到了
  • 定时任务调度
    • DelayQueue 保存定时执行的任务和执行时间,同样有一个循环查询线程,获取到任务就执行
    • TimerQueue 就是使用 DelayQueue 实现的

SynchronousQueue

SynchronousQueue 支持公平访问队列,根据构造函数的参数不同,有两种实现方式:TransferQueueTransferStack,默认情况下是 false:

private transient volatile Transferer<E> transferer;

public SynchronousQueue() {
    this(false);
}

public SynchronousQueue(boolean fair) {
    transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}

SynchronousQueue 是一个不存储元素的阻塞队列。

这里的“不存储元素”指的是,SynchronousQueue 容量为 0,每添加一个元素必须等待被取走后才能继续添加元素。

我们看下它的 put() 的实现:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {    
        Thread.interrupted();
        throw new InterruptedException();
    }
}

可以看到,它的添加是调用的 transferer.transfer(),如果返回 null 就调用 Thread.interrupted() 将中断标志位复位(设为 false),然后抛出异常。

看下 TransferStack.transfer():

/**
 * Puts or takes an item.
 */
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
    SNode s = null; 
    int mode = (e == null) ? REQUEST : DATA;    //判断是添加还是获取

    for (;;) {
        SNode h = head;      //获取栈顶节点  
        if (h == null || h.mode == mode) {  // empty or same-mode
            if (timed && nanos <= 0) {      // can't wait
                if (h != null && h.isCancelled())    //如果头节点无法获取,就去获取下一个
                    casHead(h, h.next);     // pop cancelled node
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) {
                //设置头节点
                SNode m = awaitFulfill(s, timed, nanos);
                if (m == s) {               // wait was cancelled
                    clean(s);
                    return null;
                }
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // try to fulfill
            if (h.isCancelled())            // already cancelled
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // loop until matched or waiters disappear
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // all waiters are gone
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // lost match
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // help a fulfiller
            SNode m = h.next;               // m is h's match
            if (m == null)                  // waiter is gone
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // help match
                    casHead(h, mn);         // pop both h and m
                else                        // lost match
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

逻辑比较复杂,主要就是三步:

  1. 栈是空的或者栈顶元素的模式和当前要进行的操作一致
    • 将节点推到堆栈上并等待匹配
    • 等待参数中的时间后返回
    • 如果取消就返回 null
  2. 如果栈不为空且栈顶元素模式与当前要进行的操作不一致,如果这个元素的模式是相反的模式(取对应放)
    • 尝试将栈中一个模式匹配要求的节点推到堆栈上,与相应的等待节点匹配并返回
  3. 如果栈顶已经拥有另一个模式 匹配的节点
    • 通过执行 POP 操作来找到匹配的元素,然后继续

看着有点晕,简单概括就是一个添加操作后必须等待一个获取操作才可以继续添加。

SynchronousQueue 的吞吐量高于 LinkedBlockingQueueArrayBlockingQueue,有位前辈做了测试,可以点击 这篇文章 查看。这里引用一下结论:

LinkedBlockingQueue 性能表现远超 ArrayBlcokingQueue,不管线程多少,不管 Queue 长短,LinkedBlockingQueue 都胜过 ArrayBlockingQueue。

SynchronousQueue 表现很稳定,而且在 20 个线程之内不管 Queue 长短,SynchronousQueue 性能表现是最好的,(其实SynchronousQueue 跟 Queue 长短没有关系),如果 Queue 的 capability 只能是 1,那么毫无疑问选择 SynchronousQueue,这也是设计 SynchronousQueue 的目的吧。

但大家也可以看到当超过 1000 个线程时,SynchronousQueue 性能就直线下降了,只有最高峰的一半左右,而且当 Queue 大于 30 时,LinkedBlockingQueue 性能就超过 SynchronousQueue。

相较于其他队列有缓存的作用,SynchronousQueue 适用于单线程同步传递性场景,比如:消费者没拿走当前的产品,生产者是不能再给产品的,这样可以控制生产者生产的速率和消费者一致。

LinkedTransferQueue

LinkedTransferQueue 实现了 TransferQueue 接口, 是一个由链表组成的、无界阻塞队列。

public class LinkedTransferQueue<E> extends AbstractQueue<E>
    implements TransferQueue<E>, java.io.Serializable {...}

TransferQueue

TransferQueue 也是一种阻塞队列,它用于生产者需要等待消费者消费事件的场景,与前面一节的 SynchronousQueue 有相似之处。它定义的方法如下:

public interface TransferQueue<E> extends BlockingQueue<E> {
    //尽可能快地转移元素给一个等待的消费者
    //如果在这之前有其他线程调用了 taked() 或者 poll(long,TimeUnit) 方法,就返回 true
    //否则返回 false
    boolean tryTransfer(E e);

    //转移元素给一个消费者,在有的情况下会等待直到被取走
    //
    void transfer(E e) throws InterruptedException;

    //在 timeout 时间内将元素转移给一个消费者,如果这段时间内传递出去了就返回 true
    //否则返回 false
    boolean tryTransfer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

    //如果至少有一个等待的消费者,就返回 true
    boolean hasWaitingConsumer();

    //返回等待获取元素的消费者个数
    //这个值用于监控
    int getWaitingConsumerCount();
}

tryTransfer() 和 transfer()

相对于其他阻塞队列,LinkedTransferQueue 多了两个关键地方法:tryTransfer()transfer()

分别来看看它是如何实现的。

1.transfer()

transfer() 方法的作用是:如果有等待接收元素的消费者线程,直接把生产者传入的元素 transfer 给消费者;如果没有消费者线程,transfer() 会将元素存放到队列尾部,并等待元素被消费者取走才返回:

public void transfer(E e) throws InterruptedException {
    if (xfer(e, true, SYNC, 0) != null) {
        Thread.interrupted(); // failure possible only due to interrupt
        throw new InterruptedException();
    }
}
private E xfer(E e, boolean haveData, int how, long nanos) {
    if (haveData && (e == null))
        throw new NullPointerException();
    Node s = null;                        // the node to append, if needed

    retry:
    for (;;) {                            // restart on append race

        for (Node h = head, p = h; p != null;) { // find & match first node
            boolean isData = p.isData;
            Object item = p.item;
            if (item != p && (item != null) == isData) { // unmatched
                if (isData == haveData)   // can't match
                    break;
                if (p.casItem(item, e)) { // match
                    for (Node q = p; q != h;) {
                        Node n = q.next;  // update by 2 unless singleton
                        if (head == h && casHead(h, n == null ? q : n)) {
                            h.forgetNext();
                            break;
                        }                 // advance and retry
                        if ((h = head)   == null ||
                            (q = h.next) == null || !q.isMatched())
                            break;        // unless slack < 2
                    }
                    LockSupport.unpark(p.waiter);
                    @SuppressWarnings("unchecked") E itemE = (E) item;
                    return itemE;
                }
            }
            Node n = p.next;
            p = (p != n) ? n : (h = head); // Use head if p offlist
        }

        if (how != NOW) {                 // No matches available
            if (s == null)
                s = new Node(e, haveData);
            Node pred = tryAppend(s, haveData);    //尝试添加到队尾
            if (pred == null)
                continue retry;           // lost race vs opposite mode
            if (how != ASYNC)
                return awaitMatch(s, pred, e, (how == TIMED), nanos);
        }
        return e; // not waiting
    }
}

awaitMatch() 方法的作用是:CPU 自旋等待消费者取走元素,为了避免长时间消耗 CPU,在自旋一定次数后会调用 Thread.yield() 暂停当前正在执行的线程,改为执行其他线程。

2.tryTransfer()

tryTransfer() 的作用是:试探生产者传入的元素是否能 直接传递给消费者

  • 如果有等待接收的消费者,返回 true
  • 没有则返回 false
public boolean tryTransfer(E e, long timeout, TimeUnit unit)
    throws InterruptedException {
    if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
        return true;
    if (!Thread.interrupted())
        return false;
    throw new InterruptedException();
}

可以看到,和 transfer() 必须等到消费者取出元素才返回不同的是,tryTransfer() 无论是否有消费者接收都会立即返回。

LinkedBlockingDeque

LinkedBlockingDeque 是一个由链表组成的、双向阻塞队列。

关键属性

static final class Node<E> {
    E item;
    Node<E> prev;
    Node<E> next;
    Node(E x) {
        item = x;
    }
}

transient Node<E> first;
transient Node<E> last;
private transient int count;
private final int capacity;
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();

可以看到,LinkedBlockingDeque 中持有队列首部和尾部节点,每个节点也是双向的。

双向的作用是:可以从队列两端插入和移除元素。多了一个操作队列的方向,在多线程同时入队时,可以减少一半的竞争。

除了 remove(Object) 等移除操作,LinkedBlockingDeque 的大多数操作的时间复杂度都是 O(n)。

LinkedBlockingDeque 多了获取和查询的 XXXFirstXXXLast 的方法。

《细说并发:Java 阻塞队列源码分析(下)》

7 种阻塞队列的特点

这篇文章介绍的 4 种加上上一篇 细说并发4:Java 阻塞队列源码分析(上) 中 3 种,总共 7 种阻塞队列,这么多队列看的眼都花了。

这里简单总结下 Java 中 7 种阻塞队列的特点:

  1. ArrayBlockingQueue
    • 环形数组实现的、有界的队列,一旦创建后,容量不可变
    • 基于数组,在添加删除上性能还是不如链表
  2. LinkedBlockingQueue:
    • 基于链表、有界阻塞队列
    • 添加和获取是两个不同的锁,所以并发添加/获取效率更高些
    • Executors.newFixedThreadPool() 使用了这个队列
  3. PriorityBlockingQueue
    • 基于数组的、支持优先级的、无界阻塞队列
    • 使用自然排序或者定制排序指定排序规则
    • 添加元素时,当数组中元素大于等于容量时,会扩容(当前队列中元素个数小于 64 个,数组容量就乘 3;否则就乘 2 加 2),拷贝数组
  4. DelayQueue
    • 支持延时获取元素的、无界阻塞队列
    • 添加元素时如果超出限制也会扩容
    • Leader-Follower 模型
  5. SynchronousQueue
    • 容量为 0
    • 一个添加操作后必须等待一个获取操作才可以继续添加
    • 吞吐量高于 LinkedBlockingQueueArrayBlockingQueue
  6. LinkedTransferQueue
    • 由链表组成的、无界阻塞队列
    • 实现了 TransferQueue 接口
    • CPU 自旋等待消费者取走元素,自旋一定次数后结束
  7. LinkedBlockingDeque
    • 由双向链表组成的、双向阻塞队列
    • 可以从队列两端插入和移除元素
    • 多了一个操作队列的方向,在多线程同时入队时,可以减少一半的竞争

总结

在实际开发中可能接触不到阻塞队列,线程池或者其他池都将这些细节封装好了,但是在看一些开源框架的时候经常看到有使用它们,因此如果想要自己写牛逼的框架,这些底层的东西还是需要了解的。

我们结合源码和《Java 并发编程的艺术》相关章节分两篇文章介绍了 Java 中的阻塞队列,了解了 7 种阻塞队列的大致源码实现,后面遇到需要使用阻塞队列时心里应该有些底了。

学基础就是这样,不能指望立即有用,古话说得好:无用之用是为大用,不一定哪天就派上用场了!

Thanks

《Java 并发编程的艺术》
blog.csdn.net/goldlevi/ar…
stevex.blog.51cto.com/4300375/128…

    原文作者:java集合源码分析
    原文地址: https://juejin.im/entry/59530bdbf265da6c3d6c0c03
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞