Java8-显示的Condition对象

先来看下例子

public class ConditionTest implements Runnable {

    private Lock lock;
    private Condition con;

    public ConditionTest(Lock lock, Condition condition) {
        this.lock = lock;
        this.con = condition;
    }

    public void run() {
        if ("thread1".equals(Thread.currentThread().getName()))
            testThread1Waiter();
        if ("thread2".equals(Thread.currentThread().getName()))
            testThread2Signal();
    }

    public void testThread1Waiter() {
        lock.lock();
        try {
            try {
                System.out.println("thead1被阻塞");
                con.await();
                System.out.println("thead1被唤醒");
            } catch (InterruptedException e) {
            }
        } finally {
            lock.unlock();
        }
    }

    public void testThread2Signal() {
        lock.lock();
        try {
            con.signal();
            System.out.println("thead2唤醒等待线程");
        } finally {
            lock.unlock();
        }
    }

    public static void main(String[] args) throws InterruptedException {
        Lock lock = new ReentrantLock();
        Condition condition = lock.newCondition();
        new Thread(new ConditionTest(lock, condition), "thread1").start();
        new Thread(new ConditionTest(lock, condition), "thread2").start();
    }
}

运行结果

thead1被阻塞
thead2唤醒等待线程
thead1被唤醒

Condition的await()实现原理

        /**
         * Implements interruptible condition wait.
         * <ol>
         * <li> If current thread is interrupted, throw InterruptedException.
         * <li> Save lock state returned by {@link #getState}.
         * <li> Invoke {@link #release} with saved state as argument,
         *      throwing IllegalMonitorStateException if it fails.
         * <li> Block until signalled or interrupted.
         * <li> Reacquire by invoking specialized version of
         *      {@link #acquire} with saved state as argument.
         * <li> If interrupted while blocked in step 4, throw InterruptedException.
         * </ol>
         */
        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

①首先判断当前线程是否被中断了,如果已经被中断了,则直接抛InterruptedException给上层调用者,否则进入②

if (Thread.interrupted())
                throw new InterruptedException();

②把当前线程所对应的节点放入condition队列中

       Node node = addConditionWaiter();
       /**
         * Adds a new waiter to wait queue.
         * @return its new wait node
         */
        private Node addConditionWaiter() {
            Node t = lastWaiter;
            // If lastWaiter is cancelled, clean out.
            if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }
            Node node = new Node(Thread.currentThread(), Node.CONDITION);
            if (t == null)
                firstWaiter = node;
            else
                t.nextWaiter = node;
            lastWaiter = node;
            return node;
        }

首先拿到队列(注意数据结构,Condition构建出来的也是一个队列)中最后一个等待者,紧接着判断,判断最后一个等待者的waitStatus不是CONDITION的话,

 if (t != null && t.waitStatus != Node.CONDITION) {
                unlinkCancelledWaiters();
                t = lastWaiter;
            }

解绑取消的等待者,因为通过这句代码

 Node node = new Node(Thread.currentThread(), Node.CONDITION);

我们看到,new出来的Node的状态都是CONDITION的。
那么unlinkCancelledWaiters做了什么?就是一些指针遍历并判断状态的操作,总结一下就是:从头到尾遍历每一个Node,遇到Node的waitStatus不是CONDITION的就从队列中踢掉,该节点的前后节点相连。

        private void unlinkCancelledWaiters() {
            Node t = firstWaiter;
            Node trail = null;
            while (t != null) {
                Node next = t.nextWaiter;
                //把不是condition的都踢掉
               //1、如果首节点不是condition
                if (t.waitStatus != Node.CONDITION) {
                    //2、因为上面 Node next = t.nextWaiter;这里已经记录了t.nextWaiter;的值,所以可以把t.nextWaiter设置为null;
                    t.nextWaiter = null;
                    //3、一开始trail是NULL的,所以进入4,否则进入5
                    if (trail == null)
                        //4、把下个节点当作首节点,进入6
                        firstWaiter = next;
                    else
                        //5、把首节点指向trail.nextWaiter,进入6
                        trail.nextWaiter = next;
                  //6、如果next为NULL,说明conition队列就一个节点,trail指向lastWaiter ,然后进入9
                    if (next == null)
                          lastWaiter = trail;
                }
                else
                    //8、如果首节点是condition状态,首节点指向trail,然后进入9
                    trail = t;
                //9、把下个节点设置为首节点,继续循环判断是否为condition节点,当然如果下个节点为null。也就是等待队列只有一个节点的话,那就退出循环了
                t = next;
            }
        }

等待队列的基本结构如下图所示:
《Java8-显示的Condition对象》 等待队列

插入节点只需要将原有尾节点的nextWaiter指向当前节点,并且更新尾节点。更新节点并没有像AQS更新同步队列使用CAS是因为调用await()方法的线程必定是获取了锁的线程,锁保证了操作的线程安全。

AQS实质上拥有一个同步队列和多个等待队列,具体对应关系如下图所示:

《Java8-显示的Condition对象》 AQS同步队列与等待队列

③完全释放Node的状态

    int savedState = fullyRelease(node);

    final int fullyRelease(Node node) {
        boolean failed = true;
        try {
            int savedState = getState();
            if (release(savedState)) {
                failed = false;
                return savedState;
            } else {
                throw new IllegalMonitorStateException();
            }
        } finally {
            if (failed)
                node.waitStatus = Node.CANCELLED;
        }
    }

首先获取state,release的时候将整个state传过去,理由是某线程可能多次调用了lock()方法,比如调用了10次lock,那么此线程就将state加到了10,所以这里要将10传过去,将状态全部释放,这样后面的线程才能重新从state=0开始竞争锁,这也是方法被命名为fullyRelease的原因,因为要完全释放锁,释放锁之后,如果有竞争锁的线程,那么就唤醒第一个,这都是release方法的逻辑了

④判断Node是否在AbstractQueuedSynchronizer构建的队列中而不是Node是否在Condition构建的队列(waitstatus == condition状态)中,如果Node不在AbstractQueuedSynchronizer构建的队列中,那么调用LockSupport的park方法阻塞。

 while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
  }

    final boolean isOnSyncQueue(Node node) {
        if (node.waitStatus == Node.CONDITION || node.prev == null)
            return false;
        if (node.next != null) // If has successor, it must be on queue
            return true;
        /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
        return findNodeFromTail(node);
    }

    private boolean findNodeFromTail(Node node) {
        Node t = tail;
        for (;;) {
            //循环当前节点是否是AQS尾节点
            if (t == node)
                return true;
            if (t == null)
                return false;
            t = t.prev;
        }
    }

⑤唤醒后重新参与竞争,如果获取不到锁,将再次睡眠等待唤醒

  if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

⑥如果节点被取消了,清除condition队列中被取消的节点

if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);

Condition的signal()实现原理

①要能signal(),当前线程必须持有独占锁,否则抛出异常IllegalMonitorStateException。
那么真正操作的时候,获取第一个waiter,如果有waiter,调用doSignal方法:

    /**
         * Moves the longest-waiting thread, if one exists, from the
         * wait queue for this condition to the wait queue for the
         * owning lock.
         *
         * @throws IllegalMonitorStateException if {@link #isHeldExclusively}
         *         returns {@code false}
         */
        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

② 1. 重新设置firstWaiter,指向第一个waiter的nextWaiter,也就是把首节点下个节点当作首节点

  1. 如果第一个waiter的nextWaiter为null,说明当前队列中只有一个waiter,lastWaiter置空
  2. 因为firstWaiter = first.nextWaiter,已经赋值给firstWaiter了,所以把first.nextWaiter置NULL,方便gc回收
        /**
         * Removes and transfers nodes until hit non-cancelled one or
         * null. Split out from signal in part to encourage compilers
         * to inline the case of no waiters.
         * @param first (non-null) the first node on condition queue
         */
        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

③方法本意是将一个节点从Condition队列转换为AbstractQueuedSynchronizer队列,

/**
     * Transfers a node from a condition queue onto sync queue.
     * Returns true if successful.
     * @param node the node
     * @return true if successfully transferred (else the node was
     * cancelled before signal)
     */
    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

总结一下方法的实现:

  1. 尝试将Node的waitStatus从CONDITION置为0,这一步失败直接返回false
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;
  1. 当前节点进入调用enq方法进入AbstractQueuedSynchronizer队列
     Node p = enq(node);

    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }
  1. 当前节点通过CAS机制将waitStatus置为SIGNAL
 int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);
return true;

最后返回true。再次回到这里的代码

       private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        

while循环里的!transferForSignal(first) 变为false了,直接退出循环,所以signal()是唤醒其中一个等待唤醒的线程

从唤醒的代码我们可以得出一个重要结论:某个await()的节点被唤醒之后并不意味着它后面的代码会立即执行,它会被加入到AbstractQueuedSynchronizer队列的尾部,只有前面等待的节点获取锁全部完毕才能轮到它。
不过正常情况 ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL) 这个判断是不会为true的,所以,不会在这个时候唤醒该线程。
只有到发送signal信号的线程调用reentrantLock.unlock()后因为它已经被加到AQS的等待队列中,所以才会被唤醒。

signalAll方法的作用就是将Condition队列中所有等待的节点逐一队列中从移除,由CONDITION状态变为SIGNAL状态并加入AbstractQueuedSynchronizer队列的尾部。signal()则是唤醒其中一个等待唤醒的线程

signalAll实现,唤醒等待队列中所有的等待节点

      private void doSignalAll(Node first) {
            lastWaiter = firstWaiter = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }
    原文作者:多喝水JS
    原文地址: https://www.jianshu.com/p/e663ee970676
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞