学习笔记 05 --- JUC锁

学习笔记 05 — JUC锁


LockSupport:

LockSupport是用来创建锁和其他同步类的基本线程阻塞原语。
LockSupport通过unsafe函数中的接口来实现阻塞和解除阻塞的,AQS和其他的lock都会使用到这个基础类。
LockSupport中的park() 和 unpark() 的作用分别是阻塞线程和解除阻塞线程(park()—获取许可,unpark()—释放许可)而且park()和unpark()不会遇到“Thread.suspend 和 Thread.resume所可能引发的死锁”问题。因为park() 和 unpark()有许可的存在;调用 park() 的线程和另一个试图将其 unpark() 的线程之间的竞争将保持活性。




注:
park()/unpark()和wait()/notify()的区别是后者让线程阻塞和解除阻塞前,必须通过synchronized获取对象的同步锁。
unpark函数可以先于park调用。比如线程B调用unpark函数,给线程A发了一个“许可”,那么当线程A调用park时,它发现已经有“许可”了,那么它会马上再继续运行。
LockSupport不可重入,如果一个线程连续2次调用 LockSupport .park(),那么该线程一定会一直阻塞下去。

LockSupport 只能阻塞当前线程,但是可以唤醒任意线程。

LockSupport 的park和Object的wait一样可以相应中断。但是线程如果因为调用park而阻塞的话,能够响应中断请求(中断状态被设置成true),但是不会抛出InterruptedException 。

*******************************************************************************************************************************
Thread.interrupt()方法不会中断一个正在运行的线程。这一方法实际上完成的是,在线程受到阻塞时抛出一个中断信号,这样线程就得以退出阻塞的状态。更确切的说,如果线程被Object.wait, Thread.join和Thread.sleep三种方法之一阻塞,那么,它将接收到一个中断异常(InterruptedException),从而提早地终结被阻塞状态。

*******************************************************************************************************************************


LockSupport的源码如下所示:

/*
 * @(#)LockSupport.java	1.12 06/03/30
 *
 * Copyright 2006 Sun Microsystems, Inc. All rights reserved.
 * SUN PROPRIETARY/CONFIDENTIAL. Use is subject to license terms.
 */

package java.util.concurrent.locks;
import java.util.concurrent.*;
import sun.misc.Unsafe;


/**
 * Basic thread blocking primitives for creating locks and other
 * synchronization classes.
 *
 * <p>This class associates, with each thread that uses it, a permit
 * (in the sense of the {@link java.util.concurrent.Semaphore
 * Semaphore} class). A call to {@code park} will return immediately
 * if the permit is available, consuming it in the process; otherwise
 * it <em>may</em> block.  A call to {@code unpark} makes the permit
 * available, if it was not already available. (Unlike with Semaphores
 * though, permits do not accumulate. There is at most one.)
 *
 * <p>Methods {@code park} and {@code unpark} provide efficient
 * means of blocking and unblocking threads that do not encounter the
 * problems that cause the deprecated methods {@code Thread.suspend}
 * and {@code Thread.resume} to be unusable for such purposes: Races
 * between one thread invoking {@code park} and another thread trying
 * to {@code unpark} it will preserve liveness, due to the
 * permit. Additionally, {@code park} will return if the caller's
 * thread was interrupted, and timeout versions are supported. The
 * {@code park} method may also return at any other time, for "no
 * reason", so in general must be invoked within a loop that rechecks
 * conditions upon return. In this sense {@code park} serves as an
 * optimization of a "busy wait" that does not waste as much time
 * spinning, but must be paired with an {@code unpark} to be
 * effective.
 *
 * <p>The three forms of {@code park} each also support a
 * {@code blocker} object parameter. This object is recorded while
 * the thread is blocked to permit monitoring and diagnostic tools to
 * identify the reasons that threads are blocked. (Such tools may
 * access blockers using method {@link #getBlocker}.) The use of these
 * forms rather than the original forms without this parameter is
 * strongly encouraged. The normal argument to supply as a
 * {@code blocker} within a lock implementation is {@code this}.
 *
 * <p>These methods are designed to be used as tools for creating
 * higher-level synchronization utilities, and are not in themselves
 * useful for most concurrency control applications.  The {@code park}
 * method is designed for use only in constructions of the form:
 * <pre>while (!canProceed()) { ... LockSupport.park(this); }</pre>
 * where neither {@code canProceed} nor any other actions prior to the
 * call to {@code park} entail locking or blocking.  Because only one
 * permit is associated with each thread, any intermediary uses of
 * {@code park} could interfere with its intended effects.
 *
 * <p><b>Sample Usage.</b> Here is a sketch of a first-in-first-out
 * non-reentrant lock class:
 * <pre>{@code
 * class FIFOMutex {
 *   private final AtomicBoolean locked = new AtomicBoolean(false);
 *   private final Queue<Thread> waiters
 *     = new ConcurrentLinkedQueue<Thread>();
 *
 *   public void lock() {
 *     boolean wasInterrupted = false;
 *     Thread current = Thread.currentThread();
 *     waiters.add(current);
 *
 *     // Block while not first in queue or cannot acquire lock
 *     while (waiters.peek() != current ||
 *            !locked.compareAndSet(false, true)) {
 *        LockSupport.park(this);
 *        if (Thread.interrupted()) // ignore interrupts while waiting
 *          wasInterrupted = true;
 *     }
 *
 *     waiters.remove();
 *     if (wasInterrupted)          // reassert interrupt status on exit
 *        current.interrupt();
 *   }
 *
 *   public void unlock() {
 *     locked.set(false);
 *     LockSupport.unpark(waiters.peek());
 *   }
 * }}</pre>
 */

public class LockSupport {
    private LockSupport() {} // Cannot be instantiated.

    // Hotspot implementation via intrinsics API
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long parkBlockerOffset;

    static {
        try {
            parkBlockerOffset = unsafe.objectFieldOffset
                (java.lang.Thread.class.getDeclaredField("parkBlocker"));
        } catch (Exception ex) { throw new Error(ex); }
    }

    private static void setBlocker(Thread t, Object arg) {
        // Even though volatile, hotspot doesn't need a write barrier here.
        unsafe.putObject(t, parkBlockerOffset, arg);
    }

    /**
     * Makes available the permit for the given thread, if it
     * was not already available.  If the thread was blocked on
     * {@code park} then it will unblock.  Otherwise, its next call
     * to {@code park} is guaranteed not to block. This operation
     * is not guaranteed to have any effect at all if the given
     * thread has not been started.
     *
     * @param thread the thread to unpark, or {@code null}, in which case
     *        this operation has no effect
     */
    // 如果给定线程的许可尚不可用,则使其可用。
    public static void unpark(Thread thread) {
        if (thread != null)
            unsafe.unpark(thread);
    }

    /**
     * Disables the current thread for thread scheduling purposes unless the
     * permit is available.
     *
     * <p>If the permit is available then it is consumed and the call returns
     * immediately; otherwise
     * the current thread becomes disabled for thread scheduling
     * purposes and lies dormant until one of three things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread upon return.
     *
     * @param blocker the synchronization object responsible for this
     *        thread parking
     * @since 1.6
     */
    // 为了线程调度,在许可可用之前禁用当前线程。
    public static void park(Object blocker) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(false, 0L);
        setBlocker(t, null);
    }

    /**
     * Disables the current thread for thread scheduling purposes, for up to
     * the specified waiting time, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of four
     * things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the current
     * thread; or
     *
     * <li>The specified waiting time elapses; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread, or the elapsed time
     * upon return.
     *
     * @param blocker the synchronization object responsible for this
     *        thread parking
     * @param nanos the maximum number of nanoseconds to wait
     * @since 1.6
     */
    // 为了线程调度,在许可可用前禁用当前线程,并最多等待指定的等待时间。
    public static void parkNanos(Object blocker, long nanos) {
        if (nanos > 0) {
            Thread t = Thread.currentThread();
            setBlocker(t, blocker);
            unsafe.park(false, nanos);
            setBlocker(t, null);
        }
    }

    /**
     * Disables the current thread for thread scheduling purposes, until
     * the specified deadline, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of four
     * things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts} the
     * current thread; or
     *
     * <li>The specified deadline passes; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread, or the current time
     * upon return.
     *
     * @param blocker the synchronization object responsible for this
     *        thread parking
     * @param deadline the absolute time, in milliseconds from the Epoch,
     *        to wait until
     * @since 1.6
     */
    // 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
    public static void parkUntil(Object blocker, long deadline) {
        Thread t = Thread.currentThread();
        setBlocker(t, blocker);
        unsafe.park(true, deadline);
        setBlocker(t, null);
    }

    /**
     * Returns the blocker object supplied to the most recent
     * invocation of a park method that has not yet unblocked, or null
     * if not blocked.  The value returned is just a momentary
     * snapshot -- the thread may have since unblocked or blocked on a
     * different blocker object.
     *
     * @return the blocker
     * @since 1.6
     */
    // 返回提供给最近一次尚未解除阻塞的 park 方法调用的 blocker 对象,如果该调用不受阻塞,则返回 null。
    public static Object getBlocker(Thread t) {
        return unsafe.getObjectVolatile(t, parkBlockerOffset);
    }

    /**
     * Disables the current thread for thread scheduling purposes unless the
     * permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of three
     * things happens:
     *
     * <ul>
     *
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread upon return.
     */
    // 为了线程调度,禁用当前线程,除非许可可用。
    public static void park() {
        unsafe.park(false, 0L);
    }

    /**
     * Disables the current thread for thread scheduling purposes, for up to
     * the specified waiting time, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of four
     * things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The specified waiting time elapses; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread, or the elapsed time
     * upon return.
     *
     * @param nanos the maximum number of nanoseconds to wait
     */
    // 为了线程调度禁用当前线程,最多等待指定的等待时间,除非许可可用。
    public static void parkNanos(long nanos) {
        if (nanos > 0)
            unsafe.park(false, nanos);
    }

    /**
     * Disables the current thread for thread scheduling purposes, until
     * the specified deadline, unless the permit is available.
     *
     * <p>If the permit is available then it is consumed and the call
     * returns immediately; otherwise the current thread becomes disabled
     * for thread scheduling purposes and lies dormant until one of four
     * things happens:
     *
     * <ul>
     * <li>Some other thread invokes {@link #unpark unpark} with the
     * current thread as the target; or
     *
     * <li>Some other thread {@linkplain Thread#interrupt interrupts}
     * the current thread; or
     *
     * <li>The specified deadline passes; or
     *
     * <li>The call spuriously (that is, for no reason) returns.
     * </ul>
     *
     * <p>This method does <em>not</em> report which of these caused the
     * method to return. Callers should re-check the conditions which caused
     * the thread to park in the first place. Callers may also determine,
     * for example, the interrupt status of the thread, or the current time
     * upon return.
     *
     * @param deadline the absolute time, in milliseconds from the Epoch,
     *        to wait until
     */
    // 为了线程调度,在指定的时限前禁用当前线程,除非许可可用。
    public static void parkUntil(long deadline) {
        unsafe.park(true, deadline);
    }
}

ReentrantLock—互斥锁:

ReentrantLock是一个可重入的互斥锁,又被称为“独占锁”。ReentrantLock锁在同一个时间点只能被一个线程锁持有;而可重入的意思是,ReentrantLock锁,可以被单个线程多次获取。
ReentrantLock分为“公平锁”和“非公平锁”。它们的区别体现在获取锁的机制上是否公平。“锁”是为了保护竞争资源,防止多个线程同时操作线程而出错,ReentrantLock在同一个时间点只能被一个线程获取(当某线程获取到“锁”时,其它线程就必须等待);ReentraantLock是通过一个FIFO的等待队列来管理获取该锁所有线程的。在“公平锁”的机制下,线程依次排队获取锁;而“非公平锁”在锁是可获取状态时,不管自己是不是在队列的开头都会获取锁。


ReentrantLock 与 synchronized的区别:
1、与synchronized相比,ReentrantLock提供了更多,更加全面的功能,具备更强的扩展性。例如:时间锁等候,可中断锁等候,锁投票。
2、ReentrantLock还提供了条件Condition,对线程的等待、唤醒操作更加详细和灵活,所以在多个条件变量和高度竞争锁的地方,ReentrantLock更加适合。
3、ReentrantLock提供了可轮询的锁请求。它会尝试着去获取锁,如果成功则继续,否则可以等到下次运行时处理,而synchronized则一旦进入锁请求要么成功要么阻塞,所以相比synchronized而言,ReentrantLock会不容易产生死锁些。
4、ReentrantLock支持更加灵活的同步代码块,但是使用synchronized时,只能在同一个synchronized块结构中获取和释放。注:ReentrantLock的锁释放一定要在finally中处理,否则可能会产生严重的后果。
5、ReentrantLock支持中断处理,且性能较synchronized会好些。

6、ReentrantLock持有的锁是需要手动通过unlock去关闭的


ReentrantLock实现了Lock接口,内部使用static类继承AQS实现独占式的api来实现具体功能,使用AQS的state来表示锁可重入次数。对于公平和非公平的定义是通过对AbstractQueuedSynchronizer的扩展加以实现的,也就是在tryAcquire的实现上做了不同的控制,ReentrantLock中有3个内部类,分别是Sync、FairSync和NonfairSync,类结构如下所示:

  • ReentrantLock–>Lock
  • NonfairSync/FairSync–>Sync–>AbstractQueuedSynchronizer–>AbstractOwnableSynchronizer
  • NonfairSync/FairSync–>Sync是ReentrantLock的三个内部类
  • Node是AbstractQueuedSynchronizer的内部类

内部源码部分如下:

<span style="font-size:18px;">abstract static class Sync extends AbstractQueuedSynchronizer { private static final long serialVersionUID = -5179523762034025860L; /** * Performs {@link Lock#lock}. The main reason for subclassing * is to allow fast path for nonfair version. */ abstract void lock(); /** 非公平锁的acquire */ final boolean nonfairTryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); //判断state是否被占用 if (c == 0) { //没有被占用,直接cas占用,成功的话就设置当前线程为占用线程 if (compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } //如果state不为0,因为是可重入锁,需要判断是不是自己占用的,如果是累加state值 else if (current == getExclusiveOwnerThread()) { int nextc = c + acquires; if (nextc < 0) // overflow throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } //acquire失败,AQS等待队列排队 return false; } //release的时候也需要判断是不是当前线程。因为可重入,所以可以lock多次,release的时候就要release多次 protected final boolean tryRelease(int releases) { int c = getState() - releases; if (Thread.currentThread() != getExclusiveOwnerThread()) throw new IllegalMonitorStateException(); boolean free = false; if (c == 0) { free = true; setExclusiveOwnerThread(null); } setState(c); return free; } /**AbstractOwnableSynchronizer.exclusiveOwnerThread 判断是否为当前占用lock的线程*/ protected final boolean isHeldExclusively() { // While we must in general read state before owner, // we don't need to do so to check if current thread is owner return getExclusiveOwnerThread() == Thread.currentThread(); } /**lock.newCondition每次直接new一个AQS的conditionObject维护一个条件队列*/ final ConditionObject newCondition() { return new ConditionObject(); } // Methods relayed from outer class final Thread getOwner() { return getState() == 0 ? null : getExclusiveOwnerThread(); } final int getHoldCount() { return isHeldExclusively() ? getState() : 0; } final boolean isLocked() { return getState() != 0; } /** * Reconstitutes this lock instance from a stream. * @param s the stream */ private void readObject(java.io.ObjectInputStream s) throws java.io.IOException, ClassNotFoundException { s.defaultReadObject(); setState(0); // reset to unlocked state } }</span> 
static final class NonfairSync extends Sync { private static final long serialVersionUID = 7316153563782823691L; /**非公平锁进来就cas,成功就设置独占线程,不成功再去Acquire排队,这就是公平不公平的区分*/ final void lock() { if (compareAndSetState(0, 1)) setExclusiveOwnerThread(Thread.currentThread()); else acquire(1); } /**直接使用父类中notFairAcquire*/ protected final boolean tryAcquire(int acquires) { return nonfairTryAcquire(acquires); } } static final class FairSync extends Sync { private static final long serialVersionUID = -3000897897090466540L; final void lock() { acquire(1); } /**公平锁的tryAcquire*/ protected final boolean tryAcquire(int acquires) { final Thread current = Thread.currentThread(); int c = getState(); if (c == 0) { //锁还在并且AQS没有其他等待节点,cas设置,然后再设置独占线程 if (!hasQueuedPredecessors() && compareAndSetState(0, acquires)) { setExclusiveOwnerThread(current); return true; } } else if (current == getExclusiveOwnerThread()) { //因为是可重入锁,state不为0看是不是自己占用了,如果是更新state值 int nextc = c + acquires; if (nextc < 0) throw new Error("Maximum lock count exceeded"); setState(nextc); return true; } return false; } } //判断队列没有其他等待节点 public final boolean hasQueuedPredecessors() { // The correctness of this depends on head being initialized // before tail and on head.next being accurate if the current // thread is first in queue. Node t = tail; // Read fields in reverse initialization order Node h = head; Node s; return h != t && ((s = h.next) == null || s.thread != Thread.currentThread()); }  

// 创建一个 ReentrantLock ,默认是“非公平锁”。 ReentrantLock() // 创建策略是fair的 ReentrantLock。fair为true表示是公平锁,fair为false表示是非公平锁。 ReentrantLock(boolean fair) // 查询当前线程保持此锁的次数。 int getHoldCount() // 返回目前拥有此锁的线程,如果此锁不被任何线程拥有,则返回 null。 protected Thread getOwner() // 返回一个 collection,它包含可能正等待获取此锁的线程。 protected Collection<Thread> getQueuedThreads() // 返回正等待获取此锁的线程估计数。 int getQueueLength() // 返回一个 collection,它包含可能正在等待与此锁相关给定条件的那些线程。 protected Collection<Thread> getWaitingThreads(Condition condition) // 返回等待与此锁相关的给定条件的线程估计数。 int getWaitQueueLength(Condition condition) // 查询给定线程是否正在等待获取此锁。 boolean hasQueuedThread(Thread thread) // 查询是否有些线程正在等待获取此锁。 boolean hasQueuedThreads() // 查询是否有些线程正在等待与此锁有关的给定条件。 boolean hasWaiters(Condition condition) // 如果是“公平锁”返回true,否则返回false。 boolean isFair() // 查询当前线程是否保持此锁。 boolean isHeldByCurrentThread() // 查询此锁是否由任意线程保持。 boolean isLocked() // 获取锁。 void lock() // 如果当前线程未被中断,则获取锁。 void lockInterruptibly() // 返回用来与此 Lock 实例一起使用的 Condition 实例。 Condition newCondition() // 仅在调用时锁未被另一个线程保持的情况下,才获取该锁。 boolean tryLock() // 如果锁在给定等待时间内没有被另一个线程保持,且当前线程未被中断,则获取该锁。 boolean tryLock(long timeout, TimeUnit unit) // 试图释放此锁。 void unlock() 

Sync是一个继承AQS的抽象类,使用独占锁,复写了tryRelease方法。tryAcquire方法由它的两个FairSync(公平锁)和NonfairSync(非公平锁)实现。


ReentrantLock根据传入构造方法的布尔型参数实例化出Sync的实现类FairSync和NonfairSync,分别表示公平的Sync和非公平的Sync,非公平会先直接尝试cas修改,不成功再去排队,就是插队,而公平锁就是老老实实请求排队操作


ReentrantLock非公平锁:


假设线程1调用了ReentrantLock的lock()方法,那么线程1将会独占锁,整个调用链十分简单:


《学习笔记 05 --- JUC锁》

第一个获取锁的线程就做了两件事情:

1、设置AbstractQueuedSynchronizer的state为1

2、设置AbstractOwnableSynchronizer的thread为当前线程

那么这两步做完之后就表示线程1独占了锁。然后线程2也要尝试获取同一个锁,在线程1没有释放锁的情况下必然是行不通的,所以线程2就要阻塞。那么,线程2如何被阻塞?看下线程2的方法调用链,这就比较复杂了:

《学习笔记 05 --- JUC锁》

lock()

lock()在ReentrantLock.java的NonfairSync类中实现,它的源码如下:

 1 final void lock() {
 2     if (compareAndSetState(0, 1))
 3         setExclusiveOwnerThread(Thread.currentThread());
 4     else
 5         acquire(1);
 6 }


说明
lock()会先通过compareAndSet(0, 1)来判断“锁”是不是空闲状态。是的话,“当前线程”直接获取“锁”;否则的话,调用acquire(1)获取锁。
(01) compareAndSetState()是CAS函数,它的作用是比较并设置当前锁的状态。若锁的状态值为0,则设置锁的状态值为1。
(02) setExclusiveOwnerThread(Thread.currentThread())的作用是,设置“当前线程”为“锁”的持有者。

“公平锁”和“非公平锁”关于lock()的对比

公平锁   -- 公平锁的lock()函数,会直接调用acquire(1)。 非公平锁 -- 非公平锁会先判断当前锁的状态是不是空闲,是的话,就不排队,而是直接获取锁。

acquire()

acquire()在AQS中实现的,它的源码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列依次排序,然后获取锁。
(02) “当前线程”尝试失败的情况下,会先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到”CLH队列(非阻塞的FIFO队列)”末尾。
(03) 然后,调用acquireQueued()获取锁。在acquireQueued()中,当前线程会等待它在“CLH队列”中前面的所有线程执行并释放锁之后,才能获取锁并返回。如果“当前线程”在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。

非公平锁的tryAcquire()在ReentrantLock.java的NonfairSync类中实现,源码如下:

protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

nonfairTryAcquire()在ReentrantLock.java的Sync类中实现,源码如下:

final boolean nonfairTryAcquire(int acquires) {
    // 获取“当前线程”
    final Thread current = Thread.currentThread();
    // 获取“锁”的状态
    int c = getState();
    // c=0意味着“锁没有被任何线程锁拥有”
    if (c == 0) {
        // 若“锁没有被任何线程锁拥有”,则通过CAS函数设置“锁”的状态为acquires。
        // 同时,设置“当前线程”为锁的持有者。
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果“锁”的持有者已经是“当前线程”,
        // 则将更新锁的状态。
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

说明
根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。
(01) 如果“锁”没有被任何线程拥有,则通过CAS函数设置“锁”的状态为acquires,同时,设置“当前线程”为锁的持有者,然后返回true。
(02) 如果“锁”的持有者已经是当前线程,则将更新锁的状态即可。
(03) 如果不术语上面的两种情况,则认为尝试失败。

“公平锁”和“非公平锁”关于tryAcquire()的对比

公平锁和非公平锁,它们尝试获取锁的方式不同。 公平锁在尝试获取锁时,即使“锁”没有被任何线程锁持有,它也会判断自己是不是CLH等待队列的表头;是的话,才获取锁。 而非公平锁在尝试获取锁时,如果“锁”没有被任何线程持有,则不管它在CLH队列的何处,它都直接获取锁。


private Node addWaiter(Node mode) {
    Node node = new Node(Thread.currentThread(), mode);
    // Try the fast path of enq; backup to full enq on failure
    Node pred = tail;
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    enq(node);
    return node;
}

先创建一个当前线程的Node,模式为独占模式(因为传入的mode是一个NULL),再判断一下队列上有没有节点,没有就创建一个队列,因此走enq方法:

 1 private Node enq(final Node node) {
 2     for (;;) {
 3         Node t = tail;
 4         if (t == null) { // Must initialize
 5             Node h = new Node(); // Dummy header
 6             h.next = node;
 7             node.prev = h;
 8             if (compareAndSetHead(h)) {
 9                 tail = node;
10                 return h;
11             }
12         }
13         else {
14             node.prev = t;
15             if (compareAndSetTail(t, node)) {
16                 t.next = node;
17                 return t;
18             }
19         }
20     }
21 }

上面这段代码可以用下图很好的标示出来:
《学习笔记 05 --- JUC锁》

每一步都用图表示出来了,由于线程2所在的Node是第一个要等待的Node,因此FIFO队列上肯定没有内容,tail为null,走的就是第4行~第10行的代码逻辑。这里用了CAS设置头Node,当然有可能线程2设置头Node的时候CPU切换了,线程3已经把头Node设置好了形成了上图所示的一个队列,这时线程2再循环一次获取tail,由于tail是volatile的,所以对线程2可见,线程2看见tail不为null,就走到了13行的else里面去往尾Node后面添加自身。整个过程下来,形成了一个双向队列。最后走AQS的acquireQueued(node, 1):

 1 final boolean acquireQueued(final Node node, int arg) {
 2     try {
 3         boolean interrupted = false;
 4         for (;;) {
 5             final Node p = node.predecessor();
 6             if (p == head && tryAcquire(arg)) {
 7                 setHead(node);
 8                 p.next = null; // help GC
 9                 return interrupted;
10             }
11             if (shouldParkAfterFailedAcquire(p, node) &&
12                 parkAndCheckInterrupt())
13                 interrupted = true;
14         }
15     } catch (RuntimeException ex) {
16         cancelAcquire(node);
17         throw ex;
18     }
19 }

此时再做判断,由于线程2是双向队列的真正的第一个Node(前面还有一个h),所以第5行~第10行再次判断一下线程2能不能获取锁(可能这段时间内线程1已经执行完了把锁释放了,state从1变为了0),如果还是不行,先调用AQS的shouldParkAfterFailedAcquire(p, node)方法:

 1 private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
 2     int s = pred.waitStatus;
 3     if (s < 0)
 4         /*
 5          * This node has already set status asking a release
 6          * to signal it, so it can safely park
 7          */
 8         return true;
 9     if (s > 0) {
10         /*
11          * Predecessor was cancelled. Skip over predecessors and
12          * indicate retry.
13          */
14     do {
15     node.prev = pred = pred.prev;
16     } while (pred.waitStatus > 0);
17     pred.next = node;
18 }
19     else
20         /*
21          * Indicate that we need a signal, but don't park yet. Caller
22          * will need to retry to make sure it cannot acquire before
23          * parking.
24          */
25          compareAndSetWaitStatus(pred, 0, Node.SIGNAL);
26     return false;
27 }

这个waitStatus是h的waitStatus,很明显是0,所以此时把h的waitStatus设置为Noed.SIGNAL即-1并返回false。既然返回了false,上面的acquireQueued的11行if自然不成立,再走一次for循环,还是先尝试获取锁,不成功,继续走shouldParkAfterFailedAcquire,此时waitStatus为-1,小于0,走第三行的判断,返回true。然后走acquireQueued的11行if的第二个判断条件parkAndCheckInterrupt:

private final boolean parkAndCheckInterrupt() {
    LockSupport.park(this);
    return Thread.interrupted();
}
public static void park(Object blocker) {
    Thread t = Thread.currentThread();
    setBlocker(t, blocker);
    unsafe.park(false, 0L);
    setBlocker(t, null);
}

最后一步,调用LockSupport的park方法阻塞住了当前的线程。



简化版的步骤:(非公平锁的核心)

基于CAS尝试将state(锁数量)从0设置为1

A、如果设置成功,设置当前线程为独占锁的线程;

B、如果设置失败,还会再获取一次锁数量,

B1、如果锁数量为0,再基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;

B2、如果锁数量不为0或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。

ReentrantLock公平锁:

lock()

lock()在ReentrantLock.java的FairSync类中实现,它的源码如下:

final void lock() {
    acquire(1);
}

说明:“当前线程”实际上是通过acquire(1)获取锁的。
        这里说明一下“1”的含义,它是设置“锁的状态”的参数。对于“独占锁”而言,锁处于可获取状态时,它的状态值是0;锁被线程初次获取到了,它的状态值就变成了1。
        由于ReentrantLock(公平锁/非公平锁)是可重入锁,所以“独占锁”可以被单个线程多此获取,每获取1次就将锁的状态+1。也就是说,初次获取锁时,通过acquire(1)将锁的状态值设为1;再次获取锁时,将锁的状态值设为2;依次类推...这就是为什么获取锁时,传入的参数是1的原因了。
        可重入就是指锁可以被单个线程多次获取。

acquire()

acquire()在AQS中实现的,它的源码如下:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(01) “当前线程”首先通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,进入到等待队列排序等待(前面还有可能有需要线程在等待该锁)。
(02) “当前线程”尝试失败的情况下,先通过addWaiter(Node.EXCLUSIVE)来将“当前线程”加入到"CLH队列(非阻塞的FIFO队列)"末尾。CLH队列就是线程等待队列。
(03) 再执行完addWaiter(Node.EXCLUSIVE)之后,会调用acquireQueued()来获取锁。由于此时ReentrantLock是公平锁,它会根据公平性原则来获取锁。
(04) “当前线程”在执行acquireQueued()时,会进入到CLH队列中休眠等待,直到获取锁了才返回!如果“当前线程”在休眠等待过程中被中断过,acquireQueued会返回true,此时"当前线程"会调用selfInterrupt()来自己给自己产生一个中断。

tryAcquire()

公平锁的tryAcquire()在ReentrantLock.java的FairSync类中实现,源码如下:

《学习笔记 05 --- JUC锁》

protected final boolean tryAcquire(int acquires) {
    // 获取“当前线程”
    final Thread current = Thread.currentThread();
    // 获取“独占锁”的状态
    int c = getState();
    // c=0意味着“锁没有被任何线程锁拥有”,
    if (c == 0) {
        // 若“锁没有被任何线程锁拥有”,
        // 则判断“当前线程”是不是CLH队列中的第一个线程线程,
        // 若是的话,则获取该锁,设置锁的状态,并切设置锁的拥有者为“当前线程”。
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        // 如果“独占锁”的拥有者已经为“当前线程”,
        // 则将更新锁的状态。
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

《学习笔记 05 --- JUC锁》

说明:根据代码,我们可以分析出,tryAcquire()的作用就是尝试去获取锁。注意,这里只是尝试!
         尝试成功的话,返回true;尝试失败的话,返回false,后续再通过其它办法来获取该锁。

addWaiter()

addWaiter()在AQS中实现,源码如下:

《学习笔记 05 --- JUC锁》

private Node addWaiter(Node mode) {
    // 新建一个Node节点,节点对应的线程是“当前线程”,“当前线程”的锁的模型是mode。
    Node node = new Node(Thread.currentThread(), mode);
    Node pred = tail;
    // 若CLH队列不为空,则将“当前线程”添加到CLH队列末尾
    if (pred != null) {
        node.prev = pred;
        if (compareAndSetTail(pred, node)) {
            pred.next = node;
            return node;
        }
    }
    // 若CLH队列为空,则调用enq()新建CLH队列,然后再将“当前线程”添加到CLH队列中。
    enq(node);
    return node;
}

《学习笔记 05 --- JUC锁》

说明:对于“公平锁”而言,addWaiter(Node.EXCLUSIVE)会首先创建一个Node节点,节点的类型是“独占锁”(Node.EXCLUSIVE)类型。然后,再将该节点添加到CLH队列的末尾。

acquireQueued()

acquireQueued()在AQS中实现,源码如下:

《学习笔记 05 --- JUC锁》

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        // interrupted表示在CLH队列的调度中,
        // “当前线程”在休眠时,有没有被中断过。
        boolean interrupted = false;
        for (;;) {
            // 获取上一个节点。
            // node是“当前线程”对应的节点,这里就意味着“获取上一个等待锁的线程”。
            final Node p = node.predecessor();
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            cancelAcquire(node);
    }
}

《学习笔记 05 --- JUC锁》

说明:acquireQueued()的目的是从队列中获取锁。

selfInterrupt()

selfInterrupt()是AQS中实现,源码如下:

private static void selfInterrupt() {
    Thread.currentThread().interrupt();
}

说明:selfInterrupt()的代码很简单,就是“当前线程”自己产生一个中断。但是,为什么需要这么做呢?
这必须结合acquireQueued()进行分析。如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。

在acquireQueued()中,即使是线程在阻塞状态被中断唤醒而获取到cpu执行权利;但是,如果该线程的前面还有其它等待锁的线程,根据公平性原则,该线程依然无法获取到锁。它会再次阻塞! 该线程再次阻塞,直到该线程被它的前面等待锁的线程锁唤醒;线程才会获取锁,然后“真正执行起来”!
也就是说,在该线程“成功获取锁并真正执行起来”之前,它的中断会被忽略并且中断标记会被清除! 因为在parkAndCheckInterrupt()中,我们线程的中断状态时调用了Thread.interrupted()。该函数不同于Thread的isInterrupted()函数,isInterrupted()仅仅返回中断状态,而interrupted()在返回当前中断状态之后,还会清除中断状态。 正因为之前的中断状态被清除了,所以这里需要调用selfInterrupt()重新产生一个中断!

总结

再回过头看看acquire()函数,它最终的目的是获取锁!

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

(01) 先是通过tryAcquire()尝试获取锁。获取成功的话,直接返回;尝试失败的话,再通过acquireQueued()获取锁。
(02) 尝试失败的情况下,会先通过addWaiter()来将“当前线程”加入到"CLH队列"末尾;然后调用acquireQueued(),在CLH队列中排序等待获取锁,在此过程中,线程处于休眠状态。直到获取锁了才返回。 如果在休眠等待过程中被中断过,则调用selfInterrupt()来自己产生一个中断。

简化版的步骤:(公平锁的核心)

获取一次锁数量,

B1、如果锁数量为0,如果当前线程是等待队列中的头节点,基于CAS尝试将state(锁数量)从0设置为1一次,如果设置成功,设置当前线程为独占锁的线程;

B2、如果锁数量不为0或者当前线程不是等待队列中的头节点或者上边的尝试又失败了,查看当前线程是不是已经是独占锁的线程了,如果是,则将当前的锁数量+1;如果不是,则将该线程封装在一个Node内,并加入到等待队列中去。等待被其前一个线程节点唤醒。



不会尝试CAS操作去插队,而是获取不到锁就去排队。方法和非公平锁类似,不同的地方是FairSync内部类中的tryAcquire方法:

protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();             int c = getState();             if (c == 0) {                 if (isFirst(current) &&                     compareAndSetState(0, acquires)) {                     setExclusiveOwnerThread(current);                     return true;                 }             }             else if (current == getExclusiveOwnerThread()) {                 int nextc = c + acquires;                 if (nextc < 0)                     throw new Error("Maximum lock count exceeded");                 setState(nextc);                 return true;             }             return false;         } 

从源码中可以看出,上面红色代码的地方是公平锁多出来的代码,通过对公平锁的了解和代码的分析,我认为这个判断是判断当前线程是否为等待时间最长的线程,也就是说队列里面是否有其他线程再等待,如果有其他线程再等待则当前线程不获取锁,返回继续阻塞。这也就是FIFO原理先进来的进程先执行------这个理解不知道是否准确,如果不准确希望大家可以指正出来,在评论中知会我。谢谢!
基于JDK1.7的话  红色部分代码是:!hasQueuedPredecessors()



ReentrantLock释放锁:


unlock()在ReentrantLock.java中实现的,源码如下:

public void unlock() {
    sync.release(1);
}

说明
unlock()是解锁函数,它是通过AQS的release()函数来实现的。
在这里,“1”的含义和“获取锁的函数acquire(1)的含义”一样,它是设置“释放锁的状态”的参数。由于“公平锁”是可重入的,所以对于同一个线程,每释放锁一次,锁的状态-1。




release()

release()在AQS中实现的,源码如下:

1 public final boolean release(int arg) {
2     if (tryRelease(arg)) {
3         Node h = head;
4         if (h != null && h.waitStatus != 0)
5            unparkSuccessor(h);
6         return true;
7     }
8     return false;
9 }

说明
release()会先调用tryRelease()来尝试释放当前线程锁持有的锁。成功的话,则唤醒后继等待线程,并返回true。否则,直接返回false。

tryRelease()

tryRelease()在ReentrantLock.java的Sync类中实现,源码如下:

protected final boolean tryRelease(int releases) {
    // c是本次释放锁之后的状态
    int c = getState() - releases;
    // 如果“当前线程”不是“锁的持有者”,则抛出异常!
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();

    boolean free = false;
    // 如果“锁”已经被当前线程彻底释放,则设置“锁”的持有者为null,即锁是可获取状态。
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 设置当前线程的锁的状态。
    setState(c);
    return free;
}

首先,只有当c==0的时候才会让free=true,这和上面一个线程多次调用lock方法累加state是对应的,调用了多少次的lock()方法自然必须调用同样次数的unlock()方法才行,这样才把一个锁给全部解开。

当一条线程对同一个ReentrantLock全部解锁之后,AQS的state自然就是0了,AbstractOwnableSynchronizer的exclusiveOwnerThread将被设置为null,这样就表示没有线程占有锁,方法返回true。代码继续往下走,上面的release方法的第四行,h不为null成立,h的waitStatus为-1,不等于0也成立,所以走第5行的unparkSuccessor方法:

unparkSuccessor()

在release()中“当前线程”释放锁成功的话,会唤醒当前线程的后继线程。
根据CLH队列的FIFO规则,“当前线程”(即已经获取锁的线程)肯定是head;如果CLH队列非空的话,则唤醒锁的下一个等待线程。
下面看看unparkSuccessor()的源码,它在AQS中实现。

private void unparkSuccessor(Node node) {
    // 获取当前线程的状态
    int ws = node.waitStatus;
    // 如果状态<0,则设置状态=0
    if (ws < 0)
        compareAndSetWaitStatus(node, ws, 0);

    //获取当前节点的“有效的后继节点”,无效的话,则通过for循环进行获取。
    // 这里的有效,是指“后继节点对应的线程状态<=0”
    Node s = node.next;
    if (s == null || s.waitStatus > 0) {
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    // 唤醒“后继节点对应的线程”
    if (s != null)
        LockSupport.unpark(s.thread);
}

s即h的下一个Node,这个Node里面的线程就是线程2,由于这个Node不等于null,所以走21行,线程2被unPark了,得以运行。有一个很重要的问题是:锁被解了怎样保证整个FIFO队列减少一个Node呢?这是一个很巧妙的设计,又回到了AQS的acquireQueued方法了:

 1 final boolean acquireQueued(final Node node, int arg) {
 2     try {
 3         boolean interrupted = false;
 4         for (;;) {
 5             final Node p = node.predecessor();
 6             if (p == head && tryAcquire(arg)) {
 7                 setHead(node);
 8                 p.next = null; // help GC
 9                 return interrupted;
10             }
11             if (shouldParkAfterFailedAcquire(p, node) &&
12                 parkAndCheckInterrupt())
13                 interrupted = true;
14         }
15     } catch (RuntimeException ex) {
16         cancelAcquire(node);
17         throw ex;
18     }
19 }

被阻塞的线程2是被阻塞在第12行,注意这里并没有return语句,也就是说,阻塞完成线程2依然会进行for循环。然后,阻塞完成了,线程2所在的Node的前驱Node是p,线程2尝试tryAcquire,成功,然后线程2就成为了head节点了,把p的next设置为null,这样原头Node里面的所有对象都不指向任何块内存空间,h属于栈内存的内容,方法结束被自动回收,这样随着方法的调用完毕,原头Node也没有任何的引用指向它了,这样它就被GC自动回收了。此时,遇到一个return语句,acquireQueued方法结束,后面的Node也是一样的原理。

这里有一个细节,看一下setHead方法:

private void setHead(Node node) {
    head = node;
    node.thread = null;
    node.prev = null;
}

setHead方法里面的前驱Node是Null,也没有线程,那么为什么不用一个在等待的线程作为Head Node呢?

因为一个线程随时有可能因为中断而取消,而取消的话,Node自然就要被GC了,那GC前必然要把头Node的后继Node变为一个新的头而且要应对多种情况,这样就很麻烦。用一个没有thread的Node作为头,相当于起了一个引导作用,因为head没有线程,自然也不会被取消。

再看一下上面unparkSuccessor的14行~20行,就是为了防止head的下一个node被取消的情况,这样,就从尾到头遍历,找出离head最近的一个node,对这个node进行unPark操作。

简化版的步骤:(释放锁的步骤)

1)获取当前的锁数量,然后用这个锁数量减去解锁的数量(这里为1),最后得出结果c

2)判断当前线程是不是独占锁的线程,如果不是,抛出异常

3)如果c==0,说明锁被成功释放,将当前的独占线程置为null,锁数量置为0,返回true

4)如果c!=0,说明释放锁失败,锁数量置为c,返回false

5)如果锁被释放成功的话,唤醒距离头节点最近的一个非取消的节点

Condition:

Condition的作用是对锁进行更精确的控制。Condition中的await()方法相当于Object的wait()方法,Condition中的signal()方法相当于Object的notify()方法,Condition中的signalAll()相当于Object的notifyAll()方法。不同的是,Object中的wait(),notify(),notifyAll()方法是和"同步锁"(synchronized关键字)捆绑使用的;而Condition是需要与"互斥锁"/"共享锁"捆绑使用的。



Condition的函数列表:

// 造成当前线程在接到信号或被中断之前一直处于等待状态。
void await() // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
boolean await(long time, TimeUnit unit) // 造成当前线程在接到信号、被中断或到达指定等待时间之前一直处于等待状态。
long awaitNanos(long nanosTimeout) // 造成当前线程在接到信号之前一直处于等待状态。
void awaitUninterruptibly() // 造成当前线程在接到信号、被中断或到达指定最后期限之前一直处于等待状态。
boolean awaitUntil(Date deadline) // 唤醒一个等待线程。
void signal() // 唤醒所有等待线程。
void signalAll()

Condition和ReentrantLock联合使用的示例:

import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

class BoundedBuffer {
    final Lock lock = new ReentrantLock();
    final Condition notFull  = lock.newCondition(); 
    final Condition notEmpty = lock.newCondition(); 

    final Object[] items = new Object[5];
    int putptr, takeptr, count;

    public void put(Object x) throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 如果“缓冲已满”,则等待;直到“缓冲”不是满的,才将x添加到缓冲中。
            while (count == items.length)
                notFull.await();
            // 将x添加到缓冲中
            items[putptr] = x; 
            // 将“put统计数putptr+1”;如果“缓冲已满”,则设putptr为0。
            if (++putptr == items.length) putptr = 0;
            // 将“缓冲”数量+1
            ++count;
            // 唤醒take线程,因为take线程通过notEmpty.await()等待
            notEmpty.signal();

            // 打印写入的数据
            System.out.println(Thread.currentThread().getName() + " put  "+ (Integer)x);
        } finally {
            lock.unlock();    // 释放锁
        }
    }

    public Object take() throws InterruptedException {
        lock.lock();    //获取锁
        try {
            // 如果“缓冲为空”,则等待;直到“缓冲”不为空,才将x从缓冲中取出。
            while (count == 0) 
                notEmpty.await();
            // 将x从缓冲中取出
            Object x = items[takeptr]; 
            // 将“take统计数takeptr+1”;如果“缓冲为空”,则设takeptr为0。
            if (++takeptr == items.length) takeptr = 0;
            // 将“缓冲”数量-1
            --count;
            // 唤醒put线程,因为put线程通过notFull.await()等待
            notFull.signal();

            // 打印取出的数据
            System.out.println(Thread.currentThread().getName() + " take "+ (Integer)x);
            return x;
        } finally {
            lock.unlock();    // 释放锁
        }
    } 
}

public class ConditionTest2 {
    private static BoundedBuffer bb = new BoundedBuffer();

    public static void main(String[] args) {
        // 启动10个“写线程”,向BoundedBuffer中不断的写数据(写入0-9);
        // 启动10个“读线程”,从BoundedBuffer中不断的读数据。
        for (int i=0; i<10; i++) {
            new PutThread("p"+i, i).start();
            new TakeThread("t"+i).start();
        }
    }

    static class PutThread extends Thread {
        private int num;
        public PutThread(String name, int num) {
            super(name);
            this.num = num;
        }
        public void run() {
            try {
                Thread.sleep(1);    // 线程休眠1ms
                bb.put(num);        // 向BoundedBuffer中写入数据
            } catch (InterruptedException e) {
            }
        }
    }

    static class TakeThread extends Thread {
        public TakeThread(String name) {
            super(name);
        }
        public void run() {
            try {
                Thread.sleep(10);                    // 线程休眠1ms
                Integer num = (Integer)bb.take();    // 从BoundedBuffer中取出数据
            } catch (InterruptedException e) {
            }
        }
    }
}

Condition更强大的地方在于:能够更加精细的控制多线程的休眠与唤醒。对于同一个锁,我们可以创建多个Condition,在不同的情况下使用不同的Condition。



*******************************未完待续***********************************

该文为本人学习的笔记,方便以后自己查阅。参考网上各大帖子,取其精华整合自己的理解而成。如有不对的地方,请多多指正!自勉!共勉!

************************************************************************

    原文作者:JUC
    原文地址: https://blog.csdn.net/tianya3530/article/details/54315315
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞