Java-JUC包中的锁-ReentrantLock(二)

一、ReentrantLock的UML类图

《Java-JUC包中的锁-ReentrantLock(二)》
  
(01) ReentrantLock实现了Lock接口。
(02) ReentrantLock中有一个成员变量sync,sync是Sync类型;Sync是一个抽象类,而且它继承于AQS。
(03) ReentrantLock中有”公平锁类”FairSync和”非公平锁类”NonfairSync,它们都是Sync的子类。ReentrantLock中sync对象,是FairSync与NonfairSync中的一种,这也意味着ReentrantLock是”公平锁”或”非公平锁”中的一种,ReentrantLock默认是非公平锁。

二、ReentrantLock介绍

ReentrantLock是一个可重入(reentrant)的互斥锁,又被称为“独占锁”。
顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。

三、ReentrantLock构造函数

构造函数

  /** * Creates an instance of {@code ReentrantLock}. * This is equivalent to using {@code ReentrantLock(false)}. * 默认的使用非公平的方式创建一个ReentrantLock */
     */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /** * Creates an instance of {@code ReentrantLock} with the * given fairness policy. * * @param fair {@code true} if this lock should use a fair ordering policy * 创建 ReentrantLock 通过 fair 指定是否使用公平模式 */
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();
    }

ReentrantLock的 lock 获取释放都是通过内部类 Sync 的子类 FairSync, NonfairSync 来实现, 而且两者都是继承 Sync, 而Sync是继承 AQS, 接下来我们看 FairSync 与 NonfairSync

FairSync

/** * Sync object for fair locks */
/** * 继承 Sync的公平的方式获取锁 */
static final class FairSync extends Sync {

    private static final long serialVersionUID = -3000897897090466540L;

    @Override
    final  void lock() {
        acquire(1);
    }

    /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first */
    /** * 公平的方式获取 锁 */
    protected final boolean tryAcquire(int acquires){
        // 1. 获取当前的 线程
        final Thread current = Thread.currentThread();          
        int c = getState();
        // 2. c == 0 -> 现在还没有线程获取锁
        if(c == 0){   
            // 3. 判断 AQS Sync Queue 里面是否有线程等待获取 锁,若没有 直接 CAS 获取lock 
            if(!hasQueuedPredecessors() && compareAndSetState(0, acquires)){    
                // 4. 获取 lock 成功 设置 exclusiveOwnerThread
                setExclusiveOwnerThread(current);               
                return true;
            }
        }
        // 5. 已经有线程获取锁, 判断是否是当前的线程
        else if(current == getExclusiveOwnerThread()){   
        // 6. 下面是进行lock 的重入, 就是计数器加 1 
            int nextc = c + acquires;                        
            if(nextc < 0){
                throw new Error("Maximum lock count exceeded");
            }
            setState(nextc);
            return true;
        }
        return false;
    }
}

NonfairSync

/** * Sync object for non-fair locks */
/** * 继承 Sync 实现非公平 * 公不公平的获取锁的区别: * 1. 非公平-> 在获取时先cas改变一下 AQS 的state值, 改变成功就获取, 不然就加入到 AQS 的 Sync Queue 里面 * 2. 每次获取lock之前判断是否 AQS 里面的 Sync Queue 是否有等待获取的线程 */
static final class NonfairSync extends Sync{
    private static final long serialVersionUID = 7316153563782823691L;

    /** * Perform lock. Try immediate barge, backing up to normal * acquire on failure */
    @Override
    /** * 获取 lock */
    void lock() {
        // 先cas改变一下 state 成功就表示获取
        if(compareAndSetState(0, 1)){   
            // 获取成功设置 exclusiveOwnerThread
            setExclusiveOwnerThread(Thread.currentThread()); 
        }else{
            // 获取不成功, 调用 AQS 的 acquire 进行获取
            acquire(1); 
        }
    }

    /** * 尝试获取锁 */
    protected final boolean tryAcquire(int acquires){
        return nonfairTryAcquire(acquires);
    }
}

三、基本概念

  • AQS – 指AbstractQueuedSynchronizer类。

    AQS是java中管理“锁”的抽象类,锁的许多公共方法都是在这个类中实现。AQS是独占锁(例如,ReentrantLock)和共享锁(例如,Semaphore)的公共父类。

  • AQS锁的类别 – 分为“独占锁”和“共享锁”两种

    (01) 独占锁 – 锁在一个时间点只能被一个线程锁占有。根据锁的获取机制,它又划分为“公平锁”和“非公平锁”。公平锁,是按照通过CLH等待线程按照先来先得的规则,公平的获取锁;而非公平锁,则当线程要获取锁时,它会无视CLH等待队列而直接获取锁。独占锁的典型实例子是ReentrantLock,此外,ReentrantReadWriteLock.WriteLock也是独占锁。

    (02) 共享锁 – 能被多个线程同时拥有,能被共享的锁。JUC包中的ReentrantReadWriteLock.ReadLock,CyclicBarrier, CountDownLatch和Semaphore都是共享锁。

  • CLH队列 – Craig, Landin, and Hagersten lock queue

    CLH队列是AQS中“等待锁”的线程队列。在多线程中,为了保护竞争资源不被多个线程同时操作而起来错误,我们常常需要通过锁来保护这些资源。在独占锁中,竞争资源在一个时间点只能被一个线程锁访问;而其它线程则需要等待。CLH就是管理这些“等待锁”的线程的队列。
    CLH是一个非阻塞的 FIFO 队列。也就是说往里面插入或移除一个节点的时候,在并发条件下不会阻塞,而是通过自旋锁和 CAS 保证节点插入和移除的原子性。

  • CAS函数 – Compare And Swap

    CAS函数,是比较并交换函数,它是原子操作函数;即,通过CAS操作的数据都是以原子方式进行的。例如,compareAndSetHead(), compareAndSetTail(), compareAndSetNext()等函数。它们共同的特点是,这些函数所执行的动作是以原子的方式进行的。

四、获取非公平锁的流程

【1. 在ReentrantLock类NonfairSync内部类中lock方法代码】


         /** * Performs lock. Try immediate barge, backing up to normal * acquire on failure. * 获取锁 */
        final void lock() {
            // 先cas改变一下 state 成功就表示获取锁
            if (compareAndSetState(0, 1))
            //设置当前线各为获取锁的线程
                setExclusiveOwnerThread(Thread.currentThread());
            else
                //调用父类AbstractQueuedSynchronizer的acquire方法获取锁
                acquire(1);
        }

【2. 在AbstractQueuedSynchronizer类中acquire方法代码】

 /**
     * Acquires in exclusive mode, ignoring interrupts.  Implemented
     * by invoking at least once {@link #tryAcquire},
     * returning on success.  Otherwise the thread is queued, possibly
     * repeatedly blocking and unblocking, invoking {@link
     * #tryAcquire} until success. This method can be used
     * to implement method {@link Lock#lock}.
     *
     * @param arg the acquire argument.  This value is conveyed to
     *        {@link #tryAcquire} but is otherwise uninterpreted and
     *        can represent anything you like.
     */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

步骤:
1. 调用 tryAcquire 尝试性的获取锁(一般都是又子类实现), 成功的话直接返回
2. tryAcquire 调用获取失败, 将当前的线程封装成 Node 加入到 Sync Queue 里面(调用addWaiter), 等待获取 signal 信号
3. 调用 acquireQueued 进行自旋的方式获取锁(有可能会 repeatedly blocking and unblocking)
4. 根据acquireQueued的返回值判断在获取lock的过程中是否被中断, 若被中断, 则自己再中断一下(selfInterrupt)

【3. 在ReentrantLock类NonfairSync内部类tryAcquire方法代码】

  protected final boolean tryAcquire(int acquires) {
            return nonfairTryAcquire(acquires);
  }

【4. 在ReentrantLock类Sync 内存类中nonfairTryAcquire方法代码】

       /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */
        final boolean nonfairTryAcquire(int acquires) {
            // 1. 获取当前的线程
            final Thread current = Thread.currentThread();
            // 2. 获取 AQS中的 state值(>0锁最线程获取,=0 锁没有被获取)
            int c = getState();
            // 3. c == 0 独占锁 没有被人获取
            if (c == 0) { 
                 // 4. CAS 改变 state 获取锁(这里有可能有竞争, 有可能失败)
                if (compareAndSetState(0, acquires)) {
                 // 5. 获取 lock 成功, 设置获取锁的独占线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //7. 如果不等于0,说明锁已经被线程获取到,判断获取独占锁的线程是不是当前线程
            else if (current == getExclusiveOwnerThread()) {
               // 8. 在 state 计数加1(重入获取锁)
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                // 9. 这里因为是已经获取 lock 所以不考虑并发 
                setState(nextc);
                return true;
            }
            return false;
        }

【5. 在回到AbstractQueuedSynchronizer中acquire方法,此时如果tryAcquir方法获取锁失败,程序会执行addWaiter(Node.EXCLUSIVE)方法】

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

【6. AbstractQueuedSynchronizer类里 addWaiter方法代码】

/** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node */
/** * 将当前的线程封装成 Node 加入到 Sync Queue 里面 */
private Node addWaiter(Node mode){
    // 1. 封装 Node
    Node node = new Node(Thread.currentThread(), mode);     
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    // 2. pred != null -> 队列中已经有节点, 直接 CAS 到尾节点
    if(pred != null){ 
       // 3. 先设置 Node.pre = pred 
       //则当一个 node在Sync Queue里面时 node.prev 一定 != null(除 dummy node)
       //但是 node.prev != null 不能说明其在 Sync Queue 里面, 因为现在的CAS可能失败 
        node.prev = pred; 
        // 4. CAS node 到 tail 
        if(compareAndSetTail(pred, node)){      
            pred.next = node;                  
            // 5. CAS 成功, 将 pred.next = node 
            //说明 node.next != null -> 则 node 一定在 Sync Queue
            //但若 node 在Sync Queue 里面不一定 node.next != null
            return node;
        }
    }
    // 6. 队列为空, 调用 enq 入队列
    enq(node);                                 
    return node;
}

【7. AbstractQueuedSynchronizer类里 enq方法代码】

/** * * Insert node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor 返回的是前继节点 */
/** * 将节点 node 加入队列 * 这里有个注意点 * 情况: * 1. 首先 queue是空的 * 2. 初始化一个 dummy 节点 * 3. 这时再在tail后面添加节点(这一步可能失败, 可能发生竞争被其他的线程抢占) * 这里为什么要加入一个 dummy 节点呢? * 这里的 Sync Queue 是CLH lock的一个变种, 线程节点 node 能否获取lock的判断通过其前继节点 * 而且这里在当前节点想获取lock时通常给前继节点打上 signal 的标识(表示当前继节点释放lock需要通知我来获取lock) * 若这里不清楚的同学, 请先看看 CLH lock的资料 (这是理解 AQS 的基础) */
private Node enq(final Node node){
    for(;;){
        Node t = tail;
        // 1. 队列为空 初始化一个 dummy 节点 其实和 ConcurrentLinkedQueue 一样
        if(t == null){ // Must initialize 
        // 2. 初始化 head 与 tail 
            if(compareAndSetHead(new Node())){  
                tail = head;
            }
        }else{
            // 3. 先设置 Node.pre = pred 
            //则当一个 node在Sync Queue里面时 node.prev 一定 != null
            //但是 node.prev != null 不能说明其在 Sync Queue 里面
            //因为现在的CAS可能失败 
            node.prev = t;    
            // 4. CAS node 到 tail,新建节点放到队列尾部 
            if(compareAndSetTail(t, node)){
            // 5. CAS 成功, 将 pred.next = node 
            //说明 node.next != null -> 则 node 一定在 Sync Queue
            //但若 node 在Sync Queue 里面不一定 node.next != null 
                t.next = node;                  
                return t;
            }
        }
    }
}

【8. 在回到AbstractQueuedSynchronizer中acquire方法,此时如果tryAcquir方法获取锁失败,程序会执行addWaiter(Node.EXCLUSIVE)方法将当前的线程封装成 Node 加入到 Sync Queue 里面,此时程序继续执行acquireQueued方法】

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

【9. AbstractQueuedSynchronizer类里 acquireQueued方法代码】

/** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire * * @param node the node * @param arg the acquire argument * @return {@code} if interrupted while waiting */
/** * 不支持中断的获取锁 */
final boolean acquireQueued(final Node node, int arg){
    boolean failed = true;
    try {
        boolean interrupted = false;
        for(;;){
             // 1. 获取当前节点的前继节点 ,当一个node在 Sync Queue 里面
             //并且没有获取 lock ,此时node的前继节点不可能是 null
            final Node p = node.predecessor(); 
             // 2. 判断前继节点是否是head节点(前继节点是head, 存在两种情况) 
             //(1) 前继节点现在占用lock 
             //(2)前继节点是个空节点, 已经释放lock, node现在有机会获lock;
             //则再次调用 tryAcquire尝试获取一下 
            if(p == head && tryAcquire(arg)){ 
             // 3. 获取 lock 成功, 直接设置 新head(原来的head可能就直接被回收) 
                setHead(node);  
                // help GC 
                p.next = null;          
                failed = false;
                 // 4. 返回在整个获取的过程中是否被中断过 ; 这样做有什么用? 
                 //若整个过程中被中断过, 则最后在自我中断(selfInterrupt), 
                 //因为外面的函数可能需要知道整个过程是否被中断过
                return interrupted;               
            }
            // 5. 调用 shouldParkAfterFailedAcquire 判断是否需要中断
            //这里可能会一开始返回false,但在此进去后直接返回true
            //主要和前继节点的状态是否是 signal
            if(shouldParkAfterFailedAcquire(p, node) && 
                    parkAndCheckInterrupt()){      // 6. 现在lock还是被其他线程占用 那就睡一会, 返回值判断是否这次线程的唤醒是被中断唤醒
                interrupted = true;
            }
        }
    }finally {
        // 7. 在整个获取中出错
        if(failed){  
            // 8. 清除 node 节点
            //清除的过程是先给 node 打上 CANCELLED标志, 然后再删除 
            cancelAcquire(node);                
        }
    }
}

【10.AbstractQueuedSynchronizer类里shouldParkAfterFailedAcquire方法】

本节点在进行 sleep 前一定需要给前继节点打上 SIGNAL 标识(
因为前继节点在 release lock 时会根据这个标识决定是否需要唤醒后继节点来获取 lock,若释放时标识是0, 则说明 Sync queue 里面没有等待获取lock的线程, 或Sync queue里面的节点正在获取 lock)

/** * Checks and update status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev. * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block */
/** */
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node){
    int ws = pred.waitStatus;
    if(ws == Node.SIGNAL){                                      // 1. 判断是否已经给前继节点打上标识SIGNAL, 为前继节点释放 lock 时唤醒自己做准备
        /** * This node has already set status asking a release * to signal it, so it can safely park */
        return true;
    }

    if(ws > 0){                                                 // 2. 遇到个 CANCELLED 的节点 (ws > 0 只可能是 CANCELLED 的节点, 也就是 获取中被中断, 或超时的节点)
        /** // 这里我们帮助删除 * Predecessor was cancelled. Skip over predecessors and * indicate retry */
        do{
            node.prev = pred = pred.prev;                    // 3. 跳过所有 CANCELLED 的节点
        }while(pred.waitStatus > 0);
        pred.next = node;                                    // 跳过 CANCELLED 节点
    }
    else{
        /** * waitStatus must be 0 or PROPAGATE. Indicate that we * need a signal, but don't park yet. Caller will need to * retry to make sure it cannot acquire before parking */
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);     // 4. 到这里 ws 只可能是 0 或 PROPAGATE (用于 共享模式的, 所以在共享模式中的提醒前继节点唤醒自己的方式,
                                                            // 也是给前继节点打上 SIGNAL标识 见 方法 "doReleaseShared" -> "!compareAndSetWaitStatus(h, Node.SIGNAL, 0)" -> unparkSuccessor)
    }

    return false;
}

【11.AbstractQueuedSynchronizer类里parkAndCheckInterrupt方法】

/** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted */
/** * 中断当前线程, 并且返回此次的唤醒是否是通过中断 */
private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    logger.info(Thread.currentThread().getName() + " " + "parkAndCheckInterrupt , ThreadName:" + Thread.currentThread().getName());
    return Thread.interrupted(); // Thread.interrupted() 会清除中断标识, 并返上次的中断标识
}

【12.AbstractQueuedSynchronizer类里parkAndCheckInterrupt方法】

/** * Cancels an ongoing attempt to acquire. * * @param node the node */
/** * 清除因中断/超时而放弃获取lock的线程节点(此时节点在 Sync Queue 里面) */
private void cancelAcquire(Node node) {
    // Ignore if node doesn't exist
    if (node == null)
        return;

    // 1. 线程引用清空
    node.thread = null;                 

    // Skip cancelled predecessors
    Node pred = node.prev;
    // 2. 若前继节点是 CANCELLED 的, 则也一并清除
    while (pred.waitStatus > 0)       
        node.prev = pred = pred.prev;

    // predNext is the apparent node to unsplice. CASes below will
    // fail if not, in which case, we lost race vs another cancel
    // or signal, so no further action is necessary.

    Node predNext = pred.next;         

    // Can use unconditional write instead of CAS here.
    // After this atomic step, other Nodes can skip past us.
    // Before, we are free of interference from other threads.
    // 4. 标识节点需要清除
    node.waitStatus = Node.CANCELLED; 

    // If we are the tail, remove ourselves.
    // 5. 若需要清除额节点是尾节点, 则直接 CAS pred为尾节点
    if (node == tail && compareAndSetTail(node, pred)) { 
        // 6. 删除节点predNext
        compareAndSetNext(pred, predNext, null);    
    } else {
        // If successor needs signal, try to set pred's next-link
        // so it will get one. Otherwise wake it up to propagate.
        int ws;
        // 7. 后继节点需要唤醒(但这里的后继节点predNext已经 CANCELLED 了)
        // 8. 将 pred 标识为 SIGNAL
        if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL || 
                        (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
            Node next = node.next;
            // 8. next.waitStatus <= 0 表示 next 是个一个想要获取lock的节点
            if (next != null && next.waitStatus <= 0) 
                compareAndSetNext(pred, predNext, next);
        } else {
            // 若 pred 是头节点, 则此刻可能有节点刚刚进入 queue ,所以进行一下唤醒
            unparkSuccessor(node); 
        }

        node.next = node; // help GC
    }
}

五、获取公平锁的流程

获取锁是通过lock()函数。下面,我们以lock()对获取公平锁的过程进行展开。

lock()在ReentrantLock.java的FairSync类中实现,它的源码如下:

final void lock() {
    acquire(1);
}

说明:“当前线程”实际上是通过acquire(1)获取锁的。
这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁,处于可获取状态时,它的状态值是0;锁,被线程初次获取到了,它的状态值就变成了1。
由于ReentrantLock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多此获取,每获取1次就将锁的状态+1。也就是说,初次获取锁时,通过acquire(1)将锁的状态值设为1;再次获取锁时,将锁的状态值设为2;依次类推…这就是为什么获取锁时,传入的参数是1的原因了。
可重入就是指锁可以被单个线程多次获取。

acquire()在AQS中实现的,它的源码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
(02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到”CLH队列(非阻塞的FIFO队列)”末尾。CLH队列就是线程等待队列。
(03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于此时ReentrantLock是公平锁,它会根据公平性原则来获取锁。
(04) “当前线程”在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquireQueued会返回true,此时”当前线程”会调用selfInterrupt()来自己给自己产生一个中断。至于为什么要自己给自己产生一个中断,后面再介绍。
上面是对acquire()的概括性说明。和获取非公平锁差不多,下面我将该函数分为4部分来逐步解析。
一. tryAcquire()
二. addWaiter()
三. acquireQueued()
四. selfInterrupt()

一、tryAcquire()

1. tryAcquire()

公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现,源码如下:

protected final boolean tryAcquire(int acquires) {
    // 获取“当前线程”
    final Thread current = Thread.currentThread();
    // 获取“独占锁”的状态
    int c = getState();
    // c=0意味着“锁没有被任何线程锁拥有”,
    if (c == 0) {
        // 若“锁没有被任何线程锁拥有”,
        // 则判断“当前线程”是不是CLH队列中的第一个线程线程,
        // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果“独占锁”的拥有者已经为“当前线程”,
        // 则将更新锁的状态。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

说明:根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。注意,这里只是尝试!
尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。后面我们会说明,在尝试失败的情况下,是如何一步步获取锁的。

2.hasQueuedPredecessors()

hasQueuedPredecessors()在AQS中实现,源码如下:

public final boolean hasQueuedPredecessors() {
    Node t = tail; 
    Node h = head;
    Node s;
    return h != t &&
        ((s = h.next) == null || s.thread != Thread.currentThread());
}

说明: 通过代码,能分析出,hasQueuedPredecessors() 是通过判断”当前线程”是不是在CLH队列的队首,来返回AQS中是不是有比“当前线程”等待更久的线程。下面对head、tail和Node进行说明。

3. Node的源码

Node就是CLH队列的节点。Node在AQS中实现,它的数据结构如下:

private transient volatile Node head;    // CLH队列的队首
private transient volatile Node tail;    // CLH队列的队尾

// CLH队列的节点
static final class Node {
    static final Node SHARED = new Node();
    static final Node EXCLUSIVE = null;

    // 线程已被取消,对应的waitStatus的值
    static final int CANCELLED =  1;
    // “当前线程的后继线程需要被unpark(唤醒)”,对应的waitStatus的值。
    // 一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    static final int SIGNAL    = -1;
    // 线程(处在Condition休眠状态)在等待Condition唤醒,对应的waitStatus的值
    static final int CONDITION = -2;
    // (共享锁)其它线程获取到“共享锁”,对应的waitStatus的值
    static final int PROPAGATE = -3;

    // waitStatus为“CANCELLED, SIGNAL, CONDITION, PROPAGATE”时分别表示不同状态,
    // 若waitStatus=0,则意味着当前线程不属于上面的任何一种状态。
    volatile int waitStatus;

    // 前一节点
    volatile Node prev;

    // 后一节点
    volatile Node next;

    // 节点所对应的线程
    volatile Thread thread;

    // nextWaiter是“区别当前CLH队列是 ‘独占锁’队列 还是 ‘共享锁’队列 的标记”
    // 若nextWaiter=SHARED(即nextWaiter=null),则CLH队列是“共享锁”队列;
    // 若nextWaiter=EXCLUSIVE,则CLH队列是“独占锁”队列。
    Node nextWaiter;

    // “共享锁”则返回true,“独占锁”则返回false。
    final boolean isShared() {
        return nextWaiter == SHARED;
    }

    // 返回前一节点
    final Node predecessor() throws NullPointerException {
        Node p = prev;
        if (p == null)
            throw new NullPointerException();
        else
            return p;
    }

    Node() {    // Used to establish initial head or SHARED marker
    }

    // 构造函数。thread是节点所对应的线程,mode是用来表示thread的锁是“独占锁”还是“共享锁”。
    Node(Thread thread, Node mode) {     // Used by addWaiter
        this.nextWaiter = mode;
        this.thread = thread;
    }

    // 构造函数。thread是节点所对应的线程,waitStatus是线程的等待状态。
    Node(Thread thread, int waitStatus) { // Used by Condition
        this.waitStatus = waitStatus;
        this.thread = thread;
    }
}

说明:
Node是CLH队列的节点,代表“等待锁的线程队列”。
(01) 每个Node都会一个线程对应。
(02) 每个Node会通过prev和next分别指向上一个节点和下一个节点,这分别代表上一个等待线程和下一个等待线程。
(03) Node通过waitStatus保存线程的等待状态。
(04) Node通过nextWaiter来区分线程是“独占锁”线程还是“共享锁”线程。如果是“独占锁”线程,则nextWaiter的值为EXCLUSIVE;如果是“共享锁”线程,则nextWaiter的值是SHARED。

4. compareAndSetState()

compareAndSetState()在AQS中实现。它的源码如下:

protected final boolean compareAndSetState(int expect, int update) {
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

说明: compareAndSwapInt() 是sun.misc.Unsafe类中的一个本地方法。对此,我们需要了解的是 compareAndSetState(expect, update) 是以原子的方式操作当前线程;若当前线程的状态为expect,则设置它的状态为update。

5. setExclusiveOwnerThread()

setExclusiveOwnerThread()在AbstractOwnableSynchronizer.java中实现,它的源码如下:

// exclusiveOwnerThread是当前拥有“独占锁”的线程
private transient Thread exclusiveOwnerThread;
protected final void setExclusiveOwnerThread(Thread t) {
    exclusiveOwnerThread = t;
}

说明:setExclusiveOwnerThread()的作用就是,设置线程t为当前拥有“独占锁”的线程。

6. getState(), setState()

getState()和setState()都在AQS中实现,源码如下:

// 锁的状态
private volatile int state;
// 设置锁的状态
protected final void setState(int newState) {
    state = newState;
}
// 获取锁的状态
protected final int getState() {
    return state;
}

说明:state表示锁的状态,对于“独占锁”而已,state=0表示锁是可获取状态(即,锁没有被任何线程锁持有)。由于java中的独占锁是可重入的,state的值可以>1。

小结:tryAcquire()的作用就是让“当前线程”尝试获取锁。获取成功返回true,失败则返回false。

二. addWaiter(Node.EXCLUSIVE)

addWaiter(Node.EXCLUSIVE)的作用是,创建“当前线程”的Node节点,且Node中记录“当前线程”对应的锁是“独占锁”类型,并且将该节点添加到CLH队列的末尾。

1.addWaiter()

addWaiter()在AQS中实现,源码如下:

private Node addWaiter(Node mode) {
    // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
    Node node = new Node(Thread.currentThread(), mode);
    //tail  队尾
    Node pred = tail;
    // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
    enq(node);
    return node;
}

说明:对于“公平锁”而言,addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。

2. compareAndSetTail()

compareAndSetTail()在AQS中实现,源码如下:
private final boolean compareAndSetTail(Node expect, Node update) {
return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
}
说明:compareAndSetTail也属于CAS函数,也是通过“本地方法”实现的。compareAndSetTail(expect, update)会以原子的方式进行操作,它的作用是判断CLH队列的队尾是不是为expect,是的话,就将队尾设为update。

3. enq()

enq()在AQS中实现,源码如下:

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

说明: enq()的作用很简单。如果CLH队列为空,则新建一个CLH表头;然后将node添加到CLH末尾。否则,直接将node添加到CLH末尾。

小结:addWaiter()的作用,就是将当前线程添加到CLH队列中。这就意味着将当前线程添加到等待获取“锁”的等待线程队列中了。

三. acquireQueued()

前面,我们已经将当前线程添加到CLH队列中了。而acquireQueued()的作用就是逐步的去执行CLH队列的线程,如果当前线程获取到了锁,则返回;否则,当前线程进行休眠,直到唤醒并重新获取锁了才返回。下面,我们看看acquireQueued()的具体流程。

1. acquireQueued()

acquireQueued()在AQS中实现,源码如下:

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // interrupted表示在CLH队列的调度中,“当前线程”在休眠时,有没有被中断过。
        boolean interrupted = false;
        for (;;) {
            // 获取上一个节点。
            // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

说明:acquireQueued()的目的是从队列中获取锁。

2. shouldParkAfterFailedAcquire()

shouldParkAfterFailedAcquire()在AQS中实现,源码如下:

// 返回“当前线程是否应该阻塞”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前继节点的状态
    int ws = pred.waitStatus;
    // SIGNAL状态描述:一般发生情况是,当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
    // 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
    if (ws == Node.SIGNAL)
        return true;
    // 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”。
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

说明:

(01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。

CANCELLED[1] – 当前线程已被取消
SIGNAL[-1] – “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
CONDITION[-2] – 当前线程(处在Condition休眠状态)在等待Condition唤醒
PROPAGATE[-3] – (共享锁)其它线程获取到“共享锁”
[0] – 当前线程不属于上面的任何一种状态。

(02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。

规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。
如果“规则1”发生,即“前继节点是SIGNAL”状态,则意味着“当前线程”需要被阻塞。接下来会调用parkAndCheckInterrupt()阻塞当前线程,直到当前线程被唤醒才从parkAndCheckInterrupt()中返回。

3. parkAndCheckInterrupt())

parkAndCheckInterrupt()在AQS中实现,源码如下:

private final boolean parkAndCheckInterrupt() {
    // 通过LockSupport的park()阻塞“当前线程”。
    LockSupport.park(this);
    // 返回线程的中断状态。
    return Thread.interrupted();
}

说明:parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。
它会先通过LockSupport.park()阻塞“当前线程”,然后通过Thread.interrupted()返回线程的中断状态。
这里介绍一下线程被阻塞之后如何唤醒。一般有2种情况:
第1种情况:unpark()唤醒。“前继节点对应的线程”使用完锁之后,通过unpark()方式唤醒当前线程。
第2种情况:中断唤醒。其它线程通过interrupt()中断当前线程。
补充:LockSupport()中的park(),unpark()的作用 和 Object中的wait(),notify()作用类似,是阻塞/唤醒。
它们的用法不同,park(),unpark()是轻量级的,而wait(),notify()是必须先通过Synchronized获取同步锁。

4. 再次tryAcquire()

了解了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()函数之后。我们接着分析acquireQueued()的for循环部分。

final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
    setHead(node);
    p.next = null; // help GC
    failed = false;
    return interrupted;
}

说明:
(01) 通过node.predecessor()获取前继节点。predecessor()就是返回node的前继节点。
(02) p == head && tryAcquire(arg)
首先,判断“前继节点”是不是CHL表头。如果是的话,则通过tryAcquire()尝试获取锁。
其实,这样做的目的是为了“让当前线程获取锁”,但是为什么需要先判断p==head呢?理解这个对理解“公平锁”的机制很重要,因为这么做的原因就是为了保证公平性!
(a) 前面,我们在shouldParkAfterFailedAcquire()我们判断“当前线程”是否需要阻塞;
(b) 接着,“当前线程”阻塞的话,会调用parkAndCheckInterrupt()来阻塞线程。当线程被解除阻塞的时候,我们会返回线程的中断状态。而线程被解除阻塞状态,可能是由于“线程被中断”,也可能是由于“其它线程调用了该线程的unpark()函数”。
(c) 再回到p==head这里。如果当前线程是因为其它线程调用了unpark()函数而被唤醒,那么唤醒它的线程,应该是它的前继节点所对应的线程(关于这一点,后面在“释放锁”的过程中会看到)。 OK,是前继节点调用unpark()唤醒了当前线程!
此时,再来理解p==head就很简单了:当前继节点是CLH队列的头节点,并且它释放锁之后;就轮到当前节点获取锁了。然后,当前节点通过tryAcquire()获取锁;获取成功的话,通过setHead(node)设置当前节点为头节点,并返回。
总之,如果“前继节点调用unpark()唤醒了当前线程”并且“前继节点是CLH表头”,此时就是满足p==head,也就是符合公平性原则的。否则,如果当前线程是因为“线程被中断”而唤醒,那么显然就不是公平了。这就是为什么说p==head就是保证公平性!

小结:acquireQueued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有被中断过。

四. selfInterrupt()

selfInterrupt()是AQS中实现,源码如下:

private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

说明:selfInterrupt()的代码很简单,就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢?
这必须结合acquireQueued()进行分析。如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt()方法(调用这个方法的目的是重新产生一个中断);否则不会执行。
在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。该函数不同于Thread的isInterrupted()函数,isInterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断!

小结:selfInterrupt()的作用就是当前线程自己产生一个中断,让线程进入阻塞状态。
因为当前线程的前继节点不是队列的头,那判断一下当前节点的前继节点需不需要进行阻塞,并且如果中断当前节点的前继节点(调用中断后,线程是由运行状态变成阻塞状态)成功。如果满足这个条件,并用尝试获取锁失败 执行selfInterrupt(); 这个方法的作用是中断当前线程,让当前线程进行到阻塞状态。

总结
再回过头看看acquire()函数,它最终的目的是获取锁!

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(01) 先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。
(02) 尝试失败的情况下,会先通过addWaiter()来将“当前线程”加入到”CLH队列”末尾;然后调用acquireQueued(),在CLH队列中排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。

六. 释放公平锁

1. unlock()

unlock()在ReentrantLock.java中实现的,源码如下:

public void unlock() {
    sync.release(1);
}

说明:
unlock()是解锁函数,它是通过AQS的release()函数来实现的。
在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。
关于AQS, ReentrantLock 和 sync的关系如下:

public class ReentrantLock implements Lock, java.io.Serializable {

    private final Sync sync;

    abstract static class Sync extends AbstractQueuedSynchronizer {
        ...
    }

    ...
}

从中,我们发现:sync是ReentrantLock.java中的成员对象,而Sync是AQS的子类。

2. release()

release()在AQS中实现的,源码如下:

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

说明:
release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。

3. tryRelease()

tryRelease()在ReentrantLock.java的Sync类中实现,源码如下:

protected final boolean tryRelease(int releases) {
    // c是本次释放锁之后的状态
    int c = getState() - releases;
    // 如果“当前线程”不是“锁的持有者”,则抛出异常!
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();

    boolean free = false;
    // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 设置当前线程的锁的状态。
    setState(c);
    return free;
}

说明:
tryRelease()的作用是尝试释放锁。
(01) 如果“当前线程”不是“锁的持有者”,则抛出异常。
(02) 如果“当前线程”在本次释放锁操作之后,对锁的拥有状态是0(即,当前线程彻底释放该“锁”),则设置“锁”的持有者为null,即锁是可获取状态。同时,更新当前线程的锁的状态为0。
getState(), setState()在前面已经介绍过,这里不再说明。
getExclusiveOwnerThread(), setExclusiveOwnerThread()在AQS的父类AbstractOwnableSynchronizer.java中定义,源码如下:

public abstract class AbstractOwnableSynchronizer implements java.io.Serializable {
    // “锁”的持有线程
    private transient Thread exclusiveOwnerThread;
    // 设置“锁的持有线程”为t
    protected final void setExclusiveOwnerThread(Thread t) {
        exclusiveOwnerThread = t;
    }
    // 获取“锁的持有线程”
    protected final Thread getExclusiveOwnerThread() {
        return exclusiveOwnerThread;
    }
}

4. unparkSuccessor()

在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。
下面看看unparkSuccessor()的源码,它在AQS中实现。

private void unparkSuccessor(Node node) {
    // 获取当前线程的状态
    int ws = node.waitStatus;
    // 如果状态<0,则设置状态=0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。
    // 这里的有效,是指“后继节点对应的线程状态<=0”
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒“后继节点对应的线程”
    if (s != null)
        LockSupport.unpark(s.thread);
}

说明:
unparkSuccessor()的作用是“唤醒当前线程的后继线程”。后继线程被唤醒之后,就可以获取该锁并恢复运行了。

总结
“释放锁”的过程相对“获取锁”的过程比较简单。释放锁时,主要进行的操作,是更新当前线程对应的锁的状态。如果当前线程对锁已经彻底释放,则设置“锁”的持有线程为null,设置当前线程的状态为空,然后唤醒后继线程。

    原文作者:JUC
    原文地址: https://blog.csdn.net/yjp198713/article/details/78941742
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞