AbstractQueuedSynchronizer
先分析下同步器AbstractQueuedSynchronizer
,这个是用于锁实现的类,ReentrantLock就用到了它
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
ReentrantLock
的lock()
方法就是调用acquire(int arg)
去做事情的。
其中protected boolean tryAcquire(int arg)
是留给子类去实现的,所以这里是采用了模板设计模式。这个方法简单来说就是去获取锁。
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
addWaiter
方法简单来说是构造节点,然后用CAS
的方式把这个节点加入同步器中节点链表的尾巴。
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
//如果前驱结点是头节点的话,那么尝试获取锁(也就是同步状态)
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//如果失败的话
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
acquireQueued()
方法中,当前线程在“死循环”中尝试获取同步状态,并且只有前驱节点是头节点才能够尝试获取同步状态。
/*
* 查询是否有线程比当前线程更早地请求获取锁
*/
public final boolean hasQueuedPredecessors() {
// The correctness of this depends on head being initialized
// before tail and on head.next being accurate if the current
// thread is first in queue.
Node t = tail; // Read fields in reverse initialization order
Node h = head;
Node s;
//如果h==t,返回false,表示没有。h==t的时候要么没有等待者,要么只有一个。当只有一个的时候,
//那么这一个节点肯定是首节点,而首节点中的线程必定是获取了锁的。
// h!=t&&((s = h.next) == null || s.thread != Thread.currentThread())
//如果h!=t,则进入后面的判断
//当头节点的后继节点为null,但是这个时候tail为null(head和tail节点都是懒初始
//化),或者头节点的后继节点不为null,但是头节点的后继节点中的线程不是当前线程,则返回true
return h != t &&
((s = h.next) == null || s.thread != Thread.currentThread());
}
如果等待队列为空,或者只有首节点,或者首节点的后继节点中的线程是当前线程,那么当前线程就可以去获取同步状态
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
/*
*如果这个节点的waitStatus已经被标记为SIGNAL(-1)了的话,那么
*这个节点就需要park,所以方法返回true
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
/*
* 该节点的前驱节点如果被取消了(waitStatus为1时表示取消状态,目前只有这个状态的值大于
* 0),那么跳过前驱节点(通过死循环的方式把前驱结点的前驱结点设置为自己的前驱结点)。然
* 后退出if条件语句,调到末尾,返回false,表示自己还可以抢救一下,可以进行获取的锁的尝
* 试,而不是park。
*
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
/*
* 如果以上情况都不成立的话,那么就把自己的waitStatus标记为SIGNAL,返回true,表示自己要
* park,战略性投降。
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
ReentrantLock
public class ReentrantLock implements Lock, java.io.Serializable {
.....
abstract static class Sync extends AbstractQueuedSynchronizer
/**
public ReentrantLock() {
sync = new NonfairSync();
}
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
.....
}
ConditionObject
await()
/**
* Implements interruptible condition wait.
* <ol>
* <li> If current thread is interrupted, throw InterruptedException.
* <li> Save lock state returned by {@link #getState}.
* <li> Invoke {@link #release} with saved state as argument,
* throwing IllegalMonitorStateException if it fails.
* <li> Block until signalled or interrupted.
* <li> Reacquire by invoking specialized version of
* {@link #acquire} with saved state as argument.
* <li> If interrupted while blocked in step 4, throw InterruptedException.
* </ol>
*/
public final void await() throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
long savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
//线程将会在调用park方法后阻塞,直到被重新唤醒,从condition队列加入同步队列,从await()方
//法中的while循环中退出(isOnSyncQueue(Node node)方法返回true,节点已经在同步队列中),
//进而调用同步器的acquireQueued()方法加入到获取同步状态的竞争中。
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
//成功获取同步状态(或者说锁)之后,被唤醒的线程将从先前调用的await()方法返回,此时该线程已
//经成功地获取了锁。
}
/**
* Invokes release with current state value; returns saved state.
* Cancels node and throws exception on failure.
* @param node the condition node for this wait
* @return previous sync state
*/
final long fullyRelease(Node node) {
boolean failed = true;
try {
long savedState = getState();
if (release(savedState)) {
failed = false;
return savedState;
} else {
//如果释放同步状态失败,就throws exception,在finally那里还会Cancels node
throw new IllegalMonitorStateException();
}
} finally {
//如果释放同步状态失败,就Cancels node
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
signal()
public final void signal() {
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
- 调用
Condition
的signal()
方法,将会唤醒在等待队列中等待时间最长的节点(首节点),在唤醒节点之前,会将节点移到同步队列中。 - 调用该方法的前置条件是当前线程必须获取了锁,可以看到
signal()
方法进行了isHeldExclusively()
检查,也就是当前线程必须是获取了锁的线程。接着获取等待队列的首节点,将其移动到同步队列并使用LockSupport
唤醒节点中的线程。
/**
* Removes and transfers nodes until hit non-cancelled one or
* null. Split out from signal in part to encourage compilers
* to inline the case of no waiters.
* @param first (non-null) the first node on condition queue
*/
private void doSignal(Node first) {
do {
//当没有waiters时进行的一些优化,以便编译器进行方法内联,transferForSignal方法才是重点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
/**
* Transfers a node from a condition queue onto sync queue.
* Returns true if successful.
* @param node the node
* @return true if successfully transferred (else the node was
* cancelled before signal)
把节点从condition队列“传输”到同步队列去。”传输“成功或者在唤醒前节点已经取消,就返回true。
*/
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;
/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
* 通过调用同步器的enq(Node node)方法,等待队列中的头节点线程安全地移动到同步队列。
*/
Node p = enq(node);
int ws = p.waitStatus;
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//当节点移动到同步队列后,当前线程再使用LockSupport唤醒该节点的线程。
LockSupport.unpark(node.thread);
return true;
}