await – signal – signalAll
以下代码,分别展示了wait/notify, 和Condition的await/signal的用法
Object o = new Object();
synchronized(o) //线程1
{
...
o.wait(); //内部,会先释放锁。被其他线程notify之后,会再次拿锁。
...
}
synchronized(o) //线程2
{
...
o.notify();
...
}
ReentrantLock l = new ReentrantLock();
Condition c1 = l.newCondition();
Condition c2 = l.newCondition();
l.lock; //线程1
try
{
...
c1.await(); //内部,会先释放锁。被其他线程signal之后,会再次拿锁。
...
c2.signal();
...
}finally
{
l.unlock();
}
l.lock; //线程2
try
{
...
c2.await(); //内部,会先释放锁。被其他线程notify之后,会再次拿锁。
...
c1.signal();
...
}finally
{
l.unlock();
}
通过以上代码,明确3点:
(1)Condition必须与锁协同使用:对应synchronized来说,wait()的object必须是synchronized对应的同步对象;对应ReentrantLock来说,Condition是通过ReentrantLock.newCondition()得到的。
(2)wait()/await()的时候,会先释放锁,然后进入阻塞,然后被notify/signal唤醒之后,会再去拿锁!也就是其内部有3个环节:
//释放锁
//进入阻塞
//被唤醒,拿锁,执行后续代码
(3)await/signal在使用上,比wait/notify更加灵活:
wait/notify只能附属在一个条件上,所有的阻塞线程都在这1个条件上;
而Lock可以创建多个condition,每个condition都有wait/notify,每个condition都有一个自己的阻塞线程队列。
后面所讲的BlockingQueue,将很好的展示condition的这个优点。
Condition源码分析
从上面的第(2)条可以看出,await()的时候,线程要进入阻塞。所以每个Condition内部,都维护了一个链表,或者说队列,存储所有阻塞在这个条件上的线程。
以下代码,展示了Condition的内部结构:
//ConditionObject是AQS的一个内部类,实现了Condition接口
public class ConditionObject implements Condition, java.io.Serializable {
。。。
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter; //阻塞队列
。。。
下面看一下Condition.await()的源码:
//ReentrantLock
public Condition newCondition() {
return sync.newCondition();
}
//ReentrantLock的Sync内部类
final ConditionObject newCondition() {
return new ConditionObject();
}
//AQS的ConditionObject内部类
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException(); //有人置了中段标志位,先响应中断
Node node = addConditionWaiter(); //把线程加入该condition的阻塞队列
int savedState = fullyRelease(node); //释放锁
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); //开始阻塞
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) //被中断唤醒,跳出阻塞
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE) //被唤醒之后,重新拿锁
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
由上述代码可以看出,await()是会响应中断的。下面看一下屏蔽中断的await(),即awaitUninterruptibly()
public final void awaitUninterruptibly() {
Node node = addConditionWaiter(); //加入阻塞队列
int savedState = fullyRelease(node); //释放锁
boolean interrupted = false;
while (!isOnSyncQueue(node)) {
LockSupport.park(this); //进入阻塞
if (Thread.interrupted()) //被中断唤醒,没有break,继续循环, 再次进入阻塞
interrupted = true;
}
if (acquireQueued(node, savedState) || interrupted) //阻塞出来,拿锁
selfInterrupt(); //此时,再响应中断
}
下面看一下signal()的源码:
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first); //唤醒队列里面第1个
}
private void doSignal(Node first) {
do {
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
final boolean transferForSignal(Node node) {
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread); //唤醒
return true;
}
关键点:无论await(), 还是signal(),都是在拿到锁之后执行的,所以其内部的入队/出队,都不需要加锁!
ArrayBlockingQueue
通常的Queue,一边是生产者,一边是消费者。一边进,一边出,有一个判空函数,一个判满函数。
而所谓的BlockingQueue,就是指当为空的时候,阻塞消费者线程;当为满的时候,阻塞生产者线程。
以下是ArrayBlockingQueue的核心结构:
public class ArrayBlockingQueue<E> extends AbstractQueue<E>
implements BlockingQueue<E>, java.io.Serializable {
。。。
/** The queued items */
final Object[] items;
/** items index for next take, poll, peek or remove */
int takeIndex;
/** items index for next put, offer, or add */
int putIndex;
/** Number of elements in the queue */
int count;
//其核心就是1把锁 + 2个条件
final ReentrantLock lock;
private final Condition notEmpty;
private final Condition notFull;
。。。
}
以下为其主要的构造函数:
public ArrayBlockingQueue(int capacity, boolean fair) {
if (capacity <= 0)
throw new IllegalArgumentException();
this.items = new Object[capacity];
lock = new ReentrantLock(fair);
notEmpty = lock.newCondition();
notFull = lock.newCondition();
}
以下为其put()/take()源代码
public void put(E e) throws InterruptedException {
checkNotNull(e);
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == items.length)
notFull.await(); //put的时候,队列满了,阻塞
insert(e);
} finally {
lock.unlock();
}
}
private void insert(E x) {
items[putIndex] = x;
putIndex = inc(putIndex);
++count;
notEmpty.signal(); //put进去之后,通知非空条件
}
public E take() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lockInterruptibly();
try {
while (count == 0)
notEmpty.await(); //take的时候,队列为空,阻塞
return extract();
} finally {
lock.unlock();
}
}
private E extract() {
final Object[] items = this.items;
E x = this.<E>cast(items[takeIndex]);
items[takeIndex] = null;
takeIndex = inc(takeIndex);
--count;
notFull.signal(); //take完了,通知非满条件
return x;
}
顺便说一句:上述2个函数,都是响应中断,并且阻塞的。
另外还有不响应中断的,不阻塞的成员函数,在此就不再详述了。
LinkedBlockingQueue
LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为头和尾是2个指针分开操作的,所以用了2把锁 + 2个条件,同时一个AtomicInteger的原子变量记录count数。
private final AtomicInteger count = new AtomicInteger(0);
/** Head of linked list */
private transient Node<E> head;
/** Tail of linked list */
private transient Node<E> last;
/** Lock held by take, poll, etc */
private final ReentrantLock takeLock = new ReentrantLock();
/** Wait queue for waiting takes */
private final Condition notEmpty = takeLock.newCondition();
/** Lock held by put, offer, etc */
private final ReentrantLock putLock = new ReentrantLock();
/** Wait queue for waiting puts */
private final Condition notFull = putLock.newCondition();
LinkedBlockingDeque
其原理和ArrayBlockingQueue是一样的,也是1把锁 + 2个条件。只是其数据结构不是数组,而是一个双向链表。
有一个小细节:链表不是无限长吗,怎么会满呢?这里是人为设置了一个最大长度:
public LinkedBlockingDeque() {
this(Integer.MAX_VALUE); //最大长度是整数的最大值
}
下面是其主要结构:
public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable {
...
//双向链表的Node
static final class Node<E> {
E item;
Node<E> prev;
Node<E> next;
Node(E x) {
item = x;
}
}
//队列的头,尾
transient Node<E> first;
transient Node<E> last;
private transient int count;
private final int capacity;
//1把锁 + 2个条件
final ReentrantLock lock = new ReentrantLock();
private final Condition notEmpty = lock.newCondition();
private final Condition notFull = lock.newCondition();
。。。
}
下面是其put/take函数,其原理和ArrayBlockQueue的put/take类似:
public void put(E e) throws InterruptedException {
putLast(e);
}
public void putLast(E e) throws InterruptedException {
if (e == null) throw new NullPointerException();
Node<E> node = new Node<E>(e);
final ReentrantLock lock = this.lock;
lock.lock();
try {
while (!linkLast(node))
notFull.await();
} finally {
lock.unlock();
}
}
private boolean linkLast(Node<E> node) {
// assert lock.isHeldByCurrentThread();
if (count >= capacity)
return false;
Node<E> l = last;
node.prev = l;
last = node;
if (first == null)
first = node;
else
l.next = node;
++count;
notEmpty.signal();
return true;
}
public E take() throws InterruptedException {
return takeFirst();
}
public E takeFirst() throws InterruptedException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
E x;
while ( (x = unlinkFirst()) == null)
notEmpty.await();
return x;
} finally {
lock.unlock();
}
}
private E unlinkFirst() {
// assert lock.isHeldByCurrentThread();
Node<E> f = first;
if (f == null)
return null;
Node<E> n = f.next;
E item = f.item;
f.item = null;
f.next = f; // help GC
first = n;
if (n == null)
last = null;
else
n.prev = null;
--count;
notFull.signal();
return item;
}
PriorityBlockingQueue
和上面的BlockingQueue有2个区别:
(1)是无界的,所以只有notEmpty一个条件。put不会阻塞,只有take会阻塞
(2)通过2叉堆,实现Priority
public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
...
private transient Object[] queue; //2叉堆实现
//1把锁 + 1个条件
private final ReentrantLock lock;
private final Condition notEmpty;
...
}
SynchronousQueue
SynchronousQueue是一种特殊队列,内部不是用Lock + Condition实现的。后续会单独用一篇专门阐述。