先说说BlockingQueue
BlockingQueue
,JUC中定义的阻塞队列接口,为阻塞队列的实现定义了一些通用的方法,其中包括非阻塞操作以及阻塞操作
- 非阻塞操作:
- offer,add(add不常用):队列添加元素(队尾添加)
- poll,remove(remove不常用):移除队列元素(队头移除)
- 阻塞操作:
- put:队列添加元素,通常在有界队列实现里面,当队列元素个数达到容量上限,则阻塞等待,直到队列不满且添加元素成功,则解除阻塞(队尾添加)
- take:移除队列元素,当队列为空,则阻塞等待,直到队列非空获取到元素,则解除阻塞(队头移除)
使用场景
阻塞队列在多线程编程里面,常用于实现生产者-消费者模式,生成者往往队列里面丢数据,而消费者从队列里面获取数据进行处理
经典的案例就是Java里面的线程池实现,任务会往任务队列(阻塞队列)里面丢(调用线程池的线程即是生产者),等待线程池里面的线程(消费者)去消费并处理
ArrayBlockingQueue基本介绍
阻塞队列的一种实现,特点如下:
- 队列为有界队列,存在队列满的情况
- add或offer方法不阻塞,offer添加成功返回true,否则返回false(队列满的情况下则添加失败),而add方法在队列满的情况下,抛异常,通常不建议使用add方法
- poll或remove方法不阻塞,poll方法在队列无元素的情况下返回null,remove方法在队列满的情况下,会抛异常,通常不建议使用remove方法
- put方法在队列满的情况下,线程会阻塞等待,直到队列不满,添加元素成功,则解除阻塞等待
- take方法在队列空的情况下,线程会阻塞等待,直到队列非空,取得元素,则解除阻塞等待
阻塞队列作为队列的一种实现,也实现了peek(取队列首元素)等普通队列所具备的方法,这里分析的时候,这些方法就不作为关键点来描述了
示例
展示一个示例:
master线程(生产者)生成task,交由worker线程(消费者)进行处理:
package com.crazypig.juc;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;
public class ArrayBlockingQueueTest {
public static void main(String[] args) {
BlockingQueue<Task> taskQueue = new ArrayBlockingQueue<Task>(1024);
Thread masterThread = new Thread(new Master(taskQueue), "masterThread");
Thread workerThread = new Thread(new Worker(taskQueue), "workerThread");
workerThread.start();
masterThread.start();
for (;;) {
try {
TimeUnit.SECONDS.sleep(3);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
static class Master implements Runnable {
private BlockingQueue<Task> taskQueue;
public Master(BlockingQueue<Task> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
for (;;) {
try {
TimeUnit.SECONDS.sleep(3);
taskQueue.put(new Task("task" + System.currentTimeMillis()));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Worker implements Runnable {
private BlockingQueue<Task> taskQueue;
public Worker(BlockingQueue<Task> taskQueue) {
this.taskQueue = taskQueue;
}
public void run() {
for (;;) {
try {
Task task = taskQueue.take();
task.doTask();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
static class Task {
private String taskName;
public Task(String taskName) {
this.taskName = taskName;
}
public void doTask() {
System.out.println("do task " + taskName);
}
}
}
源码分析
- 内部维护一个数组存放队列元素,通过控制putIndex和takeIndex的下表来进行队列元素的增删,同时也有记录队列元素个数的计数count
- 内部实际上利用
ReentrantLock
来实现多线程并发操作(队列入队,出队,取队头等操作,都需要上锁) - 同时,利用
ReentrantLock
提供的Condition
来实现阻塞的put和take,里面用到两个condition: notFull和notEmpty,分别用于实现阻塞的put和take操作
先看看ArrayBlockingQueue
内部重要的成员变量:
final Object[] items; // 队列存储元素的容器(数组)
int takeIndex; // 指向下一个可获取元素的位置
int putIndex; // 指向下一个可存放元素的位置
int count; // 记录队列中元素个数
// 用于实现多线程操作和阻塞take和put的关键
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
我们重点分析阻塞操作put和take的实现:
非阻塞操作的实现在理解了阻塞操作的基础上,就相当容易理解了
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await();
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
可以看出与传统的wait/notify实现几乎一致:
synchronized (obj) { while (条件不满足) { wait... } notify }
那么我们可以判断,实际上在enqueue和dequeue方法,就会实现类似于Object notify的功能,对应于Condition,应该是调用signal,具体看代码:
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
if (++putIndex == items.length)
putIndex = 0;
count++;
notEmpty.signal();
}
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
notFull.signal();
return x;
}
- ArrayBlockingQueue还提供了响应超时的相关方法,具体实现也就是调用coniditon的await超时方法,这里就不具体展开分析了,感兴趣地直接翻下源码就好
- 注意ArrayBlockingQueue里面的数组是循环使用的,take和put指向下标超过数组大小,则从头开始循环使用