JUC源码分析(二)-ReentrantLock源码分析

背景介绍

ReentrantLock是JUC中的重要的类,其静态内部类Sync继承AQS,关于AQS的分析可见JUC-AbstractQueuedSynchronizer(AQS)源码分析,Sync重写了tryAcquire()和tryRelease()完成加锁功能,静态内部类NonfairSyncFairSync分别实现非公平锁和公平锁.

ReentrantLock语义

我们知道AQS的语义是由子类重写tryAcquire()和tryRelease()完成的,在Sync中,

  1. state=0,表示锁资源可用
  2. state=1,表示锁资源已被另一个线程获取,当前线程获取锁资源失败
  3. 由于ReentrantLock支持可重入锁,所以state>1时,state数值即为锁被重入的次数
    /** * 获取锁 * 1.若没有另一线程占有锁,则设置锁持有数为1并直接返回 * 2.若当前线程已经占有锁,则将锁持有数加1并直接返回 * 3.若锁被另一线程占有,则当前线程挂起直到可以获得锁 */
    public void lock() {
        //调用sync.lock(),公平锁与非公平锁的实现不同
        sync.lock();
    }
    /** * 尝试释放锁 * 1.若当前线程占有锁,则将锁持有数减1,若锁持有数为0则释放锁 * 2.若当前线程没有占有锁,则抛出IllegalMonitorStateException */
    public void unlock() {
        //公平锁与非公平锁实现一致
        sync.release(1);
    }

其lock()和unlock()均调用其内部类Sync完成

公平锁与非公平锁

Sync

Sync继承AQS,释放锁的逻辑由公平锁和非公平锁共用,如下:

        /** * tryRelease()较为简单,将state设置为state - releases;若state==0,再设置exclusiveOwnerThread即可 */
        @ReservedStackAccess
        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

NonfairSync

        /** * 尝试直接获取锁,若失败则调用acquire(1) */
        final void lock() {
            if (compareAndSetState(0, 1))
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

NonfairSync继承Sync,tryAcquire()简单调用Sync.nonfairTryAcquire()

        /** * Performs non-fair tryLock. tryAcquire is implemented in * subclasses, but both need nonfair try for trylock method. */
        @ReservedStackAccess
        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //锁资源仍可用
                if (compareAndSetState(0, acquires)) {
                    //若使用CAS修改state值成功,即成功获取锁
                    //设置独占线程
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            //即使锁资源不可用,但current即为独占线程,依然可以修改state值(实现"可重入"功能)
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                //由于此段代码实现可重入功能,即进入此段代码运行的线程都已获得锁,此时setState(nextc)无需原子操作
                setState(nextc);
                return true;
            }
            return false;
        }

FairSync

        /** * 由于CAS不能保证公平性,因此直接调用acquire(1) */
        final void lock() {
            acquire(1);
        }

FairSync继承Sync,其tryAcquire()如下

        /** * Fair version of tryAcquire. Don't grant access unless * recursive call or no waiters or is first. */
        @ReservedStackAccess
        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                //仅在此处增加了!hasQueuedPredecessors()的条件以实现公平锁,如何实现见"辨析"
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

辨析

上文介绍了公平锁和非公平锁的代码,现在请大家思考一个问题:

  1. AQS的release()会唤醒AQS队列的第一个线程还是唤醒AQS队列的所有线程?

查看源码发现,答案是只会唤醒AQS队列的第一个线程.

既然只会唤醒AQS队列的第一个线程,那么

  1. 是否说明AQS队列中只会有一个被唤醒的线程去尝试获取锁?
  2. 是否说明被唤醒的线程一定可以无竞争的获取锁?
  3. 是否说明获取锁的顺序一定是按照的被挂起在AQS队列中的顺序,即获取锁的顺序一定是按照请求获取锁的先后顺序,即获取锁的顺序天生就是公平的?

问题有点多,先思考一下.

  1. 我们看到第三个问题:获取锁的顺序天生就是公平的吗?–>利用反证法可知,答案是不是,否则在ReentrantLock中就不需要有FairSync了.

  2. 再看第一个问题:是否说明AQS队列中只会有一个被唤醒的线程去尝试获取锁?–>答案是是的,因为确实AQS队列中只有第一个线程被唤醒了

  3. 再看一下第二个问题:是否说明被唤醒的线程一定可以无竞争的获取锁?–>答案是不是,即使AQS队列中只会有一个被唤醒的线程去尝试获取锁,但有可能有一个未被挂起在AQS队列上的线程与刚被唤醒的线程争夺锁.

设想这样一种场景

  1. 线程1获取锁成功,而后线程2~5依次尝试获取锁失败,被挂到AQS队列上.
  2. 线程1释放锁,则此时挂在AQS队列第一位的线程2则被唤醒,并且尝试获取锁资源.但与此同时线程6也尝试获取锁,便会出现线程2与线程6竞争锁资源.

如果是非公平锁,线程6可能会获取锁资源,此时线程6后申请锁但却先获取锁,不公平.
如果是公平锁,我们看下hasQueuedPredecessors()的代码

    public final boolean hasQueuedPredecessors() {
        Node t = tail;
        Node h = head;
        Node s;
        return h != t &&
            ((s = h.next) == null || s.thread != Thread.currentThread());
    }
  • h==t,此时AQS队列尚未初始化,hasQueuedPredecessors()返回false,任意线程都可尝试获取锁
  • h!=t,此时AQS已初始化完毕
    • 对线程6而言,此时便属于此种情况,并且h.next!=null&&s.thread != Thread.currentThread(),此时hasQueuedPredecessors()返回true,线程6不能尝试获取锁,将被挂起
    • 对线程2而言,h.next!=null&&s.thread == Thread.currentThread(),此时hasQueuedPredecessors()返回false,线程2可以尝试获取锁

因此线程2在线程6前获取锁,保证了公平性.

可中断与不可中断

Java 线程中的中断

在计算机领域,中断在不同场景有着不同的含义.在此讨论的中断仅为java.lang.Thread中与中断相关的interrupt(),interrupted(),isInterrupted()等方法.
在我看来,Thread的是否中断就是一个标志位,只不过是一个内置的统一的标志位机制.
思考这样一种场景,有一个线程A执行循环,循环终止的条件由B线程控制,那么线程的代码可能如下:

while(!stop){
    ...
}

当B线程需要通知A线程停止循环时,只需要设置A.stop=true即可.这种实现方式有一个致命的缺点–>破坏了封装性,如果B线程要通知A线程,就一定要让A.stop对可见,当然可以修改为通过一个公有方法通知A线程,但是线程间通知中断信息是Thread中非常普遍的场景,不如让java.lang.Thread就实现这个功能.另外,若A线程执行一个native方法呢?此时如何让这个native方法停止执行呢?总不能还要为native方法设置控制方法是否执行的公共方法供其他线程调用吧.因此这就是Java 线程中断机制的来由.

其实中断机制也可以当做是一种IPC方式,只是传递的信息已经确定(即停止线程执行),因此收到此信息的线程需要尽快停止执行,那收到中断信息的线程如何停止执行呢?Java中有哪些可以让程序立马停止执行的方式呢?
答案是returnException,但是因为return无法区分线程是正常执行结束退出还是因为接收中断信息而停止执行的,所以在接收中断信息后使用Exception(这也是Java引入异常机制的原因,学习的知识又交叉了),也就是InterruptedException,是我们非常熟悉的异常.

当B线程调用A.interrupt()通知A线程中断信息后,A线程有两个选择:

  1. 在线程执行的代码中有判断当前线程是否中断的代码,若A线程检测到中断,则throw new InterruptException,此时我们称之为可中断.
  2. 在线程执行的代码中没有判断当前线程是否中断的代码,这样即使B线程通知A线程中断信息,A线程也会继续执行,此时我们称之为不可中断.

lockInterruptibly()

其实上文说的可中断不可中断往往是针对方法的.刚好ReentrantLock中有一个可中断的方法lockInterruptibly(),我们一起看下:

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

方法签名与我们上文介绍的一致,该方法可能throws InterruptedException

    public final void acquireInterruptibly(int arg)
            throws InterruptedException {
        //首先判断是否被中断,若是,抛出异常
        if (Thread.interrupted())
            throw new InterruptedException();
        if (!tryAcquire(arg))
            doAcquireInterruptibly(arg);
    }
    /** * Acquires in exclusive interruptible mode. * @param arg the acquire argument */
    private void doAcquireInterruptibly(int arg)
        throws InterruptedException {
        final Node node = addWaiter(Node.EXCLUSIVE);
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    //抛出异常
                    throw new InterruptedException();
            }
        } catch (Throwable t) {
            cancelAcquire(node);
            throw t;
        }
    }

可以将上述代码与JUC-AbstractQueuedSynchronizer(AQS)源码分析中的acquire()acquireQueued()做个对比

  1. acquireInterruptibly()相比acquire()多了判断是否中断的逻辑
  2. doAcquireInterruptibly()就相当于addWaiter()+acquireQueued(),只不过在检测到被中断时抛出异常而不是返回是否被中断的信息
  3. acquire()相比acquireInterruptibly()多了selfInterrupt(),是因为在acquireQueued()中检测是否被中断后将中断标记清除了,因此若已被中断需要执行selfInterrupt()设置中断标记;这也是acquireQueued()返回一个布尔值表示当前线程是否被中断的原因

另:虽然根据上述分析acquireQueued()中的parkAndCheckInterrupt()可以修改为park()而不再检测是否被中断,因为即使检测到被中断之后,在acquire()中还需要再次设置中断标记.但是acquireQueued()不仅仅被acquire()调用,在JUC–ReentrantLock及Condition源码分析中的waitThread:condition.await()被阻塞的后半部分我们可以看到acquireQueued()还会被condition.await()调用,此时可以根据acquireQueued()的返回值实现更丰富的中断处理策略.

定时模式

ReentrantLock提供了一种可中断的可设置等待时间的获取锁的方法,即tryLock(long timeout, TimeUnit unit),如何实现可中断的获取锁的方法已在上文分析,接下来分析如何实现可设置等待时间的获取锁的方法.

    /** * 可中断的可设置等待时间的获取锁的方法 */
    public boolean tryLock(long timeout, TimeUnit unit)
            throws InterruptedException {
        //简单调用AQS的相关方法(ReentrantLock的Sync类未重写此方法)
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
    /** * 尝试获取独占锁,可中断也可定时 */
    public final boolean tryAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        //检查中断标记,实现可中断的方法
        if (Thread.interrupted())
            throw new InterruptedException();
        //1.执行tryAcquire()尝试获取锁
        //2.调用doAcquireNanos(arg, nanosTimeout)完成定时功能
        return tryAcquire(arg) ||
            doAcquireNanos(arg, nanosTimeout);
    }

可以看出,实现可定时是通过调用doAcquireNanos(arg, nanosTimeout)完成的

    /** * 以独占定时模式获取锁 */
    private boolean doAcquireNanos(int arg, long nanosTimeout)
            throws InterruptedException {
        if (nanosTimeout <= 0L)
            return false;
        //根据nanosTimeout计算deadline
        final long deadline = System.nanoTime() + nanosTimeout;
        final Node node = addWaiter(Node.EXCLUSIVE);
        boolean failed = true;
        try {
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return true;
                }
                nanosTimeout = deadline - System.nanoTime();
                //1.若执行一次tryAcquire()后就已超时,返回false
                if (nanosTimeout <= 0L)
                    return false;
                if (shouldParkAfterFailedAcquire(p, node) &&
                    nanosTimeout > spinForTimeoutThreshold)
                    //2.调用定时的挂起方法
                    LockSupport.parkNanos(this, nanosTimeout);
                if (Thread.interrupted())
                    throw new InterruptedException();
            }
        } finally {
            //若未成功获取锁,则将生成的Node从AQS队列中取下
            if (failed)
                cancelAcquire(node);
        }
    }

由上可知,通过

  1. 调用定时的挂起方法(LockSupport.parkNanos(this, nanosTimeout);)
  2. 若剩余时间不足则返回fasle

来实现定时模式.

总结

  • ReentrantLock通过将加锁操作委托给内部类Sync完成加锁语义
  • 公平性:由于每次只能唤醒AQS队列中的第一个线程,所以锁的竞争只会存在于未在AQS队列的线程和AQS队列的第一个线程之间,公平锁相比非公平锁增加了hasQueuedPredecessors()的判断,确保只有AQS队列的第一个线程才可以尝试获取锁资源,保证了公平性
  • 可中断:AQS的中断方法仅仅在方法内部增加了判断当前线程是否中断的逻辑,若当前线程已被中断,抛出InterruptedException
  • 定时模式:通过调用定时的挂起方法和在代码中检测剩余时间实现定时模式

参考

    原文作者:JUC
    原文地址: https://blog.csdn.net/jpf254/article/details/79450331
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞