Java多线程 -- JUC包源码分析11 -- CyclicBarrier源码分析

在前面的篇章中,讲解了ReentrantLock + Condition,并讲述了2者结合的一个典型应用:ArrayBlockingQueue/LinkedBlockingQueue。

今天讲述2者结合的另一个典型应用:CyclicBarrier。

CyclicBarrier的概念

要介绍CyclicBarrier这个概念,可以从下面这个比喻开始:一个小公司的所有人周六要出去团建,大家提前商定好周六早上9点公司门口集合,然后到了目的地,在分开行动。

在这里,“公司门口集合地“就是一个CyclicBarrier:大家本来是分散的,在某个时间点集中到一块,等所有人到齐,然后到了目的地再分散行动。

具体到代码中,就是

CyclicBarrier cb = new CyclicBarrier(5);

cb.await();  //5个线程,各自分别调await,都阻塞。直到5个线程都掉了await()的那个时间点,5个线程同时解锁,然后各自继续各自的事情!

ReentrantLock + Condition

public class CyclicBarrier {

    private final ReentrantLock lock = new ReentrantLock();
    private final Condition trip = lock.newCondition();  //用于线程之间互相唤醒
    private final int parties;   //类似于AQS那个state原子变量,也就是总共的线程个数

    ...
}
    private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;
            if (g.broken)
                throw new BrokenBarrierException();
            if (Thread.interrupted()) {         //响应中断
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;  //每个线程调用1次await(),count--
           if (index == 0) {  //count减到0的时候,此线程,唤醒其他所有线程
               boolean ranAction = false;
               try {
           final Runnable command = barrierCommand;
                   if (command != null)  //一起唤醒之后,还可以执行一个回调。
                       command.run();
                   ranAction = true;
                   nextGeneration(); //唤醒其他所有线程,同时把count值复原,用于下一次的CyclicBarrier(因为CyclicBarrier可以重复使用)
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            for (;;) {   //count > 0,说明人没到齐,阻塞自己
                try {
                    if (!timed)
                        trip.await(); //关键点:await阻塞自己的同时,会把锁释放掉,这样别的线程就会拿到锁,执行上面的index = count--
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                       throw ie;
                    } else {
                       Thread.currentThread().interrupt();
                     }
                }

                if (g.broken)
                    throw new BrokenBarrierException();
                if (g != generation)   //从阻塞中唤醒,返回
                    return index;
                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

    private void nextGeneration() {
        // signal completion of last generation
        trip.signalAll();
        // set up next generation
        count = parties;
        generation = new Generation();
    }

整个代码唯一不太好明白的是Generation,这个东西主要是用来复原CyclicBarrier,从而可以下1次重用。不像CountDownLatch,用完1次,就不能再用了。

    原文作者:JUC
    原文地址: https://blog.csdn.net/chunlongyu/article/details/52476365
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞