Java多线程 -- JUC包源码分析5 -- Condition/ArrayBlockingQueue/LinkedBlockingQueue/Deque/PriorityBlockingQueue

await – signal – signalAll

以下代码,分别展示了wait/notify, 和Condition的await/signal的用法

Object o = new Object();
synchronized(o)    //线程1
{
   ...
   o.wait();   //内部,会先释放锁。被其他线程notify之后,会再次拿锁。
   ...
}

synchronized(o)   //线程2
{
   ...
   o.notify();
   ...
}
ReentrantLock l = new ReentrantLock();
Condition c1 = l.newCondition();
Condition c2 = l.newCondition();


l.lock;               //线程1
try
{
  ...
  c1.await();  //内部,会先释放锁。被其他线程signal之后,会再次拿锁。
  ...
  c2.signal();
  ...
}finally
{
  l.unlock();
}


l.lock;               //线程2
try
{
  ...
  c2.await();  //内部,会先释放锁。被其他线程notify之后,会再次拿锁。
  ...
  c1.signal();
  ...
}finally
{
  l.unlock();
}

通过以上代码,明确3点:
(1)Condition必须与锁协同使用:对应synchronized来说,wait()的object必须是synchronized对应的同步对象;对应ReentrantLock来说,Condition是通过ReentrantLock.newCondition()得到的。

(2)wait()/await()的时候,会先释放锁,然后进入阻塞,然后被notify/signal唤醒之后,会再去拿锁!也就是其内部有3个环节:
//释放锁
//进入阻塞
//被唤醒,拿锁,执行后续代码

(3)await/signal在使用上,比wait/notify更加灵活:
wait/notify只能附属在一个条件上,所有的阻塞线程都在这1个条件上;
而Lock可以创建多个condition,每个condition都有wait/notify,每个condition都有一个自己的阻塞线程队列。
后面所讲的BlockingQueue,将很好的展示condition的这个优点。

Condition源码分析

从上面的第(2)条可以看出,await()的时候,线程要进入阻塞。所以每个Condition内部,都维护了一个链表,或者说队列,存储所有阻塞在这个条件上的线程。
以下代码,展示了Condition的内部结构:

//ConditionObject是AQS的一个内部类,实现了Condition接口

    public class ConditionObject implements Condition, java.io.Serializable {
        。。。
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;     //阻塞队列

       。。。

下面看一下Condition.await()的源码:

    //ReentrantLock
    public Condition newCondition() {
        return sync.newCondition();
    }

   //ReentrantLock的Sync内部类
   final ConditionObject newCondition() {
            return new ConditionObject();
   }


   //AQS的ConditionObject内部类
  public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();  //有人置了中段标志位,先响应中断
            Node node = addConditionWaiter();  //把线程加入该condition的阻塞队列
            int savedState = fullyRelease(node); //释放锁
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);   //开始阻塞
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)  //被中断唤醒,跳出阻塞
                    break;
            }
            if (acquireQueued(node, savedState) &&  interruptMode != THROW_IE)  //被唤醒之后,重新拿锁
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null) // clean up if cancelled
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

由上述代码可以看出,await()是会响应中断的。下面看一下屏蔽中断的await(),即awaitUninterruptibly()

        public final void awaitUninterruptibly() {
            Node node = addConditionWaiter();  //加入阻塞队列
            int savedState = fullyRelease(node); //释放锁
            boolean interrupted = false;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);   //进入阻塞
                if (Thread.interrupted())  //被中断唤醒,没有break,继续循环, 再次进入阻塞
                    interrupted = true;
            }
            if (acquireQueued(node, savedState) || interrupted)  //阻塞出来,拿锁
                selfInterrupt();  //此时,再响应中断
        }

下面看一下signal()的源码:

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);  //唤醒队列里面第1个
        }

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
       }

    final boolean transferForSignal(Node node) {

        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        Node p = enq(node);
        int ws = p.waitStatus;
        if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
            LockSupport.unpark(node.thread);  //唤醒
        return true;
    }

关键点:无论await(), 还是signal(),都是在拿到锁之后执行的,所以其内部的入队/出队,都不需要加锁!

ArrayBlockingQueue

通常的Queue,一边是生产者,一边是消费者。一边进,一边出,有一个判空函数,一个判满函数。
而所谓的BlockingQueue,就是指当为空的时候,阻塞消费者线程;当为满的时候,阻塞生产者线程。

以下是ArrayBlockingQueue的核心结构:

public class ArrayBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    。。。
    /** The queued items */
    final Object[] items;

    /** items index for next take, poll, peek or remove */
    int takeIndex;

    /** items index for next put, offer, or add */
    int putIndex;

    /** Number of elements in the queue */
    int count;

    //其核心就是1把锁 + 2个条件
    final ReentrantLock lock;
    private final Condition notEmpty;
    private final Condition notFull;

    。。。
}

以下为其主要的构造函数:

    public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

以下为其put()/take()源代码

    public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == items.length)
                notFull.await();  //put的时候,队列满了,阻塞
            insert(e);
        } finally {
            lock.unlock();
        }
    }

    private void insert(E x) {
        items[putIndex] = x;
        putIndex = inc(putIndex);
        ++count;
        notEmpty.signal(); //put进去之后,通知非空条件
    }

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            while (count == 0)
                notEmpty.await(); //take的时候,队列为空,阻塞
            return extract();
        } finally {
            lock.unlock();
        }
    }

    private E extract() {
        final Object[] items = this.items;
        E x = this.<E>cast(items[takeIndex]);
        items[takeIndex] = null;
        takeIndex = inc(takeIndex);
        --count;
        notFull.signal();  //take完了,通知非满条件
        return x;
    }

顺便说一句:上述2个函数,都是响应中断,并且阻塞的。
另外还有不响应中断的,不阻塞的成员函数,在此就不再详述了。

LinkedBlockingQueue

LinkedBlockingQueue是一种基于单向链表的阻塞队列。因为头和尾是2个指针分开操作的,所以用了2把锁 + 2个条件,同时一个AtomicInteger的原子变量记录count数。

    private final AtomicInteger count = new AtomicInteger(0);  

    /** Head of linked list */  
    private transient Node<E> head;  
    /** Tail of linked list */  
    private transient Node<E> last;  

    /** Lock held by take, poll, etc */  
    private final ReentrantLock takeLock = new ReentrantLock();  
    /** Wait queue for waiting takes */  
    private final Condition notEmpty = takeLock.newCondition();  

    /** Lock held by put, offer, etc */  
    private final ReentrantLock putLock = new ReentrantLock();  
    /** Wait queue for waiting puts */  
    private final Condition notFull = putLock.newCondition();  

LinkedBlockingDeque

其原理和ArrayBlockingQueue是一样的,也是1把锁 + 2个条件。只是其数据结构不是数组,而是一个双向链表。
有一个小细节:链表不是无限长吗,怎么会满呢?这里是人为设置了一个最大长度:

    public LinkedBlockingDeque() {
        this(Integer.MAX_VALUE);   //最大长度是整数的最大值
    }

下面是其主要结构:

public class LinkedBlockingDeque<E> extends AbstractQueue<E> implements BlockingDeque<E>, java.io.Serializable {

    ...

    //双向链表的Node
    static final class Node<E> {
        E item;
        Node<E> prev;
        Node<E> next;

        Node(E x) {
            item = x;
        }
    }

    //队列的头,尾
    transient Node<E> first;
    transient Node<E> last;


    private transient int count;
    private final int capacity;


    //1把锁 + 2个条件 
    final ReentrantLock lock = new ReentrantLock();
    private final Condition notEmpty = lock.newCondition();
    private final Condition notFull = lock.newCondition();

   。。。
}

下面是其put/take函数,其原理和ArrayBlockQueue的put/take类似:

    public void put(E e) throws InterruptedException {
        putLast(e);
    }
    public void putLast(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        Node<E> node = new Node<E>(e);
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            while (!linkLast(node))
                notFull.await();
        } finally {
            lock.unlock();
        }
    }
    private boolean linkLast(Node<E> node) {
        // assert lock.isHeldByCurrentThread();
        if (count >= capacity)
            return false;
        Node<E> l = last;
        node.prev = l;
        last = node;
        if (first == null)
            first = node;
        else
            l.next = node;
        ++count;
        notEmpty.signal();
        return true;
    }


    public E take() throws InterruptedException {
        return takeFirst();
    }
    public E takeFirst() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            E x;
            while ( (x = unlinkFirst()) == null)
                notEmpty.await();
            return x;
        } finally {
            lock.unlock();
        }
    }
    private E unlinkFirst() {
        // assert lock.isHeldByCurrentThread();
        Node<E> f = first;
        if (f == null)
            return null;
        Node<E> n = f.next;
        E item = f.item;
        f.item = null;
        f.next = f; // help GC
        first = n;
        if (n == null)
            last = null;
        else
            n.prev = null;
        --count;
        notFull.signal();
        return item;
    }

PriorityBlockingQueue

和上面的BlockingQueue有2个区别:
(1)是无界的,所以只有notEmpty一个条件。put不会阻塞,只有take会阻塞
(2)通过2叉堆,实现Priority

public class PriorityBlockingQueue<E> extends AbstractQueue<E> implements BlockingQueue<E>, java.io.Serializable {
    ...

    private transient Object[] queue;  //2叉堆实现

    //1把锁 + 1个条件
    private final ReentrantLock lock;
    private final Condition notEmpty;

    ...
 }

SynchronousQueue

SynchronousQueue是一种特殊队列,内部不是用Lock + Condition实现的。后续会单独用一篇专门阐述。

    原文作者:JUC
    原文地址: https://blog.csdn.net/chunlongyu/article/details/52436894
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞