基本概念
Semaphore 即计数信号量,它本身维护着一组permit(许可)。它本质是共享锁,通过修改 permit 的值来调整可以被多少个线程同时持有。
当信号量中有可用的许可时,线程能获取该许可;否则线程必须等待,直到有可用的许可为止。
- acquire ,表示获取信号量的许可;调用该方法时,在没有可用的许可前线程将阻塞。
- release ,表示添加一个可用许可;调用该方法时,可以添加一个许可,即潜在地释放一个阻塞的获取线程。
调用方法
假设一间店最多只能接待两个客户。那么多余的客户就必须等待前面的客户离开后才能进入。
- 客户类
public class Client implements Runnable {
private Semaphore sem;
private String name;
public Client (Semaphore sem, String name) {
this.sem = sem;
this.name = name;
}
@Override
public void run() {
try {
sem.acquire();
System.out.println(name + " 开始咨询 ..."+System.currentTimeMillis());
Thread.sleep(1000);
sem.release();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
- 调用过程
final Semaphore sem = new Semaphore(2);
new Thread(new Client(sem, "客户甲")).start();
new Thread(new Client(sem, "客户乙")).start();
new Thread(new Client(sem, "客户丙")).start();
new Thread(new Client(sem, "客户丁")).start();
- 输出结果,同一时间只能接待两位客户
客户甲 开始咨询 ...1487776676598
客户丙 开始咨询 ...1487776676598
客户丁 开始咨询 ...1487776677604
客户乙 开始咨询 ...1487776677604
内部构造
- 同步器,与 ReetrantLock 一样,Semaphore 内部定义了基于 AQS 的同步器 Sync。并且定义了 FairSync、NonfairSync 两种同步策略。
abstract static class Sync extends AbstractQueuedSynchronizer {...}
final static class NonfairSync extends Sync {...}
final static class FairSync extends Sync {...}
- 构造函数,构建 Semaphore 时,需指定它的信号量(即许可量),且默认会采用不公平的同步策略。
private final Sync sync;
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
public Semaphore(int permits, boolean fair) {
sync = ( fair ) ? new FairSync(permits) : new NonfairSync(permits);
}
- 信号量,也称许可量(permits),类似时锁的重入计数。同 ReetrantLock 一样,这里用 AQS的 状态来表示许可的数量。它表示 Semaphore 可以同时被线程 acquire 的数量。
Sync(int permits) {
setState(permits);
}
NonfairSync(int permits) {
super(permits);
}
FairSync(int permits) {
super(permits);
}
acquire
这里只介绍 acquire 方法,不分析 acquireUninterruptibly、tryAcquire 等方法。它表示获取许可。
public void acquire() throws InterruptedException {
// 调用 AQS 的方法,表示尝试获取锁,不忽略中断。
sync.acquireSharedInterruptibly(1);
}
整个调用过程如下:
AQS.acquireSharedInterruptibly ->Semaphore.tryAcquireShared ->AQS.doAcquireSharedInterruptibly
因此重点来看 tryAcquireShared 这个方法。该方法在 FairSync、NonfairSync 中作了不同实现。
1.公平方式获取许可量
FairSync 类中关于 tryAcquireShared 的定义:
protected int tryAcquireShared(int acquires) {
for (;;) {
// 关键 -> 判断该节点在等待中是否还有前继节点
// 由于等待队列采用了 FIFO 原则,所以体现公平性
if (hasQueuedPredecessors()){
return -1;
}
// 计算剩余信号量(即许可证)
int available = getState();
int remaining = available - acquires;
// 修改许可证
if (remaining < 0 || compareAndSetState(available, remaining))
return remaining;
}
}
2.非公平方式获取许可量
NonfairSync 类中关于 tryAcquireShared 的定义,与 FairSync 的区别是少了 hasQueuedPredecessors 方法判断。
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
// Sync
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 || compareAndSetState(available, remaining)){
return remaining;
}
}
}
relaase
该操作释放许可。
public void release() {
// 调用 AQS.releaseShared 方法
sync.releaseShared(1);
}
public final boolean releaseShared(int arg) {
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}
整个调用过程如下:
AQS.releaseShared ->Semaphore.tryReleaseShared->AQS.doReleaseShared
下面来看 tryReleaseShared 的实现,该方法在其内部类 Sync 中定义:
// Sync
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current){
// 抛出异常...
}
if (compareAndSetState(current, next)){
return true;
}
}
}