Jdk1.5中包含了并发大神Doug Lea写的并发工具包java.util.concurrent,这个工具包中包含了显示锁和其他的实用同步组件。Doug Lea在构建锁和组件的时候,大多是以队列同步器(AbstractQueuedSynchronizer)为基础的,因此AbstractQueuedSynchronizer可以看作是并发包的基础框架。因此掌握了AbstractQueuedSynchronizer的实现原理,也就掌握了大多数并发组件的实现原理。
AbstractQueuedSynchronizer使用一个int变量state表示同步状态,使用一个隐式的FIFO队列(隐式队列就是并没有声明这样一个队列,只是通过每个节点记录它的上个节点和下个节点来从逻辑上产生一个队列)来完成阻塞线程的排队。
AQS是一个抽象类,当我们要构建一个同步组件的时候,需要定义一个子类继承AQS,这里应用了模板方法设计模式,我们简单复习一下模板模式:
模板模式由一个抽象类和一个实现类组成,抽象类中主要有三类方法:
模板方法:实现了算法主体框架,供外部调用。里面会调用原语操作和钩子操作。
原语操作:即定义的抽象方法,子类必须重写。
钩子操作:和原语操作类似,也是供子类重写的,区别是钩子操作子类可以选择重写也可以选择不重写,如果不重写则使用抽象类默认操作,通常是一个空操作或抛出异常。
在AQS中没有原语操作,也就是说自定义的子类继承AQS后,不会强制子类重写任何方法。AQS只提供了若干钩子操作,这些钩子操作的默认实现都是直接抛出异常。子类不需要重写所有的钩子操作,只需要根据要构建的同步组件的类型来决定要调用AQS中的哪些模板方法,再实现这些模板方法中用到了的钩子操作即可。
AQS中可供子类重写的钩子操作有:
方法名称 | 描述 |
boolean tryAcquire(int arg) | 独占式获取同步状态,成功返回true,失败返回false。 |
boolean tryRelease(int arg) | 独占式释放同步状态,成功返回true,失败返回false。 |
int tryAcquireShared(int arg) | 共享式获取同步状态,获取成功则返回值>=0 |
boolean tryReleaseShared(int arg) | 共享式释放同步状态,成功返回true,失败返回false。 |
boolean isHeldExclusively() | 判断同步器是否在独占模式下被占用,一般用来表示同步器是否被当前线程占用 |
可以看到可以重写的钩子操作既有独占式同步状态的获取与释放,也有共享式同步状态的获取与释放,这样就能支持构建不同类型的同步组件,如ReentrantLock使用时同一时刻只有一个线程可以获得锁,因此就可以通过重写tryAcquire和tryRelease实现;而Semaphore在同一时刻可以有多个线程获得许可,因此就可以通过重写tryAcquireShared(int arg)和tryReleaseShared(int arg)实现,事实上,这两个同步组件正是这么实现的。
在子类重写钩子操作的时候,可以调用AQS中的以下方法来获取/设置AQS的同步状态state:
方法 | 描述 |
int getState() | 获取当前同步状态 |
void setState(int newState) | 设置当前同步状态 |
boolean compareAndSetState(int expect, int update) | 使用CAS设置当前状态,保证状态更新的原子性 |
子类重写相关钩子操作后,AQS中提供的模板方法才能正常调用(如果模板方法中使用的钩子方法没有被子类重写,将抛出异常),AQS中提供的模板方法有(这里列出了所有的模板方法,只挑了比较常用了写了描述,其他的可以自行查看源码注释):
方法 | 描述 |
void acquire(int arg) | 独占式获取同步状态,该方法会调用子类重写的tryAcquire(int arg),如果tryAcquire返回true则该方法直接返回,否则先将当前线程加入同步队列的尾部,然后阻塞当前线程 |
void acquireInterruptibly(int arg) | 和acquire类似,只是当线程获取同步状态失败被阻塞后,可以响应中断,收到中断后将会取消获取同步状态 |
boolean tryAcquireNanos(int arg, long nanosTimeout) | 在acquireInterruptibly的基础上加了超时限制,如果在超时时间内获取到同步状态返回true,否则返回false |
boolean release(int arg) | 独占式释放同步状态,该方法会在释放同步状态后将第一个节点(对应刚刚释放同步状态的线程)的后继节点对应的线程唤醒。 |
void acquireShared(int arg) | 共享式获取同步状态,该方法会调用子类重写的tryAcquireShared(int arg),如果tryAcquireShared返回true则该方法直接返回,否则先将当前线程加入同步队列的尾部,然后阻塞当前线程 |
void acquireSharedInterruptibly(int arg) | 和acquireShared类似,只是当线程获取同步状态失败被阻塞后,可以响应中断,收到中断后将会取消获取同步状态 |
boolean tryAcquireSharedNanos(int arg, long nanosTimeout) | 在acquireSharedInterruptibly的基础上加了超时限制,如果在超时时间内获取到同步状态返回true,否则返回false |
boolean releaseShared(int arg) | 共享式的释放同步状态 |
boolean hasQueuedThreads() | |
boolean hasContended() | |
Thread getFirstQueuedThread() | |
boolean isQueued(Thread thread) | |
boolean hasQueuedPredecessors() | |
int getQueueLength() | |
Collection<Thread> getQueuedThreads() | |
Collection<Thread> getExclusiveQueuedThreads() | |
Collection<Thread> getSharedQueuedThreads() | |
boolean owns(ConditionObject condition) | |
boolean hasWaiters(ConditionObject condition) | |
int getWaitQueueLength(ConditionObject condition) | |
Collection<Thread> getWaitingThreads(ConditionObject condition) |
这里我们借助一个例子来加深对AQS的理解,我们用AQS来自定义一个独占锁MutexLock(独占锁就是同一时刻只有一个线程可以获得锁,而其他线程只能被阻塞,一直到当前线程释放了锁,其他线程才有机会再获取锁):
package com.gome; import java.util.concurrent.TimeUnit; import java.util.concurrent.locks.AbstractQueuedSynchronizer; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; public class MutexLock implements Lock{ private MutexSynchronizer synchronizer=new MutexSynchronizer(); private static class MutexSynchronizer extends AbstractQueuedSynchronizer{ /** * @param unused 这个参数是用来传同步状态的累加值的,因为我们实现的是独占锁, * 因此这个参数实际用不到,我们在方法里累加值恒为1 */ @Override protected boolean tryAcquire(int unused) { /** * 用CAS来更新AQS的同步状态,如果原值是0则更新为1代表已经有线程获取了独占锁 */ if (compareAndSetState(0, 1)) { setExclusiveOwnerThread(Thread.currentThread()); //设置当前独占锁的所有者线程 return true; } return false; } /** * @param unused 这个参数是用来传同步状态的递减值的,因为我们实现的是独占锁, * 因此这个参数实际用不到,我们在方法里递减值恒为1 */ @Override protected boolean tryRelease(int unused) { //如果当前AQS同步状态是0,说明试图在没有获得同步状态的情况下释放同步状态,直接抛异常 if (getState()==0) throw new IllegalMonitorStateException(); //这里不需要CAS而是直接把同步状态设置为0,因为我们实现的是独占锁,正常情况下会执行释放操作的线程只有同步状态的所有者线程 setState(0); setExclusiveOwnerThread(null); return true; } protected Condition newCondition() { return new ConditionObject(); } } @Override public void lock() { synchronizer.acquire(1); } @Override public void lockInterruptibly() throws InterruptedException { synchronizer.acquireInterruptibly(1); } @Override public boolean tryLock() { return synchronizer.tryAcquire(1); } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return synchronizer.tryAcquireNanos(1, unit.toNanos(time)); } @Override public void unlock() { synchronizer.release(1); } @Override public Condition newCondition() { return synchronizer.newCondition(); } }
解释:
我们让MutexLock实现了Lock接口,因此MutexLock就必须实现以下方法:
方法名称 | 描述 |
void lock() | 获取锁 |
void lockInterruptibly() throws InterruptedException | 可中断地获取锁,在线程获取锁的过程中可以响应中断 |
boolean tryLock() | 尝试非阻塞获取锁,调用方法后立即返回,成功返回true,失败返回false |
boolean tryLock(long time, TimeUnit unit) throws InterruptedException | 在超时时间内获取锁,到达超时时间将返回false,也可以响应中断 |
void unlock(); | 释放锁 |
Condition newCondition(); | 获取等待组件,等待组件实现类似于Object.wait()方法的功能 |
然后我们在MutexLock中定义内部类MutexSynchronizer继承AQS类,可以看到MutexLock中的方法本身都没有做任何操作,都是把请求委托给MutexSynchronizer的实例。
为了实现MutexLock中的方法,我们需要调用AQS的acquire、acquireInterruptibly、tryAcquire、tryAcquireNanos、release方法,这几个方法都是独占式获取、释放同步状态的方法,因此子类MutexSynchronizer需要重写和独占同步状态获取、释放相关的钩子操作:tryAcquire、tryRelease。
只需要以上的代码,我们就拥有了一个可以使用的独占锁。可以看到,需要我们自己写的主要就是tryAcquire()和tryRelease()这两个方法,其他的操作,如对获取锁失败线程的阻塞、唤醒,都是AQS替我们实现的。
这一篇就到这里,这篇只介绍作为一个同步组件的构建者,如何使用AQS来构建我们自己的同步组件。
下一篇我们将介绍AQS的底层实现,探究AQS是如何利用内部的同步队列帮我们实现同步组件的。