背景介绍
AQS
AQS(AbstractQueuedSynchronizer
)是JUC下提供的一个同步框架.其名字为抽象队列同步器.
同步器
:AQS是一个同步器,可完成同步互斥功能
队列
:使用双端双向链表
完成同步互斥功能
抽象
:AQS是一个抽象类,具体的实现需要子类实现相关方法做支持,JUC中的ReentrantLock
,CountDownLatch
,Semaphore
都是基于AQS来实现
那AQS又是如何使用队列
完成同步器
功能的呢?在我们已有的知识中有什么机制可以完成同步器
功能呢?
答案是:信号量,更准确点说是记录型信号量,AQS就是记录型信号量的Java语言实现
记录型信号量
了解了AQS的基本原理之后,我们就一起回忆下记录型信号量的逻辑,记录型信号量由PV操作组成
P操作
- 将资源数减一
- 若资源数减一后小于0,将当前线程加入队列,挂起当前线程
V操作
- 将资源数加一
- 若资源数加一后小于等于0,说明队列中有线程存在,则将当前线程移出队列,唤醒当前线程
至于AQS如何实现PV操作?AQS实现的PV操作与传统记录型信号量有何异同?在下文继续分析
AbstractOwnableSynchronizer
AQS继承了AbstractOwnableSynchronizer
,AbstractOwnableSynchronizer
非常简单,只有一个exclusiveOwnerThread
属性和相应的get和set方法
exclusiveOwnerThread
:独占模式下持有同步器资源的线程
AbstractQueuedSynchronizer
AQS属性
由背景介绍可知,AQS是记录型信号量的Java语言实现,所以AQS中有一属性state
与记录型信号量的资源数含义基本一致,但state
的具体含义由子类确定;记录型信号量中使用一个队列记录挂起的线程, 所以AQS中有head
和tail
分别指向双端链表的头尾结点.
因此AQS中共有state
,head
,tail
三个属性.
AbstractQueuedSynchronizer.Node
由上文知:AQS维护一个双端双向队列
,那队列中的每个结点表示什么含义呢?
- 每个结点表示一个
等待获取锁资源
的线程
Node就是AQS队列中的结点
- 所以Node的第一个属性就是
thread
,代表被Node封装的线程 - 此外Node中还有一个
waitStatus
代表Node所处的状态,共取5个值- static final int CANCELLED = 1;//
- static final int SIGNAL = -1;//结点状态:该结点下一个结点需要unparking(唤醒)
- static final int CONDITION = -2;//结点状态:在某condition上等待
- static final int PROPAGATE = -3;
- 0//表示Node为初始状态
- 由于Node是双端链表的结点,有
prev
和next
分别表示前一个结点和后一个结点. - Node还有一个属性
nextWaiter
,与next的区别在于
nextWaiter`表示下一个等待在Condition上的结点(分析Condition时再分析)
重要方法
AQS中有多个公共方法,有是否可中断
之分,有是否可设置超时
之分的,主要方法为acquire()
和release()
,acquire()
相当于P操作,release()
相当于V操作,逻辑如下:
acquire()
- 尝试获取资源,获取成功,则将资源数减一
- 若获取失败,则将当前线程加入队列中,挂起当前线程,当V操作唤醒一个线程时,将该线程从队列中移除
releases()
- 尝试释放资源,释放成功,则唤醒一个队列中被阻塞的线程
可以看出acquire()
和release()
和PV操作逻辑还是有一些不同之处:
- AQS有一个尝试获取资源/尝试释放资源的逻辑,而非直接将资源数加一/减一,该方法在AQS中就是
tryAcquire()
/tryRelease()
,由子类实现,子类可以根据不同的实现丰富AQS的语义 - PV操作中V操作除了唤醒队列中被阻塞的线程外,还负责将该线程移出队列,但AQS中将线程加入和移出队列都交由
acquire()
,由acquire()
调用acquireQueued()
(极其重要,后面重点分析)完成
我们首先看一下acquire()
的实现
AQS将尝试获取资源抽象为tryAcquire()
,由子类实现;将当前线程加入等待队列中抽象为addWaiter()
,将挂起当前线程,当V唤醒一个线程时,将该线程从队列中移除抽象为acquireQueued()
;将逻辑1和逻辑2综合抽象为if (!tryAcquire(arg) &&acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
/** * 该方法在独占模式下使用,忽略中断 * 若第一次tryAcquire()成功:则执行一次tryAcquire()后返回 * 若第一次tryAcquire()失败:将当前线程入队,而后重复调用tryAcquire()直到成功,期间当前线程将重复挂起和唤醒 * * @param arg the acquire argument. arg将传递给 tryAcquire(),但是arg的含义由子类实现确定,与state的含义一致 */
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()
由子类实现,分析到具体子类时再详细介绍该函数,现在看下addWaiter()
和acquireQueued()
的实现
/** * 根据参数mode创建结点,并将其入队 * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */
private Node addWaiter(Node mode) {
//根据mode生成新节点,在new Node(mode)时将node.thread设置为当前线程
Node node = new Node(mode);
//死循环+CAS
for (;;) {
Node oldTail = tail;
if (oldTail != null) {
//将当前结点尾插法插入AQS队列中
node.setPrevRelaxed(oldTail);
if (compareAndSetTail(oldTail, node)) {
oldTail.next = node;
return node;
}
} else {
//初始化AQS队列,建立头结点.head.thread为null,head.waitStatus=0
initializeSyncQueue();
}
}
}
到了最重点的acquireQueued()
了,由于AQS是一个同步器,而同步器最起码牵涉到两个线程,我们规定下文分析的场景如下:
- 只有A,B两个线程争抢一个锁资源
- A线程第一次调用tryAcquire()成功,获取锁;B线程第一次调用tryAcquire()失败,进入acquireQueued()
- A线程一直占用锁资源,则B线程调用tryAcquire()会一直失败
/** * 该方法运行在独占非中断模式下 * 1.为已经在AQS队列中的线程执行acquire * 2.为Condition.await()执行acquire(牵涉到Condition,日后分析) * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */
final boolean acquireQueued(final Node node, int arg) {
//此时B线程进入该方法
//根据addWaiter()可知,此时AQS队列中有两个节点,第一个结点为头结点,无意义;第二个结点为封装B线程的Node结点
try {
boolean interrupted = false;
for (;;) {
//此时p即为head
final Node p = node.predecessor();
//完成"当V唤醒一个线程时,将该线程从队列中移除"逻辑
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
return interrupted;
}
//完成"挂起当前队列"逻辑
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
由上文场景可知,B一直无法获取锁资源,则需要挂起B线程,先分析下挂起当前线程是如何实现的.挂起当前线程由shouldParkAfterFailedAcquire()
和parkAndCheckInterrupt()
两个方法完成
-
shouldParkAfterFailedAcquire()
- 方法名字为
判断当tryAcquire()失败时是否应该挂起当前线程,其实该方法的一些逻辑我也尚未完全明白,但可以假想为该方法都返回true
当shouldParkAfterFailedAcquire()
返回true后,由于两个方法由&&
连接,则继续执行parkAndCheckInterrupt()
-
parkAndCheckInterrupt()
- 根据该方法名字可知该方法进行挂起当前线程和检查是否中断的逻辑
/** * Convenience method to park and then check if interrupted. * * @return {@code true} if interrupted */
private final boolean parkAndCheckInterrupt() {
//利用LockSupport挂起当前线程
LockSupport.park(this);
return Thread.interrupted();
}
执行到LockSupport.park(this)
时B线程已经被挂起,此时acquireQueued()
已经完成挂起当前线程,那当V唤醒一个线程时,将该线程从队列中移除又是如何完成的呢?什么时候B线程又会被唤醒呢?
后一个问题比较简单:B线程会在A线程调用release()
时被唤醒
让我们先看下release()
的实现
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
//唤醒后续结点
unparkSuccessor(h);
return true;
}
return false;
}
由于AQS队列有一个无意义的头结点,所以头结点的下一个结点就是第一个被挂在AQS队列上的线程,unparkSuccessor(h)
唤醒AQS队列上的结点.
release()
比较简单,已经分析完毕,让我们回到acqurieQueued()
,分析acquiredQueued()
是如何实现当V唤醒一个线程时,将该线程从队列中移除的呢?
final boolean acquireQueued(final Node node, int arg) {
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
//tryAcquire(arg)成功,将被唤醒的结点移出AQS队列,return
setHead(node);
p.next = null; // help GC
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} catch (Throwable t) {
cancelAcquire(node);
throw t;
}
}
在parkAndCheckInterrupt()
中B线程被挂起,在A线程的release()
中B线程被唤醒,则B线程继续执行,由于是在死循环中,则会继续执行if (p == head && tryAcquire(arg))
,此时由于A线程已经释放锁资源,则B线程tryAcquire()
成功,将被唤醒的结点移出AQS队列后return
总结
至此,AQS源码已大致分析完毕
- AQS就是记录型信号量的Java语言实现
acquire()
和release()
分别对应PV操作,具体语义有子类实现tryAcquire()
,tryRelease()
确定- AQS的
release()
较为简单,大部分逻辑由acquire()
实现- 当前线程加入队列中–>
addWaiter()
- 挂起当前线程–>
acquireQueued()
挂起当前线程前的代码 - 当V唤醒一个线程时,将该线程从队列中移除–>
acquireQueued()
挂起当前线程后的代码
- 当前线程加入队列中–>
不足
- 博客大体介绍AQS整体思路,每个方法的具体细节需要查看源码
- 未分析Node.waitStatue的作用
- 一直将AQS当做一个锁来进行分析,但是AQS语义由子类确定,可以实现
计数器(CountDownLatch)
等语义 - 未分析AQS中公平模式,可中断模式和定时模式的实现方法,三种模式的实现分析见JUC-ReentrantLock源码分析
- 未分析AQS的独占和共享模式.共享模式见JUC-CountDownLatch源码分析