生产者消费者
生产者消费者模型是并发时线程之间同步和通信重要的实现,本文主要用一下四种方式来实现
wait()/notify()方法
wait() / nofity()方法是基类Object的两个方法,也就意味着所有Java类都会拥有这两个方法,这样,我们就可以为任何对象实现同步机制。
wait()方法:当缓冲区已满/空时,生产者/消费者线程停止自己的执行,让出CPU和放弃锁,使自己处于等待状态,让其他线程执行。
notifyAll()方法:当生产者/消费者向缓冲区放入/取出一个产品时,向其他等待的线程发出可执行的通知,同时放弃锁,使自己处于等待状态。
实现代码如下:
package concurrency.interview;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ProducerAndConsumer_1 {
static class Factory {
private final int MAX_SIZE = 10;
private int number = 0;
private Object obj = new Object();
// private LinkedList<Object> list = new LinkedList<>();
public void produce() throws Exception {
synchronized (obj) {
while (number == MAX_SIZE) {
System.out.println("仓库已满!请先消费");
obj.wait();
}
number++;
System.out.println("生产成功");
TimeUnit.SECONDS.sleep(1);
obj.notifyAll();
}
}
public void consumer() throws Exception {
synchronized (obj) {
while (number <= 0) {
System.out.println("仓库是空的,不能消费");
obj.wait();
}
number--;
System.out.println("消费成功");
TimeUnit.SECONDS.sleep(1);
obj.notifyAll();
}
}
}
static class Producer implements Runnable {
Factory factory;
Producer(Factory factory) {
this.factory = factory;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
factory.produce();
}
} catch (Exception e) {
System.out.println("结束");
}
}
}
static class Consumer implements Runnable {
Factory factory;
Consumer(Factory factory) {
this.factory = factory;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
factory.consumer();
}
} catch (Exception e) {
System.out.println("结束");
}
}
}
public static void main(String[] args) throws Exception {
Factory factory = new Factory();
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Producer(factory));
executor.execute(new Consumer(factory));
TimeUnit.SECONDS.sleep(2);
executor.shutdownNow();
// Thread producer = new Thread(new Producer(factory));
// Thread consumer = new Thread(new Consumer(factory));
//
// producer.start();
// consumer.start();
//
// TimeUnit.SECONDS.sleep(5);
//
// producer.interrupt();
// consumer.interrupt();
}
}
显式Lock和Condition
JDK1.5引入了Lock和Condition,使用它们会更安全。可以通过在Condition上调用await()方法来挂起任务,类似wait(),调用signal()/signalAll(),类似notify()/notifyAll()
代码如下:
package concurrency.interview;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProducerAndConsumer_2 {
static class Factory {
private final int MAX_SIZE = 10;
private int number = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
public void produce() throws Exception {
lock.lock();
try {
while (number == MAX_SIZE) {
System.out.println("仓库已满!请先消费");
condition.await();
}
number++;
System.out.println("生产成功");
TimeUnit.SECONDS.sleep(1);
condition.signalAll();
} finally {
lock.unlock();
}
}
public void consumer() throws Exception {
lock.lock();
try {
while (number <= 0) {
System.out.println("仓库是空的,不能消费");
condition.await();
}
number--;
System.out.println("消费成功");
TimeUnit.SECONDS.sleep(1);
condition.signalAll();
} finally {
lock.unlock();
}
}
}
static class Producer implements Runnable {
Factory factory;
Producer(Factory factory) {
this.factory = factory;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
factory.produce();
}
} catch (Exception e) {
System.out.println("结束");
}
}
}
static class Consumer implements Runnable {
Factory factory;
Consumer(Factory factory) {
this.factory = factory;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
factory.consumer();
}
} catch (Exception e) {
System.out.println("结束");
}
}
}
public static void main(String[] args) throws Exception {
Factory factory = new Factory();
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Producer(factory));
executor.execute(new Consumer(factory));
TimeUnit.SECONDS.sleep(2);
executor.shutdownNow();
// Thread producer = new Thread(new Producer(factory));
// Thread consumer = new Thread(new Consumer(factory));
//
// producer.start();
// consumer.start();
//
// TimeUnit.SECONDS.sleep(5);
//
// producer.interrupt();
// consumer.interrupt();
}
}
BlockingQueue阻塞队列方法
BlockingQueue也是JDK1.5的新增内容,它是已经在内部实现了同步的队列。主要有以下两个方法
put()方法:容量达到最大时,自动阻塞。
take()方法:容量为0时,自动阻塞。
我们可以看看ArrayBlockingQueue的上面两个方法实现
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
可以看到ArrayBlockingQueue内部的同步就是使用的Lock和Condition。
使用BlockingQueue生产者和消费者代码如下:
package concurrency.interview;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ProducerAndConsumer_3 {
static class Factory {
private final int MAX_SIZE = 10;
private BlockingQueue<Object> queue = new ArrayBlockingQueue<>(MAX_SIZE);
public void produce() throws Exception {
queue.put(new Object());
TimeUnit.SECONDS.sleep(1);
System.out.println("生产成功");
}
public void consumer() throws Exception {
queue.take();
TimeUnit.SECONDS.sleep(1);
System.out.println("消费成功");
}
}
static class Producer implements Runnable {
Factory factory;
Producer(Factory factory) {
this.factory = factory;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
factory.produce();
}
} catch (Exception e) {
System.out.println("结束");
}
}
}
static class Consumer implements Runnable {
Factory factory;
Consumer(Factory factory) {
this.factory = factory;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
factory.consumer();
}
} catch (Exception e) {
System.out.println("结束");
}
}
}
public static void main(String[] args) throws Exception {
Factory factory = new Factory();
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Producer(factory));
executor.execute(new Producer(factory));
executor.execute(new Consumer(factory));
TimeUnit.SECONDS.sleep(5);
executor.shutdownNow();
// Thread producer = new Thread(new Producer(factory));
// Thread consumer = new Thread(new Consumer(factory));
//
// producer.start();
// consumer.start();
//
// TimeUnit.SECONDS.sleep(5);
//
// producer.interrupt();
// consumer.interrupt();
}
}
PipedWriter/PipedReader方法
在jdk引入BlockingQueue之前大多是使用这种方式来实现同步和通信,它基本上可以看做是一个阻塞队列,代码如下:
package concurrency.interview;
import java.io.IOException;
import java.io.PipedReader;
import java.io.PipedWriter;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
public class ProducerAndConsumer_4 {
static class Factory {
private final int MAX_SIZE = 10;
// private BlockingQueue<Object> queue = new ArrayBlockingQueue<>(MAX_SIZE);
private PipedWriter out = null;
private PipedReader in = null;
Factory() throws IOException {
out = new PipedWriter();
in = new PipedReader(out);
}
public void produce() throws Exception {
out.write("hello world!");;
TimeUnit.SECONDS.sleep(1);
System.out.println("生产成功");
}
public void consumer() throws Exception {
char[] buf = new char[12];
in.read(buf);
TimeUnit.SECONDS.sleep(1);
System.out.println("消费成功");
for (char c : buf) {
System.out.print(c);
}
System.out.println();
}
}
static class Producer implements Runnable {
Factory factory;
Producer(Factory factory) {
this.factory = factory;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
factory.produce();
}
} catch (Exception e) {
System.out.println("结束");
}
}
}
static class Consumer implements Runnable {
Factory factory;
Consumer(Factory factory) {
this.factory = factory;
}
@Override
public void run() {
try {
while (!Thread.interrupted()) {
factory.consumer();
}
} catch (Exception e) {
System.out.println("结束");
}
}
}
public static void main(String[] args) throws Exception {
Factory factory = new Factory();
ExecutorService executor = Executors.newCachedThreadPool();
executor.execute(new Producer(factory));
executor.execute(new Producer(factory));
executor.execute(new Consumer(factory));
TimeUnit.SECONDS.sleep(5);
executor.shutdownNow();
// Thread producer = new Thread(new Producer(factory));
// Thread consumer = new Thread(new Consumer(factory));
//
// producer.start();
// consumer.start();
//
// TimeUnit.SECONDS.sleep(5);
//
// producer.interrupt();
// consumer.interrupt();
}
}
参考资料
http://blog.csdn.net/monkey_d_meng/article/details/6251879/
https://zhuanlan.zhihu.com/p/20300609
《Java编程思想》