JUC源码阅读之AbstractQueueSynchronizer(二)

1. node节点

node用于组成一个等待队列,每个node在其前驱节点被release后被通知。队列中的第一个线程会尝试去获取锁,但是第一不一定代表能获取成功。

ConditonObject也使用了同样node,但是他们单独使用了一条链接。在await操作后,一个节点被加入到condition队列。在signal操作之后,这个节点会被移动到正常的AQS队列。

 static final class Node {
        /** Marker to indicate a node is waiting in shared mode ,共享模式*/
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode独占模式 */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled waitStatus表示线程已经被取消*/
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking 表示后继节点需要unpark */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition 表示节点在codition队列中*/
        static final int CONDITION = -2;
        /**
         * waitStatus value to indicate the next acquireShared should
         * unconditionally propagate表示下个acquireShared应该无条件的传播。(??)
         */
        static final int PROPAGATE = -3;

        /**
         * Status field, taking on only the values:
         *   SIGNAL:     The successor of this node is (or will soon be)
         *               blocked (via park), so the current node must
         *               unpark its successor when it releases or
         *               cancels. To avoid races, acquire methods must
         *               first indicate they need a signal,
         *               then retry the atomic acquire, and then,
         *               on failure, block.
         *   CANCELLED:  This node is cancelled due to timeout or interrupt.
         *               Nodes never leave this state. In particular,
         *               a thread with cancelled node never again blocks.
         *   CONDITION:  This node is currently on a condition queue.
         *               It will not be used as a sync queue node
         *               until transferred, at which time the status
         *               will be set to 0. (Use of this value here has
         *               nothing to do with the other uses of the
         *               field, but simplifies mechanics.)如果节点被移动到AQS队列,status会从CODITION变成0,
         *   PROPAGATE:  A releaseShared should be propagated to other
         *               nodes. This is set (for head node only) in
         *               doReleaseShared to ensure propagation
         *               continues, even if other operations have
         *               since intervened.在共享模式下释放应该被传播到其他节点
         *   0:          None of the above
         *
         * The values are arranged numerically to simplify use.
         * Non-negative values mean that a node doesn't need to
         * signal. So, most code doesn't need to check for particular
         * values, just for sign.
         *
         * The field is initialized to 0 for normal sync nodes, and
         * CONDITION for condition nodes.  It is modified using CAS
         * (or when possible, unconditional volatile writes).
         */
        volatile int waitStatus;//标记节点状态

        /**
         * Link to predecessor node that current node/thread relies on
         * for checking waitStatus. Assigned during enqueuing, and nulled
         * out (for sake of GC) only upon dequeuing.  Also, upon
         * cancellation of a predecessor, we short-circuit while
         * finding a non-cancelled one, which will always exist
         * because the head node is never cancelled: A node becomes
         * head only as a result of successful acquire. A
         * cancelled thread never succeeds in acquiring, and a thread only
         * cancels itself, not any other node.
         */
        volatile Node prev;//前驱节点

        /**
         * Link to the successor node that the current node/thread
         * unparks upon release. Assigned during enqueuing, adjusted
         * when bypassing cancelled predecessors, and nulled out (for
         * sake of GC) when dequeued.  The enq operation does not
         * assign next field of a predecessor until after attachment,
         * so seeing a null next field does not necessarily mean that
         * node is at end of queue. However, if a next field appears
         * to be null, we can scan prev's from the tail to
         * double-check.  The next field of cancelled nodes is set to
         * point to the node itself instead of null, to make life
         * easier for isOnSyncQueue.
         */
        volatile Node next;

        /**
         * The thread that enqueued this node.  Initialized on
         * construction and nulled out after use.
         */
        volatile Thread thread;

        /**
         * Link to next node waiting on condition, or the special
         * value SHARED.  Because condition queues are accessed only
         * when holding in exclusive mode, we just need a simple
         * linked queue to hold nodes while they are waiting on
         * conditions. They are then transferred to the queue to
         * re-acquire. And because conditions can only be exclusive,
         * we save a field by using special value to indicate shared
         * mode.
         */
        Node nextWaiter;//在condition队列中指向下一个node,因codition只在独占模式用,它可以指向shared表示共享模式

        /**
         * Returns true if node is waiting in shared mode.用nextWaiter是否指向SHARED来表明现在工作的模式
         */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /**
         * Returns previous node, or throws NullPointerException if null.
         * Use when predecessor cannot be null.  The null check could
         * be elided, but is present to help the VM.
         *返回前置节点或抛出异常
         * @return the predecessor of this node
         */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() {    // Used to establish initial head or SHARED marker
        }
//初始化node,如果是独占模式,初始化时nextWaiter会指向null,共享模式就指向SHARED
        Node(Thread thread, Node mode) {     // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }
//Codition队列使用
        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

在AQS内部,定义了Node head 和Node tail。

2.state

state 类型为 volatile int,用来表示当前的同步状态。除了正常的get/set方法,还可以原子性的设置state,方法如下

    protected final boolean compareAndSetState(int expect, int update) {
        // 利用unsafe类CAS的设置state值,如果当前state等于expect则设置为update。
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

3.队列工具

	//入队操作,如果当前tail为null则证明队列还没有初始化,则进行初始化
	//初始化用一个new Node()充当head,且tail和head相同
	//再把入队的node的前置节点设置为tail,再把入队的node设置为新tail
	//再把旧tail的next指向新入队的node。完成入队。返回队列中node的前驱节点
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

addWaiter(Node mode)

	 //根据工作(独占/gongxiang)模式,把当前线程入队
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }
		//设置头结点,并且把头结点的thread置null便于以后GC,
		//头结点表示已经获取锁了,也就不需要记录thread了
		private void setHead(Node node) {
			head = node;
			node.thread = null;
			node.prev = null;
		}
	 //唤醒一个节点的后继节点
    private void unparkSuccessor(Node node) {
        /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
		 *如果status为负的(SIGNAL、CONDITION、PROPAGATE)在得到期望的信号时尝试清除这个status
		 *如果修改失败或者status被等待线程改变,也没有影响
         */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);//如果node的waitStatus为负的,则清0

        /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
		 *需要被唤醒的Thread存放在这个节点的后继节点,
		 *如果这个节点被取消或者为null,则从tail向前遍历来找到一个没有被取消的后继节点。
         */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

共享模式的释放doReleaseShared(),通知后继节点,并且确保可以传播下去。

private void doReleaseShared() {
        /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
		 *确保释放的传播,即使有其他正在进行的acquire和释放
		 *这个操作通常在在头结点的未释放后继节点上进行,如果这个节点需要一个通知的话。
		 *但是如果他不需要,会把状态设置为PROPAGTE来确保释放的状态会继续传播。
		 *此外,我们必须循环执行来防止在执行这个操作时,添加了一个新的节点。
		 *并且,和其他使用unparkSuccessor不同,我们需要知道如果CAS复位状态失败
		 *那么做重复检查。
         */
        for (;;) {
            Node h = head;
            if (h != null && h != tail) {
                int ws = h.waitStatus;
                if (ws == Node.SIGNAL) {//如果头结点的waitStatus为SIGNAL则清空,
                    if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
                        continue;            // loop to recheck cases若清空失败则继续循环尝试清空
                    unparkSuccessor(h);//唤醒头结点的后继节点
                }
                else if (ws == 0 &&
                         !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
                    continue;                // loop on failed CAS
            }//如果waitStatus已经被清空,且尝试把头结点的状态设置为PROPAGATE失败则继续循环
            if (h == head)                   // loop if head changed,如果头结点在过程中变化了,继续循环
                break;
        }
    }

setHeadAndPropagate(Node node, int propagate),设置新节点为头节点,在共享模式中使用。从调用它的函数看,应该是当一个线程以共享模式入队之后,如果发现自己的前驱节点是head,并且自己获取锁成功,则把自己设置为head,然后继续传播release。

    private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below记录原来的头节点并且在下面检查
        setHead(node);//新节点设置为头节点
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *在如下条件的情况下通知队列中的下一个节点。
         *调用者明确的指明需要传播(入参progagate大于0),或者之前有相关记录
         *记录的方式包括setHead之前或者之后的设置h.waitStatus(原来head的ws)
         *并且下一个节点在shared模式中等待,或者我们不知道因为他表现为空
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
		 *这些检查中的保守性可能导致不必要的唤醒,但是这只发生在多个竞争获取或者释放
		 *所以大部分现在或者很快就需要signals
         */
		 //如果入参大于0,或者原来的头节点为null或者原来原来头节点的状态小于0
		 //或者现在新的head为null,或者新的头节点状态状态小于0
		 //获取入参节点的下一个节点,如果下一个节点为null或者处于共享模式,则doReleaseShared()
        if (propagate > 0 || h == null || h.waitStatus < 0 ||
            (h = head) == null || h.waitStatus < 0) {
            Node s = node.next;
            if (s == null || s.isShared())
                doReleaseShared();
        }
    }

4.各种版本的获取锁的工具(Utilities for various versions of acquire)

cancelAcquire(Node node)取消正在进行的获取锁的操作,一般是获取锁失败抛出异常后使用。基本流程就是取消当前这个node,找到这个node前置节点中,离他最近的不是取消状态的节点。然后把这个前置节点(如果不是head)的后继节点,设置为这个node的后继节点(也就是将node出队)。如果离他最近的非取消状态的及节点是head,则唤醒node的后继节点。

    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;//node不再绑定线程

        // Skip cancelled predecessors把node的前置节点置为他前面的第一个没有被取消的节点
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
		//predNext是明显的没有连接的节点,下面的CAS会失败
		//如果没有,我们在和另一个取消操作或者signal操作的竞争中失败,其他操作也就是没有必要的了
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
		//这里可以用无条件的写代替CAS。在这个原子操作的步骤后。别的节点可以跳过我们(因为状态是取消)
		//在这个操作之前,我们是不被其他线程干扰的
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.node出队
		//如果node是队尾,把队尾设为刚才找到的node的第一个前置非取消状态的点
		//并且把他的next设置为null
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
			//如果后继节点需要signal,试图设置他前置的下一个连接
			//这样他将会得到一个sigal。如果他不需要signal就唤醒他来传播
			//这里的操作如果pred(node的第一个有效的前置节点)不是head,
			//且他的状态等于SIGNAL(如不等于,只要他没被取消就设置为SIGNAL),且他的线程不为null
			//把node的前置节点的next设置为node的next节点(将node出队)
			//如果node的前置节点为head,则调用unparkSuccessor(node);
            int ws;
            if (pred != head &&
                ((ws = pred.waitStatus) == Node.SIGNAL ||
                 (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&
                pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

shouldParkAfterFailedAcquire(Node pred, Node node)当获取锁失败时,判断线程是否需要park。如果node的前驱节点的waitStatus(以后简称ws)为SIGNAL,则返回ture,

如果前驱节点的状态为取消,则将node找到他前驱节点中,离他最近且不是取消状态的节点。node成为这个节点的后继节点,然后返回false。如果ws为0或者PROPAGATE,则返回false并且把前驱节点的ws设置为SIGNAL。

在acquireQueued()中调用,因为是在一个死循环中调用的,所以第二次进入这个函数基本肯定会返回ture,从而把线程park住

    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        int ws = pred.waitStatus;
        if (ws == Node.SIGNAL)
            /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
	     *这个节点已经被设置为要求release信号通知他,所以他可以被安全的park
             */
            return true;
        if (ws > 0) {
            /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.找到第一个非取消的前置节点
             */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
	     *状态是0或者PROPAGATE。表明我们需要一个signal但是不能park。
	     *调用者需要再次尝试来确保它不会在park之前acquire成功
             */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

中断当前线程的便捷方法。

    static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

park当前线程,如果被unpark或者中断就会继续进行,并且返回当前线程的中断状态(返回前清除中断状态)

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);
        return Thread.interrupted();
    }

5.各种获取锁的方式

acquireQueued(final Node node, int arg),在独占模式中不断的循环尝试获取锁,可能中间会被park,不处理中断(中断后清空状态继续执行)。

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;//线程中断标志
            for (;;) {
                final Node p = node.predecessor();
				//如果节点的前置节点为head,则试图获取锁
                if (p == head && tryAcquire(arg)) {
                    setHead(node);//当前节点设置为head
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&//先通过前置节点查看他是不是能park,如果能就park住
                    parkAndCheckInterrupt())//因为这是在一个for(;;)死循环中,所以等待node被唤醒,变成了head才跳出。
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireInterruptibly(int arg) 在独占模式中,对中断进行响应,然后获取锁。基本和acquireQueued一样,只是如果线程被中断,就抛出异常。而抛出异常会导致cancelAcquire()(也就是把node出队)。

    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);//入队一个独占模式的节点
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireNanos(int arg, long nanosTimeout) 在独占模式中,尝试指定时间内获得锁,基本和
doAcquireInterruptibly一样。

private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node   = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)//如果前驱节点为signal
                    LockSupport.parkNanos(this, nanosTimeout);//利用LockSupport暂停线程
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

doAcquireShared(int arg)在共享模式中尝试获取锁,不处理中断。和acquireQueued()基本一样。

    private void doAcquireShared(int arg) {
        final Node node = addWaiter(Node.SHARED);
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head) {
                    int r = tryAcquireShared(arg);
                    if (r >= 0) {
                        setHeadAndPropagate(node, r);//把自己设置为head,并且向后传播(因为向后传播起到共享作用)
                        p.next = null; // help GC
                        if (interrupted)
                            selfInterrupt();
                        failed = false;
                        return;
                    }
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

private void doAcquireSharedInterruptibly(int arg)

private boolean doAcquireSharedNanos(int arg, long nanosTimeout)这两个函数和他们的独占模式版本基本一样,就不单独介绍了。

6.主要的输出的方法
首先是几个模板方法,供子类实现来提供功能,这几个方法一般通过操作state来实现。

protected boolean tryAcquire(int arg) {
	throw new UnsupportedOperationException();
}

protected boolean tryRelease(int arg) {
	throw new UnsupportedOperationException();
}

 
protected int tryAcquireShared(int arg) {
	throw new UnsupportedOperationException();
}

protected boolean tryReleaseShared(int arg) {
	throw new UnsupportedOperationException();
}

protected boolean isHeldExclusively() {
	throw new UnsupportedOperationException();
}

acquire(int arg) 这是一个public方法,用于在独占模式下获取锁,不响应中断。获取锁成功就直接返回,否则就用acquireQueued()来入队试图获取锁。

    public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

acquireInterruptibly(int arg)这是一个public方法,如果线程被中断就抛出异常。其他和acquire一样。

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }

在指定时间内尝试获取锁

    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

独占模式中释放锁,并且unpark后继节点,public方法

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

共享模式中尝试获取锁,不处理中断。public方法

    public final void acquireShared(int arg) {
        if (tryAcquireShared(arg) < 0)
            doAcquireShared(arg);
    }

共享模式中获取锁,中断抛出异常,public方法

    public final void acquireSharedInterruptibly(int arg)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        if (tryAcquireShared(arg) < 0)
            doAcquireSharedInterruptibly(arg);
    }

在指定时间内,在共享模式中获取锁,public方法

    public final boolean tryAcquireSharedNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (Thread.interrupted())
            throw new InterruptedException();
        return tryAcquireShared(arg) >= 0 ||
            doAcquireSharedNanos(arg, nanosTimeout);
    }

共享模式中释放锁

    public final boolean releaseShared(int arg) {
        if (tryReleaseShared(arg)) {
            doReleaseShared();
            return true;
        }
        return false;
    }

7.队列检查工具

因为AQS是工作在多线程环境,下面这些工具大多不保证绝对正确。

查询是否有线程在等待锁

    public final boolean hasQueuedThreads() {
        return head != tail;
    }

查询是否有线程曾经争夺过这个同步锁,head不为null则证明有node入队过,也就是有线程获取锁失败等待过。如果所有任务执行结束了,通过上面代码知道,head是由刚刚获取锁的线程设置的(把自己设置为head),所以任务执行结束不会修改head,这样head就不会为null了。

    public final boolean hasContended() {
        return head != null;
    }

返回队列中第一个节点的线程,也就是等待时间最长的节点。

    public final Thread getFirstQueuedThread() {
        // handle only fast path, else relay
        return (head == tail) ? null : fullGetFirstQueuedThread();
    }

获取等待的第一个线程

    private Thread fullGetFirstQueuedThread() {
        /*
         * The first node is normally head.next. Try to get its
         * thread field, ensuring consistent reads: If thread
         * field is nulled out or s.prev is no longer head, then
         * some other thread(s) concurrently performed setHead in
         * between some of our reads. We try this twice before
         * resorting to traversal.
		 *通常这个节点就是head的后继节点。在保证竞争性读的情况下获取节点的thread域
		 *如果thread为空,或者s的前置节点不再为头节点,证明一些线程在我们读取时进行了setHead。
		 *在遍历之前,我们尝试了两次
         */
        Node h, s;
        Thread st;
        if (((h = head) != null && (s = h.next) != null &&
             s.prev == head && (st = s.thread) != null) ||
            ((h = head) != null && (s = h.next) != null &&
             s.prev == head && (st = s.thread) != null))
            return st;

        /*
         * Head's next field might not have been set yet, or may have
         * been unset after setHead. So we must check to see if tail
         * is actually first node. If not, we continue on, safely
         * traversing from tail back to head to find first,
         * guaranteeing termination.
		 *head的next域也可能没有被设置,或者被在设置头结点之后取消了。我们必须检查
		 *是否tail是实际上的第一节点,如果不是,继续从尾向投遍历来找到第一个节点。
         */

        Node t = tail;
        Thread firstThread = null;
        while (t != null && t != head) {
            Thread tt = t.thread;
            if (tt != null)
                firstThread = tt;
            t = t.prev;
        }
        return firstThread;
    }

判断给出的线程是否已经入队。从tail开始遍历

    public final boolean isQueued(Thread thread) {
        if (thread == null)
            throw new NullPointerException();
        for (Node p = tail; p != null; p = p.prev)
            if (p.thread == thread)
                return true;
        return false;
    }

判断队列中等待的第一节点是不是独占模式

    final boolean apparentlyFirstQueuedIsExclusive() {
        Node h, s;
        return (h = head) != null &&
            (s = h.next)  != null &&
            !s.isShared()         &&
            s.thread != null;
    }

通过当前线程是不是在head的后置节点
,来判断是不是有等待更久的线程。

    public final boolean hasQueuedPredecessors() {
        // The correctness of this depends on head being initialized
        // before tail and on head.next being accurate if the current
        // thread is first in queue.
        Node t = tail; // Read fields in reverse initialization order
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }

8.各种检测和监控机制。

获取队列长度,不包含thread为null的节点(head节点thread为null)

    public final int getQueueLength() {
        int n = 0;
        for (Node p = tail; p != null; p = p.prev) {
            if (p.thread != null)
                ++n;
        }
        return n;
    }

返回正在等待的获取锁的线程,

    public final Collection<Thread> getQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            Thread t = p.thread;
            if (t != null)
                list.add(t);
        }
        return list;
    }

获取所有独占模式的线程

    public final Collection<Thread> getExclusiveQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (!p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }

获取所有共享模式的线程

    public final Collection<Thread> getSharedQueuedThreads() {
        ArrayList<Thread> list = new ArrayList<Thread>();
        for (Node p = tail; p != null; p = p.prev) {
            if (p.isShared()) {
                Thread t = p.thread;
                if (t != null)
                    list.add(t);
            }
        }
        return list;
    }

toString()方法

    public String toString() {
        int s = getState();
        String q  = hasQueuedThreads() ? "non" : "";
        return super.toString() +
            "[State = " + s + ", " + q + "empty queue]";
    }

关于AQS支持锁的方法就列举在这里了,下面一篇会研究,AQS中Conditon的相关代码。

       

    原文作者:JUC
    原文地址: https://blog.csdn.net/u010250451/article/details/79018696
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞