5.1 Lock接口
并发编程安全性需要使用到锁,synchronized是一种隐式的获得与释放锁的关键字,除此之外还有Lock接口及其实现类,该接口及实现类提供了显示获取和释放锁的方式。
除了上述编程时的区别外,在使用时Lock与synchronized的区别主要有以下三点:
- 非阻塞的获取锁。一个线程在没有获得到锁的时候不是阻塞被挂起
- 获得锁的线程可以被中断。一个线程在获得锁之后可以响应中断,此时该线程会释放所获得的锁。
- 超时获取锁。线程在获取锁的时候可以指定超时时间
Lock作为一个接口,定义了实现了锁的类应该实现何种方法,它是面向锁的使用者。
5.2 队列同步器
AQS,是并发包下用来构建其他类的一个基础类。一般锁的类是是实现了Lock接口并定义了一个内部类,该内部类继承了AQS。
5.2.1 队列同步器的接口与实例
同步器里提供了很多的方法,从程序员的角度可以分为三个类别:1、原子方法 2、可重写的方法 3、模板方法
原子方法有三个getState setState compareAndSetState ,AQS内部最重要的一个变量volatile state存储了当前队列锁的状态,上述三个操作都分别是获取 修改 和CAS修改state变量。get set方法就是简单的获取state变量,其中没有加任何锁。compareAndSetState调用了UNSAFE包下的CAS去修改state保证了修改的原子性。
第二类是可重写的方法,AQS的模板方法里调用了这些可重写方法。
模板方法就比较多了。比如我要实现了一个独占的锁,我就要使用模板方法去重写tryAcquire和tryRelease。模板方法里的方法也可以分为三类:1、独占式的获取和释放锁的状态 2、共享式的获取和释放锁的状态 3、查看同步队列里的的等待线程的状态
5.2.2 队列同步器的实现分析
1、同步队列
AQS名字里就有个Queue,AQS在内部维护了一个双端链表,每一个尝试获取锁失败的线程都会和一些额外的信息一起构成一个Node然后加入内部的Queue里,同时当前线程会被阻塞。当同步状态释放的时候会把Queue首部节点指向的线程释放并让其尝试再次获取同步状态。
一个双端队列里的Node至少有两个结构:指向前面节点的pre和后面节点的next。在这种情况下还需要一个指向Thread的引用。因为无论是互斥锁还是非互斥锁,还是处于不同状态的锁都是用这个队列来维护的,所以一个节点要包含线程的不同等待状态,因而Node里有一个字段waitStatus用来记录当前线程的等待状态。还要维护一个等待队列,用nextWaiter来表示。
AQS持有队列的头结点和尾节点的引用,所谓持有引用指的是AQS的属性里有head tail两个指针指向双端链表的头和尾。锁的获取、释放都是通过操作这个双端链表来实现的。
2、独占式同步状态的获取与释放
所谓独占式同步状态只只能有一个线程获得锁,是通过acquire方法来实现的。
/** * Acquires in exclusive mode, ignoring interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. */ public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
看到这里的时候我对AQS三种类型的方法有了更深的理解。这个方法里面的tryAcquire在AQS里是一个protected的方法,需要写一个子类继承并重写,但是后面的acquireQueued等方法是AQS重新写好的。回顾上面的AQS三种分类,我们要重写的方法都一些AQS模板方法里需要使用的但是AQS却没有写好的方法,换言之AQS里的模板方法是一些半成品方法。所以我就更理解了为什么书里把AQS类称为框架,作为一个框架AQS要求所有没有获得锁的线程都要被加入一个队列里,这是框架封装的逻辑,但是如何获得锁是由tryAcquire来决定的,AQS并么有实现,这就很有框架的感觉。
if语句里做了三件事情:1、尝试获取锁 2、失败了就用当前线程建立一个Node 3、 把Node加入Queue中
进入addWaiter方法,首先新建一个exclusive的mode代表当前线程是独占获取锁失败的线程,然后尝试把新建的Node用CAS放到Queue里即放到tail的后面。但是没有看出来fast path的优化体现在什么地方。AQS里的队列使用的是尾插法,这也是AQS里持有head tail两个引用的原因,tail提高了尾插的效率,不用每次都遍历链表。enq里面是一个死循环CAS操作,有一点自旋锁的味道。
private Node addWaiter(Node mode) { Node node = new Node(Thread.currentThread(), mode); // Try the fast path of enq; backup to full enq on failure Node pred = tail; if (pred != null) { node.prev = pred; if (compareAndSetTail(pred, node)) { pred.next = node; return node; } } enq(node); return node; }
/** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor */ private Node enq(final Node node) { for (;;) { Node t = tail; if (t == null) { // Must initialize if (compareAndSetHead(new Node())) tail = head; } else { node.prev = t; if (compareAndSetTail(t, node)) { t.next = node; return t; } } } }
点进compareAndAetTail方法,发现只是简单的封装compareAndSwapObject方法,而且注释里写着compareAndAetTail这个CAS更改Queue的tail的方法只有在enq里才会用的到。compareAndSetTail方法的作用是检测Queue的tail的值是否和第一个入参相等,如果相等就换成第二个入参。在加入Queue的方法里一共有两个地方使用了compareAndSetTail,两次的expect都是tail。使用CAS操作维护了设置tail的原子性。
/** * CAS tail field. Used only by enq. */ private final boolean compareAndSetTail(Node expect, Node update) { return unsafe.compareAndSwapObject(this, tailOffset, expect, update); }
不管如何addWaiter方法返回了一个Node节点,作为accquireQueued的入参。
/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting */ final boolean acquireQueued(final Node node, int arg) { boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head && tryAcquire(arg)) { setHead(node); p.next = null; // help GC failed = false; return interrupted; } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
方法的返回值为boolean类型,对应着现场在获取锁的时候是否被中断,如果返回true代表线程被中断,acquire的if判断成立,线程会selfInterrupt。抛开有关中断的内容不看,这就是一个特殊的自旋锁,p指向当前节点的前置节点,只有当p是head即当前节点是Queue的第一个节点的时候才可以尝试获取锁,如果成功把当前节点设置为head节点这样给当前节点的后继节点获取锁的机会。
整个过程的如下。值得注意的是下面图中的线程进入等待状态并不是线程的状态中的waiting,而是一种死循环的状态即自旋锁的状态。
在一个线程成功获得锁并执行完逻辑后要释放锁给别的线程去获取。
/** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} */ public final boolean release(int arg) { if (tryRelease(arg)) { Node h = head; if (h != null && h.waitStatus != 0) unparkSuccessor(h); return true; } return false; }
这里的tryRelease也是一个需要重写的方法。在上面的acquireQueued方法里当一个线程获得锁之后就会被设置成head节点,也即head节点是正在持有锁的线程的节点。然后调用unparkSuccessor释放h节点的锁,
3、共享式同步状态获取与释放
共享式的特点是同一时刻可以有多个线程获得锁。AQS里通过acquireShared方法来获得共享锁,该方法里有个tryAcquireShared方法,根据在互斥锁里的经验带try的是需要我们重写的方法。tryAcquireShared返回值是一个int类型的变量分别尝试获取共享锁的不同结果,为负代表获取失败,为0代表该次获取成功但是不可以继续获取别的共享锁,为正的话其值大小代表还可以获取多少个共享锁。
/** * Acquires in shared mode, ignoring interrupts. Implemented by * first invoking at least once {@link #tryAcquireShared}, * returning on success. Otherwise the thread is queued, possibly * repeatedly blocking and unblocking, invoking {@link * #tryAcquireShared} until success. * * @param arg the acquire argument. This value is conveyed to * {@link #tryAcquireShared} but is otherwise uninterpreted * and can represent anything you like. */ public final void acquireShared(int arg) { if (tryAcquireShared(arg) < 0) doAcquireShared(arg); }
当返回值小于0的时候进入doAcquireShared方法,即失败的时候进入了doAcquireShared方法,该方法让没有获得锁的线程自旋等待,当且仅当一个线程是head节点的后继的时候才有机会再次尝试获取锁,这一点和非非共享获取锁一样。AQS里方法的起名真是非常随意……
首先新建了一个节点并调用addWaiter方法,该方法在非共享获得锁的Acquire里也使用,其作用是把节点加入AQS内部的Queue里。下面是在Queue里的节点在其为头结点的情况下可以尝试获取锁,如果成功就退出Queue并把下一个节点设置为可以获取锁。
/** * Acquires in shared uninterruptible mode. * @param arg the acquire argument */ private void doAcquireShared(int arg) { final Node node = addWaiter(Node.SHARED); boolean failed = true; try { boolean interrupted = false; for (;;) { final Node p = node.predecessor(); if (p == head) { int r = tryAcquireShared(arg); if (r >= 0) { setHeadAndPropagate(node, r); p.next = null; // help GC if (interrupted) selfInterrupt(); failed = false; return; } } if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt()) interrupted = true; } } finally { if (failed) cancelAcquire(node); } }
同样的最后也需要释放资源。由于共享模式下会有多个线程获得锁也会有多个线程释放锁,这个过程也要保证是线程安全的。保证线程安全的逻辑在doReleaseShared里,
public final boolean releaseShared(int arg) { if (tryReleaseShared(arg)) { doReleaseShared(); return true; } return false; }
private void doReleaseShared() { /* * Ensure that a release propagates, even if there are other * in-progress acquires/releases. This proceeds in the usual * way of trying to unparkSuccessor of head if it needs * signal. But if it does not, status is set to PROPAGATE to * ensure that upon release, propagation continues. * Additionally, we must loop in case a new node is added * while we are doing this. Also, unlike other uses of * unparkSuccessor, we need to know if CAS to reset status * fails, if so rechecking. */ for (;;) { Node h = head; if (h != null && h != tail) { int ws = h.waitStatus; if (ws == Node.SIGNAL) { if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0)) continue; // loop to recheck cases unparkSuccessor(h); } else if (ws == 0 && !compareAndSetWaitStatus(h, 0, Node.PROPAGATE)) continue; // loop on failed CAS } if (h == head) // loop if head changed break; } }
4、 独占式超时获取同步状态
和独占式获取同步状态整体上是十分相似的,区别之处在于没有获取到锁的时候回等待一个指定的时间,如果没有成功获得锁就会返回false
5、 自定义同步组件 TwinsLock
5.3 重入锁
锁的重入指的是获得锁的线程再次申请获得锁的时候能够成功获得锁,而非阻塞。synchronized关键字是隐式支持锁的重入的,使用synchronized关键字修饰一个递归方法的时候,可以成功执行就可以说明该锁是支持重入的,而对于AQS框架下的锁,其重入性是通过tryAcquire来实现的。
ReentrantLock内部有三个继承AQS的类,Sync直接继承于AQS,NonfairSync和FairSyns继承于Sync即简介继承于AQS,分别用于实现公平锁和非公平锁。
ReentrantLock默认的无参构造器是新建的是非公平锁,所以ReentrantLock默认的逻辑都是非公平锁的逻辑。
1、实现重进入
先不看具体的实现,要实现锁的重入应该维护一个计数器,同一个线程多次获得一个锁的时候要对计数器累加,然后释放的时候要减少计数器的值,当计数器为零的时候代表锁可以释放。ReentrantLock默认是非公平锁,首先从非公平锁的重入性开始分析。
lock方法里首先是一个CAS操作,该操作尝试用0作为state的期待值去把state修改成1,如果成功的话把该锁设置为独占模式,即Lock方法首先假设的是当先锁是无锁状态,如果失败了则调用模板方法acquire。
/** * Sync object for non-fair locks */ static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. */ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } }
该方法继承自AQS,需要重写的是tryAcquire方法,tryAcquire方法已经写好直接调用了nonfairTryAcquire方法。
public final void acquire(int arg) { if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg)) selfInterrupt(); }
protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); }
nonfairTryAcquire里封装了可重入的主要逻辑,当一个线程获取锁走到这一步的时候,一定的CAS(0,1)失败,但是这个方法开始还是对state是否为0做了一个判断,这里应该是两个考虑:1、一开始直接CAS操作是一种性能上的优化 2、CAS失败了走到这里仍然对state是否为0做一次判断是考虑到这一执行过程中由于并发可能造成了共享变量的修改。
剩下的逻辑很简单,如果state为0,直接CAS设置成需要改的值;如果state大于0且当前锁的线程是试图获取锁的线程,那么把state增加,在增加的过程中要考虑到溢出即最大共享数目的情况。除去这两种情况一律返回失败。
/** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
这是学习AQS依赖遇到的第一个完整使用AQS框架的锁类,可以看出AQS的使用方法:实现Lock接口并继承AQS类,在Lock接口里调用AQS的acquire方法进行加锁。acquire方法是一个模板方法,AQS帮我们实现了获得锁失败后维护内部队列的逻辑,我们只要重写tryAcquire逻辑即获得锁的逻辑,所以五花八门的锁不同之处就在于获得锁的逻辑不同。
借着这个思路继续分析,释放锁的过程中调用了sync的release方法,该方法继承自AQS类,AQS帮我们写好了释放锁失败后加入队列的逻辑,我们只要写tryRelease方法。在该方法中需要判断释放了相应的次数后state是否为0,如果为0就要把锁的状态设置为不加锁。
protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; }
2、公平与非公平获取锁的区别
所谓公平即一个在为线程分配锁的时候是否考虑线程的等待时间,如果tryAcquire里只考虑CAS成功而不考虑线程的等待时间那么这个锁就是非公平的,上面的NonfaireSync里的tryAcquire就是这样的逻辑。下面的tryAcquire是FairSync里的tryAcquire。
与非公平锁的获取锁相比,区别主要在state为0即一个线程获取锁的时候当前锁的状态为0,当state为零的时候有可能是一个线程刚刚释放锁,此时Queue中还有一堆线程在等待锁,此时的情况是新获得锁的线程和Queue里的线程一起竞争锁,为了保证公平性,只有Queue里没有线程的时候才可以让新来的线程尝试获取锁,这就保证了公平性。
当然在c不为零的情况下,不存在新的线程去竞争锁,也就不需要考虑Queue里是否有别的线程在等待。
/** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; }
非公平锁是默认的实现,根据实验结果显示一个刚刚释放锁的线程再次获取锁的几率是非常大的。所以非公平锁很有可能是一个线程连续获得锁,这样线程切换的开销没有了,效率更高,但是会带来线程的饥饿。
5.4 读写锁
ReentrantReadWriteLock,多个线程可以读,一个线程去写。读写锁是一对锁,一个读锁一个写锁。读写锁中的读锁是共享锁的一种实现。
5.4.1 读写锁的接口与示例
5.4.2 读写锁的实现分析
1、读写状态的设计
读写锁也是实现Lock接口并维护一个继承AQS的内部类来实现的,所以本质上也要通过state来维护锁的状态。state按位切割使用来维护状态。
左边高位是读状态,右边低位是写状态,读写,左右,很好记忆。要想获取读状态,就是不看右边的低16位,只要把S>>16即无符号右移16位并补0。要想看写状态,只要把左边的高16位全部不看,即和0x0000FFFF相与。当读状态加一的时候S+(1<<16),当写状态加一的时候直接加一就行。
2、写锁的获取与释放
根据读写锁名字ReentrantReadWriteLock可以看出无论是读锁还是写锁都是支持重入的,写锁是一个排他锁,只支持一个线程获得锁。
ReentrantReadWriteLock一如既往的实现了Lock接口并有一个Sync内部类,和可重入锁一样有两个类分别对应公平和非公平内部类,这两个类都是private的,他暴露出来的两个public的类就是我们使用的ReadLock和WriteLock。最终模板方法里还是调用了下面的tryWriteLock方法。
该方法里有两处return false其余都是return true,false代表着获取写锁的失败。
/** * Performs tryLock for write, enabling barging in both modes. * This is identical in effect to tryAcquire except for lack * of calls to writerShouldBlock. */ final boolean tryWriteLock() { Thread current = Thread.currentThread(); int c = getState(); if (c != 0) { int w = exclusiveCount(c); if (w == 0 || current != getExclusiveOwnerThread()) return false; if (w == MAX_COUNT) throw new Error("Maximum lock count exceeded"); } if (!compareAndSetState(c, c + 1)) return false; setExclusiveOwnerThread(current); return true; }
第一个条件是w==0,w是从exclusiveCount(c)得到的,c是当前锁的状态。exclusiveCount代码就不贴了,就是一个与运算根据State的低16位的值去判断当前是否有线程持有写锁,如果没有那么w的值为0。结合上面的对state的判断,这个意思就是如果state!=0且没有线程获得了写锁,那么此时一定存在线程获取了读锁,那么此时就不可以获得写锁。总结下来就是不可以在读锁存在的情况下获得写锁。这种设计的考虑是为了保证可见性,确保写的线程写的内容能够被读的线程读到。
第二个条件是当前线程不是锁持有的线程。这个是保证了写锁的互斥性。
第三个条件是在当前锁的state=0的条件下执行,即没有线程获得锁,此时尝试CAS操作如果失败就返回false。
写锁是非共享锁,所以释放起来很简单没有考虑线程安全的问题。由于是可重入锁需要考虑计数器为0的情况修改其状态。和上面的锁的释放没有太大的区别。
3、读锁的获取与释放
读锁,首先可以共享,其次也可以重入。经过类似的调用分析,读锁的获取核心方法如下
protected final int tryAcquireShared(int unused) { /* * Walkthrough: * 1. If write lock held by another thread, fail. * 2. Otherwise, this thread is eligible for * lock wrt state, so ask if it should block * because of queue policy. If not, try * to grant by CASing state and updating count. * Note that step does not check for reentrant * acquires, which is postponed to full version * to avoid having to check hold count in * the more typical non-reentrant case. * 3. If step 2 fails either because thread * apparently not eligible or CAS fails or count * saturated, chain to version with full retry loop. */ Thread current = Thread.currentThread(); int c = getState(); if (exclusiveCount(c) != 0 && getExclusiveOwnerThread() != current) return -1; int r = sharedCount(c); if (!readerShouldBlock() && r < MAX_COUNT && compareAndSetState(c, c + SHARED_UNIT)) { if (r == 0) { firstReader = current; firstReaderHoldCount = 1; } else if (firstReader == current) { firstReaderHoldCount++; } else { HoldCounter rh = cachedHoldCounter; if (rh == null || rh.tid != getThreadId(current)) cachedHoldCounter = rh = readHolds.get(); else if (rh.count == 0) readHolds.set(rh); rh.count++; } return 1; } return fullTryAcquireShared(current); }
首先是第一个return -1的地方,看到了熟悉的方法exclusiveCount,该方法是用来判断是否有线程获取了写锁,这个判断是意思是如果存在写锁且当前不是线程不是写锁的持有线程,返回失败。之所以这么设计是为了考虑到锁降级的问题,一个持有写锁的线程是可以持有读锁的,但是当一个线程持有写锁,其余非持有写锁的线程不可以持有读锁。这也就是代码里面注释的标记为1的部分:如果写锁被其他的线程持有那么当前线程就可以获得读锁。
接下来有一个return1的判断,这个判断就是尝试进行一次CAS操作获取写锁,如果成功就返回。
最后一个return代表上面尝试CAS操作失败的情况下,进入一个循环取试图获取写锁。这个之前的叙述是类似的。我记得这个操作是封装在模板方法里的,即这个尝试CAS如果失败就进入一个循环的CAS,不明白为什么这个操作要转移到这里。
4、锁降级
如果根据state的状态判断出此时有线程持有读锁,那么此时任何线程不可以获得写锁。如果根据state状态判断出此时有线程持有写锁,当且仅当持有写锁的试图获得读锁是可以允许的,其余任何没有持有写锁的线程是不可以获得读锁的。这种考虑是为了考虑到可见性。
AQS框架下所有操作的可见性都是通过state变量来保证的,state变量是volatile的,根据happens-before原则对state写操作happens-before所有对其读操作。根据上面对源码的分析,无论是读锁还是写锁的释放都涉及到对state的写操作,所有试图获取锁的操作无论是读锁还是写锁都有一个读锁的操作,即写操作之前所有的对变量的修改对读操作之后的操作是可见的,就可见根据锁的获取与释放来保证可见性。
首先在不考虑锁降级的情况下,读写锁可以理解成“互斥”的,即读锁和写锁不能同时分发出去,不允许在有线程获得读or写锁的情况下,别的线程获得写or读锁。这就是为了保证可见性,根据上面的分析可见性是基于锁的释放这个操作来实现的,假设A线程获得读锁,B线程获得写锁,那么就会存在这么一个现象:B在写的同时A在读,由于A读的时候B并没释放锁,所以B的写操作A看不到,这就带来了可见性问题。
现在回到锁降级上,一个线程在获得写锁的时候可以获得读锁。这里为什么没有可见性问题?因为他是一个线程,线程的读写是针对线程私有内存空间的,你爱读就读,爱写就写。那按照这个理论,我获得写锁,写完之后写锁一释放直接读不就行了?我还要获得读锁吗?答案是需要的。考虑这样一个情况,A线程获得写锁,A线程写了内容,A线程释放写锁,然后A线程开始在没有获得读锁的前提下就读。假设在A线程释放写锁和在A线程在没有获得读锁的情况下就读数据之间另一个B线程写了一些数据,由于A线程在没有读锁的情况下就读数据那么由于volatile state保证的可见性就无法成立,就带来了新的可见性的问题。
那为什么没有锁升级?A线程在获得读线程的时候也可以获得写线程?因为读锁是共享的,A线程有读锁的时候,有可能BCD线程也有读锁,当A线程又拿到写锁的时候,此时就不满足“互斥”性了。
5.5 LockSupport工具
阻塞和唤醒线程
5.6 Condition接口
类似于前面的等待通知机制,前面的等待通知机制是基于synchronized和Object上定义的监视器方法来实现的,Condition接口实现的等待通知机制也要结合Lock接口的实现类。
5.6.1 Condition接口与示例
Condition接口基于Lock来实现的。
5.6.2 Condition接口实现分析
ConditionObject是AQS的一个内部类,ConditionObject维护了一个等待队列。
1、等待队列
等待队列也是一个FIFO队列,队列里的节点和AQS里的同步队列的Node都是一样的,但是一些属性在等待队列里是用不到的。当一个线程调用await方法的时候,会生成一个对应的节点并加入等待队列的末尾,这个加入节点的过程没有使用CAS保证线程安全性,因为在调用await的时候已经获得了锁,所以这是一个线程安全的操作。但是对于非互斥的锁不就存在多个线程可以并发的调用await方法了吗?这不也存在线程不安全的情况吗?
2、等待
首先能够调用await的方法一定是获取了锁的线程,即一定是在同步队列的首节点,当他调用await的时候就会被放到等待队列里去。这就是底层的实现,从同步队列的首节点去除然后放到等待队列里去,相当于放弃了锁。
/** * Implements interruptible condition wait. * <ol> * <li> If current thread is interrupted, throw InterruptedException. * <li> Save lock state returned by {@link #getState}. * <li> Invoke {@link #release} with saved state as argument, * throwing IllegalMonitorStateException if it fails. * <li> Block until signalled or interrupted. * <li> Reacquire by invoking specialized version of * {@link #acquire} with saved state as argument. * <li> If interrupted while blocked in step 4, throw InterruptedException. * </ol> */ public final void await() throws InterruptedException { if (Thread.interrupted()) throw new InterruptedException(); Node node = addConditionWaiter(); int savedState = fullyRelease(node); int interruptMode = 0; while (!isOnSyncQueue(node)) { LockSupport.park(this); if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) break; } if (acquireQueued(node, savedState) && interruptMode != THROW_IE) interruptMode = REINTERRUPT; if (node.nextWaiter != null) // clean up if cancelled unlinkCancelledWaiters(); if (interruptMode != 0) reportInterruptAfterWait(interruptMode); }
3、通知
通知的时候是把等待最长时间的线程从同步队列里取出来,任务队列的头部的节点是等待时间最长的节点即firstWaiter是等待时间最长的节点。删除的方法是把该节点的nextWaiter置为null,然后把下一个节点放到等待队列的队首。
/** * Moves the longest-waiting thread, if one exists, from the * wait queue for this condition to the wait queue for the * owning lock. * * @throws IllegalMonitorStateException if {@link #isHeldExclusively} * returns {@code false} */ public final void signal() { if (!isHeldExclusively()) throw new IllegalMonitorStateException(); Node first = firstWaiter; if (first != null) doSignal(first); }