背景介绍
CountDownLatch
是JUC下的一个类,通过调用继承了AQS的内部类完成同步操作.
CountDownLatch
的语义是一个计数器,你可以向CountDownLatch对象设置一个初始的数字作为计数值,任何调用这个对象上的await()方法都会阻塞,直到这个计数器的计数值被其他的线程减为0为止。
源码分析
CountDownLatch.await()
先看下源码
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
可知是调用可中断且共享的acquire(),可中断与不可中断的区别见JUC-ReentrantLock源码分析,我们一起看下共享和独占的区别
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
//此方法非常简单,仅判断state是否为0
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//nextWaiter为Node.SHARED
final Node node = addWaiter(Node.SHARED);
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
int r = tryAcquireShared(arg);
if (r >= 0) {
//重点
setHeadAndPropagate(node, r);
p.next = null; // help GC
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
可以看出与非共享的acquire
相比只有两处不同:
- 生成Node.SHARED模式的Node(非共享的
acquire
生成Node.EXCLUSIVE模式的Node) - 成功获取锁资源后,调用
setHeadAndPropagate(node, r);
(非共享的acquire
只会setHead
而不会向后传播)
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
//如果可传播,则执行doReleaseShared()
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}
private void doReleaseShared() {
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!h.compareAndSetWaitStatus(Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!h.compareAndSetWaitStatus(0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
在doReleaseShared()
中有个死循环,还有一句unparkSuccessor(h)
,可知是通过unparkSuccessor()
进行线程的唤醒.比较奇怪的地方是for循环的break条件是loop if head changed
,那什么时候head 才会 changed呢?
在上述代码中,被挂起的第一个线程唤醒了被挂起的第二个线程,此时第二个线程便会执行与第一个线程相似的代码,即setHeadAndPropagate()
–>doReleaseShared()
,调用setHead()
时便修改了head.
CountDownLatch.countDown()
public void countDown() {
sync.releaseShared(1);
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
//使用CAS将state原子减1
int c = getState();
if (c == 0)
return false;
int nextc = c - 1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
可以发现在tryReleaseShared()
将state减1,减至0时还是执行doReleaseShared()
唤醒被挂起的线程
ReentrantLock与CountDownLatch语义分析
ReentrantLock
在ReentrantLock中
- 初始时state为0
- lock()–>若state为0或已获得锁资源,state加1;否则,挂起
- unlock()–>state减一,若state为0,则唤醒AQS队列的第一个线程
CountDownLatch
在CountDownLatch中
- state初始为一正整数
- await()–>若state不为0,挂起
- countDown()–>state减1,若state为0,唤醒AQS队列的所有线程
Semphore 允许多个线程同时访问
- state初始为一正整数
- acquire()–>若state小于0,挂起
- release()–>释放许可,若有被挂起的线程,则唤醒线程
总结
CountDownLatch使用AQS的共享模式,在release时唤醒AQS队列上所有线程