Java并发---生产者消费者实现

生产者消费者

生产者消费者模型是并发时线程之间同步和通信重要的实现,本文主要用一下四种方式来实现

  1. wait()/notify()方法
  2. 显式Lock和Condition
  3. BlockingQueue阻塞队列方法
  4. PipedWriter/PipedReader方法

wait()/notify()方法

wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,让出CPU和放弃锁,使自己处于等待状态,让其他线程执行。
notifyAll()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
实现代码如下:

package concurrency.interview;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerAndConsumer_1 {
    static class Factory {

        private final int MAX_SIZE = 10;
        private int number = 0;

        private Object obj = new Object();

        // private LinkedList<Object> list = new LinkedList<>();

        public void produce() throws Exception {

            synchronized (obj) {
                while (number == MAX_SIZE) {
                    System.out.println("仓库已满!请先消费");
                    obj.wait();
                }

                number++;
                System.out.println("生产成功");
                TimeUnit.SECONDS.sleep(1);
                obj.notifyAll();
            }

        }

        public void consumer() throws Exception {
            synchronized (obj) {
                while (number <= 0) {
                    System.out.println("仓库是空的,不能消费");
                    obj.wait();
                }

                number--;
                System.out.println("消费成功");
                TimeUnit.SECONDS.sleep(1);
                obj.notifyAll();
            }

        }

    }

    static class Producer implements Runnable {
        Factory factory;

        Producer(Factory factory) {
            this.factory = factory;
        }

        @Override
        public void run() {

            try {
                while (!Thread.interrupted()) {
                    factory.produce();
                }
            } catch (Exception e) {
                System.out.println("结束");
            }
        }

    }

    static class Consumer implements Runnable {
        Factory factory;

        Consumer(Factory factory) {
            this.factory = factory;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    factory.consumer();
                }
            } catch (Exception e) {
                System.out.println("结束");
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Factory factory = new Factory(); 
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new Producer(factory));
        executor.execute(new Consumer(factory));

        TimeUnit.SECONDS.sleep(2);

        executor.shutdownNow();

        // Thread producer = new Thread(new Producer(factory));
        // Thread consumer = new Thread(new Consumer(factory));
        //
        // producer.start();
        // consumer.start();
        //
        // TimeUnit.SECONDS.sleep(5);
        //
        // producer.interrupt();
        // consumer.interrupt();
    }
}

显式Lock和Condition

JDK1.5引入了Lock和Condition,使用它们会更安全。可以通过在Condition上调用await()方法来挂起任务,类似wait(),调用signal()/signalAll(),类似notify()/notifyAll()

代码如下:

package concurrency.interview;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;

public class ProducerAndConsumer_2 {
    static class Factory {

        private final int MAX_SIZE = 10;
        private int number = 0;

        
        private Lock lock = new ReentrantLock();
        private Condition condition = lock.newCondition();

        public void produce() throws Exception {
            lock.lock();
            try {
                while (number == MAX_SIZE) {
                    System.out.println("仓库已满!请先消费");
                    condition.await();
                }

                number++;
                System.out.println("生产成功");
                TimeUnit.SECONDS.sleep(1);
                condition.signalAll();
            } finally {
                lock.unlock();
            }

        }

        public void consumer() throws Exception {
            lock.lock();
            try {
                while (number <= 0) {
                    System.out.println("仓库是空的,不能消费");
                    condition.await();
                }

                number--;
                System.out.println("消费成功");
                TimeUnit.SECONDS.sleep(1);
                condition.signalAll();
            } finally {
                lock.unlock();
            }

        }

    }

    static class Producer implements Runnable {
        Factory factory;

        Producer(Factory factory) {
            this.factory = factory;
        }

        @Override
        public void run() {

            try {
                while (!Thread.interrupted()) {
                    factory.produce();
                }
            } catch (Exception e) {
                System.out.println("结束");
            }
        }

    }

    static class Consumer implements Runnable {
        Factory factory;

        Consumer(Factory factory) {
            this.factory = factory;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    factory.consumer();
                }
            } catch (Exception e) {
                System.out.println("结束");
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Factory factory = new Factory();
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new Producer(factory));
        executor.execute(new Consumer(factory));

        TimeUnit.SECONDS.sleep(2);

        executor.shutdownNow();

        // Thread producer = new Thread(new Producer(factory));
        // Thread consumer = new Thread(new Consumer(factory));
        //
        // producer.start();
        // consumer.start();
        //
        // TimeUnit.SECONDS.sleep(5);
        //
        // producer.interrupt();
        // consumer.interrupt();
    }
}

BlockingQueue阻塞队列方法

BlockingQueue也是JDK1.5的新增内容,它是已经在内部实现了同步的队列。主要有以下两个方法
put()方法:容量达到最大时,自动阻塞。
take()方法:容量为0时,自动阻塞。
我们可以看看ArrayBlockingQueue的上面两个方法实现

public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }

可以看到ArrayBlockingQueue内部的同步就是使用的Lock和Condition。
使用BlockingQueue生产者和消费者代码如下:

package concurrency.interview;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerAndConsumer_3 {
    static class Factory {

        private final int MAX_SIZE = 10;

        private BlockingQueue<Object> queue = new ArrayBlockingQueue<>(MAX_SIZE);

        public void produce() throws Exception {

            queue.put(new Object());
            TimeUnit.SECONDS.sleep(1);
            System.out.println("生产成功");

        }

        public void consumer() throws Exception {

            queue.take();
            TimeUnit.SECONDS.sleep(1);
            System.out.println("消费成功");

        }

    }

    static class Producer implements Runnable {
        Factory factory;

        Producer(Factory factory) {
            this.factory = factory;
        }

        @Override
        public void run() {

            try {
                while (!Thread.interrupted()) {
                    factory.produce();
                }
            } catch (Exception e) {
                System.out.println("结束");
            }
        }

    }

    static class Consumer implements Runnable {
        Factory factory;

        Consumer(Factory factory) {
            this.factory = factory;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    factory.consumer();
                }
            } catch (Exception e) {
                System.out.println("结束");
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Factory factory = new Factory();
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new Producer(factory));
        executor.execute(new Producer(factory));
        executor.execute(new Consumer(factory));

        TimeUnit.SECONDS.sleep(5);

        executor.shutdownNow();

        // Thread producer = new Thread(new Producer(factory));
        // Thread consumer = new Thread(new Consumer(factory));
        //
        // producer.start();
        // consumer.start();
        //
        // TimeUnit.SECONDS.sleep(5);
        //
        // producer.interrupt();
        // consumer.interrupt();
    }
}

PipedWriter/PipedReader方法

在jdk引入BlockingQueue之前大多是使用这种方式来实现同步和通信,它基本上可以看做是一个阻塞队列,代码如下:

package concurrency.interview;

import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class ProducerAndConsumer_4 {
    static class Factory {

        private final int MAX_SIZE = 10;

//      private BlockingQueue<Object> queue = new ArrayBlockingQueue<>(MAX_SIZE);
        private PipedWriter out = null;
        private PipedReader in = null;
        
        Factory() throws IOException {
            out = new PipedWriter();
            in = new PipedReader(out);
        }
        
        public void produce() throws Exception {

            out.write("hello world!");;
            TimeUnit.SECONDS.sleep(1);
            System.out.println("生产成功");

        }

        public void consumer() throws Exception {

            char[] buf = new char[12];
            in.read(buf);
            TimeUnit.SECONDS.sleep(1);
            System.out.println("消费成功");
            for (char c : buf) {
                System.out.print(c);
            }
            System.out.println();
        }

    }

    static class Producer implements Runnable {
        Factory factory;

        Producer(Factory factory) {
            this.factory = factory;
        }

        @Override
        public void run() {

            try {
                while (!Thread.interrupted()) {
                    factory.produce();
                }
            } catch (Exception e) {
                System.out.println("结束");
            }
        }

    }

    static class Consumer implements Runnable {
        Factory factory;

        Consumer(Factory factory) {
            this.factory = factory;
        }

        @Override
        public void run() {
            try {
                while (!Thread.interrupted()) {
                    factory.consumer();
                }
            } catch (Exception e) {
                System.out.println("结束");
            }
        }

    }

    public static void main(String[] args) throws Exception {
        Factory factory = new Factory();
        ExecutorService executor = Executors.newCachedThreadPool();
        executor.execute(new Producer(factory));
        executor.execute(new Producer(factory));
        executor.execute(new Consumer(factory));

        TimeUnit.SECONDS.sleep(5);

        executor.shutdownNow();

        // Thread producer = new Thread(new Producer(factory));
        // Thread consumer = new Thread(new Consumer(factory));
        //
        // producer.start();
        // consumer.start();
        //
        // TimeUnit.SECONDS.sleep(5);
        //
        // producer.interrupt();
        // consumer.interrupt();
    }
}

参考资料

http://blog.csdn.net/monkey_d_meng/article/details/6251879/
https://zhuanlan.zhihu.com/p/20300609
《Java编程思想》

    原文作者:杨同不爱吃洋葱
    原文地址: https://www.cnblogs.com/yangtong/p/7155078.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞