Jdk1.6 JUC源码解析(11)-CyclicBarrier
作者:大飞
功能简介:
- CyclicBarrier是一种可重复使用的栅栏机制,可以让一组线程在某个点上相互等待,这个点就可以类比为栅栏。并且这个栅栏是可重复使用的,这点可以和前面分析过的CountDownLatch做对比,CountDownLatch只能用一次。
- CyclicBarrier还支持在所有线程到达栅栏之后,在所有线程从等待状态转到可运行状态之前,执行一个命令(或者说是动作)。
- 当然,在某些情况下,栅栏可以被打破。比如某个线程无法在规定的时间内到达栅栏。
源码分析:
- 先看下CyclicBarrier内部的结构:
public class CyclicBarrier {
/**
* 每次对栅栏的使用可以表示为一个generation。栅栏每次开放或者重置,
* generation都会发生改变。使用栅栏的线程可以关联多个generations,
* 由于等待线程可能会以多种方式请求锁,但是在特定的时间只有一个是
* 可用的,其他的要么被打破,要么开放。
* 如果一个栅栏已经被打破。且没有后续的重置动作,那么可以不存在可
* 用的generation。
*/
private static class Generation {
boolean broken = false;
}
/** 用于保护栅栏的锁 */
private final ReentrantLock lock = new ReentrantLock();
/** 栅栏开放的条件 */
private final Condition trip = lock.newCondition();
/** 表示当前使用栅栏的使用方(线程)数量 */
private final int parties;
/* 当栅栏开放时,要使用的命令(动作) */
private final Runnable barrierCommand;
/** The current generation */
private Generation generation = new Generation();
/**
* 处于等待状态的使用方(线程)的数量,在每一个generation上从
* parties递减为0。当新建generation(栅栏开放)或者栅栏被打破
* 时,重置为parties。
*/
private int count;
从上述的结构细节中可见,CyclicBarrier内部使用ReentrantLock来实现,并包含一个trip条件,来作为栅栏模拟栅栏的行为(所有使用方都在这个条件上等待)。
- 具体使用栅栏时,各个线程会在要互相等待的地方调用一个”等待”方法,然后在这个方法处等待。当所有线程都到达次方法时,栅栏打开,所有线程从等待出继续执行。接下来就从这个”等待”方法入手开始分析:
/**
* 线程调用此方法后等待,直到所有parties都调用当前barrier的这个方法。
*
* 如果当前线程是不最后一个到达此方法的线程,那么会阻塞,直到下面的
* 事情发生:
*
* 最后的线程也到达此方法。
* 其他线程中断了当前线程。
* 其他线程中断了在栅栏处等待的某个线程。
* 某个线程调用了栅栏的reset方法。
*/
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
/**
* 和上面的方法相比,多了超时的情况。
*/
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
看下上述等待方法中调用的dowait方法:
/**
* 栅栏主体代码,涵盖了所有情况。
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//获取当前的generation实例。
final Generation g = generation;
if (g.broken) //如果当前generation状态为broken,说明栅栏被打破,抛出BrokenBarrierException异常。
throw new BrokenBarrierException();
if (Thread.interrupted()) {
//如果当前线程被中断,打破栅栏,然后抛出中断异常。
breakBarrier();
throw new InterruptedException();
}
//计算当前到达线程的下标。
int index = --count;
//下标为0表示当前线程为最后一个使用栅栏的线程。
if (index == 0) { // 栅栏开放。
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)//如果有栅栏命令,执行栅栏命令。
command.run();//看来栅栏的命令是由最后一个到达栅栏的线程执行。
ranAction = true;
//产生新的generation。
nextGeneration();
return 0;
} finally {
if (!ranAction) //如果栅栏命令未执行,打破栅栏。
breakBarrier();
}
}
// 等待中的主循环,直到栅栏开放、栅栏被打破、线程被打断或者超时时退出。
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
//如果出于当前generation 且generation状态为未打破,那么打破栅栏。
breakBarrier();
throw ie;
} else {
// 如果没被中断的话,我们即将完成等待。
// 所以这个中断被算作下一次执行的中断。
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)//如果generation改变了,说明之前的栅栏已经开放,返回index。
return index;
if (timed && nanos <= 0L) {
breakBarrier();//如果超时,打破栅栏,并返回超时异常。
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
从dowait方法中可以看到,当所有使用者都到达时,栅栏开放,会调用nextGeneration方法;如果有其他情况(超时、中断等)发生,会调用breakBarrier方法。先看下nextGeneration:
/**
* 更新栅栏状态,唤醒所有在栅栏处等待的线程。
* 这个方法只有在持有锁的情况下被调用。
*/
private void nextGeneration() {
//唤醒所有在栅栏处等待的线程。
trip.signalAll();
//重置count。
count = parties;
//产生新的generation。
generation = new Generation();
}
再看下breakBarrier方法:
/**
* 设置当前栅栏generation状态为打破状态,并唤醒栅栏处的等待线程。
* 这个方法只有在持有锁的情况下被调用。
*/
private void breakBarrier() {
//设置"打破"状态。
generation.broken = true;
//重置count。
count = parties;
//唤醒所有在栅栏处等待的线程。
trip.signalAll();
}
dowait内部为栅栏的主要逻辑,经过上面的分析,应该很清晰了。最后看下其他的方法:
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
public CyclicBarrier(int parties) {
this(parties, null);
}
public int getParties() {
return parties;
}
public boolean isBroken() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return generation.broken;
} finally {
lock.unlock();
}
}
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
//重置逻辑,先打破当前的栅栏,然后建立一个新的。
breakBarrier();
nextGeneration();
} finally {
lock.unlock();
}
}
/**
* 获取在栅栏处等待的使用方(线程)数量。
*/
public int getNumberWaiting() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
return parties - count;
} finally {
lock.unlock();
}
}
小总结一下:
1.当建立一个使用方数量为n的栅栏时,栅栏内部有一个为n的计数。当使用方调用await方法时,如果其他n-1个使用方没有全部到达await方法(内部计数减1后,不等于0),那么使用方(线程)阻塞等待。
2.当第n个使用方调用await时,栅栏开放(内部计数减1后等于0),会唤醒所有在await方法上等待着的使用方(线程),大家一起通过栅栏,然后重置栅栏(内部计数又变成n),栅栏变成新建后的状态,可以再次使用。
CyclicBarrier的代码解析完毕! 参见:
Jdk1.6 JUC源码解析(7)-locks-ReentrantLock