12.JUC 锁- CyclicBarrier

基本概念

CyclicBarrier ,也称可重用屏障是一个线程同步工具,用于一组互相等待线程之间的协调,在到达某个临界点之前,这些线程必须互相等待。通俗来讲,表示 n 个线程,大家相互等待,只要有一个没完成,所有人都得等着。

CyclicBarrier 还允许指定一个任务,在所有线程到达临界点时执行,由最后到达的线程执行此任务。

调用方法

假设有场比赛,需要 3 个选手才能比赛。若存在选手未到达,则必须等待选手都到齐了了才能开始。

  • 选手类
public class Player implements Runnable {
    private CyclicBarrier cb;
    private String name;

    public Player(CyclicBarrier cb, String name) {
        this.cb = cb;
        this.name = name;
    }

    @Override
    public void run() {
        try {
            cb.await();  // 表示到达
            System.out.println(name + " 开始比赛 ...");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  • 调用过程
public class Test {
    public static void main(String[] args) {
        final CyclicBarrier cb = new CyclicBarrier(3);
        new Thread(new Player(cb, "选手甲")).start();
        new Thread(new Player(cb, "选手乙")).start();

        try {
            System.out.println("等待选手丙到达");
            Thread.sleep(1000); // 确保甲、乙先到
            cb.await(); // 到达
            System.out.println("选手丙 开始比赛...");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        }
    }
}
  • 输出结果
等待选手丙到达
选手甲 开始比赛 ...
选手乙 开始比赛 ...
选手丙 开始比赛...

内部构造

首先需要明白 CyclicBarrier 内部几个重要的成员变量:

// 需要到达 barrier 的线程数量,参与的线程数,即参与的线程数
private final int parties;

// 未到达 barrier 的线程数量
private int count;

// 由最后一个进入 barrier 的线程执行的操作,即所有线程被唤醒前的操作
private final Runnable barrierCommand;

// 代,用于 CyclicBarrier 的重用
private Generation generation = new Generation();

再来看它的构造函数,构建 CyclicBarrier 需要指定参与的线程数(parties),并可以附带打破 barrier 前需要执行的任务(barrierAction)。

public CyclicBarrier(int parties) {
    this(parties, null);
}

public CyclicBarrier(int parties, Runnable barrierAction) {
    if (parties <= 0){
        // 抛出异常...
    }

    this.parties = parties;
    this.count = parties;
    this.barrierCommand = barrierAction;
}

Genation

Genation,即代。由于 CyclicBarrier 是可重用的,那么就需要一定的结构来维持每次重用的信息。这个结构就是 genation 。

private static class Generation {
    // 默认为 false,表示还存在线程未到达 barrier ,即 barrier 还不能被破坏
    boolean broken = false;
}

当所有线程到达 barrier 时,说明本次 CyclicBarrier 的任务已经完成了,也就不需要再通过 Genation 来维护信息了, 就把 broken 设置为 true。表示 barrier 已被打破。

generation.broken = true;

若想要重用 CyclicBarrier ,就需要开启新的 Genation。

generation = new Generation();

await

线程调用 await 操作后,表示其到达 barrier。

public int await() throws ... {
    try {
        return dowait(false, 0L);
    } catch (TimeoutException toe) {
        // 抛出异常...
    }
}

CyclicBarrier 的 await 操作都会调用 dowait 方法。

private final ReentrantLock lock = new ReentrantLock();

private final Condition trip = lock.newCondition();

private int dowait(boolean timed, long nanos) throws .... {

    // 加锁,确保线程安全
    final ReentrantLock lock = this.lock;
    lock.lock();

    try {
        // ①判断 Generation 是否失效
        final Generation g = generation;
        if (g.broken){
            // 抛出异常...
        }

        // ②判断线程中断标记位
        if (Thread.interrupted()) {
            // 唤醒所有到达 barrier 线程 
            breakBarrier();
            throw new InterruptedException();
        }

        // ③判断是否所有线程都到达 barrier 时

        // 为 0 表示所有线程都已到达 barrier,则唤醒所有到达的线程
        int index = --count;
        if (index == 0) { 
            boolean ranAction = false;
            try {
                // 线程被唤醒前需要执行的任务
                final Runnable command = barrierCommand;
                if (command != null){
                    command.run();
                }
                ranAction = true;

                // 创建新的代
                nextGeneration();

                return 0;
            } finally {
                if (!ranAction){
                    breakBarrier();
                }
            }
        }

        // 不为 0 说明线程未到达 barrier,需要放在循环里是防止虚假唤醒
        for (;;) {
            try {
                // 线程进入等待阻塞,并释放锁
                if (!timed){
                    trip.await();
                }else if (nanos > 0L){
                    nanos = trip.awaitNanos(nanos);
                }
            } catch (InterruptedException ie) {
                if (g == generation && !g.broken) {
                    // 打破当前的代
                    breakBarrier();
                    throw ie;
                } else {
                    Thread.currentThread().interrupt();
                }
            }

            if (g.broken){
                throw new BrokenBarrierException();
            }

            if (g != generation){
                return index;
            }

            if (timed && nanos <= 0L) {
                breakBarrier();
                // 抛出异常...
            }
        }
    } finally {
        lock.unlock();
    }
}

1.创建新的代

private void nextGeneration() {
    // 唤醒到达 barrier 的线程
    trip.signalAll();

    // 重置 parties,并创建新的 generation ,体现了重用性。
    count = parties;
    generation = new Generation();
}

2.打破当前的代

private void breakBarrier() {
    // 打破 barrier 
    generation.broken = true;

    // 重置 parties
    count = parties;

    // 唤醒到达 barrier 的线程
    trip.signalAll();
}

reset

释放操作

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        // 打破当前的代
        breakBarrier(); 

        // 开启新的代
        nextGeneration();
    } finally {
        lock.unlock();
    }
}

    原文作者:JUC
    原文地址: https://blog.csdn.net/u012420654/article/details/56521175
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞