Condition接口 应用场景:一个线程因为某个condition不满足被挂起,直到该Condition被满足了。 类似与Object的wait/notify,因此Condition对象应该是被多线程共享的,需要使用锁保护其状态的一致性 示例代码:
class BoundedBuffer { final Lock lock = new ReentrantLock(); final Condition notFull = lock.newCondition(); final Condition notEmpty = lock.newCondition(); final Object[] items = new Object[100]; int putptr, takeptr, count; public void put(Object x) throws InterruptedException { lock.lock(); try { while (count == items.length) notFull.await(); items[putptr] = x; if (++putptr == items.length) putptr = 0; ++count; notEmpty.signal(); } finally { lock.unlock(); } } public Object take() throws InterruptedException { lock.lock(); try { while (count == 0) notEmpty.await(); Object x = items[takeptr]; if (++takeptr == items.length) takeptr = 0; --count; notFull.signal(); return x; } finally { lock.unlock(); } } }
以上代码可以很清楚的看出Condition是如何使用的,后面的BlockingXXX类型的数据结构都会使用到Condition。 在使用signal(类似于notify)通知的时候需要实现按照什么样的顺序来通知。 三种等待方式:不中断,一定时间间隔,等到某个时间点
Lock和ReadWriteLock 两个接口,后者不是前者的子接口,通过以下ReadWriteLock的代码就可以看出两者的联系了:
public interface ReadWriteLock { /** * Returns the lock used for reading. * * @return the lock used for reading. */ Lock readLock(); /** * Returns the lock used for writing. * * @return the lock used for writing. */ Lock writeLock(); }
两个接口都有一个可重入(ReentrantLock, ReentrantReadWriteLock)的实现,后面分析
LockSupport 工具类,操作对象是线程,基于Unsafe类实现。 基本操作park和unpark。park会把使得当前线程失效(没有提供操作其他线程的,其实是可以实现的),暂时挂起,直到出现以下几种情况中的一种: 1)其他线程调用unpark方法操作该线程 2)该线程被中断 3)park方法立刻返回 关于blocker,线程挂起的同步对象,blocker不是必须的,
作用是什么呢? park有三种时间类别的调用 public static void park()
public static void parkNanos(long nanos)
public static void parkUntil(long deadline) 上面三个方法有对应的重载方法,就是加一个blocker对象作为参数 public static void park(Object blocker)
public static void parkNanos(Object blocker, long nanos)
public static void parkUntil(Object blocker, long deadline) 关于unpark public static void unpark(Thread thread) 开始的时候不明白为什么没有public static void unpark()操作当前线程,后来一想,一个线程park的时候已经被block了,没有可能调用unpark来自救的。
AbstractOwnableSynchronizer, AbstractQueuedSynchronizer, AbstractQueuedLongSynchronizer 后两者是第一个类的子类。 最后一个类是从JDK6才开始出现的,还没有具体实现的子类 中间一个类的子类实现会在可重入锁里面 AbstractOwnableSynchronizer只是实现了被线程独占这些功能的Synchronizer,并不包含如何管理实现多个线程的同步。包含了一个exclusiveOwnerThread,set/get方法。 AbstractQueuedSynchronizer利用Queue的方式来管理线程关于锁的使用和同步,相当于一个锁的管理者。 首先关注四个最核心的方法: protected boolean tryAcquire(int arg)
protected boolean tryRelease(int arg)
protected int tryAcquireShared(int arg)
protected boolean tryReleaseShared(int arg) 前两个用于独占锁,后两者用于共享锁,这四个方法是由子类来实现的,即如何获取和释放锁AbstractQueuedSynchronizer是不参与的,默认实现是不支持,即抛出UnsupportedOperationException。 AbstractQueuedSynchronizer做什么呢? 当 前线程尝试获取锁的时候,AbstractQueuedSynchronizer会先调用tryAcquire或者tryAcquireShared来尝 试获取,如果得到false,那么把当前线程放到等待队列中去,然后再做进一步操作。我们来分析以下6种情况,前三种用于独占锁,后三者用于共享,独占锁 或者共享锁按照等待方式又分为三种:不可中断线程等待,可中断线程等待,尝试限时等待超时放弃。 这6种的方法都含有一个int类型的参数,这个是给上面的tryAcquire这种方法使用的,也就是说它一个自定义的参数,一般用来表示某个自定义的状态。 1) 独占锁,放入队列后,直到成功获取锁,会忽略线程的中断
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
注意:队列中的header节点是个dummy节点,真正等待的第一个节点是header后面的节点。
但是在acquire成功后中,如果发现这是第一个等待节点,dummy的header会被设置成这个节点,但是
prev和thread是null,相当于把原来的dummy的header移走,换成一个新的dummy的Node。 addWaiter()方法把当前线程加入到等待队列中去,返回一个Node对象 acquireQueued()方法会监控Node对象在队列中的变化,如果检测到线程中断,返回true,否则返回false. 如果等待期间检测到中断信号,也就是acquireQueued返回了true,会用selfInterrupt中断当前线程。 acquireQueued源代码
final boolean acquireQueued(final Node node, int arg) { try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); //检测自己是否已经排到第一个了 if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return interrupted; } //shouldParkAfterFailedAcquire的作用检测我是不是需要安心地等,如果是的话, //就调用parkAndCheckInterrupt进入等待,等待结束后会返回线程是否已经中断 if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } }
shouldParkAfterFailedAcquire的源代码
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) { int ws = pred.waitStatus; if (ws == Node.SIGNAL) //SIGNAL的意思就是说前面那个家伙在释放锁以后会告诉我的,我安心等就是了 return true; if (ws > 0) { /* * ws大于0的意思就是这个Node已经被取消了,需要跳过,并且从队列中清除出去 * 这里会清除我前面所有这种类型的Node */ do { node.prev = pred = pred.prev; } while (pred.waitStatus > 0); pred.next = node; } else { /* * 这里ws是0或者PROPAGATE,表示他是傻乎乎的家伙,还不知道SIGNAL规则 * 0是独占锁,PROPAGATE是共享锁,compareAndSetWaitStatus会找人把他设成SIGNAL * 状态,(成功与否未知,所以返回false)参见上面关于SIGNAL的解释 */ compareAndSetWaitStatus(pred, ws, Node.SIGNAL); } return false; }
2) 独占锁,放入队列后,直到成功获取锁或者遇到中断
public final void acquireInterruptibly(long arg) throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); if (!tryAcquire(arg)) doAcquireInterruptibly(arg); }
这里与1)的区别在于调用doAcquireInterruptibly,而实际上doAcquireInterruptibly和acquireQueued区别很小,前者不会中断,仅此而已,参看红色部分,发现中断,直接break,然后取消获取锁的打算。 doAcquireInterruptibly的源代码
private void doAcquireInterruptibly(long arg) throws InterruptedException { final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); }
3)限时 doAcquireNanos的源代码
private boolean doAcquireNanos(long arg, long nanosTimeout) throws InterruptedException { long lastTime = System.nanoTime(); final Node node = addWaiter(Node.EXCLUSIVE); try { for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC return true; } if (nanosTimeout <= 0) { cancelAcquire(node); return false; } // 多次parkNanos,计算实际耗费的时间才是安全的做法 if (nanosTimeout > spinForTimeoutThreshold && shouldParkAfterFailedAcquire(p, node)) LockSupport.parkNanos(this, nanosTimeout); long now = System.nanoTime(); nanosTimeout -= now - lastTime; lastTime = now; // 如果线程被中断,不好意思,要抛出异常的 if (Thread.interrupted()) break; } } catch (RuntimeException ex) { cancelAcquire(node); throw ex; } // Arrive here only if interrupted cancelAcquire(node); throw new InterruptedException(); }
注意:计算的安全的做法不是一次等待,立刻超时,因为一次等待的时间不一定等于预先设定的值,而是多次等待,累计计算比较安全。
对于4),5),6)的共享锁,做法与独占锁几乎一致。 第一个区别来自于,当排队轮到自己的时候,调用的setHeadAndPropagate方法相对于setHead要复杂一些,这是由于独占锁和共享锁的区别决定的。
setHeadAndPropagate的源代码???
private void setHeadAndPropagate(Node node, long propagate) { Node h = head; // Record old head for check below setHead(node); /* * Try to signal next queued node if: * Propagation was indicated by caller, * or was recorded (as h.waitStatus) by a previous operation * (note: this uses sign-check of waitStatus because * PROPAGATE status may transition to SIGNAL.) * and * The next node is waiting in shared mode, * or we don't know, because it appears null * * The conservatism in both of these checks may cause * unnecessary wake-ups, but only when there are multiple * racing acquires/releases, so most need signals now or soon * anyway. */ if (propagate > 0 || h == null || h.waitStatus < 0) { Node s = node.next; if (s == null || s.isShared()) doReleaseShared(); } }
对于4)来说,doAcquireShared把selfInterrupt()挪到自己的方法内 2) 关于release部分 作用,释放锁,唤醒下一个等待的线程
AbstractQueuedLongSynchronizer AbstractQueuedLongSynchronizer和AbstractQueuedSynchronizer的区别在于acquire和release的arg参数是long而不是int类型。
ReentrantLock 所谓可重入锁,就是当一个thread已经获得一个lock的时候,再次请求该锁的时候,会立即返回。 使用AbstractQueuedSynchronizer的子类(Sync, NonfairSync, FairSync)进行锁获取释放的管理。 state等于0表示当前没有线程占用锁,下面两个获取锁的过程基本类似,共同的过程是 首先检查有没有线程使用该锁,没有的话就占用该并且setState,否则就检查那个占用锁的线程是不是当前线程,如果是的话, 仅仅setState,否则就返回false。 Sync#nonfairTryAcquire
final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
FairSync#tryAcquire
protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (isFirst(current) && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } }
对于FairSync,唯一的不同在于isFirst的调用,而UnfairSync则完全不会检查,谁抢到就是谁的。
final boolean isFirst(Thread current) { Node h, s; return ((h = head) == null || ((s = h.next) != null && s.thread == current) || fullIsFirst(current)); }
isFirst会检查有没有线程排队,如果没有,当前线程就可以获得锁,如果有队列,就看当前线程是不是排第一个。 Sync#tryRelease
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
tryRelease会检查当前线程有没有占有锁,如果不是回抛出异常。接着会从state中减去计数得到新的state,如果state为0表示所有的锁都被释放了。
ReentrantReadWriteLock ReentrantReadWriteLock比较复杂,因为同事管理共享锁(读取锁)和独占锁(写入锁)。 也对应使用AbstractQueuedSynchronizer的子类(Sync, NonfairSync, FairSync)进行锁获取释放的管理。(名字一样但是实现是不同的)。 Sync类 1) Sync的state是32位的,高位的16位是共享锁的状态,低位的16位是独占锁的状态。
/* * Note that tryRelease and tryAcquire can be called by * Conditions. So it is possible that their arguments contain * both read and write holds that are all released during a * condition wait and re-established in tryAcquire. */ protected final boolean tryRelease(int releases) { int nextc = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); // 首先检查独占锁计数,如果是0表示独占锁已经被完全释放,则清除独占锁线程 // 更新状态 if (exclusiveCount(nextc) == 0) { setExclusiveOwnerThread(null); setState(nextc); return true; } else { setState(nextc); return false; } } protected final boolean tryAcquire(int acquires) { /* * Walkthrough: * 1. if read count nonzero or write count nonzero * and owner is a different thread, fail. * 2. If count would saturate, fail. (This can only * happen if count is already nonzero.) * 3. Otherwise, this thread is eligible for lock if * it is either a reentrant acquire or * queue policy allows it. If so, update state * and set owner. */ Thread current = Thread.currentThread(); int c = getState(); int w = exclusiveCount(c); if (c != 0) { // c != 0表示有共享锁或者独占锁存在,w == 0表示没有独占锁 // 那么两个条件同时成立表示有共享锁存在,就无法获得独占锁 // 或者有线程拥有独占锁但不是当前线程,那也无权获得独占锁 // (Note: if c != 0 and w == 0 then shared count != 0) if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w + exclusiveCount(acquires) > MAX_COUNT) throw new Error("Maximum lock count exceeded"); } // 到了这一步,可能有以下几种情况: // 1) c == 0没有任何锁存在(这个时候 w == 0 也成立) // 2) 当前线程拥有独占锁,并且还没到锁的最大限制数 // w == 0是当前线程没有独占锁,属于新申请 // writerShouldBlock是抽象方法,对于FairSync和UnfairSync有不同实现 // 该发现检查当前线程申请独占锁应不应该被阻止 // 对于FairSync,writerShouldBlock会用isFirst检查, // 对于isFirst,如果如果没人排队,或者你是第一个排队的,或者fullIsFirst就返回true // 对于fullIsFirst,不是很理解 // 对于UnfairSync,writerShouldBlock永远返回false,因为没有排队的概念(体现Unfair) if ((w == 0 && writerShouldBlock(current)) || !compareAndSetState(c, c + acquires)) return false; // 获取独占锁成功,设置独占锁线程 setExclusiveOwnerThread(current); return true; } // 这里使用HoldCounter类型的ThreadLocal变量存储当前线程拥有的共享锁的计数 // cachedHoldCounter缓存最近一次成功获取共享锁的线程的ThreadLocal变量 protected final boolean tryReleaseShared(int unused) { HoldCounter rh = cachedHoldCounter; Thread current = Thread.currentThread(); if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); // tryDecrement()返回拥有的共享锁的计数,大于0则并且更新计数(减1)。 if (rh.tryDecrement() <= 0) throw new IllegalMonitorStateException(); // 更新共享锁的计数 for (;;) { int c = getState(); int nextc = c - SHARED_UNIT; // 高位的共享锁计数减一 if (compareAndSetState(c, nextc)) return nextc == 0; } } protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail * 2. If count saturated, throw error * 3. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 4. If step 3 fails either because thread * apparently not eligible or CAS fails, * chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); // 其他线程正在使用独占锁 if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; // 共享锁计数到达最大限制 if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); // 类似于writerShouldBlock,readerShouldBlock是抽象方法,有不同实现, // 检查是不是阻止当前线程共享锁的申请 // 对于UnfairSync,为了防止独占锁饿死的情况,如果发现队列中第一个排队的是独占锁申请, // 就是block当前共享锁的申请 // 对于FairSync,同样使用isFirst检查当前线程 if (!readerShouldBlock(current) && compareAndSetState(c, c + SHARED_UNIT)) { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) cachedHoldCounter = rh = readHolds.get(); rh.count++; return 1; } // 针对CAS失败或者一些不太常见的失败的情况 // 思想:实现常规版本和完整版本(包含所有情况),在常规版本失败的情况下调用完整版本, 提高效率 return fullTryAcquireShared(current); }
fullTryAcquireShared 增加计数缓存,以及红色部分
/** * Full version of acquire for reads, that handles CAS misses * and reentrant reads not dealt with in tryAcquireShared. */ final int fullTryAcquireShared(Thread current) { /* * This code is in part redundant with that in * tryAcquireShared but is simpler overall by not * complicating tryAcquireShared with interactions between * retries and lazily reading hold counts. */ HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != current.getId()) rh = readHolds.get(); for (;;) { int c = getState(); int w = exclusiveCount(c); // 红色部分表示没有占用共享锁,新申请共享锁 if ((w != 0 && getExclusiveOwnerThread() != current) || ((rh.count | w) == 0 && readerShouldBlock(current))) return -1; if (sharedCount(c) == MAX_COUNT) throw new Error("Maximum lock count exceeded"); if (compareAndSetState(c, c + SHARED_UNIT)) { cachedHoldCounter = rh; // cache for release rh.count++; return 1; } } }
关于ReadLock和WriteLock 调用AbstractQueuedSynchronizer对于的共享锁/独占锁的acquire和release的方法来实现 Lock可以看出是没有只有独占锁的锁
Node#nextWaiter null, 表示独占锁的Node SHARED, 表示共享锁的Node 其他, 某个条件下的下一个等待者