读锁的调用,最终委派给其内部类 Sync extends AbstractQueuedSynchronizer
/** * 获取读锁,如果写锁不是由其他线程持有,则获取并立即返回; * 如果写锁被其他线程持有,阻塞,直到读锁被获得。 */ public void lock() { sync.acquireShared(1); } /** * 以共享模式获取对象,忽略中断。通过至少先调用一次 tryAcquireShared(int) 来实现此方法,并在成功时返回。 * 否则将线程加入队列,在阻塞和运行之间切换,重复调用tryAcquireShared直到成功 *( possibly repeatedly blocking and unblocking, invoking tryAcquireShared until success) */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg);
tryAcquireShared方法的逻辑:
整体思路如下,具体实现细节,请参考下面的源码分析
一 写锁被占用的情况
1 写锁被其他线程占用,获取失败
2 写锁被自己占用,表示“锁降级”,进入【循环cas取读锁】流程
二 写锁没有被占用的情况
1 根据读锁获取策略判断是否阻塞(readerShouldBlock方法)
(1) 公平锁策略:如果当前线程不是同步队列中的第一个节点,则阻塞
(2) 非公平锁策略:为了防止写线程饥饿,如果同步队列中的第一个节点是写线程,则阻塞当前线程。
2 如果需要阻塞,分以下2种情况:
(1)当前线程是读锁重入,则不需要阻塞,进入【循环cas取读锁】流程
(2)当前线程不是读锁重入,则获取失败
3 如果不需要阻塞,进入【循环cas取读锁】流程
三 【循环cas取读锁】
1 tryAcquireShared方法先进行了一次cas取锁操作,如果获取失败,则调用fullTryAcquireShared方法,循环获取。
2 什么是cas?请参考CAS
protected final int tryAcquireShared(int unused) { Thread current = Thread.currentThread(); //c是锁状态为:高位16位表示共享锁的数量,低位16位表示独占锁的数量
int c = getState(); //exclusiveCount(c) 取低16位的值,也就是写锁状态位:不等于0表示写锁被占用
if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) //写锁被其他线程占用,获取读锁失败
return -1; //取高16位的值,读锁状态位
int r = sharedCount(c); //readerShouldBlock()根据读锁获取策略,返回是否阻塞当前读锁获取操作。后面会详细说明此方法
if (!readerShouldBlock() && r < MAX_COUNT &&
//cas修改高16位的读锁状态,即获取读锁
compareAndSetState(c, c + SHARED_UNIT)) { //首次获取读锁
if (r == 0) { //缓存首次获取读锁的线程,及其读锁重入次数
firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { //cachedHoldCounter是最后获取锁的线程的读锁重入次数
HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) //readHolds是缓存了当前线程的读锁重入次数的ThreadLocal //当前线程自然是最后获取锁的线程,故将当前线程的holdCounter赋给cachedHoldCounter
cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) //缓存当前线程的holdCounter //fullTryAcquireShared()方法中,
//获取读锁失败的线程会执行:readHolds.remove(),故此时需要重新设置
readHolds.set(rh); rh.count++; } return 1; } //首次获取读锁失败后,重试获取
return fullTryAcquireShared(current); }
/** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */
final int fullTryAcquireShared(Thread current) { //rh表示当前线程的锁计数器
HoldCounter rh = null; for (;;) { int c = getState(); //写锁被占用
if (exclusiveCount(c) != 0) { //如果其他线程占用,读锁获取失败。如果当前线程占用,表示“锁降级”。
if (getExclusiveOwnerThread() != current) return -1; } else if (readerShouldBlock()) { //重入锁不需要阻塞。 // Make sure we're not acquiring read lock reentrantly
if (firstReader == current) { // assert firstReaderHoldCount > 0; //当前线程就是第一个获取读锁的线程,那么此时当然是重入锁。
} else { if (rh == null) { rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) { rh = readHolds.get(); if (rh.count == 0) //线程阻塞之前,清空readHolds
readHolds.remove(); } } if (rh.count == 0) //当前线程的锁计数器为0,非重入锁,需要阻塞。
return -1; } } if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); //cas设置读锁状态位
if (compareAndSetState(c, c + SHARED_UNIT)) { if (sharedCount(c) == 0) { //缓存首次获取读锁的线程,以及锁计数器
firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { if (rh == null) rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); else if (rh.count == 0) //锁计数器放入ThreadLocal
readHolds.set(rh); rh.count++; //此时rh就是最后获取读锁的线程的锁计数器
cachedHoldCounter = rh; // cache for release
} return 1; } } }
/** * 非公平锁的读锁获取策略 */ final boolean readerShouldBlock() { //为了防止写线程饥饿 //如果同步队列中的第一个线程是以独占模式获取锁(写锁), //那么当前获取读锁的线程需要阻塞,让队列中的第一个线程先执行 return apparentlyFirstQueuedIsExclusive(); } /** * 公平锁的读锁获取策略 */ final boolean readerShouldBlock() { //如果当前线程不是同步队列头结点的next节点(head.next) //则阻塞当前线程 return hasQueuedPredecessors(); }