CountDownLatch源码-JUC线程同步工具2

CountDownLatch源码-JUC线程同步工具2

上篇博文将的线程同步工具信号量Semaphore源码 以停车车辆坑位表示并发量来展开阐述的。

何为CountDownLatch

CountDownLatch能够使一个线程在等待另外一些线程完成各自工作之后,再继续执行。使用一个计数器进行实现。计数器初始值为线程的数量。当每一个线程完成自己任务后,计数器的值就会减一。当计数器的值为0时,表示所有的线程都已经完成了任务,然后在CountDownLatch上等待的线程就可以恢复执行任务。

说的依然比较抽象:简单来说,A线程在等待另外几个线程完成某个工作之后再A在继续执行。
在说白点A线程能收集其他几个线程的执行结果。

举个例子
三个人组了一个小团,导游等三个人到齐进大巴了,然后出发去旅行。

例子中的导游就是上面说的A线程,其他几个线程处理完就是这里的三个人都到齐进大巴了。

代码使用

public static void main(String[] args) throws InterruptedException {
    CountDownLatch cdl = new CountDownLatch(3);

    new Person("李四", cdl).start();
    new Person("王五", cdl).start();
    new Person("张三", cdl).start();

    System.out.println("导游正在等待"+cdl.getCount()+"个人来大巴集合");
    cdl.await();

    System.out.println("所有人到期上大巴去旅游....");  
}

static class Person extends Thread{

    private String name;

    private CountDownLatch cdl;

    public Person(String name,CountDownLatch cdl) {
        this.name = name;
        this.cdl = cdl;
    }

    @Override
    public void run() {
        try {
            System.out.println("person:"+name+"开始启程去大巴集合点");
            Thread.sleep(new Random().nextInt(10000));
            cdl.countDown();
            System.out.println("person:"+name+"【到达】启程去大巴集合点");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

运行结果:
person:李四开始启程去大巴集合点
person:张三开始启程去大巴集合点
导游正在等待3个人来大巴集合
person:王五开始启程去大巴集合点
person:张三【到达】启程去大巴集合点
person:李四【到达】启程去大巴集合点
person:王五【到达】启程去大巴集合点
所有人到期上大巴去旅游….

任何一个人没有到达之前(其它几个线程没有执行完)导游是await()卡在哪里的,等所有人到了导游才能领到大家一起去旅行。

重要属性
依赖是有一个内被抽象类继承了AQS
《CountDownLatch源码-JUC线程同步工具2》
跟以往看到的Semaphore和ReentrantLock不一样的是这里没有两个公平锁和非公平锁都继承Sync,为啥? 因为不需要,只有调用await()的那个线程才会阻塞,只有一个线程会阻塞所以跟公平与不公平没关系就它自己阻塞

构造方法

public CountDownLatch(int count) {
    if (count < 0) throw new IllegalArgumentException("count < 0");
    this.sync = new Sync(count);
}

上一篇博文Semaphore类似,也是将count值赋值给AQS.state= count来表示等待count个线程完成业务。

CountDownLatch基本上没什么方法,它本身就是一个线程同步工具类。跟以往的风格一样,我们还会从使用入手,来看await()方法和countDown(1)方法

await()方法

public void await() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);
}
//依然进入AQS模板类 跟Semaphore一样的
public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
    if (Thread.interrupted())
        throw new InterruptedException();
    if (tryAcquireShared(arg) < 0)
        doAcquireSharedInterruptibly(arg);
}

我们还是主要看子类是如何重写tryAcquireShared(1)的,

protected int tryAcquireShared(int acquires) {
    return (getState() == 0) ? 1 : -1;
}

getState() ==0了,说明CountDownLatch(count)个数量已经被其他几个线程都进行countDown()消耗,此时改方法返回0。
进入AQS.doAcquireSharedInterruptibly(arg) 这个我们就比较熟悉了,不熟悉的进入友情链接:Semaphore源码,还是addWaiter自旋,将自己挂起。

countDown(1)方法

public void countDown() {
    sync.releaseShared(1);
}
//依然是AQS模板方法
public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {
        doReleaseShared();
        return true;
    }
    return false;
}
//子类cdl
protected boolean tryReleaseShared(int releases) {
    // Decrement count; signal when transition to zero
    for (;;) {
        int c = getState();
        if (c == 0)
            return false;
        int nextc = c-1;
        if (compareAndSetState(c, nextc))
            return nextc == 0;
    }
}
//唤醒head.next
private void doReleaseShared() {
    for (;;) {
        Node h = head;
        if (h != null && h != tail) {
            int ws = h.waitStatus;
            if (ws == Node.SIGNAL) {
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                    continue;            // loop to recheck cases
                unparkSuccessor(h);
            }
            else if (ws == 0 &&
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                continue;                // loop on failed CAS
        }
        if (h == head)                   // loop if head changed
            break;
    }
}

逻辑是很简单分:

当前线程调用countDown就是state-1
唤醒head.next 即调用await()的
调用await()的线程被唤醒[LockSupport.unpark(head.next.thread)]

这里我们再次贴出await之后线程阻塞的自旋方法。

private void doAcquireSharedInterruptibly(int arg)
        throws InterruptedException {
    final Node node = addWaiter(Node.SHARED);
    boolean failed = true;
    try {
        for (;;) {
            final Node p = node.predecessor();
            if (p == head) {
                int r = tryAcquireShared(arg);//被唤醒执行
                if (r >= 0) {
                    setHeadAndPropagate(node, r);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())//阻塞
                throw new InterruptedException();
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}
    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

上文分析过await()就是getState() == 0?1:-1此时返回1进入的就是上面的方法并阻塞在parkAndCheckInterrupt,当被唤醒之后调用tryAcquireShared 即又一次判断getState() == 0?1:-1 等到getState() == 0了返回-1 await()就不阻塞了,开始执行业务。

总结CountDownLatch:

它也是计数器线程同步工具
它没有公平锁与非公平锁Sync的两个子类,以为阻塞的就是调用await()的那个一个线程(就一个)
构造方法CountDownLatch(count) 就是调用await的线程要等待的其他count各线程执行完业务逻辑的数量

    原文作者:JUC
    原文地址: https://blog.csdn.net/mayongzhan_csdn/article/details/81035455
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞