JDK5.0之前,用java实现生产者和消费者的唯一方式就是使用synchronized内置锁和wait/notify条件通知机制。JDK5.0之后提供了显示锁Lock和条件队列Condition,与内置锁和内置条件队列相对应,但是显示的锁和条件队列,功能更强大,更灵活。此外JDK5.0之后还提供了大量很有用的并发工具类,如BlockingQueue等,基于这些数据结构,能够方便、快速、高效的构建自己应用需要的效果。这里我们简单使用下显示锁和条件队列,来模拟有界缓存的实现,功能类似于JDK内置的阻塞队列BlockingQueue。
生产者和消费者问题,这里不再介绍了,网上有很多资料。这里提一个概念:条件谓词。条件谓词就是使某个依赖状态的操作能够成功的前提条件。对于有界缓存来说,只有当缓存不为空,消费者才能get数据,否则消费者必须等待;只有当缓存未满,生产者才能put数据,否者生产者必须等待。对于生产者线程来说,它的条件谓词是”缓存未满“;对于消费者来说,它的条件谓词是”缓存非空“。为什么要提到条件谓词呢?因为如果我们找到了所有的条件谓词,才能够清楚线程之间的关系和依赖,才能实现正确的等待和唤醒。《java并发编程实践》一书14.2节很好地阐了锁、条件队列、条件谓词这3者之间的联系。
首先我们使用最原始的synchronized和wait/notify机制,实现生产者和消费者问题中的有界缓存,代码如下:
package waitnotify;
import java.util.ArrayList;
import java.util.List;
public class InSideLock
{
// 保护共享数据的锁
private Object lockObj = new Object();
// 缓冲池
private List<String> pools = null;
// 最大容量
private int CAPACITY = 0;
public InSideLock(int capacity)
{
this.pools = new ArrayList<String>(capacity);
this.CAPACITY = capacity;
}
public void put(String e) throws InterruptedException
{
synchronized (lockObj)
{
while (pools.size() == CAPACITY)
{
// 调用wait,必须要持有锁,而且必须在while循环中测试条件
lockObj.wait();
}
pools.add(e);
lockObj.notifyAll();
}
}
public String get() throws InterruptedException
{
synchronized (lockObj)
{
while (pools.size() == 0)
{
lockObj.wait();
}
String value = pools.remove(0);
lockObj.notifyAll();
return value;
}
}
}
使用显示锁和条件队列代码如下:
package waitnotify;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class VisibleLock
{
// 保护共享数据的锁
private Lock mainLock = new ReentrantLock();
// 消费者等待的非空谓词
private Condition notEmpty = mainLock.newCondition();
// 生产者等待的非满谓词
private Condition notFull = mainLock.newCondition();
private List<String> pools = null;
private int CAPACITY = 0;
public VisibleLock(int capacity)
{
this.pools = new ArrayList<String>(capacity);
this.CAPACITY = capacity;
}
public void put(String e) throws InterruptedException
{
mainLock.lock();
try
{
while (pools.size() == CAPACITY)
{
notFull.await();
}
pools.add(e);
// 使用条件队列的显示API,特别注意这里不能使用notify
notEmpty.signalAll();
} finally
{
mainLock.unlock();
}
}
public String get() throws InterruptedException
{
mainLock.lock();
try
{
while (pools.size() == 0)
{
notEmpty.await();
}
String result = pools.remove(0);
notFull.signalAll();
return result;
} finally
{
mainLock.unlock();
}
}
}
下面是测试生产者和消费者的代码:
package waitnotify;
public class Test
{
public static void main(String[] args) throws Exception
{
final InSideLock pc = new InSideLock(10);
Thread t1 = new Thread(new Runnable() {
@Override
public void run()
{
while (true)
{
try
{
System.out.println("get=" + pc.get());
} catch (InterruptedException e)
{
}
}
}
});
Thread t2 = new Thread(new Runnable() {
@Override
public void run()
{
while (true)
{
try
{
long time = System.currentTimeMillis();
System.out.println("put=" + time);
pc.put(time + "");
} catch (InterruptedException e)
{
}
}
}
});
t1.start();
t2.start();
}
}
可以发现使用内置锁和内置条件队列,与使用显示锁和显示条件队列代码几乎是一样的。