AQS初体验
AQS是AbstractQueuedSynchronizer的简称。AQS提供了一种实现阻塞锁和一系列依赖FIFO等待队列的同步器的框架。所谓框架,AQS使用了模板方法的设计模式,为我们屏蔽了诸如内部队列等一系列复杂的操作,让我们专注于对锁相关功能的实现。
获取锁
既然涉及到锁竞争的问题,必然需要一个标志位来表示锁的状态,AQS中提供了state这样一个成员变量,为了安全的操作state,我们需要使用原子操作。将state从0修改为1就代表这个线程已经持有了这把锁。
但竞争锁的线程绝对不会只是一个,其他未竞争到锁的线程该如何进行处理?
第一个答案可能是重试,重试虽好,但是可不能贪杯,如果竞争很严重,无数的线程在不断的重新尝试获取锁,我们的CPU早晚会吃不消。
第二个比较好的方式就是排队,持有锁的线程释放锁之后,通知下一个线程去获取锁,避免了不必要的CPU损失。但是值得注意的是,即使是从队列中被唤醒的线程去获取锁也依旧可能获取不到的,因为无时无刻都有新加入的线程来竞争锁。
AQS实际上就是使用了双端队列来解决了这个问题的。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
tryAcquire()如果失败将执行acquireQueued()中的addWaiter()方法,即尝试加入等待队列。这个等待队列使用了双端队列进行实现,在AQS中定义了一个Node的数据结构,AQS中维护着head和tail两个成员变量。
在单线程中插入队列尾部很简单,只需要将原来的tail的next指向新插入的节点,并且将tail重新设置为新插入的节点。但是在多线程环境中,很有可能发生多个线程同时插入尾部的现象,而上述的插入过程不具有原子性,同时插入的过程必将出现多个操作顺序的混乱,最终导致等待队列的tail节点
AQS在插入tail节点时使用原子操作来保证了插入的可靠。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
插入成功的直接返回了node,而没有插入成功的则执行了enq()函数,在enq()中使用了CAS进行插入。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
经历这个CAS插入,最后全部的节点都将被插入到队列尾部。
现在,没有获取到锁的线程已经被放进队列了,但是放入队列也代表着我们可以忘了初心。我们的目标是获取锁,而不是进入队列。acquireQueued()就在尝试为我们获取锁。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
简单说就是,检查自己是不是head节点的下一个节点,如果是的话,尝试获取获取锁;如果不是的话,将使用LockSupport的park方法阻塞当前线程,避免造成CPU的浪费。
释放锁
释放锁的过程可以分成两大部分:
- 恢复AQS的状态为无锁状态
- 唤醒等待队列中下一个等待的节点
在第一个过程中,没有排在队头的节点都已经被阻塞了,而唤醒的时机就是前一个节点已经释放锁,所以可以说这个等待队列,实际上是一个唤醒链。
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
// 使用unpark唤醒下一个线程
unparkSuccessor(h);
return true;
}
return false;
}
总结
AQS为我们提供了:
- status 状态同步标示
- Node双端队列 存储竞争锁线程
- 基于Node双端队列的线程唤醒机制
我觉得AQS精华在于,将原来N个线程并发竞争锁降低为1+M(新加入)个。在我们自己实现类似的资源竞争算法中,也可以通过加入队列来降低竞争的并发度,降低CPU的负载压力。