基本概念
CyclicBarrier ,也称可重用屏障是一个线程同步工具,用于一组互相等待线程之间的协调,在到达某个临界点之前,这些线程必须互相等待。通俗来讲,表示 n 个线程,大家相互等待,只要有一个没完成,所有人都得等着。
CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。
调用方法
假设有场比赛,需要 3 个选手才能比赛。若存在选手未到达,则必须等待选手都到齐了了才能开始。
- 选手类
public class Player implements Runnable {
private CyclicBarrier cb;
private String name;
public Player(CyclicBarrier cb, String name) {
this.cb = cb;
this.name = name;
}
@Override
public void run() {
try {
cb.await(); // 表示到达
System.out.println(name + " 开始比赛 ...");
} catch (Exception e) {
e.printStackTrace();
}
}
}
- 调用过程
public class Test {
public static void main(String[] args) {
final CyclicBarrier cb = new CyclicBarrier(3);
new Thread(new Player(cb, "选手甲")).start();
new Thread(new Player(cb, "选手乙")).start();
try {
System.out.println("等待选手丙到达");
Thread.sleep(1000); // 确保甲、乙先到
cb.await(); // 到达
System.out.println("选手丙 开始比赛...");
} catch (InterruptedException e) {
e.printStackTrace();
} catch (BrokenBarrierException e) {
e.printStackTrace();
}
}
}
- 输出结果
等待选手丙到达
选手甲 开始比赛 ...
选手乙 开始比赛 ...
选手丙 开始比赛...
内部构造
首先需要明白 CyclicBarrier 内部几个重要的成员变量:
// 需要到达 barrier 的线程数量,参与的线程数,即参与的线程数
private final int parties;
// 未到达 barrier 的线程数量
private int count;
// 由最后一个进入 barrier 的线程执行的操作,即所有线程被唤醒前的操作
private final Runnable barrierCommand;
// 代,用于 CyclicBarrier 的重用
private Generation generation = new Generation();
再来看它的构造函数,构建 CyclicBarrier 需要指定参与的线程数(parties),并可以附带打破 barrier 前需要执行的任务(barrierAction)。
public CyclicBarrier(int parties) {
this(parties, null);
}
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0){
// 抛出异常...
}
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}
Genation
Genation,即代。由于 CyclicBarrier 是可重用的,那么就需要一定的结构来维持每次重用的信息。这个结构就是 genation 。
private static class Generation {
// 默认为 false,表示还存在线程未到达 barrier ,即 barrier 还不能被破坏
boolean broken = false;
}
当所有线程到达 barrier 时,说明本次 CyclicBarrier 的任务已经完成了,也就不需要再通过 Genation 来维护信息了, 就把 broken 设置为 true。表示 barrier 已被打破。
generation.broken = true;
若想要重用 CyclicBarrier ,就需要开启新的 Genation。
generation = new Generation();
await
线程调用 await 操作后,表示其到达 barrier。
public int await() throws ... {
try {
return dowait(false, 0L);
} catch (TimeoutException toe) {
// 抛出异常...
}
}
CyclicBarrier 的 await 操作都会调用 dowait 方法。
private final ReentrantLock lock = new ReentrantLock();
private final Condition trip = lock.newCondition();
private int dowait(boolean timed, long nanos) throws .... {
// 加锁,确保线程安全
final ReentrantLock lock = this.lock;
lock.lock();
try {
// ①判断 Generation 是否失效
final Generation g = generation;
if (g.broken){
// 抛出异常...
}
// ②判断线程中断标记位
if (Thread.interrupted()) {
// 唤醒所有到达 barrier 线程
breakBarrier();
throw new InterruptedException();
}
// ③判断是否所有线程都到达 barrier 时
// 为 0 表示所有线程都已到达 barrier,则唤醒所有到达的线程
int index = --count;
if (index == 0) {
boolean ranAction = false;
try {
// 线程被唤醒前需要执行的任务
final Runnable command = barrierCommand;
if (command != null){
command.run();
}
ranAction = true;
// 创建新的代
nextGeneration();
return 0;
} finally {
if (!ranAction){
breakBarrier();
}
}
}
// 不为 0 说明线程未到达 barrier,需要放在循环里是防止虚假唤醒
for (;;) {
try {
// 线程进入等待阻塞,并释放锁
if (!timed){
trip.await();
}else if (nanos > 0L){
nanos = trip.awaitNanos(nanos);
}
} catch (InterruptedException ie) {
if (g == generation && !g.broken) {
// 打破当前的代
breakBarrier();
throw ie;
} else {
Thread.currentThread().interrupt();
}
}
if (g.broken){
throw new BrokenBarrierException();
}
if (g != generation){
return index;
}
if (timed && nanos <= 0L) {
breakBarrier();
// 抛出异常...
}
}
} finally {
lock.unlock();
}
}
1.创建新的代
private void nextGeneration() {
// 唤醒到达 barrier 的线程
trip.signalAll();
// 重置 parties,并创建新的 generation ,体现了重用性。
count = parties;
generation = new Generation();
}
2.打破当前的代
private void breakBarrier() {
// 打破 barrier
generation.broken = true;
// 重置 parties
count = parties;
// 唤醒到达 barrier 的线程
trip.signalAll();
}
reset
释放操作
public void reset() {
final ReentrantLock lock = this.lock;
lock.lock();
try {
// 打破当前的代
breakBarrier();
// 开启新的代
nextGeneration();
} finally {
lock.unlock();
}
}