CyclicBarrier
同步屏障,如果有人问你如何让N个线程同时之行某件事,请告诉他CyclicBarrier 那么他是如何来确保这一点的呢? 通过await使所有线程进入barrier。当达到预设计数器或await timeout 条件时,所有await线程开始继续执行
结合场景来说,今天我们要去抢火车票。我们为了公平起见,告诉大家今天中午12点开始放票,大家都在12点以前进入准备状态。12点的时候去抢票。
- 首先我们告诉所有人,我们最多只允许10个人同时排队来抢票。int num = 10;
- 我们定义了一个CyclicBarrier ,来告诉程序我们支持10个线程同时启动,条件是ThreadNum ==10,定义了一个线程Runnable,当所有线程准备好后,我们先执行定义的线程中内容
- 线程Grab中调用了await,则CyclicBarrier内部计数器就 -1,并阻塞当前线程,当与我们在构造时传入的num一致时,解开阻塞。当我们再次调用await时,他的计数器会重新计算N-1,这也是与countdownlatch最大的区别,所以CyclicBarrier是可以重用的
- public int await(long timeout, TimeUnit unit) 是来设定await最大等待时间,而timeunit是来设定long timeout参数的单位
package juc.ccs;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
/**
*
* @author Allen 2017年9月7日
*
*/
public class GrabTicketCyclicBarrier {
int num = 10;
CyclicBarrier cyclicBarrier = new CyclicBarrier(num, new Runnable() {
@Override
public void run() {
System.out.println("公告:~~~~~开始放票");
}
});
class Grab implements Runnable {
CyclicBarrier cb;
public Grab(CyclicBarrier cb) {
this.cb = cb;
}
@Override
public void run() {
System.out.printf("[%s--%s]\n", Thread.currentThread().getId(), "准备好了");
try {
cb.await();
} catch (InterruptedException | BrokenBarrierException e) {
e.printStackTrace();
}
System.out.printf("%s~~%s \n", "抢到票了", Thread.currentThread().getId());
}
}
public void execute() {
for (int i = 0; i < num; i++) {
new Thread(new Grab(cyclicBarrier)).start();
}
}
}
[11--准备好了]
[14--准备好了]
[18--准备好了]
[13--准备好了]
[19--准备好了]
[12--准备好了]
[20--准备好了]
[17--准备好了]
[15--准备好了]
[16--准备好了]
公告:~~~~~开始放票
抢到票了~~16
抢到票了~~15
抢到票了~~17
抢到票了~~20
抢到票了~~12
抢到票了~~19
抢到票了~~18
抢到票了~~13
抢到票了~~14
抢到票了~~11
看看数据结果.所有线程都进入了准备阶段,通过匿名内部类打印了”开始放票”,所有线程开始继续执行.进行抢票 我们可以看到如果在await之后我们就没有任何任务,那么在CyclicBarrier构造中的runnable可以作为,全部线程结束后的一个任务执行 如果await之后我们有其他任务要执行,那么runnable就作为其他任务之前执行了。
下面我们改一改来看看多个await 可以看到当所有线程准备好后,开始一起抢票,然后再等待大家抢票后,一起回家。
@Override
public void run() {
try {
System.out.printf("[%s--%s]\n", Thread.currentThread().getId(), "准备好了");
cb.await();
System.out.printf("%s~~%s \n", "抢到票了", Thread.currentThread().getId());
cb.await();
System.out.printf("%s~~ \n", "一起回家");
} catch (InterruptedException | BrokenBarrierException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
[11--准备好了]
[18--准备好了]
[20--准备好了]
[19--准备好了]
[13--准备好了]
[12--准备好了]
[14--准备好了]
[15--准备好了]
[16--准备好了]
[17--准备好了]
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
抢到票了~~17
抢到票了~~14
抢到票了~~13
抢到票了~~12
抢到票了~~19
抢到票了~~20
抢到票了~~18
抢到票了~~11
抢到票了~~16
抢到票了~~15
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
一起回家~~
一起回家~~
一起回家~~
一起回家~~
一起回家~~
一起回家~~
一起回家~~
一起回家~~
一起回家~~
一起回家~~
核心源码
/**
* Main barrier code, covering the various policies.
*/
private int dowait(boolean timed, long nanos)
throws InterruptedException, BrokenBarrierException,
TimeoutException {
final ReentrantLock lock = this.lock; //ReetrantLock是独占锁
lock.lock(); //锁住当前进程
try {
final Generation g = generation; //来得到判断线程状态每次重置都会赋予一个新的(nextGeneration)。
if (g.broken)
throw new BrokenBarrierException();
if (Thread.interrupted()) {//线程断了,抛出异常
breakBarrier();//重置计数,generation.broken = true 唤醒所有阻塞中的线程
throw new InterruptedException();
}
int index = --count; //计数器减少
if (index == 0) { // 计数器为0
boolean ranAction = false;
try {
final Runnable command = barrierCommand;
if (command != null)
command.run();//执行我们在构造对象中创建的runnable
ranAction = true;
nextGeneration();//是Condition.singleAll唤醒所有下面的阻塞,Condition是来配合ReentrantLock使用的,提供了阻塞队列
return 0;
} finally {
if (!ranAction)
breakBarrier();
}
}
// loop until tripped, broken, interrupted, or timed out
for (;;) {
try {
if (!timed) //确保不在await long timeout
trip.await();
else if (nanos > 0L) //如果是 long timeout 则进入计时等待
nanos = trip.awaitNanos(nanos);
} catch (InterruptedException ie) {
if (g == generation && ! g.broken) {
breakBarrier(); //上面解释过了
throw ie;
} else {
// We're about to finish waiting even if we had not
// been interrupted, so this interrupt is deemed to
// "belong" to subsequent execution.
Thread.currentThread().interrupt();
}
}
if (g.broken)
throw new BrokenBarrierException();
if (g != generation)
return index;
if (timed && nanos <= 0L) { //当long timeout超时了,夜之星breakBarrier
breakBarrier();
throw new TimeoutException();
}
}
} finally {
lock.unlock(); //reentrantlock必须要在finally中进行unlock否则会一直占用资源这也是通常我们使用synchronized的一个原因,以后再说这块儿
}
}
private void nextGeneration() {
// signal completion of last generation
trip.signalAll(); //提供了计数器重置,在上面的方法中,当计数器唯0的时候,在后面会调用他来重置计数器并解除Cndition的阻塞
// set up next generation
count = parties;//重置计数器 parties是构造函数中我们传入的num
generation = new Generation();
}
CountDownLatch
一个计数的阻塞,确保线程任务同步的工具类,他只执行一次。他与CyclicBarrier类似,不过他的计数器没有任何重置方法提供 通过AQS中双向链表Node公平锁来阻塞和解阻塞await
- 首先他只有一个构造,就是设置num,这里的num与CyclicBarrier的不同,CyclicBarrier类似一个阈值,await计数到num时候,开始解除阻塞。
- ContDownLath通过来countDown来进行num -1 ,当countDown后num值变为0后,执行await后语句
- 可以理解为一个是自动,一个是半自动。
package juc.ccs;
import java.util.concurrent.CountDownLatch;
/**
*
* @author Allen 2017年9月7日
*
*/
public class GrabTicketCountDownLatch {
int num = 10;
CountDownLatch countDownLatch = new CountDownLatch(num);
class Grab implements Runnable {
CountDownLatch cdl;
public Grab(CountDownLatch cdl) {
this.cdl = cdl;
}
@Override
public void run() {
System.out.printf("%s~~%s \n", "抢到票了", Thread.currentThread().getId());
cdl.countDown();
}
}
public void execute() throws InterruptedException {
for (int i = 0; i < num; i++) {
new Thread(new Grab(countDownLatch)).start();
}
countDownLatch.await();
System.out.printf("%s个人都抢完票了--%s\n", num, "我去回收了");
}
}
抢到票了~~16
抢到票了~~20
抢到票了~~19
抢到票了~~18
抢到票了~~17
抢到票了~~15
抢到票了~~14
抢到票了~~12
抢到票了~~11
抢到票了~~13
10个人都抢完票了--我去回收了
核心源码
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;
Sync(int count) {//构造中传入的num就是这里的count,如下注释中的
setState(count);
}
/* 这里是构造源码
public CountDownLatch(int count) {
if (count < 0) throw new IllegalArgumentException("count < 0");
this.sync = new Sync(count);
}*/
int getCount() {
return getState(); //getState就是获取我们构造时传入的num
}
protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0) //已经是0了那么这次就直接false了,无需收尾操作
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))//替换总数-1 CAS原子操作Usafe jvm 调用cpu实现,
return nextc == 0; //为0证明c进入的时候是1.
}
}
}
public void countDown() { //释放一个计数器 AQS
sync.releaseShared(1);
}
AbstractQueuedSynchronizer.class
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) { //调用CountDownlatch中,它的基类sync中对tryReleaseShared的实现,见上面解释
doReleaseShared();
return true;
}
return false; //计数器
}
AbstractQueuedSynchronizer.class
private void doReleaseShared() { //释放共享锁
for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
if (h == head) // loop if head changed
break;
}
}
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);//获取共享锁
}
AQS.class
public final void acquireSharedInterruptibly(int arg) //获取共享锁
throws InterruptedException {
if (Thread.interrupted()) //中断则抛出
throw new InterruptedException();
if (tryAcquireShared(arg) < 0) //通过sync class 来判断是否已结束
doAcquireSharedInterruptibly(arg);
}
AQS.class
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
final Node node = addWaiter(Node.SHARED); //将插入节点到队列
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {//如果当前节点的前一个节点就是头部那么下一次就是他来尝试获得锁。FIFO原则
int r = tryAcquireShared(arg);
if (r >= 0) {
setHeadAndPropagate(node, r);//获取成功,那么它就是下一次的头
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}
AQS.class
private Node addWaiter(Node mode) {
//一个双向链表谁先执行了compareAndSetTail谁就先入队,
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail; //得到尾部
if (pred != null) { //尾队列不为空
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
Semaphore
许可访问。Semaphore限制了最多访问人数,每个人拿到一个acquire许可,得到许可的人最多只能有num,只有别人release释放了许可,新的人才可以进来执行。否则就是阻塞
- Semaphore(int permits, boolean fair) fair保证了等得越久得到许可的优先级越高
- 下例我们建立了20个线程,但是我们只授予了10个许可
- 前十个得到许可的线程执行完回调用release来释放许可。这样后面的十个线程就可以依次得到许可并执行
package juc.ccs;
import java.util.concurrent.Semaphore;
/**
*
* @author Allen 2017年9月7日
*
*/
public class GrabTicketSemaphore {
int num = 10;
Semaphore semaphore = new Semaphore(num);//最多10个人抢票
class Grab implements Runnable {
Semaphore sp;
public Grab(Semaphore sp) {
this.sp = sp;
}
@Override
public void run() {
try {
semaphore.acquire();
System.out.printf("%s~~%s \n", "抢到票了", Thread.currentThread().getId());
Thread.sleep(2000);
semaphore.release();
System.out.println("release");
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void execute() throws InterruptedException {
for (int i = 0; i < num<<1; i++) {
new Thread(new Grab(semaphore)).start();
}
}
}
核心源码
AOS.class
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this); //park调用Usafe.park来等待“许可”并阻塞
return Thread.interrupted();
}
public static void unpark(Thread thread) {
if (thread != null) //释放锁
UNSAFE.unpark(thread);
}
总结
CyclicBarrier 主要通过独占锁来阻塞线程
CountDownLatch与Semaphore 主要通过AOS 公平锁、非公平锁,Node双向链表来资源进行竞争维护。
AOS源码多等我再好好看看源码拜读拜读文献。再和大家分享