之前看了CountDownLatch,他的实现是使用了aqs,提前设置好state的值,如果state不是0的时候调用await就会阻塞当前线程,加入到aqs的队列中,调用countDown就会减小state的值,当state的值时0的时候就会释放锁,将队列中的所有的线程释放,开始运行。这个CountDownLatch可以用于不同种类的线程之间,比如我们在连zk的时候,因为zkClient内部是使用的另一个线程链接zk,我们的主线程要在zk链接之后才能继续操作,所以就使用了CountDownLatch,这是用在不同种类的线程中。CountDownLatch也可以用在相同种类的线程中,比如有这么一个需求:要去一个饭店开一个生日宴会,这个宴会必须等待所有的朋友来了才可以一起开始,这个时候可以使用下面的代码:
public static void main(String[] args) {
final CountDownLatch latch = new CountDownLatch(10);
for(int i=0;i<10;i++){
new Thread(new Runnable() {
public void run() {
System.out.println(Thread.currentThread().getName() + " 已经来到了。");//一个线程来了,代表一个顾客
latch.countDown();//表示又来了一个人
try {
latch.await();//当前的人等待其他人来
System.out.println("既然都来了,就开始吧");//当所有的人都来了,则开始
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).start();
}
}
上面的代码中,CountDownLatch是用在同种类的线程中,表示所有的线程在都达到某个状态后才会执行下一个步骤。在jdk中有一个更加好用的类,可以提供类似的作用,并且更简单,即CyclicBarrier类。CyclicBarrier类表示一个阻碍,在构造这个类时就要穿入一个值,表示标记的大小。在条件不满足时,所有调用await的线程都会被挂起,同时将一个标记减一,再将标记变为0之后,就会将所有阻塞的线程变为可以执行的。我们看一下他的源码:
1、构造方法:
public CyclicBarrier(int parties, Runnable barrierAction) {//parties表示要满足的调用await的个数,也就是上面说的要进行的聚会至少要来的人的个数。第二个参数是个可以执行的runnable,表示当条件满足之后(也就是partie变为0之后),要做的一件事,由最后触发await的线程执行。
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;//这个是不变的,供 一个方法返回。
this.count = parties;//这个count是会不断减小的,每调用一次awati就会减一。
this.barrierCommand = barrierAction;//条件满足后要执行的任务
}
这个类中还有好多属性,
ReentrantLock lock :用于在调用await的时候加锁,防止并发修改count的值造成不准确。
Condition trip:从上面的lock中产生的condition,用于在不满足条件时将调用await的线程放到等待的队列中,当条件满足后将所有的等待的移动到lock(内部是aqs)的队列中。
Generation generation:这个属性狠简单,因为CyclicBarrier是一个可以重复使用的barrier,这个generation表示第几次使用,每当parties变为0之后这一代就算完了,再进行下一代的操作。它里面封装了一个boolean属性,表示当前的这一代有没有被破坏。当某个被阻塞的线程被调用interrupt之后就会变为true,即当前的一代被破坏了。
2、await方法
public int await() throws InterruptedException, BrokenBarrierException {
try {
return dowait(false, 0L);//从await中调用的是没有等待时间限制的dowait
} catch (TimeoutException toe) {
throw new Error(toe); // cannot happen;
}
}
第一个参数表示是否有时间限制,第二个参数是如果有时间限制的话,限制的时间
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock;
lock.lock();//因为里面要修改count,所以加锁,防止并发修改
try {
final Generation g = generation;
if (g.broken)
throw new BrokenBarrierException();//关于broken的我没有研究,所以不再对broken的情况进行注释。
if (Thread.interrupted()) {
breakBarrier();
throw new InterruptedException();
}
int index = --count;//减小count,
if (index == 0) { // 刚上来不会是0的,这个是等到指定的次数被调用之后触发
boolean ranAction = false;
try {
final Runnable command = barrierCommand;//要执行的任务
if (command != null)
command.run();//由最后调用await的线程调用,注意这里在执行run的时候可能会抛异常,就会进入finally的beakBarrier方法,其他线程在重新执行后就会全部抛出BrokenBarrierException异常。
ranAction = true;
nextGeneration();//开启下一代,操作包括:1、将count重新设置为parties,2、更新generation的值,3、将全部condtion中阻塞的线程释放
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {//这个是当await没有调用指定的次数时
try {
if (!timed)//如果没有时间限制,则调用condtion.await,也就是放入到conditon的队列中,将线程挂起
trip.await();
else if (nanos > 0L)
nanos = trip.awaitNanos(nanos);//有时间限制的话,等待指定的时间,
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier();//
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)//如果因为某个原因导致broken,则所有唤醒之后的线程都会抛出异常
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) {//如果是有时间限制的,如果等待超时候就会抛异常。
breakBarrier();//这个方法会将generation设置为broken,然后将condition中阻塞的线程全部释放。
throw new TimeoutException();
}
}
} finally {
lock.unlock();
}
}
在不考虑barrierBroken的前提下,这个类还是很容易看懂的。