简介
在java.util.concurrent
包下实现并发队列的方式有两种:阻塞队列和非阻塞队列。下面一一来讲解。
一、阻塞队列
阻塞队列指的是在多线程下,当前线程如果无法完成入队(队列已满)/出队(队列为空)操作的时候就阻塞等待,直到条件满足而被其他线程唤醒后再完成入队/出队操作。
阻塞队列都实现了java.util.concurrent
包下的BlockingQueue
接口(对应Deque
的是BlockingDeque
)。这个接口是对Queue
接口的一个扩展。
BlockingQueue(BlockingDeque)不仅仅实现了队列的基本功能,同时还实现了在多线程并发的环境下自动管理线程的等待和唤醒的功能。
我们来看看BlockingQueue接口的定义:
public interface BlockingQueue<E> extends Queue<E> {
// 非阻塞入队,如队列满了则抛出IllegalStateException异常
boolean add(E e);
// 非阻塞入队,成功返回true,如队列满了则返回false
boolean offer(E e);
// 新增:阻塞方式入队,当队列满了的时候会阻塞等待,这里新增了两个参数来指定阻塞等待的时间,
// 如果在指定的时间内成功入队则返回true,否则返回false
boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 新增:阻塞方式入队,当队列满了的时候会阻塞等待,直到队列有空间存放数据
void put(E e) throws InterruptedException;
// 非阻塞出队,如果队列为空则抛出NoSuchElementException异常
E remove();
// 非阻塞出队,如果队列为空则返回null
E poll();
// 新增:阻塞方式出队,当队列为空的时候会阻塞等待,这里新增了两个参数来指定阻塞等待的时间
E poll(long timeout, TimeUnit unit) throws InterruptedException;
// 新增:阻塞方式出队,当队列为空的时候会阻塞等待,直到队列有数据入队
E take() throws InterruptedException;
// 获取队列的剩余容量
int remainingCapacity();
boolean remove(Object o);
boolean contains(Object o);
// 一次性取出队里中的全部元素,将其存放在指定的容器c里面,返回值是取的个数
int drainTo(Collection<? super E> c);
// 一次性取出队里中的多个元素,将其存放在指定的容器c里面,maxElements表示取的最大个数,返回值是取的个数
int drainTo(Collection<? super E> c, int maxElements);
可见,BlockingQueue在Queue的基础上增加了阻塞机制,并新增了几个用于实现阻塞的扩展方法。
基于BlockingQueue实现的数据结构类有ArrayBlockingQueue
、LinkedBlockingQueue
、PriorityBlockingQueue
、DelayQueue
、SynchronousQueue
、LinkedTransferQueue
六个。
1.1 ArrayBlockingQueue
ArrayBlockingQueue
内部是使用数组来实现的,它是一个有界队列,在初始化的时候需要指定容量,一旦指定就不能更改。当然,它也是一个循环队列(解释见上章的ArrayDeque)。
另外,ArrayBlockingQueue内部是使用ReentrantLock
锁来实现同步的,在初始化的时候我们可以指定使用公平锁或非公平锁。
ReentrantLock锁会维持一个获取锁的线程队列,当锁被占用时,后来的请求线程会依次入队等待。
- 公平锁:各线程按照其入队的顺序依次获取锁;
- 非公平锁:如果某个线程在发出锁请求的那一刻锁正好处于空闲状态,则该线程会立即获得锁(而不是将该线程入队,然后将锁给队列第一个线程),否则正常入队等待按顺序获得锁。
ReentrantLock默认采用的是非公平锁。
来看源码:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
// 队列内部用于存放数据的数组,注意这里是用了final关键字来修饰,也就是说明其容量是不可更改的
final Object[] items;
// 全局锁,所有访问操作(入队、出队、查看等等)都需要先获得该锁
final ReentrantLock lock;
// 队列不为空的条件对象,当队列已有元素时,用来唤醒等待出队的线程
private final Condition notEmpty;
// 队列未满的条件对象,当队列有空位时,用来唤醒等待入队的线程
private final Condition notFull;
...
public ArrayBlockingQueue(int capacity) {
// 指定队列容量,并默认使用非公平锁
this(capacity, false);
}
// 指定队列容量,同时指定是否使用公平锁
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
...
}
ArrayBlockingQueue的入队/出队操作方法可以分为以下四套。
第一套方法(Queue接口定义的方法):非阻塞、失败抛出异常。
public boolean add(E e) {
// 调用父类的add方法
return super.add(e);
}
// 这个是上面的super.add方法实现,放到这里只是为了给大家看看它的具体实现
public boolean add(E e) {
// 可见,它其实调用的就是offer方法
if (offer(e))
// 入队成功就返回true
return true;
else
// 入队失败(队列满了)就抛出IllegalStateException异常
throw new IllegalStateException("Queue full");
}
// 该方法也是在父类中,和前面的add方法对应
public E remove() {
E x = poll();
if (x != null)
// 成功返回删除的元素
return x;
else
// 当队列为空时抛出NoSuchElementException异常
throw new NoSuchElementException();
}
第二套方法(Queue接口定义的方法):非阻塞,失败返回false或null。
public boolean offer(E e) {
// 检查要插入的元素e是否为null,如果是则抛出NullPointerException异常
Objects.requireNonNull(e);
// 为了保证线程安全,需要先获取到锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
if (count == items.length)
// 如果当前队列已满,则返回false,可以看出这里是非阻塞的
return false;
else {
// 将元素入队
enqueue(e);
return true;
}
} finally {
// 释放锁
lock.unlock();
}
}
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 队列为空则返回null,非阻塞方式,和前面的offer(e)方法对应
return (count == 0) ? null : dequeue();
} finally {
lock.unlock();
}
}
第三套方法(BlockingQueue接口新增的方法):阻塞,但是可以控制阻塞超时时间,失败返回false或null,如果线程发生中断则会抛出InterruptedException异常。
// 相比前面的那个offer方法,这里增加了两个用来设置阻塞超时时间的参数
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
Objects.requireNonNull(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly(); -- 解释1
try {
while (count == items.length) {
// 如果已经超时则返回false表示入队失败
if (nanos <= 0L)
return false;
// 阻塞等待指定时间
// 这里awaitNanos方法在两种情况下会返回:
// 一是当有元素出队后调用了notFull.signal()来结束阻塞
// 二是awaitNanos方法自己返回结束了阻塞
// 当自己返回时又有两种情况:
// 一是到了设定的超时时间,此时awaitNanos的返回值不会大于0
// 二是还没到超时时间,此时awaitNanos的返回值大于0,表示还要继续阻塞的时间
nanos = notFull.awaitNanos(nanos);
}
enqueue(e);
return true;
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0) {
if (nanos <= 0L)
return null;
// 阻塞等待指定时间,和前面的offer(e, timeout, unit)方法对应
nanos = notEmpty.awaitNanos(nanos);
}
return dequeue();
} finally {
lock.unlock();
}
}
解释1:
注意这里ReentrantLock的lock()
和lockInterruptibly()
方法的区别。主要是在获取锁过程中如果线程发生了中断时进行了不同的处理:
lock():如果在获取锁过程发生了中断,会继续尝试获取锁,直到获取成功才返回。当返回后再来判断获取锁过程中是否发生过中断,如果发生过则将该线程设置为中断状态。即获取锁过程中是不可被中断的。
lockInterruptibly():如果在获取锁过程发生了中断,则会立即停止获取锁,并抛出一个中断异常(InterruptedException)。即获取锁过程中是可以被中断的。
第四套方法(BlockingQueue接口新增的方法):阻塞直到成功,如果线程发生中断则会抛出InterruptedException异常。
public void put(E e) throws InterruptedException {
Objects.requireNonNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
// 如果队列以满,则进行等待,直到有元素出队
// 这里和上面offer里面的notFull.awaitNanos(nanos)的区别就是这里是一直等待
// 在出队(dequeue)的时候,会调用notFull.signal()方法让notFull的await方法返回
notFull.await();
// 将元素入队
enqueue(e);
} finally {
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
// 如果队列为空则阻塞等待,直到有元素入队,和前面的put方法对应
// 在入队(enqueue)的时候,会调用notEmpty.signal()方法让notEmpty的await方法返回
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
其他方法:
// 入队
private void enqueue(E x) {
final Object[] items = this.items;
items[putIndex] = x;
// 如果putIndex已经指向的是数组的最后一个位置,则将其指向数组第一个位置(循环队列)
if (++putIndex == items.length) putIndex = 0;
count++;
// 唤醒处于出队等待的线程
notEmpty.signal();
}
// 出队
private E dequeue() {
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
if (++takeIndex == items.length) takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
// 唤醒处于入队等待的线程
notFull.signal();
return x;
}
// 查看队列第一个元素
public E peek() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return itemAt(takeIndex); // null when queue is empty
} finally {
lock.unlock();
}
}
// 获取队列的可用空间
public int remainingCapacity() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return items.length - count;
} finally {
lock.unlock();
}
}
通过上面的介绍,相信大家对ArrayBlockingQueue的特性都有一定的了解了。
1.2 LinkedBlockingQueue
相比于上面的ArrayBlockingQueue,LinkedBlockingQueue
内部是使用单链表来实现的,而且它内部使用了两个ReentrantLock锁来进行同步(一把锁负责出队,一把锁负责入队),以此来提高链表的操作性能。
LinkedBlockingQueue也是一个有界队列,和ArrayBlockingQueue的区别是LinkedBlockingQueue有个默认的容量,大小为Integer.MAX_VALUE
。另外,LinkedBlockingQueue的两个锁都是非公平锁。源码如下:
// 链表节点类
static class Node<E> {
// 节点数据
E item;
// 指向下一个节点的指针
Node<E> next;
Node(E x) { item = x; }
}
// 队列容量,用final修饰的,所以容量也是不可变的
private final int capacity;
// 队列元素计数器
private final AtomicInteger count = new AtomicInteger();
// 队列(链表)头指针
transient Node<E> head;
// 队列(链表)尾指针
private transient Node<E> last;
// 出队锁
private final ReentrantLock takeLock = new ReentrantLock();
// 入队锁
private final ReentrantLock putLock = new ReentrantLock();
public LinkedBlockingQueue() {
// 默认大小为Integer.MAX_VALUE
this(Integer.MAX_VALUE);
}
public LinkedBlockingQueue(int capacity) {
if (capacity <= 0) throw new IllegalArgumentException();
this.capacity = capacity;
last = head = new Node<E>(null);
}
它和ArrayBlockingQueue一样,入队/出队操作也有四套方法。
第一套方法(Queue接口定义的方法):非阻塞,失败抛出异常。
// 这两个方法都在父类AbstractQueue里面,实现和ArrayBlockingQueue一样
public boolean add(E e) {
if (offer(e))
// 成功返回true
return true;
else
// 失败抛出IllegalStateException异常
throw new IllegalStateException("Queue full");
}
public E remove() {
E x = poll();
if (x != null)
// 成功返回删除的元素
return x;
else
// 失败抛出NoSuchElementException异常
throw new NoSuchElementException();
}
第二套方法(Queue接口定义的方法):非阻塞,失败返回false或null。
public boolean offer(E e) {
// 元素不能为空
if (e == null) throw new NullPointerException();
final AtomicInteger count = this.count;
// 如果队列已满则返回false
if (count.get() == capacity)
return false;
int c = -1;
// 将要插入的元素封装成一个node对象
Node<E> node = new Node<E>(e);
// 获取入队锁
final ReentrantLock putLock = this.putLock;
putLock.lock();
try {
// 如果队列没满,则进行入队操作
if (count.get() < capacity) {
enqueue(node);
// 如果入队操作后队列还是没满,则发送一个队列未满的信号
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
}
} finally {
putLock.unlock();
}
// 如果队列在入队之前是空的,则在入队操作后发送一个队列不为空的信号
if (c == 0)
signalNotEmpty();
return c >= 0;
}
public E poll() {
final AtomicInteger count = this.count;
// 如果队列为空则返回null
if (count.get() == 0)
return null;
E x = null;
int c = -1;
// 获取出队锁
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
// 如果队列不为空,则执行出队操作
if (count.get() > 0) {
x = dequeue();
c = count.getAndDecrement();
if (c > 1)
// 如果队列在出队操作之后还是不为空,则发送一个队列不为空的信号
notEmpty.signal();
}
} finally {
takeLock.unlock();
}
// 如果队列在出队操作之前是满的,则在出队操作后发送一个队列未满的信号
if (c == capacity)
signalNotFull();
return x;
}
第三套方法(BlockingQueue接口新增的方法):阻塞,但是可以控制最大等待时间,失败返回false或null,如果线程发生中断则会抛出InterruptedException异常。
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 入队的元素不能为空
if (e == null) throw new NullPointerException();
long nanos = unit.toNanos(timeout);
int c = -1;
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 阻塞等待(最多等待指定的时间)
while (count.get() == capacity) {
// 如果在指定的时间里队列依然是满的则返回false
if (nanos <= 0L)
return false;
nanos = notFull.awaitNanos(nanos);
}
// 入队
enqueue(new Node<E>(e));
// 如果队列在入队操作后还是没满,则发送一个未满的信号
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果队列在入队操作之前是空的,则在入队操作之后发送一个队列不为空的信号
if (c == 0)
signalNotEmpty();
return true;
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
E x = null;
int c = -1;
long nanos = unit.toNanos(timeout);
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 阻塞等待(最多等待指定的时间)
while (count.get() == 0) {
// 如果在指定的时间里队列依然是空的则返回null
if (nanos <= 0L)
return null;
nanos = notEmpty.awaitNanos(nanos);
}
// 出队
x = dequeue();
// 如果队列在出队操作之后还是不为空,则发送一个队列不为空的信号
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果队列在出队操作之前是满的,则在出队操作后发送一个队列未满的信号
if (c == capacity)
signalNotFull();
return x;
}
第四套方法(BlockingQueue接口新增的方法):阻塞等待直到成功,如果线程发生中断则会抛出InterruptedException异常。
public void put(E e) throws InterruptedException {
// 要插入的元素不能为空
if (e == null) throw new NullPointerException();
int c = -1;
Node<E> node = new Node<E>(e);
final ReentrantLock putLock = this.putLock;
final AtomicInteger count = this.count;
putLock.lockInterruptibly();
try {
// 阻塞等待,直到队列不再是满的
while (count.get() == capacity) {
notFull.await();
}
// 入队
enqueue(node);
// 如果队列在入队操作之后依然没满,则发送一个队列未满的信号
c = count.getAndIncrement();
if (c + 1 < capacity)
notFull.signal();
} finally {
putLock.unlock();
}
// 如果队列在入队操作之前是空的,则在入队操作之后发送一个队列不为空的信号
if (c == 0)
signalNotEmpty();
}
public E take() throws InterruptedException {
E x;
int c = -1;
final AtomicInteger count = this.count;
final ReentrantLock takeLock = this.takeLock;
takeLock.lockInterruptibly();
try {
// 阻塞等待,直到队列不为空
while (count.get() == 0) {
notEmpty.await();
}
// 出队
x = dequeue();
// 如果队列在入队操作之后还是不为空,则发送一个队列不为空的信号
c = count.getAndDecrement();
if (c > 1)
notEmpty.signal();
} finally {
takeLock.unlock();
}
// 如果队列在出队操作之前是满的,则在出队操作之后发送一个队列未满的信号
if (c == capacity)
signalNotFull();
return x;
}
其他方法:
// 入队
private void enqueue(Node<E> node) {
last = last.next = node;
}
// 出队
private E dequeue() {
Node<E> h = head;
Node<E> first = h.next;
h.next = h; // help GC
head = first;
E x = first.item;
first.item = null;
return x;
}
// 查看队列第一个元素
public E peek() {
if (count.get() == 0)
return null;
final ReentrantLock takeLock = this.takeLock;
takeLock.lock();
try {
return (count.get() > 0) ? head.next.item : null;
} finally {
takeLock.unlock();
}
}
// 获取队列的可用空间
public int remainingCapacity() {
return capacity - count.get();
}
可见,LinkedBlockingQueue和ArrayBlockingQueue是非常相似的。
对比:
- 锁实现不同;
由于LinkedBlockingQueue使用的是两把锁,将入队锁和出队锁分开了,降低了阻塞的概率,因此在吞吐量上它要比ArrayBlockingQueue高一些。
- 数组和链表的区别;
2.1 内存利用率上ArrayBlockingQueue比LinkedBlockingQueue低,数组需要的是连续的存储空间,无法使用碎片化的空间;
2.2 内存占用上ArrayBlockingQueue比LinkedBlockingQueue低,链表每个元素需要额外的空间。同时在入队/出队的时候LinkedBlockingQueue需要创建/销毁额外的对象,则在长时间的高并发情况下对内存还是有一定影响;
2.3 在容量的使用率不高的情况下,LinkedBlockingQueue在内存使用方面更友好。比如需要一个能存放1000个元素的队列,ArrayBlockingQueue在初始化的时候就会创建一个长度为1000的数组,每个数组长度为4个字节(假设存放的是对象引用),则一开始ArrayBlockingQueue内部就会申请4000字节的元素存储空间,而LinkedBlockingQueue只会初始化一个Node对象(而不是直接创建1000个),在需要的时候再实时创建。这样当队列容量的利用率不高的时候,ArrayBlockingQueue申请的空间就有些浪费了。
1.3 PriorityBlockingQueue
PriorityBlockingQueue
是优先队列(PriorityQueue)的多线程并发实现,它是在优先队列的功能上增加了同步阻塞的功能。
PriorityBlockingQueue是一个没有大小限制的队列(默认容量和PriorityQueue一样都是11),所以入队操作永远不会阻塞,只有在出队的时候如果没有元素才会阻塞。
入队操作:
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
// 调用offer(e)方法
offer(e); // never need to block
}
public boolean offer(E e, long timeout, TimeUnit unit) {
// 直接调用offer(e)方法,忽略参数2和参数3
return offer(e); // never need to block
}
public boolean offer(E e) {
// 元素不能为null
if (e == null)
throw new NullPointerException();
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
// 检查扩容
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
// 入队操作,和PriorityQueue逻辑一样
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
// 唤醒阻塞等待出队的线程
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
可见,入队操作不是阻塞的。
再来看看出队操作:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 出队操作(dequeue)和PriorityQueue逻辑也基本一样,所以这个poll方法不是阻塞的
return dequeue();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 如果没有元素,则阻塞等待新元素入队后被唤醒,或则等待时间达到了指定的时间,所以这个方法也是阻塞的
while ( (result = dequeue()) == null && nanos > 0)
nanos = notEmpty.awaitNanos(nanos);
} finally {
lock.unlock();
}
return result;
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
E result;
try {
// 如果没有元素,则阻塞等待新元素入队后被唤醒,所以这个方法是阻塞的
while ( (result = dequeue()) == null)
notEmpty.await();
} finally {
lock.unlock();
}
return result;
}
分析了的源码后,大家可以看出PriorityBlockingQueue还是很简单的,只是在PriorityQueue的基础上增加了多线程的一些功能。
1.4 DelayQueue
DelayQueue
中的元素只有在该元素指定的时间到了才能出队。
DelayQueue内部是用优先队列(PriorityQueue
)来保存数据的,是对优先队列的进一步封装,所以它是一个特殊的优先队列。也因此它和PriorityQueue一样,对容量是没有限制的,所以入队操作永远不会阻塞,只有在出队的时候才会阻塞。
DelayQueue的定义如下:
public class DelayQueue<E extends Delayed> extends AbstractQueue<E> implements BlockingQueue<E> {
// 为了实现同步,依然使用了ReentrantLock锁
private final transient ReentrantLock lock = new ReentrantLock();
// 内部使用PriorityQueue来存储元素,这里也能看出是没有限制容量大小的(PriorityQueue会自动进行扩容)
private final PriorityQueue<E> q = new PriorityQueue<E>();
private Thread leader;
private final Condition available = lock.newCondition();
public DelayQueue() {}
public DelayQueue(Collection<? extends E> c) {
this.addAll(c);
}
}
看上面的定义会发现,DelayQueue存储的元素必须实现java.util.concurrent.Delayed
接口。定义如下:
public interface Delayed extends Comparable<Delayed> {
// 获取该元素还需要延迟多久,如果返回的值不大于表示已经到了指定的延迟时间了
long getDelay(TimeUnit unit);
}
DelayQueue将延迟时间短的排在队列前面,延迟时间长的排在队列后面。它的元素需要实现两个方法:
// 以一个Item类来举例
public class Item implement Delayed {
...
// 过期时间,该值是将来的某个时间点,不是一个相对时间(如3秒)
private long expTime;
...
// Comparable接口的方法:该方法用来在优先队列中进行排序
@Override
public int compareTo(Delayed other) {
// 用于延迟时间的排序,因为是要和队列中其他元素进行比较,
// 也就是为什么上面说不能用相对时间而是要用绝对时间(当然,也要视具体情况而定)
return Long.valueOf(expTime).compareTo(Long.valueOf(((Item) other).expTime);
}
// Delayed接口的方法:该方法用来判断元素是否过期(返回剩余的延迟时间)
@Override
public long getDelay(TimeUnit unit) {
// 获取当前时刻还需要延迟的时间,注意该值会随着时间在不停减小(因为离指定的延迟时间点越来越近)而不是固定不变的
return expTime - System.currentTimeMillis();
}
}
下面来看看DelayQueue的入队操作:
public boolean add(E e) {
return offer(e);
}
public void put(E e) {
offer(e);
}
public boolean offer(E e, long timeout, TimeUnit unit) {
return offer(e);
}
public boolean offer(E e) {
// 获取锁
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 获取锁后就直接入队,可见入队操作不是阻塞的
q.offer(e);
// 如果队里第一个元素是刚才入队的元素则说明队列以前是空的或则新入队的元素的延迟时间最短,
// 这时需要唤醒等待出队的线程,让它们重新计算等待时间
if (q.peek() == e) {
leader = null;
available.signal();
}
return true;
} finally {
lock.unlock();
}
}
可见,入队操作最终都是调用的offer(e)
方法,而这个方法不是阻塞的,所以DelayQueue的入队操作不是阻塞的。
再来看看出队操作:
public E poll() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E first = q.peek();
// 查看队列第一个元素是否到期,如果到期了则将这个元素出队返回,否则返回null
// 可见,这个方法不是阻塞的
return (first == null || first.getDelay(NANOSECONDS) > 0) ? null : q.poll();
} finally {
lock.unlock();
}
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 这是一个无线循环,直到取到元素或到了指定的等待时间为止,所以它是一个阻塞方法
for (;;) {
// 查看队列第一个元素
E first = q.peek();
// 如果队列为空,则阻塞等待新元素入队(最多等待nanos时间)
if (first == null) {
if (nanos <= 0L)
return null;
else
nanos = available.awaitNanos(nanos);
} else {
// 该元素还需要延迟的时间
long delay = first.getDelay(NANOSECONDS);
// 检查这个元素是否过期,如果过期则直接将这个元素出队并返回
if (delay <= 0L)
return q.poll();
// 如果等待时间到了则返回null
if (nanos <= 0L)
return null;
// 如果没有过期,当前线程就会进入阻塞等待阶段
// 这里将first设置为null,为的是在等待期间不要持有该元素的引用
first = null; // don't retain ref while waiting
if (nanos < delay || leader != null)
// 等待指定的时间
nanos = available.awaitNanos(nanos);
else {
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
long timeLeft = available.awaitNanos(delay);
nanos -= delay - timeLeft;
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 这是一个无线循环,直到取到元素为止,所以它是一个阻塞方法
for (;;) {
// 查看队列第一个元素
E first = q.peek();
// 如果队列为空,则阻塞等待新元素入队
if (first == null)
available.await();
else {
// 该元素还需要延迟的时间
long delay = first.getDelay(NANOSECONDS);
// 检查这个元素是否过期,如果过期则直接将这个元素出队并返回
if (delay <= 0L)
return q.poll();
// 如果没有过期,当前线程就会进入阻塞等待阶段
// 这里将first设置为null,为的是在等待期间不要持有该元素的引用
first = null; // don't retain ref while waiting
// leader是当前正处于等待的线程,如果不为空表示当前有其他线程正在等待
if (leader != null)
// 如果有其他线程处于等待阶段,则当前线程就一直等待,直到有新元素入队被唤醒
// 当前线程不会再去处理这个first元素,而让leader线程去处理,
// 因为leader线程的等待时间较短(见下面else逻辑)
available.await();
else {
// 如果当前没有其他线程处于阻塞等待,那么将就当前线程设置为leader
Thread thisThread = Thread.currentThread();
leader = thisThread;
try {
// 只阻塞等待该元素将要过期的时间
// 相对上面的available.await(),这里会在delay时间后自动唤醒,然后来处理这个元素
available.awaitNanos(delay);
} finally {
if (leader == thisThread)
leader = null;
}
}
}
}
} finally {
if (leader == null && q.peek() != null)
available.signal();
lock.unlock();
}
}
用途
DelayQueue主要用来管理超时任务。比如可以用来实现订单的定时取消:将订单保存在一个DelayQueue队列中,然后用一个线程来轮询尝试取出订单,能取出的订单都是已经到期的订单了。
1.5 SynchronousQueue
SynchronousQueue
的入队和出队是一对一的关系,即有一个入队就必须等待一个出队,否则后面的元素无法入队,所以SynchronousQueue是一种无缓冲的等待队列,也就是其内部没有存放元素的队列(因为它同时最多只会存在一个元素)。
入队操作:会一直阻塞等待,直到有出队操作处理了这个元素才返回;
出队操作:会一直阻塞等待,直到有入队操作添加了元素,才会取到这个元素并返回。
SynchronousQueue内部维护着一个存放操作线程的队列(前面几个队列的操作线程队列由ReentrantLock锁在维护),当有多个入队操作线程或出队操作线程时,会将他们添加到一个队列中依次处理,处理方式有两种:公平模式和非公平模式。
- 公平模式:采取FIFO的模式(队列),即先来的先处理;
- 非公平模式(默认):采取LIFO的模式(栈),后来的先处理。
非公平模式很容易出现饥渴情况,比如入队速度快于出队速度时,那么早期入队的可能永远得不到处理;
先来看下它的构造方法:
public SynchronousQueue() {
// 默认采用的非公平模式
this(false);
}
public SynchronousQueue(boolean fair) {
// transferer就是用来处理元素的,
// 入队时调用transferer来执行入队操作并等待出队操作,没有出队操作时就会阻塞等待
// 出队时调用transferer来执行出队操作,没有元素时就会阻塞等待
//
// 可见,公平模式使用的是TransferQueue对象,是一个队列(FIFO)
// 非公平模式使用的是TransferStack对象,是一个栈(LIFO)
// 注意这里的FIFO或LIFO指的是处理的线程而不是元素
// 比如有五个入队操作的线程,那么这五个线程处理的顺序将采用FIFO或LIFO的方式进行排序处理
transferer = fair ? new TransferQueue<E>() : new TransferStack<E>();
}
再来看看这里Transferer的定义:
abstract static class Transferer<E> {
// 这个方法有两个作用:
// 入队:使用transfer方法将要入队的元素存放到transfer对象中,此时:
// 参数1:要入队的元素(不能为null)
// 参数2: 是否允许超时,如果允许超时,那么第三个参数表示阻塞的超时时间,否则就永不超时,一直阻塞等待,此时第三个参数无意义
// 返回值:入队成功返回这次入队的元素(也就是e),失败返回null
//
// 出队:返回transfer对象中存放的元素
// 参数1:固定传null,用null表示是一个出队请求(不为null是一个入队请求且是要入队的元素)
// 参数2:是否允许超时,如果允许超时,那么第三个参数表示阻塞的超时时间,否则就永不超时,一直阻塞等待,此时第三个参数无意义
// 返回值:返回要出队的元素,失败返回null
abstract E transfer(E e, boolean timed, long nanos);
}
Transferer有TransferQueue
和TransferStack
两个具体的实现类(前面构造方法那里已经见到过了),这里我们先不看它们的具体实现,先来看看入队操作和出队操作。
入队:
public boolean offer(E e) {
// 不支持null
if (e == null) throw new NullPointerException();
// 将要入队的元素交给transferer处理,第二、三个参数说明这是一个非阻塞方法
return transferer.transfer(e, true, 0) != null;
}
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 不支持null
if (e == null) throw new NullPointerException();
// 将要入队的元素交给transferer处理,transfer方法最多阻塞等待的时间为unit.toNanos(timeout)
if (transferer.transfer(e, true, unit.toNanos(timeout)) != null)
// 如果成功则返回true
return true;
// 如果不是发生了线程中断,则返回false
if (!Thread.interrupted())
return false;
// 否则抛出中断异常
throw new InterruptedException();
}
public void put(E e) throws InterruptedException {
// 不支持null
if (e == null) throw new NullPointerException();
// 将要入队的元素交给transferer处理,transfer第二个参数为false表示永不超时,只要没处理就一直阻塞等待
if (transferer.transfer(e, false, 0) == null) {
// 如果失败则抛出中断异常
Thread.interrupted();
throw new InterruptedException();
}
}
出队:
public E poll() {
// 将transferer中的元素出队,二、三参数说明这是一个非阻塞方法
return transferer.transfer(null, true, 0);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 将transferer中的元素出队,最多阻塞等待的时间为unit.toNanos(timeout)
E e = transferer.transfer(null, true, unit.toNanos(timeout));
// 如果成功出队,则返回出队的元素,否则如果没有发生线程中断,则返回null
if (e != null || !Thread.interrupted())
return e;
// 否则抛出中断异常
throw new InterruptedException();
}
public E take() throws InterruptedException {
// 将transferer中的元素出队,第二个参数说明如果没有元素则无线等待
E e = transferer.transfer(null, false, 0);
// 如果成功出队,则返回出队的元素
if (e != null)
return e;
// 否则抛出中断异常
Thread.interrupted();
throw new InterruptedException();
}
可见,不管是入队还是出队,最终都是交给Transfer去处理的。
通过上面的代码我们发现一个问题,就是全程好像都没有使用到锁(transferer.transfer()
方法里面也没有用到锁,这个看后面)。没有使用锁SynchronousQueue又是这么实现线程安全的呐?这里就涉及到java的悲观锁和乐观锁两种锁概念了。
悲观锁:悲观的认为将要操作的数据一定会同时被其他线程操作从而发生冲突,于是在操作之前先要加上锁,防止同时被其他线程操作,比如前面的ReentrantLock就是一个悲观锁。
乐观锁:乐观的认为将要操作的数据不会同时被其他线程操作而发生冲突,所以操作前不需要加锁。当操作因冲突而失败的时候才用重试的方式来进行解决。所以乐观锁不是一种具体的锁,而是一种实现方式。在乐观锁中用到了CAS(Compare and Swap)机制。
CAS的作用是将比较和赋值两步操作合并成一步原子操作,具体过程为:比较指定内存位置的值是否是预期的值,如果是则将该位置的值替换为指定的新的值,否则就什么都不做,不管是否成功都返回旧的值。CAS的操作数涉及到三个:内存地址、期望值、新的值。
在java中
sun.misc.Unsafe
这个类实现了CAS的功能。通过下面这段伪代码来简单说明下它的用法:class SNode { ... volatile Object next; ... } // 通过Unsafe的静态方法getUnsafe来获取它的实例,注意这个方法我们不能直接使用 sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe(); // 获取SNode对象里next字段的内存地址,注意这里是相对地址(相对这个对象起始地址的偏移量) long NEXT = U.objectFieldOffset(SNode.class.getDeclaredField("next")); // 实例化SNode对象 SNode node = ...; // 期望的值 Object expectValue = ...; // 新的值 Object newValue = ...; // 查看node对象里地址的相对偏移量是NEXT位置的字段, // 如果该位置字段的值是expectValue,则将该字段的值替换为newValue U.compareAndSwapObject(node, NEXT, expectValue, newValue); // compareAndSwapObject方法的效果和下面这个if效果是一样的 // 区别是compareAndSwapObject的整个比较和替换的过程是原子的 if (node.next == expectValue) { node.next = newValue; } // 当然,除了compareAndSwapObject方法之外,Unsafe还有许多其他方法,这里就不一一介绍了。
接着我们来看看TransferQueue
的源码(TransferStack的逻辑和TransferQueue相似,只是在队列的处理顺序上不一样,这里就不再列出讲解了):
static final class TransferQueue<E> extends Transferer<E> {
// 它内部是使用一个链表来保存处理的线程
static final class QNode {
// 下一个节点的
volatile QNode next;
// 当前节点对应的线程
volatile Thread waiter;
// 当前线程需要执行的操作:isData为true表示入队,此时item存放的是入队的元素;否则表示出队,item为null
final boolean isData;
// 如果是入队线程则item为入队的元素,如果是出队线程则item为null
volatile Object item;
// 初始化Unsafe和item、next两个字段的内存地址偏移量ITEM、NEXT
private static final sun.misc.Unsafe U = sun.misc.Unsafe.getUnsafe();
private static final long ITEM;
private static final long NEXT;
static {
try {
ITEM = U.objectFieldOffset(QNode.class.getDeclaredField("item"));
NEXT = U.objectFieldOffset(QNode.class.getDeclaredField("next"));
} catch (ReflectiveOperationException e) {
throw new Error(e);
}
}
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// 以原子操作方式设置next字段的值(如果当前next指向的是cmp对象,则将next重新指向val对象),
// 成功返回true,否则返回false
boolean casNext(QNode cmp, QNode val) {
return next == cmp && U.compareAndSwapObject(this, NEXT, cmp, val);
}
// 以原子操作方式设置item字段的值(如果当前item指向的是cmp对象,则将next重新指向val对象),
// 成功返回true,否则返回false
boolean casItem(Object cmp, Object val) {
return item == cmp && U.compareAndSwapObject(this, ITEM, cmp, val);
}
// 以原子操作方式尝试将该对象设置为取消状态,
// 这里取消状态的定义是:当item指向当前对象本身时表示已取消
void tryCancel(Object cmp) {
U.compareAndSwapObject(this, ITEM, cmp, this);
}
// 当前是否是以取消状态
boolean isCancelled() {
return item == this;
}
}
// 以上就是QNode节点的定义
// 队列头部指针
transient volatile QNode head;
// 队列尾部指针
transient volatile QNode tail;
transient volatile QNode cleanMe;
TransferQueue() {
QNode h = new QNode(null, false); // initialize to dummy node.
head = h;
tail = h;
}
// 将head移动到nh节点上(如果当前head指向的是h节点的话),并且将h节点从链表中断开
// 作用就是取出链表头部的元素
void advanceHead(QNode h, QNode nh) {
if (h == head && U.compareAndSwapObject(this, HEAD, h, nh))
// 将h节点从链表中断开,断开的方式就是将其next指向自己
// 其他地方如果发现某个节点的next指向了自己,就表示这个节点已经被取出了队列
h.next = h; // forget old next
}
// 将tail移动到nt节点上(如果当前tail指向的是t节点的话)
void advanceTail(QNode t, QNode nt) {
if (tail == t)
U.compareAndSwapObject(this, TAIL, t, nt);
}
// 这个方法里没有使用悲观锁,而是采用乐观锁原理来实现的线程安全
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
// isDatae为当前线程的操作类型,如果e不为null表示当前为入队线程的入队,否则表示出队线程的出队操作
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // saw uninitialized value
continue;
// 见后面 --> 解释1
if (h == t || t.isData == isData) { // empty or same-mode
// 这里的主要逻辑就是将当前操作入队并阻塞等待被后面新来的操作唤醒
// 被唤醒后该操作就会从队列中移除
// 下面的代码可以分为四部分
// 第一部分:入队前的检查,看是否有其他线程在这期间修改过队列,如果队列被修改了则重新循环
QNode tn = t.next;
// 1.1 如果在这之前有其他线程修改了tail的值,则重新循环处理
if (t != tail) // inconsistent read
continue;
// 1.2 如果tn不为空,表示其他线程在对尾加入了一个节点,但是还没有将tail移动到对尾
if (tn != null) { // lagging tail
// 帮助将tail移动到对尾,并重新循环处理
advanceTail(t, tn);
continue;
}
// 如果已经处理超时,则返回null
if (timed && nanos <= 0L) // can't wait
return null;
// 第二部分:将当前操作添加到队列结尾
// 走到这里说明t就是最有一个节点,那么将当前操作线程添加到节点结尾(队尾)
if (s == null)
s = new QNode(e, isData);
// 将s添加到t的next上
if (!t.casNext(null, s)) // failed to link in
// 2.1 如果添加失败,则进入下次循环重试
continue;
// 2.2 如果添加成功,则将tail移动到这个新增的节点上
advanceTail(t, s); // swing tail and wait
// 第三部分:阻塞等待后来的操作将其唤醒,并将结果返回
// 如果能及时处理则不会进入阻塞等待状态
// 这里的返回值见awaitFulfill方法的注释
Object x = awaitFulfill(s, e, timed, nanos);
// 第四部分:唤醒后返回操作结果
// 检查返回值(有三种可能):
// 一:返回了一个null,表示本次是一个入队操作,正常
// 二:返回了一个E对象,表示本次是一个出队操作(取到了入队的元素),正常
// 三:返回了一个s对象,表示本次操作失败(由于发生了线程中断或超时而操作被取消了)
if (x == s) { // wait was cancelled
clean(t, s);
return null;
}
// 判断s节点是否已经从队列中移除了,如果还没有被移除,则调用advanceHead将其移除
if (!s.isOffList()) { // not already unlinked
advanceHead(t, s); // unlink if head
if (x != null) // and forget fields
s.item = s;
s.waiter = null;
}
// 返回处理结果,是出队且成功则返回一个E对象,是入队或操作失败则返回null
return (x != null) ? (E)x : e;
} else {
// else表示队列中有和当前操作对应的处理操作
// 比如当前是一个入队操作,而队尾的是一个出队操作在等待,那么整个队列里面必然全是出队操作,
// 因为不可能同时出现入队操作和出队操作在队列中,
// 所以此时只需要直接取队列头部的节点即可
// 检查队列是否被其他线程修改过,如果被修改了则重新循环
QNode m = h.next; // node to fulfill
if (t != tail || m == null || h != head)
continue; // inconsistent read
Object x = m.item;
// 如果m不可用则重新循环 --> 解释3
if (isData == (x != null) || // m already fulfilled
x == m || // m cancelled
!m.casItem(x, e)) { // lost CAS
advanceHead(h, m); // dequeue and retry
continue;
}
// 将找到的m节点从队列中移除
advanceHead(h, m); // successfully fulfilled
// 唤醒该节点对应的线程来处理当前的操作
LockSupport.unpark(m.waiter);
// 返回处理结果,是出队且成功则返回一个E对象,是入队或操作失败则返回null
return (x != null) ? (E)x : e;
}
}
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (head.next == s) ? (timed ? MAX_TIMED_SPINS : MAX_UNTIMED_SPINS) : 0;
for (;;) {
// 入多当前线程发生中断,则尝试将该节点取消掉
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
// e为s.item的初始值,x为s.item现在的值
// 如果它们不相等,说明该节点从队列中被取出用来执行后来的操作了,则返回行的数据
if (x != e)
// 这里的x有三种情况:
// 一:当前操作是一个出队操作,后来了一个入队操作,并将其入队的数据给了该节点,那么这里返回的就是入队的数据(一次正常的入队/出队)
// 二:当前操作是一个入队操作,后来了一个出队操作,并将该数据取走了(取走后这里会被设置为null),那么x就是null(一次正常的入队/出队)
// 三:当前线程发生了中断,操作被取消了(见上面的s.tryCancel(e),取消的操作就是将item指向s),那么这里返回的就是s
return x;
// 如果是一个可超时的阻塞,且已经超时,则将该操作取消,下次循环的时候会进if (x != e)返回退出
if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
// 不会立即进入阻塞等待,而是先循环最多固定次数后如果还没有被处理才进入阻塞状态
if (spins > 0)
// 如果还有循环次数可用,则将其减一并进入下一次循环
--spins;
else if (s.waiter == null)
// 如果waiter还没有被设置,则设置waiter后再循环一次
s.waiter = w;
else if (!timed)
// 如果是不可超时的阻塞,则一直阻塞等待被唤醒
// 对于LockSupport见 --> 解释2
LockSupport.park(this);
else if (nanos > SPIN_FOR_TIMEOUT_THRESHOLD)
// 如果是可超时的阻塞且还没有超时,则阻塞等待指定时间
LockSupport.parkNanos(this, nanos);
}
}
}
解释1:
这里主要是判断当前队列里是否有对应的处理操作。比如当前为入队操作,那么对应的操作就是出队操作。
- 有:则将其从队列中取出来执行;
- 没有:则将当前操作入队,并阻塞等待被后面来的对应的操作唤醒。
h == t
表示队列为空,t.isData == isData
表示对尾的操作和当前操作样(都是入队或都是出队)。这两种情况都说明当前队列中没有和当前操作对应的操作。队列中只会最多存在一种操作,即要么队列为空,要么只有入队操作,要么只有出队操作。
举个例子:比如当前队列中有一个出队操作在等待,当来了一个入队操作时,必然会将这个出队操作从队列中取出,用来执行这个入队操作。而不会将这个入队操作入队的。
解释2:
LockSupport的park()
和unpark()
方法(实际上调用的是Unsafe的park()
和unpark()
方法)有点象是ReentrantLock的lock()
和unlock()
。区别如下:
- 当对一个线程调用了
park()
后,该线程进入阻塞状态,当其他线程对这个线程调用unpark()
后唤醒这个线程,此时和ReentrantLock的lock()
和unlock()
效果样; -
unpark()
可以在park()
方法之前被调用。也就是说对一个线程可以先调用unpark()
,然后再调用park()
,此时这个线程实际上是不会阻塞的; - 当对一个线程多次调用
unpark()
方法后,只需要对其调用一次park()
即可唤醒这个线程,如果再次调用park()
则会进入阻塞状态;
解释3:
假设
isData
为true
,说明当前是一个入队操作,那么m
就是一个出队操作(反之亦然),则m.item == null
。而如果此时
m.item
的值(也就是x)不为null
,说明m
已经被其他入队操作执行了(x的类型是E)或则被取消了(x == m
),从而导致item被赋予了值。
m.casItem(x, e)
表示如果m.item
的值还是x
则将e
赋值给m.item
。如果发生上述情况,则将m移除队列,重新进入循环。
可以看出,主要逻辑都在Transferer里面,这里有些复杂,如果看不太懂的请多看几遍。
1.6 LinkedTransferQueue
前面提到的几个队列他们有个特点就是多线程操作是会加锁并阻塞,这样在操作频繁的时候就很耗时,有一个采使用CAS机制来实现的非阻塞队列ConcurrentLinkedQueue
(后面会介绍),提高了队列的吞吐量。大家还记得上一小节将到SynchronousQueue队列,它可以实现一进一出功能。
LinkedTransferQueue
可以认为就是ConcurrentLinkedQueue
和SynchronousQueue
的一个集合,具有他们的所有功能,并且进行了高效实现。
LinkedTransferQueue
是java1.7版本才添加的一个数据结构。它也是一个无界的FIFO队列。
LinkedTransferQueue实现了TransferQueue
接口(该接口继承至BlockingQueue
)。TransferQueue接口在BlockingQueue接口的基础上增加了几个方法:
public interface TransferQueue<E> extends BlockingQueue<E> {
// 若当前存在一个正在等待获出队的线程(使用take()或者poll()函数),则立即将元素交给这个线程出队;
// 若不存在,则返回false而不会进入队列等待,所以这是一个非阻塞操作
boolean tryTransfer(E e);
// 若当前存在一个正在等待出队的线程,则立即将元素交给这个线程出队;
// 否则将该元素插入到队列尾部,并且等待进入阻塞状态,
// 直到有出队线程取走该元素(返回true)或则达到了指定的等待时间(返回false同时移除该元素)。
boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 若当前存在一个正在等待出队的线程,则立即将元素交给这个线程出队;
// 否则将该元素插入到队列尾部,并且等待进入阻塞状态,直到有出队线程取走该元素。
void transfer(E e) throws InterruptedException;
// 判断是否存在出队线程
boolean hasWaitingConsumer();
// 获取所有等待出队的线程数量
int getWaitingConsumerCount();
}
来看看入队操作。先看BlockingQueue的几个方法:
public boolean add(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public void put(E e) {
xfer(e, true, ASYNC, 0);
}
public boolean offer(E e) {
xfer(e, true, ASYNC, 0);
return true;
}
public boolean offer(E e, long timeout, TimeUnit unit) {
xfer(e, true, ASYNC, 0);
return true;
}
因为LinkedTransferQueue是无界的,所以这几个方法不会阻塞(如果没有立即处理就将元素放到队列上),会直接返回true。
再来看TransferQueue的方法:
public boolean tryTransfer(E e) {
// 该方法也不会阻塞,如果没有立即处理就返回false表示失败(不会将e添加到队列中)
return xfer(e, true, NOW, 0) == null;
}
public boolean tryTransfer(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 如果没有立即处理就会阻塞等待(会将e添加到队列中),但最多等待unit.toNanos(timeout)时间
if (xfer(e, true, TIMED, unit.toNanos(timeout)) == null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
public void transfer(E e) throws InterruptedException {
// 如果没有立即处理就会阻塞等待(会将e添加到队列中),直到被处理
if (xfer(e, true, SYNC, 0) != null) {
Thread.interrupted(); // failure possible only due to interrupt
throw new InterruptedException();
}
}
再来看看出队操作:
public E poll() {
// 不会阻塞(如果没有立即处理不会将该操作放到队列上),直接返回成功或失败。
return xfer(null, false, NOW, 0);
}
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
// 如果没有立即处理会将该操作放到队列上并阻塞等待一段时间。
// 成功返回获取到的元素,失败返回null,如果发生线程中断则抛出InterruptedException异常。
E e = xfer(null, false, TIMED, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return e;
throw new InterruptedException();
}
public E take() throws InterruptedException {
// 如果没有立即处理会将该操作放到队列上并一直阻塞等待。
// 成功返回获取到的元素,否则抛出InterruptedException异常。
E e = xfer(null, false, SYNC, 0);
if (e != null)
return e;
Thread.interrupted();
throw new InterruptedException();
}
可以发现,入队和出队都是调用的xfer()
方法,那么我们再来看一下xfer方法的实现:
/**
* 参数1:如果是入队,则是入队的元素,如果是出队则为null
* 参数2:true表示入队操作,否则表示出队操作
* 参数3:有四个值:
* NOW:立即得到结果成功或失败,不会阻塞;
* ASYNC:异步处理,见数据放到内部队列上后就立即返回成功,不会阻塞;
* SYNC:异步处理,见数据放到内部队列上后进行阻塞等待处理结果;
* TIMED:异步处理,见数据放到内部队列上后进行阻塞等待处理结果,可以设置等待超时;
* 参数4:阻塞等待的时间,参数3是TIMED时才有用
*/
private E xfer(E e, boolean haveData, int how, long nanos) {
// 如果是入队操作,不允许null作为入队元素
if (haveData && (e == null))
throw new NullPointerException();
Node s = null; // the node to append, if needed
retry:
for (;;) { // restart on append race
// 查找并匹配第一个还没有匹配的节点
for (Node h = head, p = h; p != null;) { // find & match first node
boolean isData = p.isData;
Object item = p.item;
// 如果找到了没有匹配的节点
if (item != p && (item != null) == isData) { // unmatched
// 如果处理类型一样(都是入队或都是出队)则退出
if (isData == haveData) // can't match
break;
// 将e存放在p节点的item上
if (p.casItem(item, e)) { // match
for (Node q = p; q != h;) {
Node n = q.next; // update by 2 unless singleton
if (head == h && casHead(h, n == null ? q : n)) {
h.forgetNext();
break;
} // advance and retry
if ((h = head) == null ||
(q = h.next) == null || !q.isMatched())
break; // unless slack < 2
}
// 唤醒p节点对应的线程,来处理存放在该节点上的数据
LockSupport.unpark(p.waiter);
@SuppressWarnings("unchecked") E itemE = (E) item;
return itemE;
}
}
Node n = p.next;
p = (p != n) ? n : (h = head); // Use head if p offlist
}
// 如果上面没有找到匹配的对应的处理节点,则对不同how进行不同的处理
if (how != NOW) { // No matches available
// 如果处理方式不是NOW,则将当前线程添加到队列中
if (s == null)
s = new Node(e, haveData);
Node pred = tryAppend(s, haveData);
if (pred == null)
continue retry; // lost race vs opposite mode
// 如果不是异步处理,则阻塞等待
if (how != ASYNC)
return awaitMatch(s, pred, e, (how == TIMED), nanos);
}
return e; // not waiting
}
}
可以看出,LinkedTransferQueue是一个非常灵活的队列。
1.7 LinkedBlockingDeque
LinkedBlockingDeque
是基于BlockingDeque
接口实现的。BlockingDeque接口在BlockingQueue的基础上新增了下面四对个方法:
public interface BlockingDeque<E> extends BlockingQueue<E>, Deque<E> {
// 添加元素到队列头部,当队列满了的时候会阻塞等待一段时间,
// 成功返回true,失败返回false,线程发生中断抛出InterruptedException异常
boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 添加元素到队列尾部,当队列满了的时候会阻塞等待一段时间,
// 成功返回true,失败返回false,线程发生中断抛出InterruptedException异常
boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException;
// 添加元素到队列头部,当队列满了的时候会阻塞等待,线程发生中断抛出InterruptedException异常
void putFirst(E e) throws InterruptedException;
// 添加元素到队列尾部,当队列满了的时候会阻塞等待,线程发生中断抛出InterruptedException异常
void putLast(E e) throws InterruptedException;
// 从队列尾部获取数据,当队列为空的时候会阻塞等待一段时间,
// 成功返回获取到的数据,失败返回null,线程发生中断抛出InterruptedException异常
E pollFirst(long timeout, TimeUnit unit) throws InterruptedException;
// 从队列尾部获取数据,当队列为空的时候会阻塞等待一段时间,
// 成功返回获取到的数据,失败返回null,线程发生中断抛出InterruptedException异常
E pollLast(long timeout, TimeUnit unit) throws InterruptedException;
// 从队列头部获取数据,当队列为空的时候会阻塞等待,线程发生中断抛出InterruptedException异常
E takeFirst() throws InterruptedException;
// 从队列尾部获取数据,当队列为空的时候会阻塞等待,线程发生中断抛出InterruptedException异常
E takeLast() throws InterruptedException;
}
LinkedBlockingDeque
和LinkedBlockingQueue非常相似,这里简单说下上面这几个新增的方法就行了:
// 以阻塞方式入队元素e到队列头部,如果在指定时间内不能成功入队则返回失败
public boolean offerFirst(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 入队的元素不能为null
if (e == null) throw new NullPointerException();
// 将入队的元素e封装成一个node节点
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 尝试插入到队列头部,如果失败则阻塞等待
while (!linkFirst(node)) {
// 如果超时则返回失败
if (nanos <= 0L)
return false;
// 阻塞等待指定时间
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
// 以阻塞方式入队元素e到队列尾部,如果在指定时间内不能成功入队则返回失败
public boolean offerLast(E e, long timeout, TimeUnit unit) throws InterruptedException {
// 入队的元素不能为null
if (e == null) throw new NullPointerException();
// 将入队的元素e封装成一个node节点
Node<E> node = new Node<E>(e);
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
// 尝试插入到队列尾部,如果失败则阻塞等待
while (!linkLast(node)) {
// 如果超时则返回失败
if (nanos <= 0L)
return false;
// 阻塞等待指定时间
nanos = notFull.awaitNanos(nanos);
}
return true;
} finally {
lock.unlock();
}
}
// 以阻塞方式入队元素e到队列头部,如果不能成功入队则一直阻塞,直到能成功入队
public void putFirst(E e) throws InterruptedException {
// 入队的元素不能为null
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 尝试插入到队列头部,如果失败则阻塞等待
while (!linkFirst(node))
// 一直阻塞等待,直到被唤醒
notFull.await();
} finally {
lock.unlock();
}
}
// 以阻塞方式入队元素e到队列头部,如果不能成功入队则一直阻塞,直到能成功入队
public void putLast(E e) throws InterruptedException {
// 入队的元素不能为null
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 尝试插入到队列尾部,如果失败则阻塞等待
while (!linkLast(node))
// 一直阻塞等待,直到被唤醒
notFull.await();
} finally {
lock.unlock();
}
}
// 以阻塞方式出队队列头部元素,如果在指定时间内不能成功出队则返回null
public E pollFirst(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
// 尝试取出队列头部元素,如果失败则阻塞等待
while ( (x = unlinkFirst()) == null) {
// 如果等待超时则返回null
if (nanos <= 0L)
return null;
// 阻塞等待指定时间
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
// 以阻塞方式出队队列尾部元素,如果在指定时间内不能成功出队则返回null
public E pollLast(long timeout, TimeUnit unit) throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
E x;
// 尝试取出队列尾部元素,如果失败则阻塞等待
while ( (x = unlinkLast()) == null) {
// 如果等待超时则返回null
if (nanos <= 0L)
return null;
// 阻塞等待指定时间
nanos = notEmpty.awaitNanos(nanos);
}
return x;
} finally {
lock.unlock();
}
}
// 以阻塞方式出队队列头部元素,如果不能成功出队则一直等待,直到能成功出队
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 尝试取出队列头部元素,如果失败则阻塞等待
while ( (x = unlinkFirst()) == null)
// 一直阻塞等待,直到被唤醒
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
// 以阻塞方式出队队列尾部元素,如果不能成功出队则一直等待,直到能成功出队
public E takeLast() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
// 尝试取出队列尾部元素,如果失败则阻塞等待
while ( (x = unlinkLast()) == null)
// 一直阻塞等待,直到被唤醒
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
二、非阻塞队列
java.util.concurrent
包下的非阻塞队列有ConcurrentLinkedQueue
和ConcurrentLinkedDeque
两个。因为Deque只是在Queue的基础上增加了栈的操作API,其他没什么区别,所以这里就只拿ConcurrentLinkedQueue来讲解。
ConcurrentLinkedQueue是一个无界的线程安全队列,它采用了 wait-free 算法(采用的CAS机制)来实现。
入队操作:
public boolean offer(E e) {
// 将e封装成一个Node对象,e不能为null
final Node<E> newNode = newNode(Objects.requireNonNull(e));
// 无限循环,直到成功入队
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
// 检查p是否是尾节点,正常情况下p是尾节点,如果中途被其他线程执行了入队操作,那么p就不再是尾节点了,此时p.next也就是q就不等于null了
if (q == null) {
// p is last node
// 如果p还是尾节点,则设置p.next = newNode
// 如果失败表示p.next已经不是null(参考前面的CAS)说明有其他线程修改的p.next的值
if (casNext(p, null, newNode)) {
// Successful CAS is the linearization point
// for e to become an element of this queue,
// and for newNode to become "live".
if (p != t) // hop two nodes at a time
// 移动tail,让其重新指向对尾元素newNode
// 注意这里没有检查casTail的返回值,因为即便是移动失败也是没问题的,其他线程会帮助移动,即便没有移动也没关系,因为每次入队都是要找到尾节点的
casTail(t, newNode); // Failure is OK.
return true;
}
// 如果casNext设置失败,则进入下次循环来重新设置
// Lost CAS race to another thread; re-read next
}
else if (p == q)
// We have fallen off list. If tail is unchanged, it
// will also be off-list, in which case we need to
// jump to head, from which all live nodes are always
// reachable. Else the new tail is a better bet.
// 在节点出队的时候会将节点的next指向自己 -- 解释1
// 如果这个时候正好有一个线程将p出队了,那么p.next就会等于p
// 如果tail发生了改变,则将p和t重新指向tail
// 如果tail没有发生改变,因为tail指向的节点也就是当前p指向的节点已经出队,所以tail不能再用(无法再通过它找到尾节点了),
// 这将p指向head,从head位置重新开始遍历找到尾节点
p = (t != (t = tail)) ? t : head;
else
// Check for tail updates after two hops.
//
p = (p != t && t != (t = tail)) ? t : q;
}
}
出队操作:
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
// 如果下面的if条件成立,表示p节点有数据且被成功取出
// 否则表示p节点的数据已经被其他线程取走了,这里还能获取到p节点是因为那个线程还没来得及将p节点移除
if (item != null && casItem(p, item, null)) {
// Successful CAS is the linearization point
// for item to be removed from this queue.
if (p != h) // hop two nodes at a time
// 将head后移一位,指向下一个节点
updateHead(h, ((q = p.next) != null) ? q : p);
// 返回取到的数据
return item;
}
// p.next为null表示p还没有被其它线程从队列中移除,且没有后续节点了,也就是队列为空了
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
// p == q表示该p节点已经被其它线程从队列中移除了,则需要重新从head开始查找
else if (p == q)
continue restartFromHead;
else
// p还没有被其它线程从队列中移除,且还有后续节点,则将p指向p的下一个节点(p.next)
p = q;
}
}
}
解释1:
在出队方法里,如果一个节点成功出队,则会调用updateHead(node1, node2)
来更新head,其中node1是将要出队的节点,node2是重新作为head的节点。来看下源码:
final void updateHead(Node<E> h, Node<E> p) {
// assert h != null && p != null && (h == p || h.item == null);
if (h != p && casHead(h, p))
// casHead用来将head指向p,如果成功,则调用lazySetNext方法,注意它的两个参数都是h
// 看下面的lazySetNext方法可见,就是将参数1的next设置为参数2,即:h.next = h
// 所以,出队后的节点它的next都是指向自己的
lazySetNext(h, h);
}
static <E> void lazySetNext(Node<E> node, Node<E> val) {
U.putOrderedObject(node, NEXT, val);
}
总结
一、特点
功能 | 内部存储 | 容量 | 锁 | 出队规则 | |
---|---|---|---|---|---|
ArrayBlockingQueue | 队列 | 数组 | 有界,需要指定大小 | 悲观锁(一把) | FIFO |
LinkedBlockingQueue | 队列 | 单链表 | 有界,默认为Integer.MAX_VALUE | 悲观锁(二把) | FIFO |
LinkedBlockingDeque | 队列+栈 | 双向链表 | 有界,默认为Integer.MAX_VALUE | 悲观锁(一把) | FIFO+LIFO |
PriorityBlockingQueue | 队列 | 数组 | 无界,默认11 | 悲观锁(一把) | 自定义 |
DelayQueue | 队列 | PriorityQueue | 无界,默认11 | 悲观锁(一把) | 自定义 |
SynchronousQueue | 队列 | – | 最多一个元素 | 乐观锁CAS | 一进一出 |
LinkedTransferQueue | 队列 | 单链表 | 无界 | 乐观锁CAS | 一进一出+FIFO |
ConcurrentLinkedQueue | 队列 | 单链表 | 无界 | 乐观锁CAS | FIFO |
ConcurrentLinkedDeque | 队列+栈 | 双向链表 | 无界 | 乐观锁CAS | FIFO+LIFO |
二、方法阻塞情况
入队操作
add(e) | offer(e) | offer(e, timeout, unit) | put(e) | tryTransfer(e) | tryTransfer(e, timeout, unit) | transfer(e) | |
---|---|---|---|---|---|---|---|
ArrayBlockingQueue | 否 | 否 | 是 | 是 | – | – | – |
LinkedBlockingQueue | 否 | 否 | 是 | 是 | – | – | – |
PriorityBlockingQueue | 否 | 否 | 否 | 否 | – | – | – |
DelayQueue | 否 | 否 | 否 | 否 | – | – | – |
SynchronousQueue | 是 | 是 | 是 | 是 | – | – | – |
LinkedTransferQueue | 否 | 否 | 否 | 否 | 否 | 是 | 是 |
LinkedBlockingDeque | 否 | 否 | 是 | 是 | – | – | – |
ConcurrentLinkedQueue | 否 | 否 | 否 | 否 | – | – | – |
ConcurrentLinkedDeque | 否 | 否 | 否 | 否 | – | – | – |
出队操作
remove() | poll() | poll(e, timeout, unit) | take() | |
---|---|---|---|---|
ArrayBlockingQueue | 否 | 否 | 是 | 是 |
LinkedBlockingQueue | 否 | 否 | 是 | 是 |
PriorityBlockingQueue | 否 | 否 | 是 | 是 |
DelayQueue | 否 | 否 | 是 | 是 |
SynchronousQueue | 是 | 是 | 是 | 是 |
LinkedTransferQueue | 否 | 否 | 是 | 是 |
LinkedBlockingDeque | 否 | 否 | 是 | 是 |
ConcurrentLinkedQueue | 否 | 否 | 否 | 否 |
ConcurrentLinkedDeque | 否 | 否 | 否 | 否 |
是:表示该方法会阻塞;否:表示该方法不会阻塞。