JUC CountDownLatch 分析

简介

JUC提供的并发组件CountDownLatch可以实现类似Thread.join的效果,实现等待多个线程执行结束以后才执行接下来逻辑的功能(一个线程阻塞等待其他多个并发任务线程执行完任务之后才解除阻塞)

eg:

package com.crazypig.juc;

import java.util.concurrent.CountDownLatch;

public class CountDownLatchTest {

    public static void main(String[] args) {

        final int CNT = 2;
        final CountDownLatch cdl = new CountDownLatch(CNT);

        Thread[] thds = new Thread[CNT];
        for (int i = 0; i < thds.length; i++) {
            thds[i] = new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        Thread.sleep(3000);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    } finally {
                        System.out.println(Thread.currentThread().getName() + " end!");
                        cdl.countDown();
                    }
                }
            }, "thd" + i);
            thds[i].start();
        }

        System.out.println(Thread.currentThread().getName() + " wait cdl ...");
        try {
            cdl.await();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        System.out.println(Thread.currentThread().getName() + " end!");
    }

}

输出:

main wait cdl ...
thd0 end!
thd1 end!
main end!

一般CountDownLatch主要是用于等待往线程池提交的多个任务执行结束,再执行相关逻辑的场景,因为往线程池里面执行的任务,无法通过thread.join来执行等待

实现分析

其内部实现也是借助于AQS,并使用其共享语义实现模式,内部类Sync初始化state为CountDownLatch需要等待的资源(通常是任务数or线程数)的个数。等待线程调用await底层直观逻辑是需要等到state为0,否则则阻塞等待;其他线程的每一次countDown操作,都使用state减1,直到countDown使得state为0,那么调用await的等待线程将会从阻塞状态中恢复。

源码分析

CountDownLatch的具体实现源码不多,直接通过在源码上+注释来达到分析的效果:

package java.util.concurrent;
import java.util.concurrent.locks.*;
import java.util.concurrent.atomic.*;


public class CountDownLatch {

    private static final class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 4982264981922014374L;

        Sync(int count) {
            setState(count);
        }

        int getCount() {
            return getState();
        }

        protected int tryAcquireShared(int acquires) {
            return (getState() == 0) ? 1 : -1;
        }

        protected boolean tryReleaseShared(int releases) {
            // Decrement count; signal when transition to zero
            for (;;) {
                int c = getState();
                if (c == 0)
                    return false;
                int nextc = c-1;
                if (compareAndSetState(c, nextc))
                    return nextc == 0; // 只有 c == 0, 才执行唤醒动作
            }
        }
    }

    private final Sync sync;

    public CountDownLatch(int count) {
        if (count < 0) throw new IllegalArgumentException("count < 0");
        // 设置state = count
        this.sync = new Sync(count);
    }

    // 线程阻塞等待count为0才会返回, 响应中断也会返回
    public void await() throws InterruptedException {
        // 最终会调用sync.tryAcquireShared,根据内部类实现的逻辑,需要阻塞等待直到state = 0
        sync.acquireSharedInterruptibly(1);
    }

    // 相比await(),加入超时机制, 阻塞等待具有最长时间, 不会无限制地等待
    public boolean await(long timeout, TimeUnit unit)
        throws InterruptedException {
        // 最终会调用sync.tryAcquireShared
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    // 使得count - 1,即AQS中的state - 1
    public void countDown() {
        sync.releaseShared(1);
    }

    // 返回AQS中的state(count等价于state)
    public long getCount() {
        return sync.getCount();
    }

    public String toString() {
        return super.toString() + "[Count = " + sync.getCount() + "]";
    }
}
    原文作者:JUC
    原文地址: https://blog.csdn.net/d6619309/article/details/80642134
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞