JUC简笔2-AQS源码分析

1、什么是AQS?

AQS全称,AbstractQueuedSynchronizer,意思大概就是“抽象队列同步器”,这个名词有点抽象,我们来先看看源码中的介绍,lalalalala…..,一大段英文。

/**
 * Provides a framework for implementing blocking locks and related
 * synchronizers (semaphores, events, etc) that rely on
 * first-in-first-out (FIFO) wait queues.  This class is designed to
 * be a useful basis for most kinds of synchronizers that rely on a
 * single atomic {@code int} value to represent state. Subclasses
 * must define the protected methods that change this state, and which
 * define what that state means in terms of this object being acquired
 * or released.  Given these, the other methods in this class carry
 * out all queuing and blocking mechanics. Subclasses can maintain
 * other state fields, but only the atomically updated {@code int}
 * value manipulated using methods {@link #getState}, {@link
 * #setState} and {@link #compareAndSetState} is tracked with respect
 * to synchronization.
 * */

翻译过来大概就是:
1、AQS是一个用于实现阻塞锁和相关同步器 (semaphores, events等等)的框架,它是基于FIFO等待队列的;
2、AQS是“以单个原子值(int)表示状态的同步器”为基础的;
3、子类必须定义protected的方法来改变状态,还必须根据这个对象被acquired或released来定义状态的含义,且只能通过getState()、setState()、compareAndSetState()来更新状态值。

2、AQS的组成

这里没也什么好说的,与其说一大堆不如直接上源码看组成

public abstract class AbstractQueuedSynchronizer extends AbstractOwnableSynchronizer implements java.io.Serializable {
    private static final long serialVersionUID = 7373984972572414691L;
    //构造方法
    protected AbstractQueuedSynchronizer() { }
    //定义了一个静态内部类Node
    //AQS内部维护了一个CLH队列,Node是这个队列的节点
    static final class Node {...}
    //队列头指针
    private transient volatile Node head;
    //队列尾指针
    private transient volatile Node tail;
    //状态位
    private volatile int state;

    //获取状态位state
    protected final int getState() {
        return state;
    }

    //设置状态位state
    protected final void setState(int newState) {
        state = newState;
    }

    //通过CAS设置位state
    protected final boolean compareAndSetState(int expect, int update) {
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    //一大堆方法.....
}

3、AQS中的CLH队列

关于CLH队列,有个很有趣的地方,我以为CLH是三个英文单词的缩写,但说明里是这样写的——”CLH” (Craig, Landin, and Hagersten),应该是三个大佬的名字。
源码中的说明翻译过来就是: AQS中的等待队列是CLH阻塞队列的变种,CLH的锁常用于自旋锁。相反,我们使用它们来阻塞同步器,但使用相同的基本策略来hold住前节点(prev)的控制信息。每个节点中的“状态”字段跟踪线程是否应该被阻塞。

对于这里,我的理解是AQS中的CLH队列并没有使用自旋锁,而是用状态值来判断线程是否应该被阻塞。(如果理解错了希望广大网友可以纠正)

下面我们来分析CLH的节点类:

static final class Node {
            /* AQS定义两种资源共享方式:Exclusive(独占,只有一个线程能执行,如ReentrantLock)和 Share(共享,多个线程可同时执行,如Semaphore/CountDownLatch) */
            /** Marker to indicate a node is waiting in shared mode * 表示结点处于分享模式 */
            static final Node SHARED = new Node();
            /** Marker to indicate a node is waiting in exclusive mode * 表示结点处于独占模式(也有翻译成排他模式 ) */
            static final Node EXCLUSIVE = null;


            //下面是几个表示状态的常量:
            /** waitStatus value to indicate thread has cancelled */
            static final int CANCELLED =  1;
            /** waitStatus value to indicate successor's thread needs unparking */
            static final int SIGNAL    = -1;
            /** waitStatus value to indicate thread is waiting on condition */
            static final int CONDITION = -2;
            /** * waitStatus value to indicate the next acquireShared should unconditionally propagate */
            static final int PROPAGATE = -3;

            /** * Status field, taking on only the values: * SIGNAL: 当前线程的后继线程需要被unpark(唤醒)”,对应的Status值 * 当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉, * 因此需要唤醒当前线程的后继线程。 * * CANCELLED: 结点被由于超时或中断被撤销,且再也无法离开这个状态; * * CONDITION: 结点在条件队列中,因为等待某个条件而被阻塞;它将不被用作同步队列节 * 点,直到状态值改变为0; * * * PROPAGATE: 大概意思是这是保证下一次可以直接传播的状态 * 0: 除以上状态外的状态 * * * 结点初始化时,waitStatus位为0, 要使用CAS进行修改。 */
             volatile int waitStatus;
             //前指针 
             volatile Node prev;
             //后指针
             volatile Node next;
             //进入队列的线程
             volatile Thread thread;
             //条件队列中的下一个结点
             Node nextWaiter;

             /** *下面是方法 *返回结点是否是share模式 */
            final boolean isShared() {
                return nextWaiter == SHARED;
            }

            /** * 返回该结点的前结点, 如果前结点为空,则抛出空指针异常 */
            final Node predecessor() throws NullPointerException {
                Node p = prev;
                if (p == null)
                    throw new NullPointerException();
                else
                    return p;
            }

            //空参构造方法
            Node() {   
            }

            Node(Thread thread, Node mode) {     // Used by addWaiter
                this.nextWaiter = mode;
                this.thread = thread;
            }

            Node(Thread thread, int waitStatus) { // Used by Condition
                this.waitStatus = waitStatus;
                this.thread = thread;
            }
    }

}

由源码可知,CLH队列的形状大概是像下面这个样子
《JUC简笔2-AQS源码分析》

4、AQS的主要方法分析

1) acquire() 获取锁

public final void acquire(int arg) {
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
 }

方法流程:
1. tryAcquire():尝试获取锁;
2. addWaiter(Node.EXCLUSIVE):获取锁失败,将该线程加入CLH列尾部,且标记为独占模式;
3. acquireQueued(Node,int):当前线程会根据公平性原则来进行阻塞等待,直到获取锁为止;

2) tryAcquire() 尝试获取锁

protected boolean tryAcquire(int arg) {  
        throw new UnsupportedOperationException();  
    }  

该方法并没有具体实现,需要子类重写,在该类中只是抛出了一个异常。

3) addWaiter()

private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);//从尾部插入CLH队列
        return node;
}

作用:会首先创建一个Node节点,然后再将该节点添加到CLH队列的末尾。
该方法结构较简单,核心为enq(node); 接下来分析enq()方法;

private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
 }

作用:将当前线程添加到CLH队列中
方法流程:
1. 如果tail为空,说明CLH为空,初始化CLH,创建一个空节点为CLH的head,当前结点为CLH的tail
2. 设当前节点的前任节点为之前的tail,再设当前节点为尾节点;
3. 不断循环,直到设置成功;

4) acquireQueued()

final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }

    }

方法用途原文为Acquires in exclusive uninterruptible mode for thread already in queue.,不太好理解;
直接翻译过来就是“为已经在等待队列中且处于不可中断的独占模式下的线程获取锁”;

作用:从CLH队列中获取锁
方法流程:
1. node.predecessor():获取当前结点的前结点;
2. 获取成功后设置当前结点为头结点,获取锁,如果成功获取锁,回收原来的头结点,返回false;否则执行3;
3. shouldParkAfterFailedAcquire() :如果前结点被阻塞(waitState为SINGAL)则返回true,如果前结点被取消(waitState为CANCEL)则跳过前节点,设前前结点的后继为当前结点;
4. parkAndCheckInterrupt():阻塞并判断是否被中断,被中断则返回true;
5. 继续循环,直到获取锁

5) shouldParkAfterFailedAcquire()

// 
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;
    if (ws == Node.SIGNAL)
        return true;

    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

作用: 返回“当前线程是否应该阻塞”
方法流程:
1.获取前继节点的状态;
2. 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true;
3. 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”(跳过状态为CANCEL的结点);

6) release() 、tryRelease()

public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
}

protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

两个方法较简单,不作分析,下面直接分析unparkSuccessor();

7) unparkSuccessor()

private void unparkSuccessor(Node node) {
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
、}

作用: 唤醒后继结点
方法流程:
1. 如果当前节点的状态小于0,则设它的waitStatus为0;
2. 获得当前节点的后节点,如果为空或者被在被撤销状态,则从CLH的尾部开始,找到一个不为空且waitStatus小于0的结点。
3. 唤醒满足条件的节点;

AQS的源码分析就到这里为止,由于本人目前功力尚浅,很多地方只能翻译原文来理解,对AQS的理解停留在代码级别,如有不对和遗漏欢迎各位补充。

    原文作者:JUC
    原文地址: https://blog.csdn.net/weixin_38278878/article/details/80040321
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞