JUC ArrayBlockingQueue 分析

先说说BlockingQueue

BlockingQueue,JUC中定义的阻塞队列接口,为阻塞队列的实现定义了一些通用的方法,其中包括非阻塞操作以及阻塞操作

  • 非阻塞操作:
    • offer,add(add不常用):队列添加元素(队尾添加)
    • poll,remove(remove不常用):移除队列元素(队头移除)
  • 阻塞操作:
    • put:队列添加元素,通常在有界队列实现里面,当队列元素个数达到容量上限,则阻塞等待,直到队列不满且添加元素成功,则解除阻塞(队尾添加)
    • take:移除队列元素,当队列为空,则阻塞等待,直到队列非空获取到元素,则解除阻塞(队头移除)

使用场景

阻塞队列在多线程编程里面,常用于实现生产者-消费者模式,生成者往往队列里面丢数据,而消费者从队列里面获取数据进行处理

经典的案例就是Java里面的线程池实现,任务会往任务队列(阻塞队列)里面丢(调用线程池的线程即是生产者),等待线程池里面的线程(消费者)去消费并处理

ArrayBlockingQueue基本介绍

阻塞队列的一种实现,特点如下:

  • 队列为有界队列,存在队列满的情况
  • add或offer方法不阻塞,offer添加成功返回true,否则返回false(队列满的情况下则添加失败),而add方法在队列满的情况下,抛异常,通常不建议使用add方法
  • poll或remove方法不阻塞,poll方法在队列无元素的情况下返回null,remove方法在队列满的情况下,会抛异常,通常不建议使用remove方法
  • put方法在队列满的情况下,线程会阻塞等待,直到队列不满,添加元素成功,则解除阻塞等待
  • take方法在队列空的情况下,线程会阻塞等待,直到队列非空,取得元素,则解除阻塞等待

阻塞队列作为队列的一种实现,也实现了peek(取队列首元素)等普通队列所具备的方法,这里分析的时候,这些方法就不作为关键点来描述了

示例

展示一个示例:

master线程(生产者)生成task,交由worker线程(消费者)进行处理:

package com.crazypig.juc;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

public class ArrayBlockingQueueTest {

    public static void main(String[] args) {

        BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(1024);
        Thread masterThread = new Thread(new Master(taskQueue), "masterThread");
        Thread workerThread = new Thread(new Worker(taskQueue), "workerThread");

        workerThread.start();

        masterThread.start();

        for (;;) {
            try {
                TimeUnit.SECONDS.sleep(3);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }

    }

    static class Master implements Runnable {

        private BlockingQueue<Task> taskQueue;

        public Master(BlockingQueue<Task> taskQueue) {
            this.taskQueue = taskQueue;
        }

        public void run() {

            for (;;) {
                try {
                    TimeUnit.SECONDS.sleep(3);
                    taskQueue.put(new Task("task" + System.currentTimeMillis()));
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }

    }

    static class Worker implements Runnable {

        private BlockingQueue<Task> taskQueue;

        public Worker(BlockingQueue<Task> taskQueue) {
            this.taskQueue = taskQueue;
        }

        public void run() {

            for (;;) {
                try {
                    Task task = taskQueue.take();
                    task.doTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

        }

    }

    static class Task {

        private String taskName;

        public Task(String taskName) {
            this.taskName = taskName;
        }

        public void doTask() {
            System.out.println("do task " + taskName);
        }

    }

}

源码分析

  • 内部维护一个数组存放队列元素,通过控制putIndex和takeIndex的下表来进行队列元素的增删,同时也有记录队列元素个数的计数count
  • 内部实际上利用ReentrantLock来实现多线程并发操作(队列入队,出队,取队头等操作,都需要上锁)
  • 同时,利用ReentrantLock提供的Condition来实现阻塞的put和take,里面用到两个condition: notFull和notEmpty,分别用于实现阻塞的put和take操作

先看看ArrayBlockingQueue内部重要的成员变量:

final Object[] items; // 队列存储元素的容器(数组)
int takeIndex; // 指向下一个可获取元素的位置
int putIndex;  // 指向下一个可存放元素的位置
int count; // 记录队列中元素个数

// 用于实现多线程操作和阻塞take和put的关键
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;

我们重点分析阻塞操作put和take的实现:

非阻塞操作的实现在理解了阻塞操作的基础上,就相当容易理解了


public void put(E e) throws InterruptedException {
    checkNotNull(e);
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == items.length)
            notFull.await();
        enqueue(e);
    } finally {
        lock.unlock();
    }
}

public E take() throws InterruptedException {
    final ReentrantLock lock = this.lock;
    lock.lockInterruptibly();
    try {
        while (count == 0)
            notEmpty.await();
        return dequeue();
    } finally {
        lock.unlock();
    }
}

可以看出与传统的wait/notify实现几乎一致:

synchronized (obj) { while (条件不满足) { wait... } notify }

那么我们可以判断,实际上在enqueue和dequeue方法,就会实现类似于Object notify的功能,对应于Condition,应该是调用signal,具体看代码:


private void enqueue(E x) {
    final Object[] items = this.items;
    items[putIndex] = x;
    if (++putIndex == items.length)
        putIndex = 0;
    count++;
    notEmpty.signal();
}

private E dequeue() {
    final Object[] items = this.items;
    @SuppressWarnings("unchecked")
    E x = (E) items[takeIndex];
    items[takeIndex] = null;
    if (++takeIndex == items.length)
        takeIndex = 0;
    count--;
    if (itrs != null)
        itrs.elementDequeued();
    notFull.signal();
    return x;
}
  1. ArrayBlockingQueue还提供了响应超时的相关方法,具体实现也就是调用coniditon的await超时方法,这里就不具体展开分析了,感兴趣地直接翻下源码就好
  2. 注意ArrayBlockingQueue里面的数组是循环使用的,take和put指向下标超过数组大小,则从头开始循环使用
    原文作者:JUC
    原文地址: https://blog.csdn.net/d6619309/article/details/80658606
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞