AQS是同步框架,它进行两个方面的工作:资源的管理和资源申请者的管理。对应由两部分组成:一个volatile int state(代表共享资源)和一个FIFO线程等待队列(多线程争用资源被阻塞时会进入此队列)。state的访问方式有三种:
getState()
setState()
compareAndSetState()
AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch)。
实现自定义同步器时只需要实现共享资源state的获取与释放方式即可,至于具体线程等待队列的维护(如获取资源失败入队/唤醒出队等),AQS已经在顶层实现好了。
自定义同步器实现时主要实现以下几种方法:
isHeldExclusively():该线程是否正在独占资源。只有用到condition才需要去实现它。
tryAcquire(int):独占方式。尝试获取资源,成功则返回true,失败则返回false。
tryRelease(int):独占方式。尝试释放资源,成功则返回true,失败则返回false。
tryAcquireShared(int):共享方式。尝试获取资源。负数表示失败;0表示成功,但没有剩余可用资源;正数表示成功,且有剩余资源。
tryReleaseShared(int):共享方式。尝试释放资源,成功则返回true,失败则返回false。
以ReentrantLock为例,state初始化为0,表示未锁定状态。A线程lock()时,会调用tryAcquire()独占该锁并将state+1。此后,其他线程再tryAcquire()时就会失败,直到A线程unlock()到state=0(即释放锁)为止,其它线程才有机会获取该锁。当然,释放锁之前,A线程自己是可以重复获取此锁的(state会累加),这就是可重入的概念。但要注意,获取多少次就要释放多么次,这样才能保证state是能回到零态的。
再以CountDownLatch以例,任务分为N个子线程去执行,state也初始化为N(注意N要与线程个数一致)。这N个子线程是并行执行的,每个子线程执行完后countDown()一次,state会CAS减1。等到所有子线程都执行完后(即state=0),会unpark()主调用线程,然后主调用线程就会从await()函数返回,继续后余动作。
一般来说,自定义同步器要么是独占方法,要么是共享方式,他们也只需实现tryAcquire-tryRelease、tryAcquireShared-tryReleaseShared中的一种即可。但AQS也支持自定义同步器同时实现独占和共享两种方式,如ReentrantReadWriteLock。
理解了这些,可以很容易的自定义同步器类,如下例子:
class Mutex implements Lock , Serializable{
private static class Syn extends AbstractQueuedSynchronizer{
@Override
protected boolean isHeldExclusively() {
return getState()==1;
}
@Override
protected boolean tryAcquire(int arg) {
assert arg==1;
if(compareAndSetState(0,1)) {
setExclusiveOwnerThread(Thread.currentThread());
return true;
}
return false;
}
@Override
protected boolean tryRelease(int arg) {
assert arg==1;
if(getState()==0)
throw new IllegalMonitorStateException();
setExclusiveOwnerThread(null);
setState(0);
return true;
}
}
private Syn syn=new Syn();
public void lock() {
syn.acquire(1);
}
public void unlock() {
syn.release(1);
}
public void lockInterruptibly() throws InterruptedException{
syn.acquireInterruptibly(1);
}
public boolean tryLock(){
return syn.tryAcquire(1);
}
public boolean tryLock(long time, TimeUnit unit) throws InterruptedException{
return syn.tryAcquireNanos(1,unit.toNanos(time));
}
public Condition newCondition(){
return null;
}
}
上面说,AQS对资源申请者的管理已经在顶层实现好了,自定义同步器时仅仅需要重写几种特定的方法,不需要关心队列的实现细节。但这里还是简要的了解下。
先看acquire()方法:acquire()方法是独占模式获取资源的顶级入口。代码如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
流程如下:
1. 调用自定义同步器的tryAcquire()尝试直接去获取资源,如果成功则直接返回;
2. 没成功,则addWaiter()将该线程加入等待队列的尾部,并标记为独占模式;
3. acquireQueued()使线程在等待队列中休息,有机会时(轮到自己,会被unpark())会去尝试获取资源。获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。
再看release()方法,release()方法是独占模式下释放资源的顶级入口。代码如下:
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;//找到头结点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);//唤醒等待队列里的下一个线程
return true;
}
return false;
}
至此,关于AQS的简单介绍就结束了。AQS是许多常用类的基础,比如ReentrantLock,Semaphore,ReentrantReadWriteLock,CountDownLatch等等,都是基于AQS构建的。感兴趣的话可以去看他们的实现。