JUC LinkedBlockingQueue 分析

基本介绍

  • JUC里面提供的有界阻塞队列,基于链表方式实现
  • 默认队列大小为Integer.MAX_VALUE,给人的感觉是近乎无界,在构造的时候可以自行指定队列的容量
  • 内部维护两把锁(putLock与takeLock),用以优化存取的并发性能;put/offer只需要获取putLock,而take/poll只需要获取takeLock,细化了锁粒度,使得存取操作可以并发进行,相比ArrayBlockingQueue存取元素都使用同一个lock来说具有更高的吞吐量
  • 只有在remove,contains, clear,循环迭代器的操作中,才需要同时获取takeLock和putLock(这些操作的程使用场景相对比较少)

推荐LinkedBlockingQueue在构造的时候传入具体的容量大小,否则若生产者(put)的生产速度远远大于消费者(take)的消费速度的情况下,很可能会把堆内存撑爆

使用场景与示例

最常用的使用场景也还是用于实现生产者消费者模式,可参考JUC ArrayBlockingQueue 分析里面的使用场景和使用示例

源码分析

看看重要的成员变量:

// 队列容量
private final int capacity;
// 指定当前队列元素个数,用原子变量解决多线程并发访问
private final AtomicInteger count = new AtomicInteger();
// 队列head,哨兵元素,null值,方便链表insert, delete
transient Node<E> head;
// 指向队列尾元素
private transient Node<E> last;
// take锁,用于保证并发下take, poll操作的线程安全
private final ReentrantLock takeLock = new ReentrantLock();
// take操作阻塞等待条件
private final Condition notEmpty = takeLock.newCondition();
// put锁,用于维护并发下的put, offer操作
private final ReentrantLock putLock = new ReentrantLock();
// put操作阻塞等待条件
private final Condition notFull = putLock.newCondition();

入队和出队操作,比较简单,方法会被put, offer, take , poll等方法调用,多线程并发安全保证使用putLock和takeLock来保证(外层调用方法里面):

// 入队
private void enqueue(Node<E> node) {
    // assert putLock.isHeldByCurrentThread();
    // assert last.next == null;
    last = last.next = node;
}

// 出队
private E dequeue() {
    // assert takeLock.isHeldByCurrentThread();
    // assert head.item == null;
    Node<E> h = head;
    Node<E> first = h.next;
    h.next = h; // help GC
    head = first;
    E x = first.item;
    first.item = null;
    return x;
}

分析put / offer操作:

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    final AtomicInteger count = this.count;
    putLock.lockInterruptibly();
    try {
        // 达到容量限制,阻塞等待
        while (count.get() == capacity) {
            notFull.await();
        }
        enqueue(node); // 将元素添加到队列尾部
        c = count.getAndIncrement();
        // 此处若判断容量还没满,则唤醒其他可能等待阻塞的生产者线程?这里会产生很多无谓的唤醒动作?因为很多时候容量都没达到上限?
        if (c + 1 < capacity)
            notFull.signal();
    } finally {
        putLock.unlock();
    }
    // c = count.getAndIncrement, 若为0,则证明有元素经过该方法保存到队列里面,且之前是0,代表可能有线程阻塞等待take操作(notEmpty condition),因此这个条件下,需要进行唤醒
    if (c == 0)
        signalNotEmpty();
}


// 唤醒阻塞的take线程,该方法会被put/offer操作调用
private void signalNotEmpty() {
    final ReentrantLock takeLock = this.takeLock;
    // 注意此处应该尝试获取的是takeLock
    takeLock.lock();
    try {
        notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
}

// 与put的逻辑大致相同,只是offer操作当容量达到上限的时候,不阻塞等待添加,而是直接返回添加失败
public boolean offer(E e) {
    if (e == null) throw new NullPointerException();
    final AtomicInteger count = this.count;
    // 容量达到上限,无法添加,返回添加失败
    if (count.get() == capacity)
        return false;
    int c = -1;
    Node<E> node = new Node<E>(e);
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        if (count.get() < capacity) {
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        }
    } finally {
        putLock.unlock();
    }
    if (c == 0)
        signalNotEmpty();
    return c >= 0;
}

分析take / poll 方法:

// 移除队头元素,若队列为空,则阻塞等待
public E take() throws InterruptedException {
    E x;
    int c = -1;
    final AtomicInteger count = this.count;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lockInterruptibly();
    try {
        // 队列为空,阻塞等待
        while (count.get() == 0) {
            notEmpty.await();
        }
        x = dequeue(); // 取队头元素
        c = count.getAndDecrement();
        // 若发现队列还有元素,尝试唤醒其他等待的消费者线程
        if (c > 1)
            notEmpty.signal();
    } finally {
        takeLock.unlock();
    }
    // c = count.getAndDecrement,若c原先为容量上限,证明此次take之后,队列不满,需要唤醒可能在阻塞等待的生产者线程(调用put操作的线程)
    if (c == capacity)
        signalNotFull();
    return x;
}

// 对等待put(notFull condition)的生产者线程进行唤醒
private void signalNotFull() {
    final ReentrantLock putLock = this.putLock;
    putLock.lock();
    try {
        notFull.signal();
    } finally {
        putLock.unlock();
    }
}

// poll在队列为空的情况下不阻塞等待,直接返回null,其他逻辑与
public E poll() {
    final AtomicInteger count = this.count;
    if (count.get() == 0)
        return null;
    E x = null;
    int c = -1;
    final ReentrantLock takeLock = this.takeLock;
    takeLock.lock();
    try {
        if (count.get() > 0) {
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        }
    } finally {
        takeLock.unlock();
    }
    if (c == capacity)
        signalNotFull();
    return x;
}

超时的offer,poll方法就不展开了,重点在于使用condition的超时阻塞等待,然后加以判断:

// 超时offer实现的关键
while (count.get() == capacity) {
    if (nanos <= 0)
        return false;
    nanos = notFull.awaitNanos(nanos);
}

// 超时poll的实现关键
while (count.get() == 0) {
    if (nanos <= 0)
        return null;
    nanos = notEmpty.awaitNanos(nanos);
}
    原文作者:JUC
    原文地址: https://blog.csdn.net/d6619309/article/details/80672290
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞