ReentrantLock 是java继synchronized关键字之后新出的线程锁,今天看了看实现源码。主要是通过自旋来实现的。使用自旋的基本思路就是为所有的线程构建一个node,连成一个队列,然后每一个node都轮询前驱节点,如果前驱已经释放锁了,那么当前阶段就可以获取锁,否则就继续循环。
之前了解过使用ThreadLocal机制实现的自旋锁,但是reentrant lock应该是另一种。
/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
这是加锁方法,先进行一次快速加锁,失败了再进行常规加锁。快速加锁的情景指的是当前没有锁,所以直接CAS原子操作看看能不能获取,也就是if块里的操作,如果没有成功,常规获取,也就是acquire操作。
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
首先执行一次tryAcquire()尝试获取,分为公平和非公平,这里就只看非公平的情况吧
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
首先获取当前线程,然后得到锁此时的state,如果state是0,说明可以争取锁,CAS一下,否则说明锁被用了,但是如果用锁的就是当前线程,就把state加1,获取成功,否则就获取失败。
一旦tryAcquire()返回了false,说明获取锁失败了,就必须进入等待队列中,所以会执行后面的acquireQueued(addWaiter)方法。先看addWaiter
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter所做的是把当前线程加入到等待队列中,也就是把锁的tail变量设置为当前线程,也是先快设置一次,也就是一次CAS,如果成功就返回,否则就执行一次常规加入,即enq操作
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq就是典型的自旋+CAS的实现,因为CAS控制并发是非阻塞的,所以如果一定要让操作执行,必须把CAS放入循环内。所以enq就是一个while循环,不断检测CAS是否成功
一旦加入队列了,剩下的就是执行acquireQueued方法,既然进入队列了,为什么还要执行这个方法?因为需要让这个新加入的节点自旋,也就是让其进入等待状态,或者说让它循环等待前驱阶段是否释放锁
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
可以看到,确实有一个循环,不断检测前驱节点,如果前驱是head(这是一个dump头节点),说明自己已经是真正的头节点了,可以互获锁了,就会持续执行tryAcquire去竞争。这个队列的消费是直接把消费的节点从队列删除,而之前博客的CLH是通过节点的一个状态字来检测的。
可以看到,整个重入锁就是通过自旋+CAS来实现的
获取锁的大致过程如下:
执行一次CAS获取,若不成功则
执行一次tryAcquire获取,若不成功则
把当前节点加入到队列,也是先CAS加入一次,不成功再自旋
自旋检测前驱是否释放锁,并尝试获取
与自旋锁相对应的概念是互斥锁,等待时是通过让线程睡眠实现的。自旋锁适用于占用锁时间比较短的场景,这样线程状态转换的开销相比较与cpu循环,代价会变大,但是随着锁获取时间的增长,cpu的代价会越来越大