CyclicBarrier源码分析

CyclicBarrier源码分析

首先分析CyclicBarrier的核心方法await

    public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen;
        }
    }
    public int await(long timeout, TimeUnit unit)
        throws InterruptedException,
               BrokenBarrierException,
               TimeoutException {
        return dowait(true, unit.toNanos(timeout));
    }

可以看出不管是否带超时,但将会调用方法dowait方法

   private int dowait(boolean timed, long nanos)
        throws InterruptedException, BrokenBarrierException,
               TimeoutException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            final Generation g = generation;

            if (g.broken)
                throw new BrokenBarrierException();

            if (Thread.interrupted()) {
                breakBarrier();
                throw new InterruptedException();
            }

           int index = --count;
           if (index == 0) {  // tripped
               boolean ranAction = false;
               try {
                   final Runnable command = barrierCommand;
                   if (command != null)
                       command.run();
                   ranAction = true;
                   nextGeneration();
                   return 0;
               } finally {
                   if (!ranAction)
                       breakBarrier();
               }
           }

            // loop until tripped, broken, interrupted, or timed out
            for (;;) {
                try {
                    if (!timed)
                        trip.await();
                    else if (nanos > 0L)
                        nanos = trip.awaitNanos(nanos);
                } catch (InterruptedException ie) {
                    if (g == generation && ! g.broken) {
                        breakBarrier();
                        throw ie;
                    } else {
                        // We're about to finish waiting even if we had not
                        // been interrupted, so this interrupt is deemed to
                        // "belong" to subsequent execution.
                        Thread.currentThread().interrupt();
                    }
                }

                if (g.broken)
                    throw new BrokenBarrierException();

                if (g != generation)
                    return index;

                if (timed && nanos <= 0L) {
                    breakBarrier();
                    throw new TimeoutException();
                }
            }
        } finally {
            lock.unlock();
        }
    }

在代码执行前,将会使用一个ReentrantLock对象(CyclicBarrier的实例域)进行加锁,保证屏障的线程安全。

final Generation g = generation;

if (g.broken)
    throw new BrokenBarrierException();

if (Thread.interrupted()) {
    breakBarrier();
    throw new InterruptedException();
}

接下来将检查屏障是否被打破,还有检查当前线程是否被中断
那么是如何检查屏障是否被打破,使用到的即是Generation对象,将根据该对象的实例域broken(一个布尔类型的值)来判断是否被打破

private static class Generation {
    boolean broken = false;
}

Generation类只有一个布尔类型的实例域broken,用于作为屏障是否打破的标记

int index = --count;
if (index == 0) {  // tripped
    boolean ranAction = false;
    try {
        final Runnable command = barrierCommand;
        if (command != null)
            command.run();
            ranAction = true;
            nextGeneration();
            return 0;
        } finally {
            if (!ranAction)
                breakBarrier();
        }
    }

接下来将开始计数,将屏障计数减一,然后判断是否为0,若为0则全部线程到达屏障,屏障将打开
在打开屏障前,将会执行CyclicBarrier的Runnable实例域barrierCommand,barrierCommand就是在构造函数中传参进行来的Runnable对象,即用户想要在所有线程到达后,而在屏障打开前锁要执行的任务
可以看出,如上说的一样,改任务是在屏障打开前执行的,而且是由最后一个到达屏障的线程执行

接下来可以看看打开屏障的方法nextGeneration

private void nextGeneration() {
    // signal completion of last generation
    trip.signalAll();
    // set up next generation
    count = parties;
    generation = new Generation();
}

trip是一个Condition对象

private final Condition trip = lock.newCondition();

用于实现屏障的阻塞功能,当然,在所有线程到达屏障后,将会唤醒所有阻塞的线程
count是用于到达屏障的计数,而parties则是指定屏障的需要线程到达的数量,当屏障打开时,count将复原为需要到达数量
最后,将重新实例化一个Generation对象,然后替换掉原来的。
前面提到,Generation有个布尔类型实例域broken用于判断屏障是否被打破,那么直接用一个boolean类型不就行了吗,为什么还需要定义个Generation类。Generation所代表的是”一代”,而屏障需要标记的,不单只是是否被打破,还要标记屏障是否完成了一轮的任务,即被正常打开,显然这样需要两个标记值。而使用Generation对象用作表示每轮的屏障的状态,当验证屏障是否被打开时,只需要验证是否为同一个Generation对象即可。如果不抽象出一个Generation类,对于这种屏障状态的表达,将显得复杂而难以控制

for (;;) {
    try {
        if (!timed)
            trip.await();
        else if (nanos > 0L)
            nanos = trip.awaitNanos(nanos);
    } catch (InterruptedException ie) {
        if (g == generation && ! g.broken) {
            breakBarrier();
            throw ie;
        } else {
            // We're about to finish waiting even if we had not
            // been interrupted, so this interrupt is deemed to
            // "belong" to subsequent execution.
            Thread.currentThread().interrupt();
        }
    }

    if (g.broken)
        throw new BrokenBarrierException();

    if (g != generation)
        return index;

    if (timed && nanos <= 0L) {
        breakBarrier();
        throw new TimeoutException();
    }
}

若然到达屏障的不是最后一个线程,将进入一个死循环,并且根据是否带超时,执行Condition对象的await方法进行阻塞,以此等待其他线程到达屏障,该过程在死循环中执行,是为了防止线程意外唤醒,破坏屏障的规则
若然线程在阻塞的过程中被中断,将会判断屏障是否完好,若然完好,则会将屏障打破,唤醒其他线程,这种情况是任何一个到达屏障的线程被中断都会引发的。若然线程在中断时,屏障已经打开或者被打破了,将再次执行当前线程的中断操作,而不做更多的操作
线程从阻塞中返回后,将有三个判断
第一个判断判断于屏障是否被打破,则线程是因为屏障被打破而唤醒的,抛出BrokenBarrierException异常
第二个判断是判断generation是否为同一个对象,则判断”本轮操作”完成了吗,屏障被正常打开,将返回一个整型index,index表达的是线程到达屏障的顺序,数值越大越早到达,index为0则表明该线程是最后一个到达的
第三个判断是用于超时判断,若线程由于超时而被唤醒,将会打破屏障,唤醒其他线程,并且抛出TimeoutException

打破屏障的方法

private void breakBarrier() {
    generation.broken = true;
    count = parties;
    trip.signalAll();
}

屏障打破后将唤醒所有在屏障中等待的线程,而且被打破后需要重置屏障,否则不能再使用该屏障
重置屏障方法

public void reset() {
    final ReentrantLock lock = this.lock;
    lock.lock();
    try {
        breakBarrier();   // 打破当前屏障
        nextGeneration(); // 下一代,即重新开始屏障计数
    } finally {
        lock.unlock();
    }
}
    原文作者:移动开发
    原文地址: https://my.oschina.net/bingzhong/blog/1559853
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞