JUC 之阻塞队列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析

阻塞队列继承结构说明:

《JUC 之阻塞队列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析》

根据继承结构可知:

1.Array 和 Linked 实现了统一的继承结构,不同的地方在于内部实现不用阻塞机制和队列数据存放结构

2.它们都实现了Iterable 接口,表示它们都是可迭代的,即实现了iterator() 方法,iterator()方法会返回一个Iterator ,Iterator为迭代器 其中主要的方法为next hasNext 等, 那么队列中对迭代器的实现使用内部类实现,在ArrayBlockingQueue 中即为Itr 这个私有的内部类,我们发现在每次调用iterator() 方法时都会生成一个Itr , 在ArrayBlockingQueue 中还有一个Itrs内部类 Itrs使用单向链表的方式维护生成的多个迭代器,迭代器的实现原理会在其它的博客中进行剖析,这里就不多说了

3. ArrayBlockingQueue与ReentrantLock是组合关系,ArrayBlockingQueue中包含一个ReentrantLock对象(lock)。ReentrantLock是可重入的互斥锁,ArrayBlockingQueue就是根据该互斥锁实现“多线程对竞争资源的互斥访问”。而且,ReentrantLock分为公平锁和非公平锁,关于具体使用公平锁还是非公平锁,在创建ArrayBlockingQueue时可以指定;而且,ArrayBlockingQueue默认会使用非公平锁。   

 

主要方法:

要想了解这两个阻塞队列,先分析一下队列提供的主要方法 BlockingQueue


 // 将指定元素插入到队列尾部 如果成功返回true
 // 如果队列已满 抛出IllegalStateException  其内部调用的offer
 boolean add(E e);

 //将指定元素插入队列尾部,如果成功返回true 如果队列已满返回false
 boolean offer(E e);

 //将指定元素插入队列尾部,如果队列已满进行阻塞 直到notFull条件唤醒
 void put(E e) throws InterruptedException;

 //将指定元素插入队列尾部,如果队列已满则等待指定时间
 boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException;

 //移除并获取队列头部元素,如果队列为空则阻塞 直到notEmpty条件唤醒
 E take() throws InterruptedException;

 //移除并获取队列头部元素,如果队列为空则等待指定时间
 E poll(long timeout, TimeUnit unit)
        throws InterruptedException;

 //队列剩余可使用容量 items.length - count
 int remainingCapacity();

 //移除队列中指定元素 改方法调用removeAt 两个方法之后会详细说明,因为remove移除的可能不是头部元素所以需要队列中元素重新排列和修改putIndex值
 boolean remove(Object o);
 
 //是否包含指定元素
 public boolean contains(Object o);
 
 //移除队列中所有的可用元素,并添加到Collection中
 int drainTo(Collection<? super E> c);
 //移除指定个数的可用元素,并添加到Collection中,从takeIndex开始
 int drainTo(Collection<? super E> c, int maxElements);
 
 //移除并获取队列头部元素,如果为空则抛NoSuchElementException 内部调用poll
 E remove();

 //移除并获取队列头元素,如果为空则返回null
 E poll();
 //只获取不移除队列头部元素,如果为空则抛NoSuchElementException 内部调用peek
 E element();
 //只获取不移除队列头部元素,如果队列为空则返回null 内部调用itemAt
 E peek();


 

ArrayBlockingQueue 构造方法和成员表量说明:

//存放队列元素的数组
final Object[] items;

//下次take, poll, peek or remove 元素的位置
int takeIndex;

//下次 put, offer, or add 元素的位置
int putIndex;

//队列元素的数量
int count;

//进行队列同步操作的重入锁
final ReentrantLock lock;

//不为空条件 当调用take队列为空时进行wait
private final Condition notEmpty;

//不满条件 当调用put队列已满时进行wait
private final Condition notFull;

//维护队列迭代器 Itrs 内部使用单向链表维护多个迭代器
transient Itrs itrs = null;

 提供三个构造方法:

//指定数组容量
public ArrayBlockingQueue(int capacity) {
        //默认ReentrantLock 为非公平模式
        this(capacity, false);
    }

//指定数组容量并设置ReentrantLock 公平非公平模式
public ArrayBlockingQueue(int capacity, boolean fair) {
        if (capacity <= 0)
            throw new IllegalArgumentException();
        this.items = new Object[capacity];
        lock = new ReentrantLock(fair);
        notEmpty = lock.newCondition();
        notFull =  lock.newCondition();
    }

//除了上面两个设置,还设置队列初始元素
public ArrayBlockingQueue(int capacity, boolean fair,
                              Collection<? extends E> c) {
        this(capacity, fair);

        final ReentrantLock lock = this.lock;
        lock.lock(); // Lock only for visibility, not mutual exclusion
        try {
            int i = 0;
            try {
                for (E e : c) {
                    checkNotNull(e);
                    items[i++] = e;
                }
            } catch (ArrayIndexOutOfBoundsException ex) {
                throw new IllegalArgumentException();
            }
            count = i;
            putIndex = (i == capacity) ? 0 : i;
        } finally {
            lock.unlock();
        }
    }

《JUC 之阻塞队列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析》

ArrayBlockingQueue中的元素存在公平访问与非公平访问的区别,对于公平访问队列,被阻塞的线程可以按照阻塞的先后顺序访问队列,即先阻塞的线程先访问队列。而非公平队列,当队列可用时,阻塞的线程将进入争夺访问资源的竞争中,也就是说谁先抢到谁就执行,没有固定的先后顺序。

非阻塞方法我们先不说明,先详细剖析一下两个阻塞方法put take

   public void put(E e) throws InterruptedException {
        checkNotNull(e);
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //如果队列已满则进行阻塞等待 即添加到notFull Condition的等待队列中
            while (count == items.length)
                notFull.await();
            //如果没满则直接入队
            enqueue(e);
        } finally {
            lock.unlock();
        }
    }

    
    private void enqueue(E x) {
        // assert lock.getHoldCount() == 1;
        // assert items[putIndex] == null;
        final Object[] items = this.items;
        items[putIndex] = x;
        //索引自增,如果已是最后一个位置,重新设置 putIndex = 0;
        if (++putIndex == items.length)
            putIndex = 0;
        count++;
        //有元素入队之后唤醒阻塞在notEmpty Condition 上的take操作
        notEmpty.signal();
    }

put方法是一个阻塞的方法,如果队列元素已满,那么当前线程将会被notFull条件对象挂起加到等待队列中,直到队列有空档才会唤醒执行添加操作。但如果队列没有满,那么就直接调用enqueue(e)方法将元素加入到数组队列中。说白了就是当队列满时通过条件对象Condtion来阻塞当前调用put方法的线程,直到线程又再次被唤醒执行。总得来说添加线程的执行存在以下两种情况,一是,队列已满,那么新到来的put线程将添加到notFull的条件队列中等待,二是,有移除线程执行移除操作,移除成功同时唤醒put线程,如下图所示 

《JUC 之阻塞队列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析》

上图为队列已满进行put操作的示意图

当有线程进行take操作取出元素,示意图如下:
 

《JUC 之阻塞队列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析》

 

接下来我们看下take操作,take操作在队列为空的时候阻塞,当有put操作时唤醒notEmpty Condition

我们先看下源码:

    public E take() throws InterruptedException {
        final ReentrantLock lock = this.lock;
        lock.lockInterruptibly();
        try {
            //如果队列为空进行等待
            while (count == 0)
                notEmpty.await();
            return dequeue();
        } finally {
            lock.unlock();
        }
    }


    private E dequeue() {
        // assert lock.getHoldCount() == 1;
        // assert items[takeIndex] != null;
        final Object[] items = this.items;
        @SuppressWarnings("unchecked")
        E x = (E) items[takeIndex];
        items[takeIndex] = null;
        //取值索引到头之后重置为0
        if (++takeIndex == items.length)
            takeIndex = 0;
        count--;
        if (itrs != null)
            itrs.elementDequeued();
        //唤醒等待在notFull上的put操作
        notFull.signal();
        return x;
    }

从代码上看逻辑还是很简单的, 这里就不画示意图了, 逻辑跟put相反

到这里主要的方法基本上已经都说明了, 但是最后还要说一下remove方法,这个方法的逻辑稍微有点复杂,现在我们剖析一下remove方法

    public boolean remove(Object o) {
        if (o == null) return false;
        final Object[] items = this.items;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            //不为空才删除,为空则直接返回false
            if (count > 0) {
                final int putIndex = this.putIndex;
                int i = takeIndex;
                //循环队列中的可用元素,如果有指定remove的对象则调用removeAt
                do {
                    if (o.equals(items[i])) {
                        removeAt(i);
                        return true;
                    }
                    if (++i == items.length)
                        i = 0;
                } while (i != putIndex);
            }
            return false;
        } finally {
            lock.unlock();
        }
    }


    void removeAt(final int removeIndex) {
        // assert lock.getHoldCount() == 1;
        // assert items[removeIndex] != null;
        // assert removeIndex >= 0 && removeIndex < items.length;
        final Object[] items = this.items;
        //如果要移除的元素位置正好是takeIndex 即在上面的remove循环中第一次就命中
        //这是一种特殊情况,其实就相当于take操作了
        if (removeIndex == takeIndex) {
            // removing front item; just advance
            items[takeIndex] = null;
            if (++takeIndex == items.length)
                takeIndex = 0;
            count--;
            if (itrs != null)
                //重新设置迭代器元素
                itrs.elementDequeued();
        //如果移除的并非takeIndex元素
        } else {
            // an "interior" remove
            // slide over all others up through putIndex.
            final int putIndex = this.putIndex;
            //此处循环正是移除的逻辑,根据removeIndex 把removeIndex 后面的可用元素统一向前
            //挪动 然后把putIndex之前的值设置为null 重新设置putIndex
            for (int i = removeIndex;;) {
                int next = i + 1;
                if (next == items.length)
                    next = 0;
                if (next != putIndex) {
                    items[i] = items[next];
                    i = next;
                } else {
                    items[i] = null;
                    this.putIndex = i;
                    break;
                }
            }
            count--;
            if (itrs != null)
                //对应更新迭代器数据
                itrs.removedAt(removeIndex);
        }
        //唤醒notFull put 操作
        notFull.signal();
    }

只看代码对于移除可能还不是很清除,那我们就画下图理解一下:

《JUC 之阻塞队列 ArrayBlockingQueue LinkedBlockingQueue 原理剖析》

到此位置ArrayBlockingQueue的实现原理基本上算是说完了,但是关于迭代器的实现,还在在并发操作中迭代器原理,还有在移除时迭代器是如何整理数据的, 这个之后单独说明

    原文作者:JUC
    原文地址: https://blog.csdn.net/zhangman0702/article/details/83472771
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞