JUC线程进阶篇06:生产者消费者案例与Condition线程通信
标签: 多线程
使用synchronized解决线程安全问题
生产者消费者的问题
生产者消费者案例有可能产生什么样的问题?
添加和创建数据的线程叫生产者线程,删除和销毁数据的线程叫消费者线程。如果生产者线程或快,另一边接收不到,可能会造成数据丢失的情况;如果消费者线程过快,另一边已经不发了还在接收,可能会有错误的重复的数据。
/** * 生产者和消费者 */
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor,"生产者A").start();
new Thread(consumer,"消费者B").start();
}
}
// 店员
class Clerk {
private int product = 0;
// 进货
public synchronized void get() {
if (product >= 10) {
System.out.println("产品已满,无法添加");
} else {
System.out.println(Thread.currentThread().getName() + "进货:" + ++product);
}
}
// 卖货
public synchronized void sale() {
if (product <= 0) {
System.out.println("缺货");
} else {
System.out.println(Thread.currentThread().getName() + "卖货:" + --product);
}
}
}
// 生产者
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i = 0 ; i < 20 ; i++) {
clerk.get();
}
}
}
// 消费者
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i = 0 ; i < 20 ; i++) {
clerk.sale();
}
}
}
等待唤醒机制
当生产者生产满了,就不能继续生产了;消费者获取完了,就不能继续获取了。
所以要添加wait()
和notifyAll()
,当不能继续时要进入线程等待,当可以继续时,要唤醒
package sort;
/** * Created by japson on 4/22/2018. */
/** * 生产者和消费者 */
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor,"生产者A").start();
new Thread(consumer,"消费者B").start();
}
}
// 店员
class Clerk {
private int product = 0;
// 进货
public synchronized void get() {
if (product >= 10) {
System.out.println("产品已满,无法添加");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + "进货:" + ++product);
this.notifyAll();
}
}
// 卖货
public synchronized void sale() {
if (product <= 0) {
System.out.println("缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
} else {
System.out.println(Thread.currentThread().getName() + "卖货:" + --product);
this.notifyAll();
}
}
}
// 生产者
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i = 0 ; i < 20 ; i++) {
clerk.get();
}
}
}
// 消费者
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i = 0 ; i < 20 ; i++) {
clerk.sale();
}
}
}
问题1:线程死循环
在上面的代码中,好使有一些隐性的问题。我们将问题放大。
- 我们将店员进货的囤积上限设定为1
- 生产者延时0.2s
然后运行程序。会发现程序没有自动结束,还一直在执行。说明程序陷入了死循环。
这是因为,假设循坏到最后一轮时,由于产品已满引发了get()进货方法的wait()操作,然后生产者线程等待,随后消费者消费了一份产品,并唤醒等待的生产者线程。此时,被唤醒的生产者线程由于循环结束,直接结束了线程的执行。但是另一边,消费者线程没有结束,而且由于将产品消费完后再次进入了等待,但是生产者线程此时已经结束了,不能再唤醒消费者线程,所以便进入了死循环。
解决这种问题的方法时去掉Clerk类中get方法和sale方法的else,并将原来else中的代码直接提出,这样,就算线程结束,也会先再次唤醒等待的线程:
/** * 生产者和消费者 */
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor, "生产者A").start();
new Thread(consumer, "消费者B").start();
}
}
// 店员
class Clerk {
private int product = 0;
// 进货
public synchronized void get() {
if (product >= 1) {
System.out.println("产品已满,无法添加");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "进货:" + ++product);
this.notifyAll();
}
// 卖货
public synchronized void sale() {
if (product <= 0) {
System.out.println("缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "卖货:" + --product);
this.notifyAll();
}
}
// 生产者
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}
// 消费者
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}
问题2 :虚假唤醒
现在有两个(多个)消费者线程和生产者线程,情况会如何呢?
程序运行的结果中,产品数量出现了负数,这肯定是错误的。
错误的原因在于,当一个消费者线程A遇到产品为0时,等待,并释放锁标志,然后另外一个消费者线程C获取到该锁标志,由于产品仍然为0,也等待,并释放锁标志。此时消费者线程A、C都在等待中
这时候,一个生产者线程获取到锁,在生产一个产品后,执行notifyAll()唤醒所有线程,这时候,一个消费者线程A消费一个产品使得产品为0,另外一个消费者线程C再消费一个产品使得产品变为了负数。
这种现象称为虚假唤醒。在Object.wait()方法的javadoc中叙述了该如何解决这种问题:
对于某一个参数的版本,实现中断和虚假唤醒是可能的,而且此方法应始终在循环中使用:
synchronized (obj) {
while (<condition does not hold>)
obj.wait();
... // Perform action appropriate to condition
}
此方法只应由作为此对象监视器的所有者的线程来调用。有关线程能够成为监视器所有者的方法的描述,请参阅 notify 方法。
即,将get和sale方法中的if都改为while,这样,每次被唤醒后,都会再次判断产品数是否>=0:
/** * 生产者和消费者 */
public class TestProductorAndConsumer {
public static void main(String[] args) {
Clerk clerk = new Clerk();
Productor productor = new Productor(clerk);
Consumer consumer = new Consumer(clerk);
new Thread(productor, "生产者A").start();
new Thread(consumer, "消费者B").start();
new Thread(productor, "生产者C").start();
new Thread(consumer, "消费者D").start();
}
}
// 店员
class Clerk {
private int product = 0;
// 进货
public synchronized void get() {
while (product >= 1) {
System.out.println("产品已满,无法添加");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "进货:" + ++product);
this.notifyAll();
}
// 卖货
public synchronized void sale() {
while (product <= 0) {
System.out.println("缺货");
try {
this.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "卖货:" + --product);
this.notifyAll();
}
}
// 生产者
class Productor implements Runnable {
private Clerk clerk;
public Productor(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i = 0; i < 20; i++) {
try {
Thread.sleep(200);
} catch (InterruptedException e) {
e.printStackTrace();
}
clerk.get();
}
}
}
// 消费者
class Consumer implements Runnable {
private Clerk clerk;
public Consumer(Clerk clerk) {
this.clerk = clerk;
}
public void run() {
for (int i = 0; i < 20; i++) {
clerk.sale();
}
}
}
同步锁的方式解决线程安全问题
可以使用Lock来代替synchronized,需要在原同步方法中加锁,然后在finally中释放锁。这就要用到线程间的通信
线程通信
Condition工具类
在原来的synchronized内部,我们可以调用object的wait与notify方法,那么使用lock之后,如何进行线程的通信呢。
在原隐式锁中对应的wait()
、notify()
、notifyAll()
方法,在显式锁Lock中对应的是Condition对象的方法。
Condition接口描述了可能会与锁Lock有关的条件变量。分别是await()
、signal()
和signalAll()
。Condition实例实质上被绑定到一个锁上,要为特定Lock实例获得Condition实例,使用其newCondition()方法。
使用方法如下:
class Clerk {
private int product = 0;
private Lock lock = new ReentrantLock();
private Condition condition = lock.newCondition();
// 进货
public void get() {
lock.lock();
try {
while (product >= 1) {
System.out.println("产品已满,无法添加");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "进货:" + ++product);
condition.signalAll();
} finally {
lock.unlock();
}
}
// 卖货
public void sale() {
lock.lock();
try {
while (product <= 0) {
System.out.println("缺货");
try {
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(Thread.currentThread().getName() + "卖货:" + --product);
condition.signalAll();
}finally {
lock.unlock();
}
}
}
线程按序交替
编写一个程序,开启三个线程,这三个线程的ID分别为A、B、C,每个线程将自己的ID在屏幕上打印10遍,要求输出的结果必须按顺序显示。如:ABCABC…
也要使用线程通信,其基本思想是:对于每一个子线程,运行一个打印A或B或C的方法。交替执行这些方法,首先需要一个标记来记录当前应该打印那个字母。然后不应该打印的时候,调用await()方法阻塞该线程;能够打印的时候,打印并改变标记,然后唤醒下一个线程signal()。
通过标记确认应该打印那个,通过线程间的通信,阻塞或唤醒线程,来控制交替打印。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
/** * Created by japson on 4/24/2018. */
public class TestABCAlternate {
public static void main(String[] args) {
AlternateDemo alternateDemo = new AlternateDemo();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0 ; i < 10 ; i++) {
alternateDemo.loopA(i);
}
}
},"A线程").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0 ; i < 10 ; i++) {
alternateDemo.loopB(i);
}
}
},"B线程").start();
new Thread(new Runnable() {
@Override
public void run() {
for (int i = 0 ; i < 10 ; i++) {
alternateDemo.loopC(i);
}
}
},"C线程").start();
}
}
class AlternateDemo {
int number = 1; // 用来标记应该执行的线程。1 2 3对应a b c
private Lock lock = new ReentrantLock();
// 线程通信对象
private Condition condition1 = lock.newCondition();
private Condition condition2 = lock.newCondition();
private Condition condition3 = lock.newCondition();
public void loopA(int totalLoop) {
lock.lock();
try {
// 如果不应该当前线程执行,则该线程等待
if (number != 1) {
condition1.await();
}
// 打印A
System.out.println("A");
// 唤醒下一个线程
number = 2;
condition2.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void loopB(int totalLoop) {
lock.lock();
try {
// 如果不应该当前线程执行,则该线程等待
if (number != 2) {
condition2.await();
}
// 打印A
System.out.println("B");
// 唤醒下一个线程
number = 3;
condition3.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
public void loopC(int totalLoop) {
lock.lock();
try {
// 如果不应该当前线程执行,则该线程等待
if (number != 3) {
condition3.await();
}
// 打印A
System.out.println("C");
// 唤醒下一个线程
number = 1;
condition1.signal();
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}