JUC源码分析-集合篇(九):LinkedBlockingQueue

LinkedBlockingQueue 是单向链表结构的自定义容量的阻塞队列,元素操作按照** FIFO **(first-in-first-out 先入先出) 的顺序,使用显式锁 ReentrantLock 和 Condition 来保证线程安全。链表结构的队列通常比基于数组的队列(ArrayBlockingQueue)有更高的吞吐量,但是在并发环境下性能却不如数组队列。因为比较简单,本章本来是不在笔者的写作范围内的,但是在后面的线程池源码中用到了LinkedBlockingQueue,我们我们就来简单看一下,加深一下印象。

本章应该是队列篇的终章了,还有LinkedBlockingDeque、ArrayBlockingQueue这些比较简单的队列就不再讲解了,后面我们会开始线程池相关源码分析。

概述

LinkedBlockingQueue(后称LBQ)队列容量可通过参数来自定义,并且内部是不会自动扩容的。如果未指定容量,将取最大容量Integer.MAX_VALUE。 如果你理解了前几篇我们所讲的队列,那么你会发现 LBQ 非常容易理解,内部没有太多复杂的算法,数据结构也是使用了简单的链表结构。

数据结构

《JUC源码分析-集合篇(九):LinkedBlockingQueue》

LinkedBlockingQueue 继承关系

标准的队列继承关系,不多赘述。

重要属性

//容量
private final int capacity;

//元素个数
private final AtomicInteger count = new AtomicInteger();

//链表头
transient Node<E> head;

//链表尾
private transient Node<E> last;

//出列锁
private final ReentrantLock takeLock = new ReentrantLock();

//等待获取(出队)条件
private final Condition notEmpty = takeLock.newCondition();

//入列锁
private final ReentrantLock putLock = new ReentrantLock();

//等待插入(入列)条件
private final Condition notFull = putLock.newCondition();

LBQ 在实现多线程对竞争资源的互斥访问时,对于入列和出列操作分别使用了不同的锁。对于入列操作,通过putLock进行同步;对于出列操作,通过takeLock进行同步。
此外,插入锁putLock和出列条件notFull相关联,出列锁takeLock和出列条件notEmpty相关联。通过notFullnotEmpty更细腻的控制锁。

  • 若某线程(线程A)要取出数据时,队列正好为空,则该线程会执行notEmpty.await()进行等待;当其它某个线程(线程B)向队列中插入了数据之后,会调用notEmpty.signal()唤醒notEmpty上的等待线程。此时,线程A会被唤醒从而得以继续运行。 此外,线程A在执行取操作前,会获取takeLock,在取操作执行完毕再释放takeLock
  • 若某线程(线程H)要插入数据时,队列已满,则该线程会它执行notFull.await()进行等待;当其它某个线程(线程I)取出数据之后,会调用notFull.signal()唤醒notFull上的等待线程。此时,线程H就会被唤醒从而得以继续运行。 此外,线程H在执行插入操作前,会获取putLock,在插入操作执行完毕才释放putLock

源码解析

put(E e)

LBQ 的添加元素的方法有offer()、put()put是在队列已满的情况下等待,而offer则直接返回结果,它们内部操作都一致。所这里我们只对put进行解析

//尾部插入节点,队列满时会一直等待可用,响应中断
public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
 
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;//获取入列锁
    final AtomicInteger count = this.count;//获取元素数
    putLock.lockInterruptibly();//响应中断式加锁
    try {
       
        while (count.get() == capacity) {
            notFull.await();//队列已满,等待
        }
        enqueue(node);//节点添加到队列尾
        c = count.getAndIncrement();
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
}

说明:看源码吧。

poll()

LBQ 的获取元素的方法有poll()、take()、peek()take在队列为空的情况下会一直等待,poll不等待直接返回结果,peek是获取但不移除头结点元素,内部操作都差不多。这里我们只对take进行解析:

/**获取并消除头节点,会一直等待队列可用,响应中断*/
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;//获取出列锁
    takeLock.lockInterruptibly();//响应中断式加锁
    try {
        while (count.get() == 0) {
            notEmpty.await();//队列为空,等待
        }
        x = dequeue();//首节点出列
        c = count.getAndDecrement();
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

说明:略。。。

小结

本章只是为了加深同学们的印象,为之后线程池源码解析做准备,随便看看就行了。

作者:泰迪的bagwell
链接:https://www.jianshu.com/p/b888a1689822
來源:简书
简书著作权归作者所有,任何形式的转载都请联系作者获得授权并注明出处。

    原文作者:JUC
    原文地址: https://blog.csdn.net/yongchao940/article/details/83026901
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞