这一节我们来了解阻塞队列(BlockingQueue),BlockingQueue接口定义了一种阻塞的FIFO queue,每一个BlockingQueue都有一个容量,当容量满时往BlockingQueue中添加数据时会造成阻塞,当容量为空时取元素操作会阻塞。首先我们来看ArrayBlockingQueue和LinkedBlockingQueue.
ArrayBlockingQueue
ArrayBlockingQueue是一个用数组实现的有界阻塞队列。此队列按照先进先出(FIFO)的原则对元素进行排序。默认情况下不保证访问者公平的访问队列,所谓公平访问队列是指阻塞的所有生产者线程或消费者线程,当队列可用时,可以按照阻塞的先后顺序访问队列,即先阻塞的生产者线程,可以先往队列里插入元素,先阻塞的消费者线程,可以先从队列里获取元素。通常情况下为了保证公平性会降低吞吐量。
我们看他的构造函数实现:
//默认是非公平的,初始指定队列容量
public ArrayBlockingQueue(int capacity) {
this(capacity, false);
}
//该构造方法可以设置队列的公平性。当然如果为公平的,则对性能会产生影响
//访问者的公平性是使用可重入锁实现的
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
使用很简单我们直接看一个实例:
public class ProducerConsumerTest {
public static void main(String[] args) {
final BlockingQueue<Integer> blockingQueue = new ArrayBlockingQueue<Integer>(3);
ExecutorService service = Executors.newFixedThreadPool(10);
for(int i = 0;i<4;i++){
service.execute(new ProducerAndConsumer(blockingQueue));
}
}
}
class ProducerAndConsumer implements Runnable{
private boolean flag = false;
private Integer j = 1;
private Lock lock = new ReentrantLock();
Condition pro_con = lock.newCondition();
Condition con_con = lock.newCondition();
private BlockingQueue<Integer> blockingQueue;
public ProducerAndConsumer(BlockingQueue<Integer> blockingQueue){
this.blockingQueue= blockingQueue;
}
//生产
public void put(){
try {
lock.lock();
while(flag)
pro_con.await();
System.out.println("正在准备放入数据。。。");
Thread.sleep(new Random().nextInt(10)*100);
Integer value = new Random().nextInt(30);
blockingQueue.put(value);
System.out.println(Thread.currentThread().getName()+" 放入的数据 "+value);
flag = true;
con_con.signal();
} catch (Exception e) {
e.printStackTrace();
}
finally{
lock.unlock();
}
}
public void get(){
try {
lock.lock();
while(!flag)
con_con.await();
System.out.println("正在准备取数据。。。");
Thread.sleep(new Random().nextInt(10)*1000);
System.out.println(Thread.currentThread().getName()+" 取到的数据为"+blockingQueue.take());
flag = false;
pro_con.signal();
} catch (Exception e) {
e.printStackTrace();
}
finally{
lock.unlock();
}
}
@Override
public void run() {
while(true){
if(j==1){
put();
}
else{
get();
}
j=(j+1)%2;
}
}
}
输出为:
正在准备放入数据。。。
正在准备放入数据。。。
正在准备放入数据。。。
正在准备放入数据。。。
pool-1-thread-2 放入的数据 13
正在准备取数据。。。
pool-1-thread-3 放入的数据 4
正在准备取数据。。。
pool-1-thread-3 取到的数据为13
正在准备放入数据。。。
pool-1-thread-1 放入的数据 11
正在准备取数据。。。
pool-1-thread-4 放入的数据 26
正在准备取数据。。。
pool-1-thread-1 取到的数据为4
正在准备放入数据。。。
pool-1-thread-2 取到的数据为11
正在准备放入数据。。。
pool-1-thread-3 放入的数据 18
正在准备取数据。。。
...
...
LinkedBlockingQueue
LinkedBlockingQueue是一个用链表实现的有界阻塞队列。此队列的默认和最大长度为Integer.MAX_VALUE。此队列按照先进先出的原则对元素进行排序。
先看一下他的构造函数:
public LinkedBlockingQueue() {
this(Integer.MAX_VALUE); //MAX_VALUE=2147483647
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
我们还是直接开看一个例子:
public class BlockingQueueTest {
/** * 定义装苹果的篮子 */
public static class Basket {
// 篮子,能够容纳3个苹果
// BlockingQueue<String> basket = new ArrayBlockingQueue<String>(3);
BlockingQueue<String> basket = new LinkedBlockingQueue<String>(3);
// 生产苹果,放入篮子
public void produce() throws InterruptedException {
// put方法放入一个苹果,若basket满了,等到basket有位置
basket.put("An apple");
}
// 消费苹果,从篮子中取走
public String consume() throws InterruptedException {
// get方法取出一个苹果,若basket为空,等到basket有苹果为止
return basket.take();
}
}
// 测试方法
public static void testBasket() {
// 建立一个装苹果的篮子
final Basket basket = new Basket();
// 定义苹果生产者
class Producer implements Runnable {
public String instance = "";
public Producer(String a) {
instance = a;
}
public void run() {
try {
while (true) {
// 生产苹果
System.out.println("生产者准备生产苹果:" + instance);
basket.produce();
System.out.println("! 生产者生产苹果完毕:" + instance);
// 休眠300ms
Thread.sleep(300);
}
} catch (InterruptedException ex) {
}
}
}
// 定义苹果消费者
class Consumer implements Runnable {
public String instance = "";
public Consumer(String a) {
instance = a;
}
public void run() {
try {
while (true) {
// 消费苹果
System.out.println("消费者准备消费苹果:" + instance);
basket.consume();
System.out.println("! 消费者消费苹果完毕:" + instance);
// 休眠1000ms
Thread.sleep(1000);
}
} catch (InterruptedException ex) {
}
}
}
ExecutorService service = Executors.newCachedThreadPool();
Producer producer = new Producer("P1");
Producer producer2 = new Producer("P2");
Consumer consumer = new Consumer("C1");
service.submit(producer);
service.submit(producer2);
service.submit(consumer);
// 程序运行3s后,所有任务停止
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
}
service.shutdownNow();
}
public static void main(String[] args) {
BlockingQueueTest.testBasket();
}
}
输出为:
生产者准备生产苹果:P1
消费者准备消费苹果:C1
! 生产者生产苹果完毕:P1
生产者准备生产苹果:P2
! 消费者消费苹果完毕:C1
! 生产者生产苹果完毕:P2
生产者准备生产苹果:P2
! 生产者生产苹果完毕:P2
生产者准备生产苹果:P1
! 生产者生产苹果完毕:P1
生产者准备生产苹果:P2
生产者准备生产苹果:P1
消费者准备消费苹果:C1
! 消费者消费苹果完毕:C1
! 生产者生产苹果完毕:P2
生产者准备生产苹果:P2
消费者准备消费苹果:C1
! 消费者消费苹果完毕:C1
! 生产者生产苹果完毕:P1
生产者准备生产苹果:P1
消费者准备消费苹果:C1
! 消费者消费苹果完毕:C1
! 生产者生产苹果完毕:P2
Process finished with exit code 0