模拟通过线程实现消费者和订阅者模式:
首先,定义一个店员:店员包含进货、卖货方法;其次,定义一个生产者,生产者负责给店员生产产品;再者,定义一个消费者,消费者负责从店员那里消费产品。
店员:
/** * 店员 */ class Clerk { private int product = 0; /** * 进货 */ public synchronized void purchase() { if (product >= 10) { System.out.println("产品已满。。。"); } else { System.out.println(Thread.currentThread().getName() + ":" + ++product); } } /** * 卖货 */ public synchronized void sell() { if (product <= 0) { System.out.println("产品缺貨。。。"); } else { System.out.println(Thread.currentThread().getName() + ":" + --product); } } }
生产者
/** * 生产者 不断的生产产品给店员 * */ class Productor implements Runnable{ private Clerk clerk; public Productor(Clerk clerk){ this.clerk=clerk; } public void run() { for(int i=0;i<20;i++){ clerk.purchase(); } } }
消费者
/** * 消费者 不断的从店员那里消费产品 * */ class Consumer implements Runnable{ private Clerk clerk; public Consumer(Clerk clerk){ this.clerk=clerk; } public void run() { for(int i=0;i<20;i++){ clerk.sell(); } } }
此时,运行程序,运行结果如下:
Productor-A:1 Productor-A:2 Productor-A:3 Productor-A:4 Productor-A:5 Productor-A:6 Productor-A:7 Productor-A:8 Productor-A:9 Productor-A:10 产品已满。。。 产品已满。。。 产品已满。。。 产品已满。。。 产品已满。。。 产品已满。。。 产品已满。。。 产品已满。。。 产品已满。。。 产品已满。。。 Consumer-A:9 Consumer-A:8 Consumer-A:7 Consumer-A:6 Consumer-A:5 Consumer-A:4 Consumer-A:3 Consumer-A:2 Consumer-A:1 Consumer-A:0 产品缺貨。。。 产品缺貨。。。 产品缺貨。。。 产品缺貨。。。 产品缺貨。。。 产品缺貨。。。 产品缺貨。。。 产品缺貨。。。 产品缺貨。。。 产品缺貨。。。
从运行打印结果可以发现这里存在两个问题:
1)一旦生产者发现店员产品已满时,仍然没有停止生产产品,在不断地生产生产产品;
2)一旦消费者发现店员产品缺货时,依然时不断地消费消费。
这里明显是有缺陷的,现实中应该是:一旦发现货物满时,就不在进货,而是开启卖货行为;当卖货行为发现无货时,开始进货行为。
针对生产者消费者改进:
消费者、生产者、客户端调用代码不变,只修改店员类:
/** * 店员 */ class Clerk { private int product = 0; /** * 进货 */ public synchronized void purchase() { if (product >= 10) { System.out.println("产品已满。。。"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(Thread.currentThread().getName() + ":" + ++product); this.notifyAll(); } } /** * 卖货 */ public synchronized void sell() { if (product <= 0) { System.out.println("产品缺貨。。。"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(Thread.currentThread().getName() + ":" + --product); this.notifyAll(); } } }
此时运行结果:
Productor-A:1 Consumer-A:0 产品缺貨。。。 Productor-A:1 Productor-A:2 Productor-A:3 Productor-A:4 Productor-A:5 Productor-A:6 Productor-A:7 Productor-A:8 Productor-A:9 Productor-A:10 产品已满。。。 Consumer-A:9 Consumer-A:8 Consumer-A:7 Consumer-A:6 Consumer-A:5 Consumer-A:4 Consumer-A:3 Consumer-A:2 Consumer-A:1 Consumer-A:0 产品缺貨。。。 Productor-A:1 Productor-A:2 Productor-A:3 Productor-A:4 Productor-A:5 Productor-A:6 Productor-A:7 Productor-A:8 Consumer-A:7 Consumer-A:6 Consumer-A:5 Consumer-A:4 Consumer-A:3 Consumer-A:2 Consumer-A:1
此时,从结果运行来说是按照我们希望的结果出现了。
改进后带来问题一:
修改店员类的最大进货数为1,把生产者一次生产20修改为2,消费者一次消费20也修改为2。
/** * 生产者 不断的生产产品给店员 */ class Productor implements Runnable { private Clerk clerk; public Productor(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i = 0; i < 2; i++) { clerk.purchase(); } } } /** * 消费者 不断的从店员那里消费产品 */ class Consumer implements Runnable { private Clerk clerk; public Consumer(Clerk clerk) { this.clerk = clerk; } public void run() { for (int i = 0; i < 2; i++) { clerk.sell(); } } }
View Code
店员类:
/** * 店员 */ class Clerk { private int product = 0; /** * 进货 */ public synchronized void purchase() { if (product >= 1) { System.out.println("产品已满。。。"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(Thread.currentThread().getName() + ":" + ++product); this.notifyAll(); } } /** * 卖货 */ public synchronized void sell() { if (product <= 0) { System.out.println("产品缺貨。。。"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } else { System.out.println(Thread.currentThread().getName() + ":" + --product); this.notifyAll(); } } }
此时运行结果:
从运行结果上来看,程序是一个死锁现象。
为什么会发生死锁问题?
从运行结果上来看分析:
分析:
1)当purchase()运行“2”是处于this.wait()等待状态时,此时sell()开始运行;
2)sell()运行时,第一次走3当运行到this.notifyAll()时,开始运行4和purchase()等待向下执行(一旦向下执行purchase将不再被调用,原因生产者只有两次循环机会),而运行‘4’时打印‘产品缺货’,而且代码走入this.wait()处于一直等待状态。
因此会看到程序一直未结束状态,这个属于代码的一个BUG。
解决方法:
修改店员类:
/** * 店员 */ class Clerk { private int product = 0; /** * 进货 */ public synchronized void purchase() { if (product >= 1) { System.out.println("产品已满。。。"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + ++product); this.notifyAll(); } /** * 卖货 */ public synchronized void sell() { if (product <= 0) { System.out.println("产品缺貨。。。"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + --product); this.notifyAll(); } }
在运行发现程序正常结束,打印结果如下:
Productor-A:1 产品已满。。。 Consumer-A:0 产品缺貨。。。 Productor-A:1 Consumer-A:0
改进后带来问题二(虚假唤醒):
此时修改客户端调用代码:
public class SpuriousWakeupsTest { public static void main(String[] args) { Clerk clerk = new Clerk(); Productor productor = new Productor(clerk); Consumer consumer = new Consumer(clerk); new Thread(productor, "Productor-A").start(); new Thread(consumer, "Consumer-A").start(); new Thread(productor, "Productor-B").start(); new Thread(consumer, "Consumer-B").start(); } }
之前只有一个生产者和一个消费者,修改后让其拥有两个生产者和两个消费者,此时运行代码如下:
Consumer-A:-1
产品缺貨。。。
Productor-A:0
Productor-A:1
Consumer-A:0
Consumer-B:-1
产品缺貨。。。
Productor-B:0
Productor-B:1
Consumer-B:0
从代码分析逻辑来看,输出解雇貌似毫无逻辑,此现象就是一个虚假唤醒现象。
问题分析:
1)缺货时,两个consumer线程都进入wait状态;
2)当另外一个生产者生产了产品并调用了notifyall,此时两个consumer线程都被唤醒并跳过wait,进入消费代码
System.out.println(Thread.currentThread().getName() + ":" + --product);
,因此导致一个输出产品数为0,另外一个产品数为-1。
解决办法:
基于while来反复判断进入正常操作的临界条件是否满足:
synchronized (obj) { while (<condition does not hold>) obj.wait(); ... // Perform action appropriate to condition }
此处理方案来此ava.lang.Object API的wati方法说明信息中。
店员修改:
/** * 店员 */ class Clerk { private int product = 0; /** * 进货 */ public synchronized void purchase() { while (product >= 1) { System.out.println("产品已满。。。"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + ++product); this.notifyAll(); } /** * 卖货 */ public synchronized void sell() { while (product <= 0) { System.out.println("产品缺貨。。。"); try { this.wait(); } catch (InterruptedException e) { e.printStackTrace(); } } System.out.println(Thread.currentThread().getName() + ":" + --product); this.notifyAll(); } }
运行结果:
Productor-A:1 产品已满。。。 Consumer-B:0 产品缺貨。。。 Productor-B:1 产品已满。。。 Consumer-A:0 产品缺貨。。。 Productor-B:1 Consumer-B:0 Productor-A:1 Consumer-A:0