java并发编程(二十四)----(JUC集合)ArrayBlockingQueue和LinkedBlockingQueue介绍

这一节我们来了解阻塞队列(BlockingQueue),BlockingQueue接口定义了一种阻塞的FIFO queue,每一个BlockingQueue都有一个容量,当容量满时往BlockingQueue中添加数据时会造成阻塞,当容量为空时取元素操作会阻塞。首先我们来看ArrayBlockingQueue和LinkedBlockingQueue.

ArrayBlockingQueue

ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。

我们看他的构造函数实现:

//默认是非公平的,初始指定队列容量
public ArrayBlockingQueue(int capacity) {
       this(capacity, false);
   }

//该构造方法可以设置队列的公平性。当然如果为公平的,则对性能会产生影响
//访问者的公平性是使用可重入锁实现的
public ArrayBlockingQueue(int capacity, boolean fair) {
       if (capacity <= 0)
           throw new IllegalArgumentException();
       this.items = new Object[capacity];
       lock = new ReentrantLock(fair);
       notEmpty = lock.newCondition();
       notFull =  lock.newCondition();
   }

使用很简单我们直接看一个实例:

public class ProducerConsumerTest {
   public static void main(String[] args) {

        final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3);

        ExecutorService service = Executors.newFixedThreadPool(10);

        for(int i = 0;i<4;i++){
            service.execute(new ProducerAndConsumer(blockingQueue));
        }
    }
}

class ProducerAndConsumer implements Runnable{
    private boolean flag = false;

    private Integer j = 1;

    private Lock lock = new ReentrantLock();

    Condition pro_con = lock.newCondition();

    Condition con_con = lock.newCondition();

    private BlockingQueue<Integer> blockingQueue;

    public ProducerAndConsumer(BlockingQueue<Integer> blockingQueue){
        this.blockingQueue= blockingQueue;
    }

    //生产
    public void put(){
        try {
            lock.lock();
            while(flag)
                pro_con.await();
            System.out.println("正在准备放入数据。。。");
            Thread.sleep(new Random().nextInt(10)*100);
            Integer value = new Random().nextInt(30);
            blockingQueue.put(value);
            System.out.println(Thread.currentThread().getName()+" 放入的数据 "+value);
            flag = true;
            con_con.signal();
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally{
            lock.unlock();
        }
    }

    public void get(){
        try {
            lock.lock();
            while(!flag)
                con_con.await();
            System.out.println("正在准备取数据。。。");
            Thread.sleep(new Random().nextInt(10)*1000);
            System.out.println(Thread.currentThread().getName()+" 取到的数据为"+blockingQueue.take());
            flag = false;
            pro_con.signal();
        } catch (Exception e) {
            e.printStackTrace();
        }
        finally{
            lock.unlock();
        }
    }

    @Override
    public void run() {

        while(true){
            if(j==1){
                put();
            }
            else{
                get();
            }
            j=(j+1)%2;
        }
    }
}

输出为:

正在准备放入数据。。。
正在准备放入数据。。。
正在准备放入数据。。。
正在准备放入数据。。。
pool-1-thread-2   放入的数据    13
正在准备取数据。。。
pool-1-thread-3   放入的数据    4
正在准备取数据。。。
pool-1-thread-3   取到的数据为13
正在准备放入数据。。。
pool-1-thread-1   放入的数据    11
正在准备取数据。。。
pool-1-thread-4   放入的数据    26
正在准备取数据。。。
pool-1-thread-1   取到的数据为4
正在准备放入数据。。。
pool-1-thread-2   取到的数据为11
正在准备放入数据。。。
pool-1-thread-3   放入的数据    18
正在准备取数据。。。
...
...

LinkedBlockingQueue

LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
先看一下他的构造函数:

public LinkedBlockingQueue() {
  this(Integer.MAX_VALUE);  //MAX_VALUE=2147483647
 }

public LinkedBlockingQueue(int capacity) {
     if (capacity <= 0) throw new IllegalArgumentException();
     this.capacity = capacity;
     last = head = new Node<E>(null);
 }

我们还是直接开看一个例子:

public class BlockingQueueTest {

    /** * 定义装苹果的篮子 */
    public static class Basket {
        // 篮子,能够容纳3个苹果
        // BlockingQueue<String> basket = new ArrayBlockingQueue<String>(3);
        BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);

        // 生产苹果,放入篮子
        public void produce() throws InterruptedException {
            // put方法放入一个苹果,若basket满了,等到basket有位置
            basket.put("An apple");
        }

        // 消费苹果,从篮子中取走
        public String consume() throws InterruptedException {
            // get方法取出一个苹果,若basket为空,等到basket有苹果为止
            return basket.take();
        }
    }

    //  测试方法
    public static void testBasket() {

        // 建立一个装苹果的篮子
        final Basket basket = new Basket();

        // 定义苹果生产者
        class Producer implements Runnable {
            public String instance = "";

            public Producer(String a) {
                instance = a;
            }

            public void run() {
                try {
                    while (true) {
                        // 生产苹果
                        System.out.println("生产者准备生产苹果:" + instance);
                        basket.produce();
                        System.out.println("! 生产者生产苹果完毕:" + instance);
                        // 休眠300ms
                        Thread.sleep(300);
                    }
                } catch (InterruptedException ex) {
                }
            }
        }

        // 定义苹果消费者
        class Consumer implements Runnable {
            public String instance = "";

            public Consumer(String a) {
                instance = a;
            }

            public void run() {
                try {
                    while (true) {
                        // 消费苹果
                        System.out.println("消费者准备消费苹果:" + instance);
                        basket.consume();
                        System.out.println("! 消费者消费苹果完毕:" + instance);
                        // 休眠1000ms
                        Thread.sleep(1000);
                    }
                } catch (InterruptedException ex) {
                }
            }
        }

        ExecutorService service = Executors.newCachedThreadPool();
        Producer producer = new Producer("P1");
        Producer producer2 = new Producer("P2");
        Consumer consumer = new Consumer("C1");
        service.submit(producer);
        service.submit(producer2);
        service.submit(consumer);

        // 程序运行3s后,所有任务停止
        try {
            Thread.sleep(3000);
        } catch (InterruptedException e) {
        }

        service.shutdownNow();
    }

    public static void main(String[] args) {
        BlockingQueueTest.testBasket();
    }
}

输出为:

生产者准备生产苹果:P1
消费者准备消费苹果:C1
! 生产者生产苹果完毕:P1
生产者准备生产苹果:P2
! 消费者消费苹果完毕:C1
! 生产者生产苹果完毕:P2
生产者准备生产苹果:P2
! 生产者生产苹果完毕:P2
生产者准备生产苹果:P1
! 生产者生产苹果完毕:P1
生产者准备生产苹果:P2
生产者准备生产苹果:P1
消费者准备消费苹果:C1
! 消费者消费苹果完毕:C1
! 生产者生产苹果完毕:P2
生产者准备生产苹果:P2
消费者准备消费苹果:C1
! 消费者消费苹果完毕:C1
! 生产者生产苹果完毕:P1
生产者准备生产苹果:P1
消费者准备消费苹果:C1
! 消费者消费苹果完毕:C1
! 生产者生产苹果完毕:P2

Process finished with exit code 0
    原文作者:JUC
    原文地址: https://blog.csdn.net/a953713428/article/details/58671796
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞