CyclicBarrier源码分析
首先分析CyclicBarrier的核心方法await
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
public int await(long timeout, TimeUnit unit)
throws InterruptedException,
BrokenBarrierException,
TimeoutException {
return dowait(true, unit.toNanos(timeout));
}
可以看出不管是否带超时,但将会调用方法dowait方法
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
在代码执行前,将会使用一个ReentrantLock对象(CyclicBarrier的实例域)进行加锁,保证屏障的线程安全。
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
接下来将检查屏障是否被打破,还有检查当前线程是否被中断
那么是如何检查屏障是否被打破,使用到的即是Generation对象,将根据该对象的实例域broken(一个布尔类型的值)来判断是否被打破
private static class Generation {
boolean broken = false;
}
Generation类只有一个布尔类型的实例域broken,用于作为屏障是否打破的标记
int index = --count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
接下来将开始计数,将屏障计数减一,然后判断是否为0,若为0则全部线程到达屏障,屏障将打开
在打开屏障前,将会执行CyclicBarrier的Runnable实例域barrierCommand,barrierCommand就是在构造函数中传参进行来的Runnable对象,即用户想要在所有线程到达后,而在屏障打开前锁要执行的任务
可以看出,如上说的一样,改任务是在屏障打开前执行的,而且是由最后一个到达屏障的线程执行
接下来可以看看打开屏障的方法nextGeneration
private void nextGeneration() {
// signal completion of last generation
trip.signalAll();
// set up next generation
count = parties;
generation = new Generation();
}
trip是一个Condition对象
private final Condition trip = lock.newCondition();
用于实现屏障的阻塞功能,当然,在所有线程到达屏障后,将会唤醒所有阻塞的线程
count是用于到达屏障的计数,而parties则是指定屏障的需要线程到达的数量,当屏障打开时,count将复原为需要到达数量
最后,将重新实例化一个Generation对象,然后替换掉原来的。
前面提到,Generation有个布尔类型实例域broken用于判断屏障是否被打破,那么直接用一个boolean类型不就行了吗,为什么还需要定义个Generation类。Generation所代表的是”一代”,而屏障需要标记的,不单只是是否被打破,还要标记屏障是否完成了一轮的任务,即被正常打开,显然这样需要两个标记值。而使用Generation对象用作表示每轮的屏障的状态,当验证屏障是否被打开时,只需要验证是否为同一个Generation对象即可。如果不抽象出一个Generation类,对于这种屏障状态的表达,将显得复杂而难以控制
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
若然到达屏障的不是最后一个线程,将进入一个死循环,并且根据是否带超时,执行Condition对象的await方法进行阻塞,以此等待其他线程到达屏障,该过程在死循环中执行,是为了防止线程意外唤醒,破坏屏障的规则
若然线程在阻塞的过程中被中断,将会判断屏障是否完好,若然完好,则会将屏障打破,唤醒其他线程,这种情况是任何一个到达屏障的线程被中断都会引发的。若然线程在中断时,屏障已经打开或者被打破了,将再次执行当前线程的中断操作,而不做更多的操作
线程从阻塞中返回后,将有三个判断
第一个判断判断于屏障是否被打破,则线程是因为屏障被打破而唤醒的,抛出BrokenBarrierException异常
第二个判断是判断generation是否为同一个对象,则判断”本轮操作”完成了吗,屏障被正常打开,将返回一个整型index,index表达的是线程到达屏障的顺序,数值越大越早到达,index为0则表明该线程是最后一个到达的
第三个判断是用于超时判断,若线程由于超时而被唤醒,将会打破屏障,唤醒其他线程,并且抛出TimeoutException
打破屏障的方法
private void breakBarrier() {
generation.broken = true;
count = parties;
trip.signalAll();
}
屏障打破后将唤醒所有在屏障中等待的线程,而且被打破后需要重置屏障,否则不能再使用该屏障
重置屏障方法
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
breakBarrier(); // 打破当前屏障
nextGeneration(); // 下一代,即重新开始屏障计数
} finally {
lock.unlock();
}
}