《Java源码分析》:ReentrantLock.lock 锁机制
AbstractQueuedSynchronizer这个类真的很难也很复杂,是构建锁以及实现其他相关同步类的基础框架。本来是没有打算来研究下这个类,但是由于想了解锁机制,因此,不可避免的需要研究它。
但是这个类确实很难,自己看了几遍这个类也还是没有过多的看懂,本篇博文只能说是对自己看这个类的一点点理解和记录,若有错,请批评指正。
基本的思想是表现为一个同步器,支持下面两个操作:
1、获取锁:首先判断当前状态是否允许获取锁,如果是就获取锁,否则就阻塞操作或者获取失败,也就是说如果是独占锁就可能阻塞,如果是共享锁就可能失败。另外如果是阻塞线程,那么线程就需要进入阻塞队列。当状态位允许获取锁时就修改状态,并且如果进了队列就从队列中移除。
2、释放锁:这个过程就是修改状态位,如果有线程因为状态位阻塞的话就唤醒队列中的一个或者更多线程。
1、ReentrantLock类的lock()方法
由于锁Lock的实现都是委托给AbstractQueuedSynchronizer来实现的。因此,就将分析ReenterantLock类中如何获取锁和如何释放锁来理解AbstractQueuedSynchronizer的相关知识。
public void java.util.concurrent.locks.ReentrantLock.lock()
API文档对此方法的说明如下:
功能:获取锁。
如果该锁没有被另一个线程保持,则获取该锁并立即返回,并将锁的保持计数器设置为1.
如果当前线程已经保持该锁,则将保持计数加1,并且该方法立即返回。
如果该锁被另一个线程保持,则出于线程调度的目的,禁用该线程,并且在获得锁之前,该线程一直处于休眠状态,此时锁保持计数被设置为1.
从上面的文档可以看出ReentrantLock是可重入锁的实现。而内部是委托java.util.concurrent.locks.ReentrantLock.Sync.lock()实现的。
java.util.concurrent.locks.ReentrantLock.Sync是抽象类,有java.util.concurrent.locks.ReentrantLock.FairSync和java.util.concurrent.locks.ReentrantLock.NonfairSync两个实现,也就是常说的公平锁和不公平锁。
公平锁和非公平锁
如果获取一个锁是按照请求的顺序得到的,那么就是公平锁,否则就是非公平的。
在没有深入了解内部机制及实现之前,先了解下为什么会存在公平锁和非公平锁。
1、公平锁保证一个阻塞的线程最终能够获得锁,因为是有序的,所以总是可以按照请求的顺序获得锁。
2、不公平锁意味着后请求锁的线程可能在其前面排列的休眠线程恢复前拿到锁,这样就有可能提高并发的性能。这是因为通常情况下挂起的线程重新开始与它真正开始运行,二者之间会产生严重的延时。因此非公平锁就可以利用这段时间完成操作。这是非公平锁在某些时候比公平锁性能要好的原因之一。
二者在实现上的区别会在后面介绍,我们先从公平锁(FairSync)开始。ReentrantLock类中公平锁的源码如下。
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;
final void lock() {
acquire(1);//
}
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}
前面说过java.util.concurrent.locks.AbstractQueuedSynchronizer (AQS)是Lock的基础,从上面的源码可以看出,对于一个FairSync而言,lock()就直接调用AQS的acquire(int arg);
AQS类中的acquire(int arg)
API文档对此函数的说明:以独占模式获取对象,忽略中断。通过至少调用一次 tryAcquire(int) 来实现此方法,并在成功时返回。否则在成功之前,一直调用 tryAcquire(int) 将线程加入队列,线程可能重复被阻塞或不被阻塞。
在介绍acquire函数的实现之前,先介绍下AQS的几种方法,这是锁的基础。
对于一个AQS的实现而言,通常情况下需要实现以下方法来描述如何锁定线程。
1、tryAcquire(int) 试图在独占模式下获取对象状态。此方法应该查询是否允许它在独占模式下获取对象状态,如果允许,则获取它。
此方法总是由执行 acquire 的线程来调用。如果此方法报告失败,则 acquire 方法可以将线程加入队列(如果还没有将它加入队列),直到获得其他某个线程释放了该线程的信号。也就是说此方法是一种尝试性方法,如果成功获取锁那最好,如果没有成功也没有关系,直接返回false。
2、tryRelease(int) 试图设置状态来反映独占模式下的一个释放。 此方法总是由正在执行释放的线程调用。释放锁可能失败或者抛出异常,这个在后面会具体分析。
3、tryAcquireShared(int) 试图在共享模式下获取对象状态。
4、tryReleaseShared(int) 试图设置状态来反映共享模式下的一个释放。
5、isHeldExclusively() 如果对于当前(正调用的)线程,同步是以独占方式进行的,则返回 true。
除了tryAcquire(int)外,其它方法会在后面具体介绍。首先对于ReentrantLock而言,不管是公平锁还是非公平锁,都是独占锁,也就是说同时能够有一个线程持有锁。因此对于acquire(int arg)而言,arg==1。在AQS中acquire的实现如下:
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
这个看起来比较复杂,我们分解以下4个步骤。
1、如果tryAcquire(arg)成功,那就没有问题,已经拿到锁,整个lock()过程就结束了。如果失败进行操作2。
2、调用addWaiter方法:将当前线程创建一个独占节点(Node)并且此节点加入CHL队列末尾。进行操作3。
3、自旋尝试获取锁,失败根据前一个节点来决定是否挂起(park()),直到成功获取到锁。进行操作4。
4、如果当前线程已经中断过,那么就中断当前线程(清除中断位)。
下面我们对acquire方法中调用的其它方法一一进行分析。
tryAcquire(acquires)
在AQS类中,这个方法是一个抽象方法。AQS的实现类FairSync,即对于公平锁而言,它的实现源码如下。
此方法思想的大概思想为:首先判断有没有其它线程持有锁,如果没有,则该线程持有锁并立即返回。如果有,则先判断该线程是否为独占线程,如果是则将保持计数以加1并立即返回。否则返回false。
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();//对于AQS存在一个state来描述当前有多少线程持有锁
/* 如果c等于零,则没有线程持有锁,则将锁给当前线程即可 如果c不等于,就先判断此线程有没有拥有锁, */
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {//这里的acquires = 1
setExclusiveOwnerThread(current);//成功后将AQS独占线程设为当前线程
return true;
}
}
else if (current == getExclusiveOwnerThread()) {//判断当前线程是否为AQS的独占线程
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);//修改状态位
return true;
}
return false;
}
分析上面的代码
在这段代码中,前面说明对于AQS存在一个state来描述当前有多少线程持有锁。由于AQS支持共享锁(例如读写锁,后面会继续讲),所以这里state>=0,但是由于ReentrantLock是独占锁,所以这里不妨理解为0<=state,acquires=1。hasQueuedPredecessors是一个很复杂的逻辑,包括踢出无用的节点等复杂过程,这里暂且不提,大体上的意思是说判断AQS是否为空或者当前线程是否在队列头(为了区分公平与非公平锁)。
1、如果当前锁有其它线程持有,c!=0,进行操作2。否则,如果当前线程在AQS队列头部,则尝试将AQS状态state设为acquires(等于1),成功后将AQS独占线程设为当前线程返回true,否则进行2。这里可以看到compareAndSetState就是使用了CAS操作。
2、判断当前线程与AQS的独占线程是否相同,如果相同,那么就将当前状态位加1(这里+1后结果为负数后面会讲,这里暂且不理它),修改状态位,返回true,否则进行3。这里之所以不是将当前状态位设置为1,而是修改为旧值+1呢?这是因为ReentrantLock是可重入锁,同一个线程每持有一次就+1。
3、返回false。
比较非公平锁的tryAcquire实现java.util.concurrent.locks.ReentrantLock.Sync.nonfairTryAcquire(int),公平锁多了一个判断当前节点是否在队列头,这个就保证了是否按照请求锁的顺序来决定获取锁的顺序(同一个线程的多次获取锁除外)。
现在再回头看公平锁和非公平锁的lock()方法。
公平锁只有一句acquire(1);而非公平锁的调用如下:
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
很显然,非公平锁在第一次获取锁,或者其它线程释放锁后(可能等待),优先采用compareAndSetState(0,1)然后设置AQS独占线程而持有锁,这样有时候比acquire(1)顺序检查锁持有而要高效。即使在重入锁上,也就是compareAndSetState(0,1)失败,但是是当前线程持有锁上,非公平锁也没有问题。
addWaiter(mode)
tryAcquire失败就意味着入队列了。此时AQS的队列中节点Node就开始发挥作用了。一般情况下AQS支持独占锁和共享锁,而独占锁在Node中就意味着条件(Condition)队列为空。在java.util.concurrent.locks.AbstractQueuedSynchronizer.Node中有两个常量,
static final Node EXCLUSIVE = null; //独占节点模式
static final Node SHARED = new Node(); //共享节点模式
addWaiter(mode)中的mode就是节点模式,也就是共享锁还是独占锁模式。
前面一再强调ReentrantLock是独占锁模式。
private Node addWaiter(Node mode) {//mode Node.EXCLUSIVE
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
/* 将代表当前线程的节点加入到同步队列的末尾 */
Node pred = tail;
if (pred != null) {//同步队列中有节点
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//同步队列中没有节点,则调用enq方法
enq(node);
return node;
}
上面是节点如队列的一部分。当前仅当队列不为空并且将新节点插入尾部成功后直接返回新节点。否则进入enq(Node)进行操作。
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // Must initialize
/* compareAndSetHead(new Node())方法就是调用的是 unsafe.compareAndSwapObject(this, headOffset, null, update); 即如果头结点为空,则用新节点更新 */
if (compareAndSetHead(new Node()))
tail = head;
} else {//在尾结点不为空之后,则将node节点加入到队列中去
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
enq(Node)去队列操作实现了CHL队列的算法,如果为空就创建头结点,然后同时比较节点尾部是否是改变来决定CAS操作是否成功,当且仅当成功后才将为不节点的下一个节点指向为新节点。可以看到这里仍然是CAS操作。
acquireQueued(node,arg)
自旋请求锁,如果可能的话挂起线程,直到得到锁,返回当前线程是否中断过(如果park()过并且中断过的话有一个interrupted中断位)。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
/* 如果当前节点是AQS队列的头结点(如果第一个节点是DUMP节点也就是傀儡节点, 那么第二个节点实际上就是头结点了), 就尝试在此获取锁tryAcquire(arg)。如果成功就将头结点设置为当前节点 */
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//检测当前节点是否应该park(),如果应该park()就挂起当前线程并且返回当前线程中断位。进行操作1。
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
下面的分析就需要用到上节节点的状态描述了。acquireQueued过程是这样的:
1、如果当前节点是AQS队列的头结点(如果第一个节点是DUMP节点也就是傀儡节点,那么第二个节点实际上就是头结点了),就尝试在此获取锁tryAcquire(arg)。如果成功就将头结点设置为当前节点(不管第一个结点是否是DUMP节点),返回中断位。否则进行2。
2、检测当前节点是否应该park(),如果应该park()就挂起当前线程并且返回当前线程中断位。进行操作1。
一个节点是否该park()是关键,这是由方法java.util.concurrent.locks.AbstractQueuedSynchronizer.shouldParkAfterFailedAcquire(Node, Node)实现的。
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
代码分析
1、如果前一个节点的等待状态waitStatus==SIGNAL(SIGNAL的含义为表示当前节点的后继节点包含的线程需要运行),即也就是前面的节点还没有获得到锁,那么返回true,表示当前节点(线程)就应该park()了。否则进行2。
2、如果前一个节点的等待状态waitStatus>0,也就是前一个节点被CANCELLED了,那么就将前一个节点去掉,递归此操作直到所有前一个节点的waitStatus<=0,进行4。否则进行3。
3、前一个节点等待状态waitStatus=0 或者是PROPAGATE,修改前一个节点状态位为SINGAL,表示后面有节点等待你处理,需要根据它的等待状态来决定是否该park()。进行4。
4、返回false,表示线程不应该park()。
selfInterrupt()方法
private static void selfInterrupt() {
Thread.currentThread().interrupt();
}
如果线程曾经中断过(或者阻塞过)(比如手动interrupt()或者超时等等,那么就再中断一次,中断两次的意思就是清除中断位)。
参考资料
1、http://www.blogjava.net/xylz/archive/2010/07/07/325410.html (好资料,强烈推荐)