基本概念
首先我们需要知道两个概念:AQS、CHL 队列。
AQS:即 AbstractQueuedSynchronizer 类,它是整个 J.U.C 框架的核心所在。其中之一的功能就是将线程封装在一个节点里面,不同的节点通过连接形成了一个 CHL 队列。
CHL 队列:它是一个非阻塞的 FIFO 队列,也就是说在并发条件下往此队列做插入或移除操作不会阻塞,它通过自旋锁和 CAS 保证节点插入和移除的原子性,实现无锁快速插入。
CHL 队列比起原来的 CLH 锁已经做了很大的改造,主要从两方面进行了改造:节点的结构与节点等待机制。
在结构上引入了头结点和尾节点,他们分别指向队列的头和尾,尝试获取锁、入队列、释放锁等实现都与头尾节点相关,并且每个节点都引入前驱节点和后后续节点的引用。
在等待机制上由原来的自旋改成阻塞唤醒。
节点元素
在 AQS 中,CHL 队列实际是由节点组成的链表。那么就需要知道它的节点构成。首先来看它的一个静态内部类:
static final class Node {
// 省略部分代码...
volatile Node prev; // 前节点
volatile Node next; // 后节点
volatile int waitStatus;// 等待状态
volatile Thread thread; // 节点线程
Node nextWaiter; // 节点模式
Node() { }
Node(Thread thread, Node mode) {
this.nextWaiter = mode;
this.thread = thread;
}
Node(Thread thread, int waitStatus) {
this.waitStatus = waitStatus;
this.thread = thread;
}
}
Node,即节点,它是组成链表的基本元素。构成一个节点需要指定线程、节点等待状态(模式)。一个完整的节点如下图所示:
m :即 nextWaiter(mode),表示节点模式。
t :即 thread,表示节点存放的线程。
w :即 waitStatus,表示节点的等待状态。
在 AQS 中,Node 的模式有共享和独占两种,它们在 Node 类中的定义如下:
// 共享模式,表示线程要获取的是共享锁,即一个锁可以被不同的线程拥有
static final Node SHARED = new Node();
// 独占模式,表示线程要获取的独占锁,即一个锁只能被一个线程拥有
static final Node EXCLUSIVE = null;
而 Node 的等待状态则有五种,同样在 Node 类中定义:
// 表示当前节点的后续节点中的线程通过 park 被阻塞了,需要通过unpark解除它的阻塞
static final int SIGNAL = -1;
// 表示当前节点在 condition 队列中
static final int CONDITION = -2;
// 共享模式的头结点可能处于此状态,表示无条件往下传播
// 引入此状态是为了优化锁竞争,使队列中线程有序地一个一个唤醒
static final int PROPAGATE = -3;
//表示当前节点的线程因为超时或中断被取消了
static final int CANCELLED = 1;
// 关键 -> 0 表示初始化状态
Node 的等待状态,因为大于 0 的只有 CANCELLED 一种状态。因此在很多地方也用 waitStatus > 0 表示该状态。
Sync 等待队列
Sync 等待队列也称同步等待队列,该队列是 CHL 队列。
- 在 JUC 框架中,若有线程尝试获取锁时失败时,该线程会被包装成节点添加进此队列,也称入队。
- 在 JUC 框架中,若有线程释放锁时,等待队列的头节点会从队列中移除,表示不用再等待,也称出队。
1.入队操作
在 AQS 中通过 addWaiter 方法将节点加入等待队列:
// 头节点
private transient volatile Node head;
// 尾节点
private transient volatile Node tail;
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
Node pred = tail;
// 为空表示等待队列为空
if (pred != null) {
node.prev = pred;
// 通过 CAS 操作设置 tail 为 node
// 关键 ->失败表示在这期间有其他线程的节点被设置为新的尾节点
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
// 关键 -> 当[等待队列]为空,或者新节点入队失败时(说明存在并发),代码才会执行到这
enq(node);
return node;
}
// 往等待队列中(尾部)插入一个节点
private Node enq(final Node node) {
// 关键 -> 自旋直至成功
for (;;) {
Node t = tail;
// t 为空表示等待队列为空
if (t == null) {
// 构建等待队列的头节点,实质是创建一个空的循环双链表
if (compareAndSetHead(new Node())){
tail = head;
}
} else {
// 设置该节点为新的尾节点
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
分析以上代码可知,入队操作只与队列的尾节点有关,通过原子操作将新节点设置成新的尾节点。若操作失败则通过不断自旋直至成功。
下图表示一个刚初始化的等待队列:
添加节点到等待队列的过程如下:
2.出队操作
出队操作跟头节点有关,将要执行出队的节点设置为新的头节点,并置空旧的头节点从等待队列移除即可。
if (p == head && tryAcquire(arg)) {
// 设置新的头节点
setHead(node);
// 将旧头节点后指针置空,表示从等待队列移除
p.next = null;
failed = false;
return interrupted;
}
private void setHead(Node node) {
head = node;
node.thread = null;
node.prev = null;
}
整个过程如下:
- 设置新的 head
- 清除节点上的线程
- 将旧的头节点从等待队列移除( 断开前指针和后指针)