JUC锁-互斥锁ReentrantLock(二)

ReentrantLock介绍

ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。

顾名思义,ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都可能获取锁。

ReentrantLock的UML类图

《JUC锁-互斥锁ReentrantLock(二)》

(01) ReentrantLock实现了Lock接口。

(02) ReentrantLock与sync是组合关系。ReentrantLock中,包含了Sync对象;而且,Sync是AQS的子类;更重要的是,Sync有两个子类FairSync(公平锁)和NonFairSync(非公平锁)。ReentrantLock是一个独占锁,至于它到底是公平锁还是非公平锁,就取决于sync对象是”FairSync的实例”还是”NonFairSync的实例”。

ReentantLock的构造方法

final ReentrantLock lock = new ReentrantLock();
final ReentrantLock lock = new ReentrantLock(true)
/** 同步器:内部类Sync的一个引用 */
    private final Sync sync;

    /** * 创建一个非公平锁 */
    public ReentrantLock() {
        sync = new NonfairSync();
    }

    /** * 创建一个锁 * @param fair true-->公平锁 false-->非公平锁 */
    public ReentrantLock(boolean fair) {
        sync = (fair)? new FairSync() : new NonfairSync();
    }

三个内部类Sync/NonfairSync/FairSync,这里只列出类的定义

/** * 该锁同步控制的一个基类.下边有两个子类:非公平机制和公平机制.使用了AbstractQueuedSynchronizer类的 */
    static abstract class Sync extends AbstractQueuedSynchronizer

    /** * 非公平锁同步器 */
    final static class NonfairSync extends Sync

    /** * 公平锁同步器 */
    final static class FairSync extends Sync

ReentantLock的FairSync的lock方法:

ReetantLock调用的lock方法调用了他的组件Sync的lock方法,先看一下FairSync的lock方法:

1.lock()
final void lock() {
    acquire(1);
}
2.acquire()

acquire是在AQS实现的

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。

(02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到”CLH队列(非阻塞的FIFO队列)”末尾。CLH队列就是线程等待队列。

(03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于此时ReentrantLock是公平锁,它会根据公平性原则来获取锁。

(04) “当前线程”在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquireQueued会返回true,此时”当前线程”会调用selfInterrupt()来自己给自己产生一个中断。至于为什么要自己给自己产生一个中断,后面再介绍。

我们分成四个部分去解释这个acquire过程:

  • 一. tryAcquire()
  • 二. addWaiter()
  • 三. acquireQueued()
  • 四. selfInterrupt()

tryAcquire()

protected final boolean tryAcquire(int acquires) {
    // 获取“当前线程”
    final Thread current = Thread.currentThread();
    // 获取“独占锁”的状态
    int c = getState();
    // c=0意味着“锁没有被任何线程锁拥有”,
    if (c == 0) {
        // 若“锁没有被任何线程锁拥有”,
        // 则判断“当前线程”是不是CLH队列中的第一个线程线程,
        // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }

    else if (current == getExclusiveOwnerThread()) {
        // 如果“独占锁”的拥有者已经为“当前线程”,
        // 则将更新锁的状态。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

注意,有两个比较简单的函数:hasQueuedPredecessors() 是通过判断”当前线程”是不是在CLH队列的队首,setExclusiveOwnerThread()的作用就是,设置线程t为当前拥有“独占锁”的线程。

tryAcquire()的作用就是尝试去获取锁。当 当前线程是队首或者是正在执行的线程 则尝试成功,返回true;否则尝试失败,返回false,后续再通过其它办法来获取该锁。后面我们会说明,在尝试失败的情况下,是如何一步步获取锁的。

addWaiter()

private Node addWaiter(Node mode) {
    // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
    enq(node);
    return node;
}

enq()

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;
        if (t == null) { // Must initialize
            if (compareAndSetHead(new Node()))
                tail = head;
        } else {
            node.prev = t;
            if (compareAndSetTail(t, node)) {
                t.next = node;
                return t;
            }
        }
    }
}

enq()的作用很简单。如果CLH队列为空,则新建一个CLH表头;然后将node添加到CLH末尾。否则,直接将node添加到CLH末尾。

addWaiter()的作用,就是将当前线程添加到CLH队列中。这就意味着将当前线程添加到等待获取“锁”的等待线程队列中了。

acquireQueued()


//acquireQueued()的目的是从队列中获取锁。
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // interrupted表示在CLH队列的调度中,
        // “当前线程”在休眠时,有没有被中断过。
        boolean interrupted = false;
        for (;;) {
            // 获取上一个节点。
            // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

先把shouldParkAfterFailedAcquire(p, node)和parkAndCheckInterrupt()两个函数弄懂

// 返回“当前线程是否应该阻塞”
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    // 前继节点的状态
    int ws = pred.waitStatus;
    // 如果前继节点是SIGNAL状态,则意味这当前线程需要被unpark唤醒。此时,返回true。
    if (ws == Node.SIGNAL)
        return true;
    // 如果前继节点是“取消”状态,则设置 “当前节点”的 “当前前继节点” 为 “‘原前继节点’的前继节点”。
    if (ws > 0) {
        do {
            node.prev = pred = pred.prev;
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        // 如果前继节点为“0”或者“共享锁”状态,则设置前继节点为SIGNAL状态。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
    }
    return false;
}

(01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。

CANCELLED[1]  -- 当前线程已被取消
SIGNAL[-1]    -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。
CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒
PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁”
[0]           -- 当前线程不属于上面的任何一种状态。

(02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。

规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。
规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。
规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false

如果“规则1”发生,即“前继节点是SIGNAL”状态,则意味着“当前线程”需要被阻塞。接下来会调用parkAndCheckInterrupt()阻塞当前线程,直到当前先被唤醒才从parkAndCheckInterrupt()中返回。

parkAndCheckInterrupt方法

private final boolean parkAndCheckInterrupt() {
    // 通过LockSupport的park()阻塞“当前线程”。
    LockSupport.park(this);
    // 返回线程的中断状态。
    return Thread.interrupted();
}

parkAndCheckInterrupt()的作用是阻塞当前线程,并且返回“线程被唤醒之后”的中断状态。
它会先通过LockSupport.park()阻塞“当前线程”,然后通过Thread.interrupted()返回线程的中断状态。

了解了shouldParkAfterFailedAcquire()和parkAndCheckInterrupt()函数之后。我们接着分析acquireQueued()的for循环部分。

for (;;) {
            // 获取上一个节点。
            // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }

说明:

(01) 通过node.predecessor()获取前继节点。predecessor()就是返回node的前继节点,若对此有疑惑可以查看下面关于Node类的介绍。

(02) p == head && tryAcquire(arg)

  • 首先,判断“前继节点”是不是CHL表头。如果是的话,则通过tryAcquire()尝试获取锁。

  • 如果不是,那么我们需要shouldParkAfterFailedAcquire()我们判断“当前线程”是否需要阻塞;为什么呢?

    (waitstate的状态SIGNAL[-1] – “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。)

  • 接着,“当前线程”阻塞的话,会调用parkAndCheckInterrupt()来阻塞线程。

  • 当线程被解除阻塞的时候,我们会返回线程的中断状态。而线程被解决阻塞,可能是由于“线程被中断”,也可能是由于“其它线程调用了该线程的unpark()函数”。

(03) 再回到p==head这里。如果当前线程是因为其它线程调用了unpark()函数而被唤醒,那么唤醒它的线程,应该是它的前继节点所对应的线程(关于这一点,后面在“释放锁”的过程中会看到)。 OK,是前继节点调用unpark()唤醒了当前线程!

小结:acquireQueued()的作用就是“当前线程”会根据公平性原则进行阻塞等待,直到获取锁为止;并且返回当前线程在等待过程中有没有并中断过。

释放公平锁Unlock

1.unlock()

unlock()在ReentrantLock.java中实现的,而公平锁和非公平锁没有重写unlock方法,这说明释放锁的方法一样

public void unlock() {
    sync.release(1);
}

2. release()

public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

3.tryRelease()

复制代码

protected final boolean tryRelease(int releases) {
    // c是本次释放锁之后的状态
    int c = getState() - releases;
    // 如果“当前线程”不是“锁的持有者”,则抛出异常!
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();

    boolean free = false;
    // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 设置当前线程的锁的状态。
    setState(c);
    return free;
}

tryRelease()的作用是尝试释放锁。

(01) 如果“当前线程”不是“锁的持有者”,则抛出异常。

(02) 如果“当前线程”在本次释放锁操作之后,对锁的拥有状态是0(即,当前线程彻底释放该“锁”),则设置“锁”的持有者为null,即锁是可获取状态。同时,更新当前线程的锁的状态为0。

4.unparkSuccessor()

在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。

private void unparkSuccessor(Node node) {
    // 获取当前线程的状态
    int ws = node.waitStatus;
    // 如果状态<0,则设置状态=0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。
    // 这里的有效,是指“后继节点对应的线程状态<=0”
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒“后继节点对应的线程”
    if (s != null)
        LockSupport.unpark(s.thread);
}

unparkSuccessor()的作用是“唤醒当前线程的后继线程”。后继线程被唤醒之后,就可以获取该锁并恢复运行了。

ReentantLock的NonFairSync的lock方法:

非公平锁和公平锁在获取锁的方法上,流程是一样的;它们的区别主要表现在“尝试获取锁的机制不同”。简单点说,“公平锁”在每次尝试获取锁时,都是采用公平策略(根据等待队列依次排序等待);而“非公平锁”在每次尝试获取锁时,都是采用的非公平策略(无视等待队列,直接尝试获取锁,如果锁是空闲的,即可获取状态,则获取锁)。

我们已经详细介绍了获取公平锁的流程和机制;下面,通过代码分析以下获取非公平锁的流程。当染,我们主要强调他们不一样的地方,

lock方法

final void lock() {
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}

lock()会先通过compareAndSet(0, 1)来判断“锁”是不是空闲状态。是的话,“当前线程”直接获取“锁”;否则的话,调用acquire(1)获取锁。

“公平锁”和“非公平锁”关于lock()的对比

公平锁   -- 公平锁的lock()函数,会直接调用acquire(1)。
非公平锁 -- 非公平锁会先判断当前锁的状态是不是空闲,是的话,就不排队,而是直接获取锁。
也就是完成了我们说的插队

acquire()

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

“公平锁”和“非公平锁”关于acquire()的对比

公平锁和非公平锁,只有tryAcquire()函数的实现不同;即它们尝试获取锁的机制不同。
这就是我们所说的“它们获取锁策略的不同所在之处”!

我们已经详细介绍了acquire()涉及到的各个函数。
这里仅对它们有差异的函数tryAcquire()进行说明。

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    // 获取“当前线程”
    final Thread current = Thread.currentThread();
    // 获取“锁”的状态
    int c = getState();
    // c=0意味着“锁没有被任何线程锁拥有”
    if (c == 0) {
        // 若“锁没有被任何线程锁拥有”,则通过CAS函数设置“锁”的状态为acquires。
        // 同时,设置“当前线程”为锁的持有者。
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果“锁”的持有者已经是“当前线程”,
        // 则将更新锁的状态。
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

“公平锁”和“非公平锁”关于tryAcquire()的对比

公平锁和非公平锁,它们尝试获取锁的方式不同。
公平锁在尝试获取锁时,即使“锁”没有被任何线程锁持有,它也会判断自己是不是CLH等待队列的表头;是的话,才获取锁。
而非公平锁在尝试获取锁时,如果“锁”没有被任何线程持有,则不管它在CLH队列的何处,它都直接获取锁。

也就是“第二次插队”
    原文作者:JUC
    原文地址: https://blog.csdn.net/qq_33394088/article/details/79051249
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞