这篇文章将使用经典的生产者消费者的例子来进一步巩固java多线程通信,介绍使用阻塞队列来简化程序
下面是一个经典的生产者消费者的例子:
假设使用缓冲区存储整数,缓冲区的大小是受限制的。缓冲区提供write(int)方法将一个整数添加到缓冲区,还体统read()方法从缓冲区中读取并删除一个整数。为了同步操作,使用具有两个条件的锁,notEmpty(缓冲区非空)和notFull(缓冲区未满)。当任务相缓冲区添加一个int时,如果缓冲区是满的,那么任务将等待notFull状态,当任务从缓冲区总删除一个int时,如果缓冲区是空的,那么任务将等待notEmpty状态。
import java.util.LinkedList; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; public class ConsumerProducer { private static Buffer buffer = new Buffer(); public static void main(String[] args) { // 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new Producertask()); executor.execute(new Consumertask()); executor.shutdown(); } private static class Producertask implements Runnable { @Override public void run() { try { int i = 1; while (true) { System.out.println("Producer writes " + i); buffer.write(i++); Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Consumertask implements Runnable { public void run() { try { while (true) { System.out.println("\t\t\tConsumer reads " + buffer.read()); Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Buffer { private static final int CAPACITY = 1; // buffer size LinkedList<Integer> queue = new LinkedList<Integer>(); // 创建锁 private static Lock lock = new ReentrantLock(); // 创建两个条件 private static Condition notEmpty = lock.newCondition(); private static Condition notFull = lock.newCondition(); public void write(int value) { lock.lock(); // 请求锁 try { while (queue.size() == CAPACITY) { System.out.println("Wait for notFull condition"); notFull.await(); } queue.offer(value); notEmpty.signal(); // notEmpty条件信号 } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); // 释放锁 } } @SuppressWarnings("finally") public int read() { int value = 0; lock.lock(); try { while (queue.isEmpty()) { System.out.println("\t\t\tWait for notEmpty condition"); notEmpty.await(); } value = queue.remove(); notFull.signal(); } catch (Exception e) { e.printStackTrace(); } finally { lock.unlock(); return value; } } } }
阻塞列队
阻塞列队在试图想一个满列队添加元素或这从空列队删除元素时会导致线程阻塞。BlockQueue接口扩展java.util.queue,并且提供同步的put和take方法想列队头部添加元素,以及从列队尾删除元素
java支持三个具体的阻塞列队ArrayBlockingQueue、LinkedblockingQueue 和 PriorityBlockingQueue ,他们都在java.util.concurrent包中。
ArrayBlockingQueue
一个由数组支持的有界阻塞队列。此队列按 FIFO(先进先出)原则对元素进行排序。队列的头部 是在队列中存在时间最长的元素。队列的尾部 是在队列中存在时间最短的元素。新元素插入到队列的尾部,队列获取操作则是从队列头部开始获得元素。
这是一个典型的“有界缓存区”,固定大小的数组在其中保持生产者插入的元素和使用者提取的元素。一旦创建了这样的缓存区,就不能再增加其容量。试图向已满队列中放入元素会导致操作受阻塞;试图从空队列中提取元素将导致类似阻塞。
此类支持对等待的生产者线程和使用者线程进行排序的可选公平策略。默认情况下,不保证是这种排序。然而,通过将公平性 (fairness) 设置为 true 而构造的队列允许按照 FIFO 顺序访问线程。公平性通常会降低吞吐量,但也减少了可变性和避免了“不平衡性”。
LinkedBlockingQueue
一个基于已链接节点的、范围任意的 blocking queue。此队列按 FIFO(先进先出)排序元素。队列的头部 是在队列中时间最长的元素。队列的尾部 是在队列中时间最短的元素。新元素插入到队列的尾部,并且队列获取操作会获得位于队列头部的元素。链接队列的吞吐量通常要高于基于数组的队列,但是在大多数并发应用程序中,其可预知的性能要低。
可选的容量范围构造方法参数作为防止队列过度扩展的一种方法。如果未指定容量,则它等于Integer.MAX_VALUE
。除非插入节点会使队列超出容量,否则每次插入后会动态地创建链接节点。
PriorityBlockingQueue
一个无界阻塞队列,它使用与类 PriorityQueue
相同的顺序规则,并且提供了阻塞获取操作。虽然此队列逻辑上是无界的,但是资源被耗尽时试图执行 add 操作也将失败(导致 OutOfMemoryError)。此类不允许使用 null 元素。依赖自然顺序的优先级队列也不允许插入不可比较的对象(这样做会导致抛出 ClassCastException)。
使用ArrayBlockingQueue简化后的代码如下:
import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ConsumerProducerUsingBlockingQueue { private static ArrayBlockingQueue<Integer> buffer = new ArrayBlockingQueue<Integer>(2); public static void main(String[] args) { // 创建线程池 ExecutorService executor = Executors.newFixedThreadPool(2); executor.execute(new Producertask()); executor.execute(new Consumertask()); executor.shutdown(); } private static class Producertask implements Runnable { @Override public void run() { try { int i = 1; while (true) { System.out.println("Producer writes " + i); buffer.put(i++); Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } private static class Consumertask implements Runnable { public void run() { try { while (true) { System.out.println("\t\t\tConsumer reads " + buffer.take()); Thread.sleep((int) (Math.random() * 10000)); } } catch (InterruptedException e) { e.printStackTrace(); } } } }
可以看到,代码减少了一半,主要是因为ArrayBlockingQueue中已经实现了同步,所以无需手动编码