基本介绍
Semaphore
,JUC中提供的一个并发组件,根据字面理解是信号量的意思。Semaphore
能够实现的功能是允许多个线程同时获取共享资源,实际是共享锁(基于AQS的共享实现模式)的实现。获取不到资源的线程会进行阻塞等待,直到其他线程释放资源,当前阻塞线程尝试获取共享资源成功,则从阻塞等待中返回,否则继续阻塞等待。每个资源称为一个permit(许可),由Semaphore
内部持有。Semaphore
实际上也是基于AQS来实现其功能,permit对应的就是AQS中的同步状态state。通过静态内部类实现共享锁的获取和释放方法,同时提供公平和非公平的实现,类似于ReentrantLock
中的实现。
可以用Semaphore实现限行的功能,比如限制任务执行一次并发只能有N个(定义一个具有N个permit的Semaphore)
基本方法
重要方法列表
方法 | 说明 |
---|---|
void acquire() throws InterruptedException | 获取一个许可,允许中断 |
void acquireUninterruptibly() | 同acquire()获取一个许可,不同的是该方法不响应中断 |
boolean tryAcquire() | 尝试获取一个许可,立刻返回,成功返回true,失败返回false |
boolean tryAcquire(long timeout, TimeUnit unit) | 同tryAcquire(),尝试获取一个许可,不同的是该方法支持超时机制 |
void acquire(int permits) throws InterruptedException | acquire高级版,尝试一次获取多个许可 |
void acquireUninterruptibly(int permits) | acquireUninterruptibly高级版,尝试一次获取多个许可 |
void tryAcquire(int permits) | tryAcquire高级版 |
boolean tryAcquire(int permits, long timeout, TimeUnit unit) | tryAcquire(long timeout, TimeUnit unit)高级版 |
void release() | 释放一个许可 |
void release(int permits) | 释放多个许可 |
Semaphore使用示例
public class SemaphoreTest {
public static void main(String[] args) {
final Semaphore res = new Semaphore(2);
Thread[] thds = new Thread[10];
Runnable r = new Runnable() {
@Override
public void run() {
try {
res.acquire();
} catch (InterruptedException e1) {
e1.printStackTrace();
return ;
}
try {
System.out.println("thread : " + Thread.currentThread().getName() + " got permit");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
res.release();
}
}
};
for (int i = 0; i < thds.length; i++) {
thds[i] = new Thread(r, "thd" + i);
thds[i].start();
}
for (int i = 0; i < thds.length; i++) {
try {
thds[i].join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
}
示例输出,每次只有2个线程能够获取到许可(看到两两输出):
thread : thd0 got permit
thread : thd1 got permit
thread : thd3 got permit
thread : thd2 got permit
thread : thd4 got permit
thread : thd5 got permit
thread : thd7 got permit
thread : thd6 got permit
thread : thd8 got permit
thread : thd9 got permit
源码分析
Semaphore
代码不多,下面直接根据源码 + 注释的形式来理解:
public class Semaphore implements java.io.Serializable {
private static final long serialVersionUID = -3222578661600680210L;
private final Sync sync;
// 基于AQS实现,使用AQS中同步状态state来表示permits
// 分非公平实现和公平实现,默认使用非公平实现
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;
Sync(int permits) {
setState(permits);
}
final int getPermits() {
return getState();
}
// 非公平尝试获取共享资源(对该实现类而言,是尝试获取许可permits)
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
// 释放共享资源,资源释放无所谓公平性(对该实现类而言,是释放许可permits)
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
if (compareAndSetState(current, next))
return true;
}
}
// 动态减少共享资源(许可)
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
if (compareAndSetState(current, next))
return;
}
}
// 直接修改共享资源(许可)为0
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}
/** * 非公平实现 */
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;
NonfairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}
/** * 公平实现 */
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;
FairSync(int permits) {
super(permits);
}
protected int tryAcquireShared(int acquires) {
for (;;) {
// 公平性的关键: 判断同步队列是否有其他节点在等待获取共享资源
// 非公平性就是一上来就尝试抢占,不管同步队列里面是否有其他节点在等待获取共享资源
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}
/** * Semaphore默认使用非公平实现 */
public Semaphore(int permits) {
sync = new NonfairSync(permits);
}
// 可选择公平性的构造函数
public Semaphore(int permits, boolean fair) {
sync = fair ? new FairSync(permits) : new NonfairSync(permits);
}
// 请求获取许可,若许可不足则阻塞,直到有其他线程调用release,或者当前线程被中断(若被中断,则会抛InterruptedException并且中断标志位会被重置)
public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
// 请求获取许可,不响应线程中断
public void acquireUninterruptibly() {
sync.acquireShared(1);
}
// 尝试获取一个许可(非公平获取),成功获取返回true,失败则返回false
public boolean tryAcquire() {
return sync.nonfairTryAcquireShared(1) >= 0;
}
// 尝试获取一个许可,获取成功则返回true,如果无法获取,等待直到超时或者被中断(无法获取返回false)
public boolean tryAcquire(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
// 释放一个许可,使得阻塞于获取许可的线程有机会得到许可
public void release() {
sync.releaseShared(1);
}
// acquire()的高级版,一次请求获取多个许可
public void acquire(int permits) throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireSharedInterruptibly(permits);
}
// acquireUninterruptibly()的高级版,一次请求获取多个许可
public void acquireUninterruptibly(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.acquireShared(permits);
}
// tryAcquire()高级版,尝试一次获取多个许可
public boolean tryAcquire(int permits) {
if (permits < 0) throw new IllegalArgumentException();
return sync.nonfairTryAcquireShared(permits) >= 0;
}
public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
throws InterruptedException {
if (permits < 0) throw new IllegalArgumentException();
return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
}
// 释放多个许可
public void release(int permits) {
if (permits < 0) throw new IllegalArgumentException();
sync.releaseShared(permits);
}
// 返回当前可用的许可数量
public int availablePermits() {
return sync.getPermits();
}
// 请求获取剩余的所有许可
public int drainPermits() {
return sync.drainPermits();
}
// 直接减少许可数量,不阻塞当前调用线程
protected void reducePermits(int reduction) {
if (reduction < 0) throw new IllegalArgumentException();
sync.reducePermits(reduction);
}
public boolean isFair() {
return sync instanceof FairSync;
}
public final boolean hasQueuedThreads() {
return sync.hasQueuedThreads();
}
public final int getQueueLength() {
return sync.getQueueLength();
}
protected Collection<Thread> getQueuedThreads() {
return sync.getQueuedThreads();
}
public String toString() {
return super.toString() + "[Permits = " + sync.getPermits() + "]";
}
}