JUC包学习所得

JUC包学习所得

1.LOCK

java.util.concurrent.locks.Lock 是一个类似于 synchronized 块的线程同步机制。但是 Lock 比 synchronized 块更加灵活、精细。

Java Lock 例子

Lock lock = new ReentrantLock();

try{
    lock.lock();
    //核心代码编写
}finally{
   lock.unlock();
}

Lock接口主要方法

lock()

用来获取锁

lockInterruptibly()

当通过这个方法获取锁时,如果该线程正在等待获取锁,则它能够响应中断。也就是说,当两个线程同时通过lockInterruptibly()获取某个锁时,假如线程A获得了锁,而线程B仍在等待获取锁,那么对线程B调用interrupt()方法可以中断B的等待过程。

tryLock()

尝试获取锁,如果成功则返回true,失败返回false(其他线程已占有锁)。这个方法会立即返回,在拿不到锁时也不会等待

tryLock(long timeout, TimeUnit timeUnit)

和tryLock()方法类似,只不过在拿不到锁时等待一定的时间,如果超过等待时间还拿不到锁就返回false

unlock()

释放锁

与synchronized的比较

synchronized是Java的一个关键字,当我们使用synchronized来修饰方法或代码块时,线程必须先获得对应的锁才能执行该段代码。而其他线程只能一直等待,直到当前线程释放锁并获得对应的锁才能进入该段代码。这里获取锁的线程释放锁只会有两种情况:
获取锁的线程执行完该段代码,线程会释放占有的锁;
线程执行发生异常,此时JVM会让线程自动释放锁。
那么如果这个占有锁的线程由于等待IO或其他原因(比如调用sleep方法)被阻塞,但是还没有释放锁,那么其他线程只能干巴巴的等着,试想这多么影响程序的执行效率。
当多个线程同时读写文件是,我们知道读操作和写操作会发生冲突,写操作和写操作也会发生冲突,但是读操作和读操作之间不会冲突。synchronized关键字对一段代码加锁,所有的线程必须先获得对应的锁才有该代码段的执行权限。如果多个线程同时进行读操作时,使用synchronized关键字会导致在任何时刻只有一个线程读,其他线程等待,大大降低执行效率。
Lock可以对以上种种情况作优化,提供更好的执行效率。另外,Lock方便了对锁的管理,可以自由的加锁和释放锁,还可以判断有没有成功获取锁。但是在使用Lock时要注意,Lock需要开发者手动去释放锁,如果没有主动释放锁,就要可能导致死锁出现。建议在finally语句块中释放Lock锁。

读写锁 ReadWriteLock

java.util.concurrent.locks.ReadWriteLock 读写锁是一种先进的线程锁机制。它能够允许多个线程在同一时间对某特定资源进行读取,但同一时间内只能有一个线程对其进行写入。
读写锁的理念在于多个线程能够对一个共享资源进行读取,而不会导致并发问题。并发问题的发生场景在于对一个共享资源的读和写操作的同时进行,或者多个写操作并发进行。

一个线程在对受保护资源在读或者写之前对 ReadWriteLock 锁定的规则如下:

读锁:如果没有任何写操作线程锁定 ReadWriteLock,并且没有任何写操作线程要求一个写锁(但还没有获得该锁)。因此,可以有多个读操作线程对该锁进行锁定。
写锁:如果没有任何读操作或者写操作。因此,在写操作的时候,只能有一个线程对该锁进行锁定。


ReadWriteLock readWriteLock = new ReentrantReadWriteLock();


readWriteLock.readLock().lock();

    // multiple readers can enter this section
    // if not locked for writing, and not writers waiting
    // to lock for writing.

readWriteLock.readLock().unlock();


readWriteLock.writeLock().lock();

    // only one writer can enter this section,
    // and only if no threads are currently reading.

2.同步器

AQS的核心思想是基于volatile int state这样的volatile变量,配合Unsafe工具对其原子性的操作来实现对当前锁状态进行修改。同步器内部依赖一个FIFO的双向队列来完成资源获取线程的排队工作。

同步器的应用

同步器主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状态,对同步状态的修改或者访问主要通过同步器提供的3个方法:

1.getState() 获取当前的同步状态
2.setState(int newState) 设置当前同步状态
3.compareAndSetState(int expect,int update) 使用CAS设置当前状态,该方法能够保证状态设置的原子性。

同步器可以支持独占式的获取同步状态,也可以支持共享式的获取同步状态,这样可以方便实现不同类型的同步组件。同步器也是实现锁的关键,在锁的实现中聚合同步器,利用同步器实现锁的语义。
AQS支持共享模式和独占模式,在独占模式下,其他线程试图获取该锁将无法获取成功;在共享模式下,多个线程获取某个锁可能(但不是一定)会获取成功

AQS同步队列

同步器AQS内部的实现是依赖同步队列(一个FIFO的双向队列,其实就是数据结构双向链表)来完成同步状态的管理。
当前线程获取同步状态失败时,同步器AQS会将当前线程和等待状态等信息构造成为一个节点(node)加入到同步队列,同时会阻塞当前线程;
当同步状态释放的时候,会把首节点中的线程唤醒,使首节点的线程再次尝试获取同步状态。AQS是独占锁和共享锁的实现的父类。

AQS锁的类别:分为独占锁和共享锁两种

独占锁:锁在一个时间点只能被一个线程占有。根据锁的获取机制,又分为“公平锁”和“非公平锁”。等待队列中按照FIFO的原则获取锁,等待时间越长的线程越先获取到锁,这就是公平的获取锁,即公平锁。而非公平锁,线程获取的锁的时候,无视等待队列直接获取锁。ReentrantLock和ReentrantReadWriteLock.Writelock是独占锁。

共享锁:同一个时候能够被多个线程获取的锁,能被共享的锁。JUC包中ReentrantReadWriteLock.ReadLock,CyclicBarrier,CountDownLatch和Semaphore都是共享锁。

AQS同步器的结构

《JUC包学习所得》

同步队列的基本结构,同步器拥有首节点(head)和尾节点(tail)

《JUC包学习所得》

同步队列设置尾节点(未获取到锁的线程加入同步队列): 同步器AQS中包含两个节点类型的引用:一个指向头结点的引用(head),一个指向尾节点的引用(tail),当一个线程成功的获取到锁(同步状态),其他线程无法获取到锁,而是被构造成节点(包含当前线程,等待状态)加入到同步队列中等待获取到锁的线程释放锁。这个加入队列的过程,必须要保证线程安全。否则如果多个线程的环境下,可能造成添加到队列等待的节点顺序错误,或者数量不对。因此同步器提供了CAS原子的设置尾节点的方法(保证一个未获取到同步状态的线程加入到同步队列后,下一个未获取的线程才能够加入)

《JUC包学习所得》

同步队列设置首节点(原头节点释放锁,唤醒后继节点):同步队列遵循FIFO,头节点是获取锁(同步状态)成功的节点,头节点在释放同步状态的时候,会唤醒后继节点,而后继节点将会在获取锁(同步状态)成功时候将自己设置为头节点。设置头节点是由获取锁(同步状态)成功的线程来完成的,由于只有一个线程能够获取同步状态,则设置头节点的方法不需要CAS保证,只需要将头节点设置成为原首节点的后继节点 ,并断开原头结点的next引用

* 独占式的锁的获取 *

《JUC包学习所得》
调用同步器的acquire(int arg)方法可以获取同步状态,该方法对中断不敏感,即线程获取同步状态失败后进入同步队列,后续对线程进行中断操作时,线程不会从同步队列中移除。

(1) 当前线程实现通过tryAcquire()方法尝试获取锁,获取成功的话直接返回,如果尝试失败的话,进入等待队列排队等待,可以保证线程安全(CAS)的获取同步状态。
(2) 如果尝试获取锁失败的话,构造同步节点(独占式的Node.EXCLUSIVE),通过addWaiter(Node node,int args)方法,将节点加入到同步队列的队列尾部。
(3) 最后调用acquireQueued(final Node node, int args)方法,使该节点以死循环的方式获取同步状态,如果获取不到,则阻塞节点中的线程。acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点的时候才能尝试获取锁(同步状态)( p == head && tryAcquire(arg))。

原因是:

1.头结点是成功获取同步状态的节点,而头结点的线程释放锁以后,将唤醒后继节点,后继节点线程被唤醒后要检查自己的前驱节点是否为头结点。

2.维护同步队列的FIFO原则,节点进入同步队列以后,就进入了一个自旋的过程,每个节点(后者说每个线程)都在自省的观察。

独占式的获取同步状态的流程如下
《JUC包学习所得》

AQS部分为参考JUC回顾之-AQS同步器的实现原理所得

public class AbstractQueuedSynchronizerTest {

    /** * * (AQS节点的定义,同步队列的节点定义) * * <p> * 修改历史: <br> * 修改日期 修改人员 版本 修改内容<br> * -------------------------------------------------<br> * 2016年7月4日 上午10:26:38 user 1.0 初始化创建<br> * </p> * * @author Peng.Li * @version 1.0 * @since JDK1.7 */
    static final class Node {

        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode * * */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled * 在同步队列中等待的线程等待超时或者被中断,需要从同步队列中取消等待 * */
        static final int CANCELLED = 1;

        /** waitStatus value to indicate successor's thread needs unparking(唤醒) * 后继节点的线程处于等待状态,而当前的节点如果释放了同步状态或者被取消,将会通知后继节点,使后继节点的线程得以运行。 **/
        static final int SIGNAL = -1;

        /** waitStatus value to indicate thread is waiting on condition * 节点在等待队列中,节点的线程等待在Condition上,当其他线程对Condition调用了signal()方法后,该节点会从等待队列中转移到同步队列中,加入到同步状态的获取中 **/
        static final int CONDITION = -2;
        /** * waitStatus value to indicate the next acquireShared should * unconditionally(无条件的) propagate(传播) * * 表示下一次共享式同步状态获取将会被无条件地传播下去 */
        static final int PROPAGATE = -3;

        /** * Status field, taking on only the values: * SIGNAL: The successor of this node is (or will soon be) * blocked (via park), so the current node must * unpark its successor when it releases or * cancels. To avoid races, acquire methods must * first indicate they need a signal, * then retry the atomic acquire, and then, * on failure, block. * CANCELLED: This node is cancelled due to timeout or interrupt. * Nodes never leave this state. In particular, * a thread with cancelled node never again blocks. * CONDITION: This node is currently on a condition queue. * It will not be used as a sync queue node * until transferred, at which time the status * will be set to 0. (Use of this value here has * nothing to do with the other uses of the * field, but simplifies mechanics.) * PROPAGATE: A releaseShared should be propagated(传播) to other * nodes. This is set (for head node only) in * doReleaseShared to ensure propagation * continues, even if other operations have * since intervened(干涉). * 0: None of the above * * The values are arranged numerically to simplify use. * Non-negative values mean that a node doesn't need to * signal. So, most code doesn't need to check for particular * values, just for sign. * * The field is initialized to 0 for normal sync nodes, and * CONDITION for condition nodes. It is modified using CAS * (or when possible, unconditional volatile writes). * * 使用CAS更改状态,volatile保证线程可见性,即被一个线程修改后,状态会立马让其他线程可见。 * */
        volatile int waitStatus;

        /** * Link to predecessor node that current node/thread relies on * for checking waitStatus. Assigned during enqueing(入队), and nulled * out (for sake of GC) only upon dequeuing. Also, upon * cancellation of a predecessor, we short-circuit while * finding a non-cancelled one, which will always exist * because the head node is never cancelled: A node becomes * head only as a result of successful acquire. A * cancelled thread never succeeds in acquiring, and a thread only * cancels itself, not any other node. * * 前驱节点,当前节点加入到同步队列中被设置 */
        volatile Node prev;

        /** * Link to the successor node that the current node/thread * unparks upon release. Assigned during enqueuing, adjusted * when bypassing cancelled predecessors, and nulled out (for * sake of GC) when dequeued. The enq operation does not * assign next field of a predecessor until after attachment, * so seeing a null next field does not necessarily mean that * node is at end of queue. However, if a next field appears * to be null, we can scan prev's from the tail to * double-check. The next field of cancelled nodes is set to * point to the node itself instead of null, to make life * easier for isOnSyncQueue. * * 后继节点 */
        volatile Node next;

        /** * The thread that enqueued this node. Initialized on * construction and nulled out after use. * * 获取同步状态的线程 */
        volatile Thread thread;

        /** * Link to next node waiting on condition, or the special * value SHARED. Because condition queues are accessed only * when holding in exclusive(独有的) mode, we just need a simple * linked queue to hold nodes while they are waiting on * conditions. They are then transferred(移动到) to the queue(同步队列) to * re-acquire. And because conditions can only be exclusive, * we save a field by using special value to indicate shared * mode. * * 等待队列中的后继节点,如果当前节点是共享的,那么这个字段是一个SHARED常量, * 也就是说节点类型(独占和共享)和等待队列中的后继节点共用同一个字段。 */
        Node nextWaiter;

        /** * Returns true if node is waiting in shared mode */
        final boolean isShared() {
            return nextWaiter == SHARED;
        }

        /** * Returns previous node, or throws NullPointerException if null. * Use when predecessor cannot be null. The null check could * be elided, but is present to help the VM. * * @return the predecessor of this node */
        final Node predecessor() throws NullPointerException {
            Node p = prev;
            if (p == null)
                throw new NullPointerException();
            else
                return p;
        }

        Node() { // Used to establish initial head or SHARED marker
        }

        Node(Thread thread, Node mode) { // Used by addWaiter
            this.nextWaiter = mode;
            this.thread = thread;
        }

        Node(Thread thread, int waitStatus) { // Used by Condition
            this.waitStatus = waitStatus;
            this.thread = thread;
        }
    }

    /** * Head of the wait queue, lazily initialized. Except for (除...以外) * initialization(初始化), it is modified only via method setHead. Note: * If head exists, its waitStatus is guaranteed not to be * CANCELLED.(如果head引用已经存在,等待状态保证不会被取消) */
    private transient volatile Node head;

    /** * Tail of the wait queue(等待队列), lazily initialized. Modified only via * method enq to add new wait node. */
    private transient volatile Node tail;

    /** * The synchronization state. * 同步状态,线程可见的,共享内存里面保存 * */
    private volatile int state;

    /** * Returns the current value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> read. * @return current state value * * 得到同步状态的值 * */
    protected final int getState() {
        return state;
    }

    /** * Sets the value of synchronization state. * This operation has memory semantics of a <tt>volatile</tt> write. * @param newState the new state value */
    protected final void setState(int newState) {
        state = newState;
    }

    /** * Acquires in exclusive(互斥) mode, ignoring(忽视) interrupts. Implemented * by invoking at least once {@link #tryAcquire}, * returning on success. Otherwise the thread is queued(排队), possibly * repeatedly(反复的) blocking and unblocking, invoking {@link * #tryAcquire} until success. This method can be used * to implement method {@link Lock#lock}. * * @param arg the acquire argument. This value is conveyed(传达) to * {@link #tryAcquire} but is otherwise uninterpreted and * can represent anything you like. * * 独占式的获取同步状态 * */
    public final void acquire(int arg) {
        if (!tryAcquire(arg) && acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }

    protected boolean tryAcquire(int arg) {
        throw new UnsupportedOperationException();
    }

    /** * Creates and enqueues node for current thread and given mode. * * @param mode Node.EXCLUSIVE for exclusive, Node.SHARED for shared * @return the new node * * * 如果尝试获取同步状态失败的话,则构造同步节点(独占式的Node.EXCLUSIVE),通过 addWaiter(Node node,int args)方法将该节点加入到同步队列的队尾。 * */
    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            // 确保节点能够被安全的添加
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

    /** * Convenience method to interrupt current thread. * 分析:如果在acquireQueued()中,当前线程被中断过,则执行selfInterrupt();否则不会执行。 * 线程在阻塞状态被“中断唤醒”而获取CPU的执行权;但是该线程前面还有其他等待锁的线程,根据公平性原则,该线程仍然无法获取到锁,他会再次阻塞。 * 直到该线程被他前面等待锁的线程唤醒;线程才会获取锁。该线程“成功获取锁并真正执行起来之前”,他的中断会被忽略并且中断标记会被清除,因为在parkAndCheckInterrupt()中, * 我们线程的中断状态时调用了Thread.interrupted(),这个函数在返回中断状态之后,还会清除中断状态,正因为清除了中断状态,所以在selfInterrupt重新产生一个中断。 * * * 当前线程自己产生一个中断 */
    private static void selfInterrupt() {
        Thread.currentThread().interrupt();
    }

    /** * Acquires in exclusive uninterruptible mode for thread already in * queue. Used by condition wait methods as well as acquire. * * @param node the node * @param arg the acquire argument * @return {@code true} if interrupted while waiting * * acquireQueued方法当前线程在死循环中获取同步状态,而只有前驱节点是头节点才能尝试获取同步状态( p == head && tryAcquire(arg)) * 原因是:1.头结点是成功获取同步状态的节点,而头节点的线程释放了同步状态以后,将会唤醒其后继节点,后继节点的线程被唤醒后要检查自己的前驱节点是否为头结点。 * 2.维护同步队列的FIFO原则,节点进入同步队列之后,就进入了一个自旋的过程,每个节点(或者说是每个线程)都在自省的观察。 * */
    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    failed = false;
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) && parkAndCheckInterrupt())
                    interrupted = true;
            }
        } finally {
            if (failed)
                cancelAcquire(node);
        }
    }

    /** * Inserts node into queue, initializing if necessary. See picture above. * @param node the node to insert * @return node's predecessor * * 同步器通过死循环的方式来保证节点的正确添加,在“死循环” 中通过CAS将节点设置成为尾节点之后,当前线程才能从该方法中返回,否则当前线程不断的尝试设置。 * enq方法将并发添加节点的请求通过CAS变得“串行化”了。 * */
    private Node enq(final Node node) {
        for (;;) {
            Node t = tail;
            if (t == null) { // Must initialize
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

    /** * Convenience method to park and then check if interrupted * * @return {@code true} if interrupted * * 阻塞当前线程 * */
    private final boolean parkAndCheckInterrupt() {
        // 阻塞当前线程
        LockSupport.park(this);
        // 线程被唤醒之后的中断状态
        return Thread.interrupted();
    }

    /** * Releases in exclusive mode. Implemented by unblocking one or * more threads if {@link #tryRelease} returns true. * This method can be used to implement method {@link Lock#unlock}. * * @param arg the release argument. This value is conveyed to * {@link #tryRelease} but is otherwise uninterpreted and * can represent anything you like. * @return the value returned from {@link #tryRelease} * * 释放公平锁 * */
    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

    /** * Attempts to set the state to reflect a release in exclusive * mode. * * <p>This method is always invoked by the thread performing release. * * <p>The default implementation throws * {@link UnsupportedOperationException}. * * @param arg the release argument. This value is always the one * passed to a release method, or the current state value upon * entry to a condition wait. The value is otherwise * uninterpreted and can represent anything you like. * @return {@code true} if this object is now in a fully released * state, so that any waiting threads may attempt to acquire; * and {@code false} otherwise. * @throws IllegalMonitorStateException if releasing would place this * synchronizer in an illegal state. This exception must be * thrown in a consistent fashion for synchronization to work * correctly. * @throws UnsupportedOperationException if exclusive mode is not supported */
    protected boolean tryRelease(int arg) {
        throw new UnsupportedOperationException();
    }

    /** * Checks and updates status for a node that failed to acquire. * Returns true if thread should block. This is the main signal * control in all acquire loops. Requires that pred == node.prev * * @param pred node's predecessor holding status * @param node the node * @return {@code true} if thread should block * 返回当前线程是否应该阻塞 * * 说明: (01) 关于waitStatus请参考下表(中扩号内为waitStatus的值),更多关于waitStatus的内容,可以参考前面的Node类的介绍。 CANCELLED[1] -- 当前线程已被取消 SIGNAL[-1] -- “当前线程的后继线程需要被unpark(唤醒)”。一般发生情况是:当前线程的后继线程处于阻塞状态,而当前线程被release或cancel掉,因此需要唤醒当前线程的后继线程。 CONDITION[-2] -- 当前线程(处在Condition休眠状态)在等待Condition唤醒 PROPAGATE[-3] -- (共享锁)其它线程获取到“共享锁” [0] -- 当前线程不属于上面的任何一种状态。 (02) shouldParkAfterFailedAcquire()通过以下规则,判断“当前线程”是否需要被阻塞。 规则1:如果前继节点状态为SIGNAL,表明当前节点需要被unpark(唤醒),此时则返回true。 规则2:如果前继节点状态为CANCELLED(ws>0),说明前继节点已经被取消,则通过先前回溯找到一个有效(非CANCELLED状态)的节点,并返回false。 规则3:如果前继节点状态为非SIGNAL、非CANCELLED,则设置前继的状态为SIGNAL,并返回false。 * */
    private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
        // 前驱节点的状态
        int ws = pred.waitStatus;
        // 如果前驱节点是SIGNAL状态,则意味着当前线程需要unpark唤醒,此时返回true
        if (ws == Node.SIGNAL)
            /* * This node has already set status asking a release to signal it, so it can safely park. */
            return true;
        // 如果前继节点是取消的状态,则设置当前节点的“当前前继节点为”原节点的前继节点
        if (ws > 0) {
            /* * Predecessor was cancelled. Skip over predecessors and indicate retry. */
            do {
                node.prev = pred = pred.prev;
            } while (pred.waitStatus > 0);
            pred.next = node;
        } else {
            /* * waitStatus must be 0 or PROPAGATE. Indicate that we need a signal, but don't park yet. Caller will need to retry to make sure * it cannot acquire before parking. */
            compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
        }
        return false;
    }

    /** * Cancels an ongoing attempt to acquire. * * @param node the node */
    private void cancelAcquire(Node node) {
        // Ignore if node doesn't exist
        if (node == null)
            return;

        node.thread = null;

        // Skip cancelled predecessors
        Node pred = node.prev;
        while (pred.waitStatus > 0)
            node.prev = pred = pred.prev;

        // predNext is the apparent node to unsplice. CASes below will
        // fail if not, in which case, we lost race vs another cancel
        // or signal, so no further action is necessary.
        Node predNext = pred.next;

        // Can use unconditional write instead of CAS here.
        // After this atomic step, other Nodes can skip past us.
        // Before, we are free of interference from other threads.
        node.waitStatus = Node.CANCELLED;

        // If we are the tail, remove ourselves.
        if (node == tail && compareAndSetTail(node, pred)) {
            compareAndSetNext(pred, predNext, null);
        } else {
            // If successor needs signal, try to set pred's next-link
            // so it will get one. Otherwise wake it up to propagate.
            int ws;
            if (pred != head && ((ws = pred.waitStatus) == Node.SIGNAL || (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL)))
                    && pred.thread != null) {
                Node next = node.next;
                if (next != null && next.waitStatus <= 0)
                    compareAndSetNext(pred, predNext, next);
            } else {
                unparkSuccessor(node);
            }

            node.next = node; // help GC
        }
    }

    /** * Wakes up node's successor, if one exists. * * @param node the node */
    private void unparkSuccessor(Node node) {
        /* * If status is negative (i.e., possibly needing signal) try to clear in anticipation of signalling. It is OK if this fails or if * status is changed by waiting thread. */
        int ws = node.waitStatus;
        if (ws < 0)
            compareAndSetWaitStatus(node, ws, 0);

        /* * Thread to unpark is held in successor, which is normally just the next node. But if cancelled or apparently null, traverse * backwards from tail to find the actual non-cancelled successor. */
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

    /** * Sets head of queue to be node, thus dequeuing. Called only by * acquire methods. Also nulls out unused fields for sake of GC * and to suppress unnecessary signals and traversals. * * @param node the node */
    private void setHead(Node node) {
        head = node;
        node.thread = null;
        node.prev = null;
    }

    /** * Atomically sets synchronization state to the given updated * value if the current state value equals the expected value. * This operation has memory semantics of a <tt>volatile</tt> read * and write. * * @param expect the expected value * @param update the new value * @return true if successful. False return indicates that the actual * value was not equal to the expected value. */
    protected final boolean compareAndSetState(int expect, int update) {
        // See below for intrinsics setup to support this
        return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
    }

    /** * CAS waitStatus field of a node. */
    private static final boolean compareAndSetWaitStatus(Node node, int expect, int update) {
        return unsafe.compareAndSwapInt(node, waitStatusOffset, expect, update);
    }

    /** * CAS next field of a node. */
    private static final boolean compareAndSetNext(Node node, Node expect, Node update) {
        return unsafe.compareAndSwapObject(node, nextOffset, expect, update);
    }

    /** * CAS tail field. Used only by enq. */
    private final boolean compareAndSetTail(Node expect, Node update) {
        return unsafe.compareAndSwapObject(this, tailOffset, expect, update);
    }

    /** * CAS head field. Used only by enq. */
    private final boolean compareAndSetHead(Node update) {
        return unsafe.compareAndSwapObject(this, headOffset, null, update);
    }

    /** * Setup to support compareAndSet. We need to natively implement * this here: For the sake of permitting future enhancements, we * cannot explicitly subclass AtomicInteger, which would be * efficient and useful otherwise. So, as the lesser of evils, we * natively implement using hotspot intrinsics(编译器内部函数) API. And while we * are at it, we do the same for other CASable fields (which could * otherwise be done with atomic field updaters). */
    private static final Unsafe unsafe = Unsafe.getUnsafe();
    private static final long stateOffset;
    private static final long headOffset;
    private static final long tailOffset;
    private static final long waitStatusOffset;
    private static final long nextOffset;

    static {
        try {
            stateOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("state"));
            headOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("head"));
            tailOffset = unsafe.objectFieldOffset(AbstractQueuedSynchronizer.class.getDeclaredField("tail"));
            waitStatusOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("waitStatus"));
            nextOffset = unsafe.objectFieldOffset(Node.class.getDeclaredField("next"));

        } catch (Exception ex) {
            throw new Error(ex);
        }
    }

}

3.阻塞队列

*阻塞队列 *

BlockingQueue 通常用于一个线程生产对象,而另外一个线程消费这些对象的场景。
《JUC包学习所得》
一个线程往里边放,另外一个线程从里边取的一个 BlockingQueue。
一个线程将会持续生产新对象并将其插入到队列之中,直到队列达到它所能容纳的临界点。也就是说,它是有限的。如果该阻塞队列到达了其临界点,负责生产的线程将会在往里边插入新对象时发生阻塞。它会一直处于阻塞之中,直到负责消费的线程从队列中拿走一个对象。
负责消费的线程将会一直从该阻塞队列中拿出对象。如果消费线程尝试去从一个空的队列中提取对象的话,这个消费线程将会处于阻塞之中,直到一个生产线程把一个对象丢进队列。

类型抛异常特定值阻塞超时
插入add(o)offer(o)put(o)offer(o, timeout, timeunit)
移除remove(o)poll(o)take(o)poll(timeout, timeunit)
检查element(o)peek(o)

四组不同的行为方式解释:

抛异常:如果试图的操作无法立即执行,抛一个异常。
特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

无法向一个 BlockingQueue 中插入 null。如果你试图插入 null,BlockingQueue 将会抛出一个 NullPointerException。
可以访问到 BlockingQueue 中的所有元素,而不仅仅是开始和结束的元素。比如说,你将一个对象放入队列之中以等待处理,但你的应用想要将其取消掉。那么你可以调用诸如 remove(o) 方法来将队列之中的特定对象进行移除。但是这么干效率并不高(译者注:基于队列的数据结构,获取除开始或结束位置的其他对象的效率不会太高),因此你尽量不要用这一类的方法,除非你确实不得不那么做。
BlockingQueue的实现

数组阻塞队列 ArrayBlockingQueue

ArrayBlockingQueue 是一个有界的阻塞队列,其内部实现是将对象放到一个数组里。有界也就意味着,它不能够存储无限多数量的元素。它有一个同一时间能够存储元素数量的上限。你可以在对其初始化的时候设定这个上限,但之后就无法对这个上限进行修改了(译者注:因为它是基于数组实现的,也就具有数组的特性:一旦初始化,大小就无法修改)。
ArrayBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

延迟队列 DelayQueue

DelayQueue 实现了 BlockingQueue 接口。
DelayQueue 对元素进行持有直到一个特定的延迟到期。注入其中的元素必须实现 java.util.concurrent.Delayed 接口,该接口定义:

public interface Delayed extends Comparable<Delayed< {

 public long getDelay(TimeUnit timeUnit);

}

DelayQueue 将会在每个元素的 getDelay() 方法返回的值的时间段之后才释放掉该元素。如果返回的是 0 或者负值,延迟将被认为过期,该元素将会在 DelayQueue 的下一次 take 被调用的时候被释放掉。
传递给 getDelay 方法的 getDelay 实例是一个枚举类型,它表明了将要延迟的时间段。
Delayed 接口也继承了 java.lang.Comparable 接口,这也就意味着 Delayed 对象之间可以进行对比。这个可能在对 DelayQueue 队列中的元素进行排序时有用,因此它们可以根据过期时间进行有序释放。

链阻塞队列 LinkedBlockingQueue

LinkedBlockingQueue 内部以一个链式结构(链接节点)对其元素进行存储。如果需要的话,这一链式结构可以选择一个上限。如果没有定义上限,将使用 Integer.MAX_VALUE 作为上限。
LinkedBlockingQueue 内部以 FIFO(先进先出)的顺序对元素进行存储。队列中的头元素在所有元素之中是放入时间最久的那个,而尾元素则是最短的那个。

BlockingQueue<String> unbounded = new LinkedBlockingQueue<String>();
BlockingQueue<String> bounded   = new LinkedBlockingQueue<String>(1024);

bounded.put("Value");

String value = bounded.take();

具有优先级的阻塞队列 PriorityBlockingQueue

PriorityBlockingQueue 是一个无界的并发队列。它使用了和类 java.util.PriorityQueue 一样的排序规则。你无法向这个队列中插入 null 值。
所有插入到 PriorityBlockingQueue 的元素必须实现 java.lang.Comparable 接口。因此该队列中元素的排序就取决于你自己的 Comparable 实现。
注意 PriorityBlockingQueue 对于具有相等优先级(compare() == 0)的元素并不强制任何特定行为。
同时注意,如果你从一个 PriorityBlockingQueue 获得一个 Iterator 的话,该 Iterator 并不能保证它对元素的遍历是以优先级为序的。

同步队列 SynchronousQueue

SynchronousQueue 是一个特殊的队列,它的内部同时只能够容纳单个元素。如果该队列已有一元素的话,试图向队列中插入一个新元素的线程将会阻塞,直到另一个线程将该元素从队列中抽走。同样,如果该队列为空,试图向队列中抽取一个元素的线程将会阻塞,直到另一个线程向队列中插入了一条新的元素。

*阻塞双端队列 BlockingDeque*

BlockingDeque 接口继承自 BlockingQueue 接口。这就意味着你可以像使用一个 BlockingQueue 那样使用 BlockingDeque。如果你这么干的话,各种插入方法将会把新元素添加到双端队列的尾端,而移除方法将会把双端队列的首端的元素移除。正如 BlockingQueue 接口的插入和移除方法一样。

抛异常:如果试图的操作无法立即执行,抛一个异常。
特定值:如果试图的操作无法立即执行,返回一个特定的值(常常是 true / false)。
阻塞:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行。
超时:如果试图的操作无法立即执行,该方法调用将会发生阻塞,直到能够执行,但等待时间不会超过给定值。返回一个特定值以告知该操作是否成功(典型的是 true / false)。

抛异常特定值阻塞超时
插入addFirst(o)offerFirst(o)putFirst(o)
移除removeFirst(o)pollFirst(o)takeFirst(o)
检查getFirst(o)peekFirst(o)
抛异常特定值阻塞超时
插入addLast(o)offerLast(o)putLast(o)
移除removeLast(o)pollLast(o)takeLast(o)
检查getLast(o)peekLast(o)

链阻塞双端队列 LinkedBlockingDeque

LinkedBlockingDeque 类实现了 BlockingDeque 接口。
deque(双端队列) 是 “Double Ended Queue” 的缩写。因此,双端队列是一个你可以从任意一端插入或者抽取元素的队列。
LinkedBlockingDeque 是一个双端队列,在它为空的时候,一个试图从中抽取数据的线程将会阻塞,无论该线程是试图从哪一端抽取数据。

BlockingDeque<String> deque = new LinkedBlockingDeque<String>();

deque.addFirst("1");
deque.addLast("2");

String two = deque.takeLast();
String one = deque.takeFirst();

4.执行器

java.util.concurrent.ExecutorService 接口表示一个异步执行机制,使我们能够在后台执行任务。因此一个 ExecutorService 很类似于一个线程池。实际上,存在于 java.util.concurrent 包里的 ExecutorService 实现就是一个线程池实现。

ExecutorService executorService = Executors.newFixedThreadPool(10);

executorService.execute(new Runnable() {
    public void run() {
        System.out.println("Asynchronous task");
    }
});

executorService.shutdown();

《JUC包学习所得》
有几种不同的方式来将任务委托给 ExecutorService 去执行:

execute(Runnable)

要求一个 java.lang.Runnable 对象,然后对它进行异步执行

submit(Runnable)

要求一个 Runnable 实现类,但它返回一个 Future 对象

submit(Callable)

类似于 submit(Runnable) 方法,除了它所要求的参数类型之外。Callable 实例除了它的 call() 方法能够返回一个结果之外和一个 Runnable 很相像。Runnable.run() 不能够返回一个结果

Future future = executorService.submit(new Callable(){
    public Object call() throws Exception {
        System.out.println("Asynchronous Callable");
        return "Callable Result";
    }
});
 //注意 future.get() 是阻塞的
System.out.println("future.get() = " + future.get());

invokeAny(…)

要求一系列的 Callable 或者其子接口的实例对象。调用这个方法并不会返回一个 Future,但它返回其中一个 Callable 对象的结果。无法保证返回的是哪个 Callable 的结果 – 只能表明其中一个已执行结束。
如果其中一个任务执行结束(或者抛了一个异常),其他 Callable 将被取消。

invokeAll(…)

调用你在集合中传给 ExecutorService 的所有 Callable 对象。invokeAll() 返回一系列的 Future 对象,通过它们你可以获取每个 Callable 的执行结果。
记住,一个任务可能会由于一个异常而结束,因此它可能没有 “成功”。无法通过一个 Future 对象来告知我们是两种结束中的哪一种.

**使用完 ExecutorService 之后你应该将其关闭,以使其中的线程不再运行。
比如,如果你的应用是通过一个 main() 方法启动的,之后 main 方法退出了你的应用,如果你的应用有一个活动的 ExexutorService 它将还会保持运行。ExecutorService 里的活动线程阻止了 JVM 的关闭。
要终止 ExecutorService 里的线程你需要调用 ExecutorService 的 shutdown() 方法。ExecutorService 并不会立即关闭,但它将不再接受新的任务,而且一旦所有线程都完成了当前任务的时候,ExecutorService 将会关闭。在 shutdown() 被调用之前所有提交给 ExecutorService 的任务都被执行。
如果你想要立即关闭 ExecutorService,你可以调用 shutdownNow() 方法。这样会立即尝试停止所有执行中的任务,并忽略掉那些已提交但尚未开始处理的任务。无法担保执行任务的正确执行。可能它们被停止了,也可能已经执行结束。**

线程池执行者 ThreadPoolExecutor

ThreadPoolExecutor 包含的线程池能够包含不同数量的线程。池中线程的数量由以下变量决定:
corePoolSize
maximumPoolSize
当一个任务委托给线程池时,如果池中线程数量低于 corePoolSize,一个新的线程将被创建,即使池中可能尚有空闲线程。
如果内部任务队列已满,而且有至少 corePoolSize 正在运行,但是运行线程的数量低于 maximumPoolSize,一个新的线程将被创建去执行该任务。
《JUC包学习所得》

定时执行者服务 ScheduledExecutorService

java.util.concurrent.ScheduledExecutorService 是一个 ExecutorService, 它能够将任务延后执行,或者间隔固定时间多次执行。 任务由一个工作者线程异步执行,而不是由提交任务给 ScheduledExecutorService 的那个线程执行。
ScheduledExecutorService,你可以通过调用它的以下方法:

schedule (Callable task, long delay, TimeUnit timeunit)

这个方法返回一个 ScheduledFuture,通过它你可以在它被执行之前对它进行取消,或者在它执行之后获取结果。

schedule (Runnable task, long delay, TimeUnit timeunit)

这一方法工作起来就像以一个 Callable 作为一个参数的那个版本的方法一样,因此 ScheduledFuture.get() 在任务执行结束之后返回 null

scheduleAtFixedRate (Runnable, long initialDelay, long period, TimeUnit timeunit)

这一方法规划一个任务将被定期执行。该任务将会在首个 initialDelay 之后得到执行,然后每个 period 时间之后重复执行。
如果给定任务的执行抛出了异常,该任务将不再执行。如果没有任何异常的话,这个任务将会持续循环执行到 ScheduledExecutorService 被关闭。
如果一个任务占用了比计划的时间间隔更长的时候,下一次执行将在当前执行结束执行才开始。计划任务在同一时间不会有多个线程同时执行。

scheduleWithFixedDelay (Runnable, long initialDelay, long period, TimeUnit timeunit)

除了 period 有不同的解释之外这个方法和 scheduleAtFixedRate() 非常像。
scheduleAtFixedRate() 方法中,period 被解释为前一个执行的开始和下一个执行的开始之间的间隔时间。
而在本方法中,period 则被解释为前一个执行的结束和下一个执行的结束之间的间隔。因此这个延迟是执行结束之间的间隔,而不是执行开始之间的间隔。

ForkJoinPool

ForkJoinPool 在 Java 7 中被引入。它和 ExecutorService 很相似,除了一点不同。ForkJoinPool 让我们可以很方便地把任务分裂成几个更小的任务,这些分裂出来的任务也将会提交给 ForkJoinPool。任务可以继续分割成更小的子任务,只要它还能分割。可能听起来有些抽象,因此本节中我们将会解释 ForkJoinPool 是如何工作的,还有任务分割是如何进行的。ForkJoinPool 是一个特殊的线程池,它的设计是为了更好的配合 分叉-和-合并 任务分割的工作。ForkJoinPool 也在 java.util.concurrent 包中,其完整类名为 java.util.concurrent.ForkJoinPool。

RecursiveAction 是一种没有任何返回值的任务。它只是做一些工作,比如写数据到磁盘,然后就退出了。
RecursiveTask 是一种会返回结果的任务。它可以将自己的工作分割为若干更小任务,并将这些子任务的执行结果合并到一个集体结果。

5.并发容器

JDK5中添加了新的concurrent包,相对同步容器而言,并发容器通过一些机制改进了并发性能。因为同步容器将所有对容器状态的访问都串行化了,这样保证了线程的安全性,所以这种方法的代价就是严重降低了并发性,当多个线程竞争容器时,吞吐量严重降低。因此Java5.0开始针对多线程并发访问设计,提供了并发性能较好的并发容器,引入了java.util.concurrent包。与Vector和Hashtable、Collections.synchronizedXxx()同步容器等相比,util.concurrent中引入的并发容器主要解决了两个问题:
1)根据具体场景进行设计,尽量避免synchronized,提供并发性。
2)定义了一些并发安全的复合操作,并且保证并发环境下的迭代操作不会出错。
ConcurrentHashMap代替同步的Map(Collections.synchronized(new HashMap())),众所周知,HashMap是根据散列值分段存储的,同步Map在同步的时候锁住了所有的段,而ConcurrentHashMap加锁的时候根据散列值锁住了散列值锁对应的那段,因此提高了并发性能。ConcurrentHashMap也增加了对常用复合操作的支持,比如”若没有则添加”:putIfAbsent(),替换:replace()。这2个操作都是原子操作。
CopyOnWriteArrayList和CopyOnWriteArraySet分别代替List和Set,主要是在遍历操作为主的情况下来代替同步的List和同步的Set,这也就是上面所述的思路:迭代过程要保证不出错,除了加锁,另外一种方法就是”克隆”容器对象。
ConcurrentLinkedQuerue是一个先进先出的队列。它是非阻塞队列。
ConcurrentSkipListMap可以在高效并发中替代SoredMap(例如用Collections.synchronzedMap包装的TreeMap)。
ConcurrentSkipListSet可以在高效并发中替代SoredSet(例如用Collections.synchronzedSet包装的TreeMap)。

并发 Map(映射) ConcurrentMap

ConcurrentHashMap

HashMap是非线程安全的,Hashtable是线程安全的,但是由于Hashtable是采用synchronized进行同步,相当于所有线程进行读写时都去竞争一把锁,导致效率非常低下。
ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组

并发导航映射 ConcurrentNavigableMap

能让它的子 map 具备并发访问的能力。所谓的 “子 map” 指的是诸如 headMap(),subMap(),tailMap() 之类的方法返回的 map。

headMap()

headMap(T toKey) 方法返回一个包含了小于给定 toKey 的 key 的子 map。

tailMap()

tailMap(T fromKey) 方法返回一个包含了不小于给定 fromKey 的 key 的子 map。

subMap()

subMap() 方法返回原始 map 中,键介于 from(包含) 和 to (不包含) 之间的子 map。

CopyOnWriteArrayList 与 CopyOnWriteArraySet

ArrayList 在使用iterator的时候会遇到ConcurrentModificationException的异常,就是由于遍历的时候,又进行写操作。
CopyOnWriteArrayList 彻底没有了modCount的概念。自然也没有了之前的异常。
那如果出现了写操作时并行读呢?
因为有一个copy的过程,写操作是在一个新的引用队列上操作的,那么读的时候还会在原引用队列上操作。

它的问题:
很显然,当读写并发执行的时候,读拿到的是旧的数据。
不要使用它的iterator.remove ,会报异常,因为根本就没有实现。

闭锁 CountDownLatch

栅栏 CyclicBarrier

交换机 Exchanger

信号量 Semaphore

原理理解
《JUC包学习所得》

    原文作者:JUC
    原文地址: https://blog.csdn.net/li_yunqi/article/details/81534180
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞