07.JUC 锁 - AQS - Condition

基本概念

Condition ,即条件(也称为条件队列或条件变量)。它主要是为了在 JUC 框架中提供和 Java 传统的监视器风格的 wait、notify、notifyAll 方法类似的功能。

Condition 自己也维护了一个队列,该队列的作用是维护一个等待 signal 信号的队列,两个队列的作用是不同,事实上,每个线程也仅仅会同时存在以上两个队列中的一个。将设有线程 1、2,下面来具体过程。

线程1:

  • 线程1 调用 reentrantLock.lock时,持有锁。

  • 线程1 调用 await 方法,进入[条件等待队列],同时释放锁。

  • 线程1 获取到线程2 signal 信号,从 [条件等待队列] 进入 [同步等待队列]。

线程2:

  • 线程2 调用 reentrantLock.lock时,由于锁被线程1 持有,进入 [同步等待队列]。

  • 由于线程1 释放锁,线程2 从 [同步等待队列] 移除,获取到锁。线程2 调用 signal 方法,导致线程 1 被唤醒。

  • 线程2 调用 reentrantLock.unlock 。线程1 获取锁,继续循环。

条件等待队列

条件等待队列,指的是 Condition 内部自己维护的一个队列,不同于 AQS 的[同步等待队列]。它具有以下特点:

  • 要加入[条件等待队列]的节点,不能在 [同步等待队列]。
  • 从 [条件等待队列] 移除的节点,会进入[同步等待队列]。
  • 一个锁对象只能有一个[同步等待队列],但可以有多个[条件等待队列]。

这里以 AbstractQueuedSynchronizer 类(AQS)的内部类 ConditionObject 为例(该类是 Condition 的实现类)来分析下它的具体实现过程。

首先来看该类内部定义的几个成员变量:

private transient Node firstWaiter;
private transient Node lastWaiter;

它采用了 AQS 的 Node 节点构造,并定义了两个成员变量:firstWaiter、lastWaiter ,类似同步等待队列中 head、tail。说明在 ConditionObject 内部也维护着一个自己的等待队列。目前可知它的结构如下:

《07.JUC 锁 - AQS - Condition》

不同于 AQS 等待队列的双向链表结构,条件等待队列的结构是单向链表。通过 Node 的 nextWaiter 来指向下一个节点。

1.入队操作

Condition 的入队操作表示将节点添加进[条件等待队列]。该过程通过 AQS.ConditionObject 类 的addConditionWaiter 方法来完成。

具体过程如下:

private Node addConditionWaiter() {

    // 判断[条件等待队列]是否为空
    Node t = lastWaiter;
    if (t != null && t.waitStatus != Node.CONDITION) {
        // 清空[条件等待队列]中节点状态不为 CONDITION 的节点
        unlinkCancelledWaiters();
        t = lastWaiter;
    }

    // 创建节点并添加进条件等待队列
    Node node = new Node(Thread.currentThread(), Node.CONDITION);
    if (t == null) {
        firstWaiter = node;
    } else {
        t.nextWaiter = node;
    }
    lastWaiter = node;
    return node;
}

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;

    // 条件等待队列不为空,则从头节点开始清除
    while (t != null) {
        Node next = t.nextWaiter;

        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;

            // 第一次循环 trail 为空
            if (trail == null) {
                firstWaiter = next;
            } else {
                // 这里 trail 表示上一次循环的节点,即当前 t 的前节点
                trail.nextWaiter = next;
            }

            if (next == null) {
                lastWaiter = trail;
            }
        } else {
            trail = t;
        }

        t = next;
    }
}

分析代码可知,在 ConditionObject 中虽然也是采用链表构造,但是它的节点构造与 AQS 的等待队列存在一些差异,如下图所示:

《07.JUC 锁 - AQS - Condition》

  • t ,即 thread,表示节点存放的线程
  • w,即 waitStatus,表示节点等待状态。[条件等待队列]中的节点[等待状态]都是 CONDITION,否则会被清除。
  • nextWaiter ,表示后指针,与 AQS 的同步等待队列不同,在同步等待队列中表示 mode,即节点的模式。

再来看看它添加过程:

《07.JUC 锁 - AQS - Condition》

2.出队操作

// 设置新的头节点
if ((firstWaiter = first.nextWaiter) == null) {
    lastWaiter = null;
}

// 将旧的头节点从[条件等待队列]中移除
first.nextWaiter = null;

整个过程如下:

《07.JUC 锁 - AQS - Condition》

await

public final void await() throws InterruptedException {
    if (Thread.interrupted()) {
        // 抛出异常...
    }

    // 创建包含当前线程的节点并添加到[条件等待队列]
    Node node = addConditionWaiter();

    // 释放锁的所有重入计数,失败则中断线程,并将节点的状态置为 CANCELD
    long savedState = fullyRelease(node);
    int interruptMode = 0;

    // 判断该节点是否在[同步等待队列]
    while (!isOnSyncQueue(node)) {
        // 不在的话,则线程进入阻塞状态
        LockSupport.park(this);

        // 表示线程被唤醒的操作:确定中断模式并退出循环
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0) {
            break;
        }
    }

    // 成功获取独占锁后,并判断 interruptMode 的值
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE) {
        interruptMode = REINTERRUPT;
    }


    if (node.nextWaiter != null) {
        // 清除条件等待队列上节点状态不为 CONDITION 的节点
        unlinkCancelledWaiters();
    }

    if (interruptMode != 0) {
        // 根据 interruptMode 作出对应的动作
        // 若为 THROW_IE 则抛出异常中断线程
        // 若为 REINTERRUPT 则设置线程中断标记位
        reportInterruptAfterWait(interruptMode);
    }
}

1.fullyRelease

该方法表示释放独占锁的所有重入计数。

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        // 释放独占锁
        int savedState = getState();
        if (release(savedState)) {
            failed = false;
            return savedState;
        } else {
            // 失败则中断该线程
            throw new IllegalMonitorStateException();
        }
    } finally {
        // 并将该节点的状态置为 CANCELD
        if (failed){
            node.waitStatus = Node.CANCELLED;
        }   
    }
}

2.isOnSyncQueue

该方法表示判断节点是否在同步等待队列上。

final boolean isOnSyncQueue(Node node) {
    // 当 node 的状态为 CONDITION 或同步等待队列为空,则返回 false
    if (node.waitStatus == Node.CONDITION || node.prev == null){
        return false;
    }

    // 后继节点不为空,说明该节点肯定在同步等待队列中
    if (node.next != null){
        return true;
    }

    // 查询同步等待队列的末尾节点
    return findNodeFromTail(node);
}

private boolean findNodeFromTail(Node node) {
    Node t = tail;
    // 从同步等待队列的尾节点往前开始查找
    for (;;) {
        if (t == node){
            return true;
        }
        if (t == null){
            return false;
        }
        t = t.prev;
    }
}

3.checkInterruptWhileWaiting

该过程表示

// 在退出等待时重新中断
private static final int REINTERRUPT =  1;

// 在退出等待时抛出异常
private static final int THROW_IE    = -1;

private int checkInterruptWhileWaiting(Node node) {
    // 判断线程的中断标记位,若为 true,则再选择对应的动作;若为 false 返回 0;
    return Thread.interrupted() ? 
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) : 0;
}

final boolean transferAfterCancelledWait(Node node) {
    // 修改节点的等待状态,并将其加入[同步等待队列]
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {
        enq(node);
        return true;
    }

    // 直到节点在[同步等待队列]后退出
    while (!isOnSyncQueue(node)){
        Thread.yield();
    }   
    return false;
}

4.unlinkCancelledWaiters

该方法用于清除条件等待队列上节点状态不为 CONDITION 的节点

private void unlinkCancelledWaiters() {
    Node t = firstWaiter;
    Node trail = null;
    while (t != null) {
        Node next = t.nextWaiter;
        if (t.waitStatus != Node.CONDITION) {
            t.nextWaiter = null;
            if (trail == null){
                firstWaiter = next;
            }else{
                trail.nextWaiter = next;
            }

            if (next == null){
                lastWaiter = trail;
            }
        } else{
            trail = t;
        }
        t = next;
    }
}

5.reportInterruptAfterWait

该操作是根据中断模式作出相对于的动作

private void reportInterruptAfterWait(int interruptMode) throws InterruptedException {
    if (interruptMode == THROW_IE){
        throw new InterruptedException();
    }else if (interruptMode == REINTERRUPT){
        selfInterrupt();
    }
}

signal

public final void signal() {
    // 该线程是否持有独占锁
    if (!isHeldExclusively()) {
        // 抛出异常...
    }

    // 唤醒[条件等待队列]中的第一个节点
    Node first = firstWaiter;
    if (first != null) {
        doSignal(first);
    }
}
  • doSignal
private void doSignal(Node first) {
    do {
        // 设置新的头节点,并将节点从[条件等待队列]中移除
        if ((firstWaiter = first.nextWaiter) == null) {
            lastWaiter = null;
        }
        first.nextWaiter = null;

        // 直到将该节点加入[同步等待队列]或[条件等待队列]为空时跳出循环
    } while (!transferForSignal(first) && (first = firstWaiter) != null);
}
  • transferForSignal
final boolean transferForSignal(Node node) {

    // 若不能修改节点状态,说明该节点已被取消
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0)){
        return false;
    }

    // 将节点添加进[同步等待队列]
    Node p = enq(node);
    int ws = p.waitStatus;

    //如果该节点的状态为 cancel 或者修改waitStatus失败,则直接唤醒。
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL)){
        LockSupport.unpark(node.thread);
    }

    return true;
}
    原文作者:JUC
    原文地址: https://blog.csdn.net/u012420654/article/details/56496631
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞