JUC与生产者消费者

前面一篇文章介绍了生产者消费者模式,这篇来看看JUC包下的哪些类与该模式有关。
生产者消费者模式中有个中间类Channel,对于数据Data起到通道作用,还确保了Producer与Consumer这些线程的协调运行。在前篇文章所举的例子中的Table类担任这个角色,内部是使用数组实现的队列配合synchronized关键字。而在JUC包下提供了BlockingQueue接口及其实现类,它们相当于Channel角色。
《JUC与生产者消费者》BlockingQueue接口——阻塞队列
BlockingQueue接口表示的是在达到合适的状态之前线程一直阻塞的队列。是Queue的子接口,拥有多个方法如offer,poll等,但实际实现阻塞功能的方法是BlockingQueue所固有的put,take方法

ArrayBlockingQueue——基于数组的BlockingQueue
基于数组实现的BlockingQueue,元素个数有最大限制。

LinkedBlockingQueue——基于链表的BlockingQueue
元素个数没有最大限制,基于链表实现。

PriorityBlockingQueue——带有优先级的BlockingQueue
带有优先级的BlockingQueue,优先级依据Comparable接口的自然排序,或者构造函数传进的Comparator比较器的规则。

DelayQueue——一定时间后才可以take的BlockingQueue
表示的是用于存储java.util.concurrent.Delayed对象的队列。当从该队列take时,只有在该元素指定的时间到期后才能take。取出顺序按到期时间的长短排,最长的先被take出。

SynchronousQueue——直接传递的BlockingQueue
该类用于执行由Producer到Consumer的”直接传递“ 。如果Producer先put,在Consumer take之前,Producer线程将一直阻塞。反之亦然。

ConcurrentLinkedQueue——元素个数没有最大限制的线程安全队列
首先该类不同于上面那些,它并非BlockingQueue,也没有继承BlockingQueue。它表示的是元素个数没有限制的线程安全队列。

BlockingQueue是JUC提供的Channel,那么用ArrayBlocking来改写上一篇的Table。

public class TableByJUC extends ArrayBlockingQueue<String> {

    public TableByJUC(int capacity) {
        super(capacity);
    }

    public void put(String cake) throws InterruptedException {
        System.out.println(Thread.currentThread().getName() + " puts " + cake);
        super.put(cake);
    }

    public String take() throws InterruptedException {
        String cake = super.take();
        System.out.println(Thread.currentThread().getName() + " take " + cake);
        return cake;
    }
}

java.util.concurrent.Exchanger

用于让两个线程安全的交换对象
举个例子:两个线程交换缓冲区
ProducerThread填充缓冲区直到充满,调用exchange方法将填满的缓冲区传递给ConsumerThread,传递缓冲区后 作为交换接受空的缓冲区。ConsumerThread调用exchange方法将空的缓冲区传递给ProducerThread,传递后作为交换接受被填满字符的缓冲区,打印缓冲区中字符。

Main:
将buffer1传给ProducerThread,buffer2传给ConsumerThread,同时将通用的Exchanger实例传给两者。

public class Main {
    public static void main(String[] args) {
        Exchanger<char[]> exchanger = new Exchanger<>();
        char[] buffer1 = new char[5];
        char[] buffer2 = new char[5];
        ProducerThread producerThread = new ProducerThread(exchanger, buffer1, 654987);
        ConsumerThread consumerThread = new ConsumerThread(exchanger, buffer2, 123654);
        producerThread.start();
        consumerThread.start();

        try {
            Thread.sleep(10000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        producerThread.interrupt();
        consumerThread.interrupt();
    }
}

ProducerThread

public class ProducerThread extends Thread {

    private final Exchanger<char[]> exchanger;
    private char[] buffer;
    private char index;
    private final Random random;

    public ProducerThread(Exchanger<char[]> exchanger, char[] buffer, long seed) {
        super("ProducerThread");
        this.exchanger = exchanger;
        this.buffer = buffer;
        this.random = new Random(seed);
        this.index = 0;
    }

    @Override
    public void run() {
        try {
            while(true) {
                for (int i = 0; i < buffer.length; i++) {
                    // 向缓冲区添加数据
                    buffer[i] = nextChar();
                    System.out.println(Thread.currentThread().getName() + ": " +
                            buffer[i] + " -> ");
                }
                System.out.println(Thread.currentThread().getName() + ": BEFORE exchange");
                buffer = exchanger.exchange(buffer);
                System.out.println(Thread.currentThread().getName() + ": AFTER exchange");
            }
        } catch (InterruptedException e) {
        }
    }

    // 生成字符
    private char nextChar() throws InterruptedException {
        char c = (char) ('A' + index%26);
        index++;
        Thread.sleep(random.nextInt(1000));
        return c;
    }
}

ConsumerThread

public class ConsumerThread extends Thread {
    private final Exchanger<char[]> exchanger;
    private final Random random;
    private char[] buffer;

    public ConsumerThread(Exchanger<char[]> exchanger, char[] buffer, long seed) {
        super("ConsumerThread");
        this.exchanger = exchanger;
        this.buffer = buffer;
        this.random = new Random(seed);
    }

    @Override
    public void run() {
        try {
            while (true) {
                // 交换缓冲区
                System.out.println(Thread.currentThread().getName() + ": BEFORE exchange");
                buffer = exchanger.exchange(buffer);
                System.out.println(Thread.currentThread().getName() + ": AFTER exchange");

                // 从缓冲区取出数据
                for (int i = 0; i < buffer.length; i++) {
                    System.out.println(Thread.currentThread().getName() + ": -> " + buffer[i]);
                    Thread.sleep(random.nextInt(1000));
                }
            }
        } catch (InterruptedException e) {
        }
    }
}

运行结果:

ConsumerThread: BEFORE exchange   //ConsumerThread等待echange方法被执行
ProducerThread: A ->   //ProducerThread向buffer1填充A-E
ProducerThread: B -> 
ProducerThread: C -> 
ProducerThread: D -> 
ProducerThread: E -> 
ProducerThread: BEFORE exchange  //ProducerThread的exchange执行(进行交换)
ProducerThread: AFTER exchange // 执行完毕接受到空的缓冲区buffer2
ConsumerThread: AFTER exchange // ConsumerThread的exchange执行后接受到被填满字符的buffer1
ConsumerThread: -> A // ConsumerThread开始依此输出
ProducerThread: F ->  // ProducerThread向buffer2中填充F-K
ConsumerThread: -> B
ProducerThread: G -> 
ConsumerThread: -> C
ProducerThread: H -> 
ConsumerThread: -> D
ProducerThread: I -> 
ConsumerThread: -> E
ConsumerThread: BEFORE exchange //ConsumerThread耗尽buffer1,开始等待新的被填满字符的buffer2
ProducerThread: J -> 
ProducerThread: BEFORE exchange // buffer2填满完毕,开始交换
ProducerThread: AFTER exchange // 交换完毕,收到空的buffer1
ConsumerThread: AFTER exchange // 交换完毕,收到满的buffer2
ConsumerThread: -> F
ConsumerThread: -> G
ProducerThread: K -> 
ConsumerThread: -> H
ProducerThread: L -> 
ConsumerThread: -> I
ProducerThread: M -> 
ProducerThread: N -> 
ProducerThread: O -> 
ProducerThread: BEFORE exchange
ConsumerThread: -> J
ConsumerThread: BEFORE exchange
ConsumerThread: AFTER exchange
ConsumerThread: -> K
ProducerThread: AFTER exchange
ConsumerThread: -> L
ConsumerThread: -> M

从运行结果可以看出被填满字符的缓冲区从ProducerThread流向ConsumerThread,即生成的字符单向流向ConsumerThread。而空缓冲区它的流动方向相反.

这里有一片关于Exchanger在实际业务中的应用的文章:http://lixuanbin.iteye.com/blog/2166772
这一链接是关于Exchanger源码分析:https://www.cnblogs.com/aniao/p/aniao_exchanger.html

    原文作者:JUC
    原文地址: https://blog.csdn.net/sinat_34976604/article/details/86561307
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞