9.JUC组件拓展-BlockingQueue

在Java中,BlockingQueue是一个接口,它的实现类有ArrayBlockingQueue、DelayQueue、 LinkedBlockingDeque、LinkedBlockingQueue、PriorityBlockingQueue、SynchronousQueue等,它们的区别主要体现在存储结构上或对元素操作上的不同,但是对于take与put操作的原理,却是类似的。

BlockingQueue

《9.JUC组件拓展-BlockingQueue》

BlockingQueue 是一个先进先出的队列(Queue),为什么说是阻塞(Blocking)的呢?是因为 BlockingQueue 支持当获取队列元素但是队列为空时,会阻塞等待队列中有元素再返回;也支持添加元素时,如果队列已满,那么等到队列可以放入新元素时再放入。

《9.JUC组件拓展-BlockingQueue》

  • add(anObject):把anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则抛出异常
  • offer(anObject):表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false.
  • put(anObject):把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续.
  • poll(time):取走BlockingQueue里排在首位的对象,若不能立即取出,则可以等time参数规定的时间,取不到时返回null
  • take():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到Blocking有新的对象被加入为止

注意:BlockingQueue 不接受null 元素。试图add、put 或offer 一个null 元素时,某些实现会抛出NullPointerException。null 被用作指示poll 操作失败的警戒值。

BlockingQueue 的各个实现都遵循了这些规则,当然我们也不用死记这个表格,知道有这么回事,然后写代码的时候根据自己的需要去看方法的注释来选取合适的方法即可。

一个 BlockingQueue 可能是有界的,如果在插入的时候,发现队列满了,那么 put 操作将会阻塞。通常,在这里我们说的无界队列也不是说真正的无界,而是它的容量是 Integer.MAX_VALUE(21亿多)。

BlockingQueue 实现主要用于生产者-使用者队列,但它另外还支持Collection 接口。因此,举例来说,使用remove(x) 从队列中移除任意一个元素是有可能的。然而,这种操作通常不 会有效执行,只能有计划地偶尔使用,比如在取消排队信息时。

BlockingQueue 的实现都是线程安全的,但是批量的集合操作如 addAll, containsAll, retainAll 和 removeAll 不一定是原子操作。如 addAll(c) 有可能在添加了一些元素后中途抛出异常,此时 BlockingQueue 中已经添加了部分元素,这个是允许的,取决于具体的实现。

BlockingQueue 的实现类

1. ArrayBlockingQueue

  • 构造函数必须带一个int参数来指明其大小
  • 一个由数组结构组成的有界阻塞队列.
  • 此队列按 FIFO(先进先出)原则对元素进行排序.
  • 其并发控制采用可重入锁来控制,不管是插入操作还是读取操作,都需要获取到锁才能进行操作。
  • 如果队列为空,这个时候读操作的线程进入到读线程队列排队,等待写线程写入新的元素,然后唤醒读线程队列的第一个等待线程。
  • 如果队列已满,这个时候写操作的线程进入到写线程队列排队,等待读线程将队列元素移除腾出空间,然后唤醒写线程队列的第一个等待线程。
  • 支持公平锁和非公平锁。【注:每一个线程在获取锁的时候可能都会排队等待,如果在等待时间上,先获取锁的线程的请求一定先被满足,那么这个锁就是公平的。反之,这个锁就是不公平的。公平的获取锁,也就是当前等待时间最长的线程先获取锁】

2. LinkedBlockingQueue

  • 大小不定的BlockingQueue
  • 若其构造函数带一个规定大小的参数,生成的BlockingQueue有大小限制
  • 若不带大小参数,所生成的BlockingQueue的大小由Integer.MAX_VALUE来决定
  • 其所含的对象是以FIFO(先入先出)顺序排序的
  • 链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低
  • 最新插入的数据在尾部,最新移除的对象再头部

3. PriorityBlockingQueue

  • 类似于LinkedBlockQueue,但其所含对象的排序不是FIFO,而是依据对象的自然排序顺序或者是构造函数的Comparator决定的顺序
  • 一个无界的阻塞队列

4. SynchronousQueue

  • 特殊的BlockingQueue,对其的操作必须是放和取交替完成的
  • 一种阻塞队列,其中每个 put 必须等待一个 take,反之亦然
  • SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。据此,把这个类称作一个队列显然是夸大其词了。它更多像是一个汇合点。

5. DelayQueue

  • DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口。

生产者与消费者模式

阻塞队列的最常使用的例子就是生产者消费者模式,也是各种实现生产者消费者模式方式中首选的方式。使用者不用关心什么阻塞生产,什么时候阻塞消费,使用非常方便。

LinkedBlockingQueue来实现一个生产者与消费者模型:

public class BlockingQueueTest {
    //生产者
    public static class Producer implements Runnable{
        private final BlockingQueue<Integer> blockingQueue;
        private volatile boolean flag;
        private Random random;

        public Producer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
            flag=false;
            random=new Random();

        }
        public void run() {
            while(!flag){
                int info=random.nextInt(100);
                try {
                    blockingQueue.put(info);
                    System.out.println(Thread.currentThread().getName()+" produce "+info);
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public void shutDown(){
            flag=true;
        }
    }
    //消费者
    public static class Consumer implements Runnable{
        private final BlockingQueue<Integer> blockingQueue;
        private volatile boolean flag;
        public Consumer(BlockingQueue<Integer> blockingQueue) {
            this.blockingQueue = blockingQueue;
        }
        public void run() {
            while(!flag){
                int info;
                try {
                    info = blockingQueue.take();
                    System.out.println(Thread.currentThread().getName()+" consumer "+info);
                    Thread.sleep(50);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }
        public void shutDown(){
            flag=true;
        }
    }
    public static void main(String[] args){
        BlockingQueue<Integer> blockingQueue = new LinkedBlockingQueue<Integer>(10);
        //BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<>(10);
        Producer producer=new Producer(blockingQueue);
        Consumer consumer=new Consumer(blockingQueue);
        //创建5个生产者,5个消费者
        for(int i=0;i<10;i++){
            if(i<5){
                new Thread(producer,"producer"+i).start();
            }else{
                new Thread(consumer,"consumer"+(i-5)).start();
            }
        }

        try {
            Thread.sleep(1000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producer.shutDown();
        consumer.shutDown();

    }
}

运行效果:

producer1 produce 95
producer3 produce 36
consumer0 consumer 95
consumer2 consumer 36
producer0 produce 27
consumer4 consumer 27
producer2 produce 75
......

ArrayBlockingQueue和LinkedBlockingQueue的区别

  • 队列中锁的实现不同

ArrayBlockingQueue实现的队列中的锁是没有分离的,即生产和消费用的是同一个锁;

LinkedBlockingQueue实现的队列中的锁是分离的,即生产用的是putLock,消费是takeLock

  • 在生产或消费时操作不同

ArrayBlockingQueue实现的队列中在生产和消费的时候,是直接将枚举对象插入或移除的;

LinkedBlockingQueue实现的队列中在生产和消费的时候,需要把枚举对象转换为Node进行插入或移

  • 队列大小初始化方式不同

ArrayBlockingQueue实现的队列中必须指定队列的大小;

LinkedBlockingQueue实现的队列中可以不指定队列的大小,但是默认是Integer.MAX_VALUE

    原文作者:JUC
    原文地址: https://blog.csdn.net/sunweiguo1/article/details/80384152
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞