JUC代码浅析[6]——基于AQS的CyclicBarrier
CyclicBarrier是一种同步机制允许一组线程相互等待,等到所有线程都到达一个屏障点才退出await方法,它没有直接实现AQS而是借助ReentrantLock来实现的同步机制。它是可循环使用的,而CountDownLatch是一次性的,另外它体现的语义也跟CountDownLatch不同,CountDownLatch减少计数到达条件采用的是release方式,而CyclicBarrier走向屏障点(await)采用的是Acquire方式,Acquire是会阻塞的,这也实现了CyclicBarrier的另外一个特点,只要有一个线程中断那么屏障点就被打破,所有线程都将被唤醒(CyclicBarrier自己负责这部分实现,不是由AQS调度的),这样也避免了因为一个线程中断引起永远不能到达屏障点而导致其他线程一直等待。屏障点被打破的CyclicBarrier将不可再使用(会抛出BrokenBarrierException)除非执行reset操作。
下面是一个使用的例子
class Solver {
final int N;
final float[][] data;
final CyclicBarrier barrier;
class Worker implements Runnable {
int myRow;
Worker(int row) { myRow = row; }
public void run() {
while (!done()) {
processRow(myRow);
try {
barrier.await();
} catch (InterruptedException ex) {
return;
} catch (BrokenBarrierException ex) {
return;
}
}
}
}
public Solver(float[][] matrix) {
data = matrix;
N = matrix.length;
barrier = new CyclicBarrier(N,
new Runnable() {
public void run() {
mergeRows(…);
}
});
for (int i = 0; i < N; ++i)
new Thread(new Worker(i)).start();
waitUntilDone();
}
}
这个例子的场景是每个Worker线程处理完一行数据后,等待其他线程到达屏障点。当所有线程都到达屏障点时进行一次合并操作(mergeRows方法),然后每个Worker线程继续处理下一行数据.如此反复。
下面的代码片段说明了如何实现这种机制,整个过程为
1首先检查屏障点有没有打破,如果打破了就直接抛出异常退出。
2检查当前线程是否被中断了,如果中断了就打破屏障点,并抛出异常退出。
3检查当前线程是否最后一个到达屏障点,如果是的话就设置下一轮屏障,并唤醒所有线程,如果需要的话执行barrierCommand,退出。
4.以上都不满足的话,循环等待在trip这个condition上。如果设置了等待超时时间,那么只要有一个线程等待超时后就会打破屏障点唤醒所有线程,并抛出异常TimeoutException退出,其他线程可能会抛出BrokenBarrierException异常。所以如果不设置超时时间并且最后一个线程由于某种原因不能到达屏障点,那么所有线程都一直死等待了,因此才需要上面几步的检查。
整个过程还有很多逻辑来处理执行屏障点任务引发的异常、park线程超时引发的中断异常和超时异常等
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = —count;
if (index == 0) { // tripped
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();
ranAction = true;
nextGeneration();
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed)
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();
throw ie;
} else {
// We’re about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// “belong” to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
CyclicBarrier对外API只有7个:
l public CyclicBarrier(int parties) 创建一个CyclicBarrier,指定参与者数量
l public CyclicBarrier(int parties, Runnable barrierAction)创建一 个CyclicBarrier,指定参与者数量。并且指定所有参与者到达屏障点之后需要执行的动作。
l public int await() throws InterruptedException, BrokenBarrierException在所 有参与者到达屏障点之前一直等待(也就是说等待所有参与者都调用了await方法才往下执行)
l public int await(long timeout, TimeUnit unit)
throws InterruptedException,BrokenBarrierException,TimeoutException 跟await()方法一样,只是指定了等待超时时间,具体实现请看上面的分析
l public boolean isBroken()屏障点是否被打破
l public void reset()重置屏障点(屏障点打破之后只能调用这个方法才可重新使用)
l public int getNumberWaiting()处于等待状态的参与者的个数,也就是调用了await方法的参与者 个数