《Java源码分析》:Condition
如下这篇博文讲解的Condition真心将的好,自己结合了源码也基本上对Condition有了一个大致的了解。写篇博客记录下。
参考博客地址如下:http://ifeve.com/understand-condition/。
API文档中对Condtion类列出的方法的说明如下
1、 void await()
造成当前线程在接到信号或被中断之前一直处于等待状态。
2、 boolean await(long time, TimeUnit unit)
造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
3、long awaitNanos(long nanosTimeout)
造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
4、 void awaitUninterruptibly()
造成当前线程在接到信号之前一直处于等待状态。
5、boolean awaitUntil(Date deadline)
造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
6、 void signal() 唤醒一个等待线程。
7、 void signalAll() 唤醒所有等待线程。
下面以一个例子来开始介绍。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ConditionDemo {
private static Lock lock = new ReentrantLock();
private static Condition condition = lock.newCondition();
public static void main(String[] args) {
Thread thread1 = new Thread(new Runnable(){
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName()+"正在运行。。。。");
try {
Thread.sleep(2000);
System.out.println(Thread.currentThread().getName()+"停止运行,等待一个signal");
condition.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"获得一个signal,继续执行");
lock.unlock();
}
},"waitThread");
thread1.start();
try {
Thread.sleep(1000);//保证线程1先执行,否则线程1将一直等待signal信号
} catch (InterruptedException e1) {
e1.printStackTrace();
}
Thread thread2 = new Thread(new Runnable(){
@Override
public void run() {
lock.lock();
System.out.println(Thread.currentThread().getName()+"正在运行。。。。");
condition.signal();//发送信号,唤醒其它线程
System.out.println(Thread.currentThread().getName()+"发送一个signal");
System.out.println(Thread.currentThread().getName()+"发送一个signal后,结束");
lock.unlock();
}
},"signalThread");
thread2.start();
}
}
运行结果:
waitThread正在运行。。。。
waitThread停止运行,等待一个signal
signalThread正在运行。。。。
signalThread发送一个signal
signalThread发送一个signal后,结束
waitThread获得一个signal,继续执行
Condition的执行方式是这样的:
1、当Thread1拿到锁之后,开始执行,当调用condition.await()方法之后,thread1开始睡眠并释放锁
2、thread1开始睡眠并释放锁之后,thread2拿到锁,拿到锁之后开始运行,并调用condition.signal()发射一个信号来唤醒正在等待此条件condition的线程。发射信号之后thread2会继续执行,执行完毕后thread2释放锁。
3、当thread2释放锁之后,thread1拿到锁开始继续运行直至结束。
从上面的可以看出:Condition是一个多线程协调通信的一个工具类。使得某个或者某些线程一起等待某个条件(Condition),只有当该条件具备( signal 或者 signalAll方法被带调用)时 ,这些等待线程才会被唤醒,从而重新争夺锁。
看了上面的例子,你可能会有这样的疑问:当thread1拿到锁之后开始工作,然后调用condition.await()方法开始睡眠等待信号的达到。但是没有看见此线程释放锁呀,当thread2发出signal信号且释放锁之后也没有看见它重新获取锁呀??
有这样的困惑就太对了,这样才能促进我们思考嘛,是吧。
我们都知道,ReentrantLock是独占锁,一个线程拿到锁后如果不释放,那么另外一个线程肯定是拿不到锁,所以在lock.lock()和lock.unlock()之间可能有一次释放锁的操作(同样也必然还有一次获取锁的操作)。我们再回头看代码,thread1在进入lock.lock()后唯一可能释放锁的操作就是await()了。也就是说await()操作实际上就是释放锁,然后挂起线程,一旦条件满足就被唤醒,再次获取锁!
以上只是我们的猜测,下面我们就从源码的角度来分析到底await方法是如何进行了:释放锁,然后挂起线程,一旦条件满足就被唤醒以及再次获取锁等操作。
ReentrantLock类中的newCondition方法的代码如下:
public Condition newCondition() {
return sync.newCondition();
}
此方法张直接调用了AbstractQueuedSynchronizer的实现类Sync中的newCondition()方法。
Sync类中的newCondition()方法的代码如下:
final ConditionObject newCondition() {
return new ConditionObject();
}
直接new 了一个 ConditionObject类的对象。ConditionObject类是Condition的实现类,ConditionObject是AbstractQueuedSynchronizer同步器中的一个内部类。
因此,在前面的例子中当调用condition.await方法时,就是调用的ConditionObject类中的await()方法。
下面就开始分析这个await方法的内部实现。
分析await()方法的实现
源码如下:
public final void await() throws InterruptedException {
if (Thread.interrupted())//判断当前线程是否被中断
throw new InterruptedException();
//将当前线程作为内容构造的节点node放入到条件队列中并返回此节点
Node node = addConditionWaiter();
//释放当前线程所拥有的锁,返回值为AQS的状态位(即此时有几个线程拥有锁(考虑ReentrantLock的重入))。
int savedState = fullyRelease(node);
int interruptMode = 0;
/* 检测此节点是否在同步队列上,如果不在,说明此线程还没有资格竞争锁,此线程就继续挂起睡觉。 直到检测到此节点在同步队列上(在上面时候加入的呢?在有线程发出signal信号的时候), */
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
//并检测此线程有没有被中断
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//此线程尝试的获取锁,可参考博文ReentrantLock.lock方法分析。
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
//清理下条件队列中的不是在等待条件的节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//报告异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
上面的注释的比较详细,方便大家对照的源码分析。
await方法的大概思想为:首先将此代表该当前线程的节点加入到条件队列中去,然后释放该线程所有的锁并开始睡眠,最后不停的检测AQS队列中是否出现了此线程节点。如果收到signal信号之后就会在AQS队列中检测到,检测到之后,说明此线程又参与了竞争锁。
关于await方法中调用的其它方法的源码分析如下,这些方法都有一定的注释,也比较简单,就不仔细分析了
/** * Adds a new waiter to wait queue. * @return its new wait node */
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
/* CONDITION,值为-2,表示当前节点在等待condition,也就是在condition队列中; 如果此节点的状态不是CONDITION,则需要将此节点在条件队列中移除 */
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;//获取最后一个在等待的节点
}
//将此线程作为内容构造一个节点加入到条件队列末尾。
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
/** * Invokes release with current state value; returns saved state. * Cancels node and throws exception on failure. * @param node the condition node for this wait * @return previous sync state */
/* 函数功能:释放锁, 如果失败,则抛异常并将此节点的类型设置为:CANCELLED,为之后从条件队列中移除此节点。 */
final int fullyRelease(Node node) {
boolean failed = true;
try {
int savedState = getState();
if (release(savedState)) {//释放锁
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
// Internal support methods for Conditions
/** * Returns true if a node, always one that was initially placed on * a condition queue, is now waiting to reacquire on sync queue. *翻译:如果一个节点刚开始在条件队列上,现在在同步队列上获取锁则返回true */
final boolean isOnSyncQueue(Node node) {
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
return findNodeFromTail(node);
}
/* 函数功能:将waitStatus不是CONDITION的节点全部删除。 */
private void unlinkCancelledWaiters() {
Node t = firstWaiter;
Node trail = null;
while (t != null) {
Node next = t.nextWaiter;
if (t.waitStatus != Node.CONDITION) {
t.nextWaiter = null;
if (trail == null)
firstWaiter = next;
else
trail.nextWaiter = next;
if (next == null)
lastWaiter = trail;
}
else
trail = t;
t = next;
}
}
回到上面的demo,锁被释放后,线程1开始沉睡,这个时候线程因为线程1沉睡时调用fullyRelease方法释放锁,接着会唤醒AQS队列中的头结点,所以线程2会开始竞争锁,并获取到,开始工作,线程2会调用signal方法,“发出”signal信号。
开始分析signal()方法
public final void signal() {
if (!isHeldExclusively())//检测当前线程是否为拥有锁的独占线程
throw new IllegalMonitorStateException();
/* firstWaiter为condition自己维护的一个链表的头结点, 取出第一个节点后开始唤醒操作 */
Node first = firstWaiter;
if (first != null)
doSignal(first);//开始唤醒
}
说明下,其实Condition内部维护了等待队列的头结点和尾节点,该队列的作用是存放等待signal信号的线程,该线程被封装为Node节点后存放于此。
下面为ConditionObject类中维护等待队列的头结点和尾节点的声明。
public class ConditionObject implements Condition, java.io.Serializable {
private static final long serialVersionUID = 1173984872572414699L;
/** First node of condition queue. */
private transient Node firstWaiter;
/** Last node of condition queue. */
private transient Node lastWaiter;
这里又出现了一个条件队列,可能我们就有点晕了,了解AbstractQueuedSynchronizer同步器的都知道,这个类中还维护着一个队列,AQS自己维护的队列是当前等待资源(这里的资源就是锁)的队列,AQS会在资源被释放后,依次唤醒队列中从前到后的所有节点,使他们对应的线程恢复执行。直到队列为空。
而Condition自己也维护了一个队列,该队列的作用是维护一个等待signal信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个,流程是这样的:
用上面的Demo的两个线程来描述
1、首先,线程1调用lock.lock()时,由于此时锁并没有被其它线程占用,因此线程1直接获得锁并不会进入AQS同步队列中进行等待。
2、在线程1执行期间,线程2调用lock.lock()时由于锁已经被线程1占用,因此,线程2进入AQS同步队列中进行等待。
3、在线程1中执行condition.await()方法后,线程1释放锁并进入条件队列Condition中等待signal信号的到来。
4、线程2,因为线程1释放锁的关系,会唤醒AQS队列中的头结点,所以线程2会获取到锁。
5、线程2调用signal方法,这个时候Condition的等待队列中只有线程1一个节点,于是它被取出来,并被加入到AQS的等待队列中。注意,这个时候,线程1 并没有被唤醒。只是加入到了AQS等待队列中去了
6、待线程2执行完成之后并调用lock.unlock()释放锁之后,会唤醒此时在AQS队列中的头结点.所以线程1开始争夺锁(由于此时只有线程1在AQS队列中,因此没人与其争夺),如果获得锁继续执行。
- 直到线程1释放锁整个过程执行完毕。
可以看到,整个协作过程是靠结点在AQS的等待队列和Condition的等待队列中来回移动实现的,Condition作为一个条件类,很好的自己维护了一个等待信号的队列,并在适时的时候将结点加入到AQS的等待队列中来实现的唤醒操作。
有了以上的知识,我们继续来看signal方法。
public final void signal() {
if (!isHeldExclusively())//检测当前线程是否为拥有锁的独占线程
throw new IllegalMonitorStateException();
/* firstWaiter为condition自己维护的一个链表的头结点, 取出第一个节点后开始唤醒操作 */
Node first = firstWaiter;
if (first != null)
doSignal(first);//开始唤醒
}
此方法干了两件事:取出Condition条件队列中的头结点,然后调用doSignal开始唤醒。
下面介绍下doSignal是如何来完成唤醒操作的。
private void doSignal(Node first) {
do {
//修改头结点,完成旧头结点的移出工作
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
doSignal(Node first)方法干了两件事:第一件事为修改条件队列中的头结点,第二件事为完成旧的头结点的移出工作,即从Condition队列中移出到AQS同步队列中去。
节点的移出工作是调用transferForSignal(Node node)来完成的。transferForSignal(Node node)函数的代码如下:
/** * Transfers a node from a condition queue onto sync queue. * Returns true if successful. * @param node the node * @return true if successfully transferred (else the node was * cancelled before signal) *翻译:从条件队列中转移一个节点到同步队列中去 */
final boolean transferForSignal(Node node) {
/* * If cannot change waitStatus, the node has been cancelled. *翻译:如果不能改变waitStatus的值,则说明此节点已经被取消了 */
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
Node p = enq(node);//将节点加入到syn队列中去,返回的是syn队列中node节点前面的一个节点
int ws = p.waitStatus;
//如果结点p的状态为cancel 或者修改waitStatus失败,则直接唤醒。
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
LockSupport.unpark(node.thread);
return true;
}
可以看到,正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程。
只有到发送signal信号的线程调用reentrantLock.unlock()后,因为它已经被加到AQS的等待队列中,所以才可能会被唤醒。
以上就是关于Condition的相关知识。
最后以一个Condition实现的生产消费者模型的例子结束。
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class ProductDemo {
private Lock lock = new ReentrantLock();
private Condition nonFull = lock.newCondition();
private Condition nonEmpty = lock.newCondition();
private Object [] items;
private int head,tail,count;
public ProductDemo(int maxSize){
items = new Object[maxSize];
count = 0;
head = tail = 0;
}
public ProductDemo(){
this(100);
}
public void put(Object o) throws InterruptedException{
lock.lock();
try{
while(count==items.length){
nonFull.await();
}
items[tail++]=o;
if(tail==items.length){
tail = 0;
}
count++;
nonEmpty.signalAll();
}finally{
lock.unlock();
}
}
public Object take() throws InterruptedException{
lock.lock();
try{
while(count<=0){
nonEmpty.await();
}
count --;
Object o = items[head];
head++;
if(head==items.length){
head = 0;
}
nonFull.signalAll();
return o;
}finally{
lock.unlock();
}
}
}