阻塞队列继承结构说明:
根据继承结构可知:
1.Array 和 Linked 实现了统一的继承结构,不同的地方在于内部实现不用阻塞机制和队列数据存放结构
2.它们都实现了Iterable 接口,表示它们都是可迭代的,即实现了iterator() 方法,iterator()方法会返回一个Iterator ,Iterator为迭代器 其中主要的方法为next hasNext 等, 那么队列中对迭代器的实现使用内部类实现,在ArrayBlockingQueue 中即为Itr 这个私有的内部类,我们发现在每次调用iterator() 方法时都会生成一个Itr , 在ArrayBlockingQueue 中还有一个Itrs内部类 Itrs使用单向链表的方式维护生成的多个迭代器,迭代器的实现原理会在其它的博客中进行剖析,这里就不多说了
3. ArrayBlockingQueue与ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象(lock)。ReentrantLock是可重入的互斥锁,ArrayBlockingQueue就是根据该互斥锁实现“多线程对竞争资源的互斥访问”。而且,ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。
主要方法:
要想了解这两个阻塞队列,先分析一下队列提供的主要方法 BlockingQueue
// 将指定元素插入到队列尾部 如果成功返回true
// 如果队列已满 抛出IllegalStateException 其内部调用的offer
boolean add(E e);
//将指定元素插入队列尾部,如果成功返回true 如果队列已满返回false
boolean offer(E e);
//将指定元素插入队列尾部,如果队列已满进行阻塞 直到notFull条件唤醒
void put(E e) throws InterruptedException;
//将指定元素插入队列尾部,如果队列已满则等待指定时间
boolean offer(E e, long timeout, TimeUnit unit)
throws InterruptedException;
//移除并获取队列头部元素,如果队列为空则阻塞 直到notEmpty条件唤醒
E take() throws InterruptedException;
//移除并获取队列头部元素,如果队列为空则等待指定时间
E poll(long timeout, TimeUnit unit)
throws InterruptedException;
//队列剩余可使用容量 items.length - count
int remainingCapacity();
//移除队列中指定元素 改方法调用removeAt 两个方法之后会详细说明,因为remove移除的可能不是头部元素所以需要队列中元素重新排列和修改putIndex值
boolean remove(Object o);
//是否包含指定元素
public boolean contains(Object o);
//移除队列中所有的可用元素,并添加到Collection中
int drainTo(Collection<? super E> c);
//移除指定个数的可用元素,并添加到Collection中,从takeIndex开始
int drainTo(Collection<? super E> c, int maxElements);
//移除并获取队列头部元素,如果为空则抛NoSuchElementException 内部调用poll
E remove();
//移除并获取队列头元素,如果为空则返回null
E poll();
//只获取不移除队列头部元素,如果为空则抛NoSuchElementException 内部调用peek
E element();
//只获取不移除队列头部元素,如果队列为空则返回null 内部调用itemAt
E peek();
ArrayBlockingQueue 构造方法和成员表量说明:
//存放队列元素的数组
final Object[] items;
//下次take, poll, peek or remove 元素的位置
int takeIndex;
//下次 put, offer, or add 元素的位置
int putIndex;
//队列元素的数量
int count;
//进行队列同步操作的重入锁
final ReentrantLock lock;
//不为空条件 当调用take队列为空时进行wait
private final Condition notEmpty;
//不满条件 当调用put队列已满时进行wait
private final Condition notFull;
//维护队列迭代器 Itrs 内部使用单向链表维护多个迭代器
transient Itrs itrs = null;
提供三个构造方法:
//指定数组容量
public ArrayBlockingQueue(int capacity) {
//默认ReentrantLock 为非公平模式
this(capacity, false);
}
//指定数组容量并设置ReentrantLock 公平非公平模式
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
//除了上面两个设置,还设置队列初始元素
public ArrayBlockingQueue(int capacity, boolean fair,
Collection<? extends E> c) {
this(capacity, fair);
final ReentrantLock lock = this.lock;
lock.lock(); // Lock only for visibility, not mutual exclusion
try {
int i = 0;
try {
for (E e : c) {
checkNotNull(e);
items[i++] = e;
}
} catch (ArrayIndexOutOfBoundsException ex) {
throw new IllegalArgumentException();
}
count = i;
putIndex = (i == capacity) ? 0 : i;
} finally {
lock.unlock();
}
}
ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。
非阻塞方法我们先不说明,先详细剖析一下两个阻塞方法put take
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列已满则进行阻塞等待 即添加到notFull Condition的等待队列中
while (count == items.length)
notFull.await();
//如果没满则直接入队
enqueue(e);
} finally {
lock.unlock();
}
}
private void enqueue(E x) {
// assert lock.getHoldCount() == 1;
// assert items[putIndex] == null;
final Object[] items = this.items;
items[putIndex] = x;
//索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
if (++putIndex == items.length)
putIndex = 0;
count++;
//有元素入队之后唤醒阻塞在notEmpty Condition 上的take操作
notEmpty.signal();
}
put方法是一个阻塞的方法,如果队列元素已满,那么当前线程将会被notFull条件对象挂起加到等待队列中,直到队列有空档才会唤醒执行添加操作。但如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。说白了就是当队列满时通过条件对象Condtion来阻塞当前调用put方法的线程,直到线程又再次被唤醒执行。总得来说添加线程的执行存在以下两种情况,一是,队列已满,那么新到来的put线程将添加到notFull的条件队列中等待,二是,有移除线程执行移除操作,移除成功同时唤醒put线程,如下图所示
上图为队列已满进行put操作的示意图
当有线程进行take操作取出元素,示意图如下:
接下来我们看下take操作,take操作在队列为空的时候阻塞,当有put操作时唤醒notEmpty Condition
我们先看下源码:
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
//如果队列为空进行等待
while (count == 0)
notEmpty.await();
return dequeue();
} finally {
lock.unlock();
}
}
private E dequeue() {
// assert lock.getHoldCount() == 1;
// assert items[takeIndex] != null;
final Object[] items = this.items;
@SuppressWarnings("unchecked")
E x = (E) items[takeIndex];
items[takeIndex] = null;
//取值索引到头之后重置为0
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
itrs.elementDequeued();
//唤醒等待在notFull上的put操作
notFull.signal();
return x;
}
从代码上看逻辑还是很简单的, 这里就不画示意图了, 逻辑跟put相反
到这里主要的方法基本上已经都说明了, 但是最后还要说一下remove方法,这个方法的逻辑稍微有点复杂,现在我们剖析一下remove方法
public boolean remove(Object o) {
if (o == null) return false;
final Object[] items = this.items;
final ReentrantLock lock = this.lock;
lock.lock();
try {
//不为空才删除,为空则直接返回false
if (count > 0) {
final int putIndex = this.putIndex;
int i = takeIndex;
//循环队列中的可用元素,如果有指定remove的对象则调用removeAt
do {
if (o.equals(items[i])) {
removeAt(i);
return true;
}
if (++i == items.length)
i = 0;
} while (i != putIndex);
}
return false;
} finally {
lock.unlock();
}
}
void removeAt(final int removeIndex) {
// assert lock.getHoldCount() == 1;
// assert items[removeIndex] != null;
// assert removeIndex >= 0 && removeIndex < items.length;
final Object[] items = this.items;
//如果要移除的元素位置正好是takeIndex 即在上面的remove循环中第一次就命中
//这是一种特殊情况,其实就相当于take操作了
if (removeIndex == takeIndex) {
// removing front item; just advance
items[takeIndex] = null;
if (++takeIndex == items.length)
takeIndex = 0;
count--;
if (itrs != null)
//重新设置迭代器元素
itrs.elementDequeued();
//如果移除的并非takeIndex元素
} else {
// an "interior" remove
// slide over all others up through putIndex.
final int putIndex = this.putIndex;
//此处循环正是移除的逻辑,根据removeIndex 把removeIndex 后面的可用元素统一向前
//挪动 然后把putIndex之前的值设置为null 重新设置putIndex
for (int i = removeIndex;;) {
int next = i + 1;
if (next == items.length)
next = 0;
if (next != putIndex) {
items[i] = items[next];
i = next;
} else {
items[i] = null;
this.putIndex = i;
break;
}
}
count--;
if (itrs != null)
//对应更新迭代器数据
itrs.removedAt(removeIndex);
}
//唤醒notFull put 操作
notFull.signal();
}
只看代码对于移除可能还不是很清除,那我们就画下图理解一下:
到此位置ArrayBlockingQueue的实现原理基本上算是说完了,但是关于迭代器的实现,还在在并发操作中迭代器原理,还有在移除时迭代器是如何整理数据的, 这个之后单独说明