Java集合, LinkedBlockingQueue源码解析(常用于并发编程)

LinkedBlockingQueue是一个基于链表实现的可选容量的阻塞队列。队头的元素是插入时间最长的,队尾的元素是最新插入的。新的元素将会被插入到队列的尾部。
LinkedBlockingQueue的容量限制是可选的,如果在初始化时没有指定容量,那么默认使用int的最大值作为队列容量。

概述

类继承关系

LinkedBlockingQueue的继承关系如下图:

public class LinkedBlockingQueue<E> 
        extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

底层数据结构

LinkedBlockingQueue内部是使用链表实现一个队列的,但是却有别于一般的队列,在于该队列至少有一个节点,头节点不含有元素。结构图如下:

《Java集合, LinkedBlockingQueue源码解析(常用于并发编程)》

可以发现head.item=null,last.next=null。

原理

LinkedBlockingQueue中维持两把锁,一把锁用于入队,一把锁用于出队,这也就意味着,同一时刻,只能有一个线程执行入队,其余执行入队的线程将会被阻塞;同时,可以有另一个线程执行出队,其余执行出队的线程将会被阻塞。换句话说,虽然入队和出队两个操作同时均只能有一个线程操作,但是可以一个入队线程和一个出队线程共同执行,也就意味着可能同时有两个线程在操作队列,那么为了维持线程安全,LinkedBlockingQueue使用一个AtomicInterger类型的变量表示当前队列中含有的元素个数,所以可以确保两个线程之间操作底层队列是线程安全的,这个在后面源码分析的时候会说明。

源码分析

重要字段

LinkedBlockingQueue可以指定容量,内部维持一个队列,所以有一个头节点head和一个尾节点last,内部维持两把锁,一个用于入队,一个用于出队,还有锁关联的Condition对象。主要对象的定义如下:

    //容量,如果没有指定,该值为Integer.MAX_VALUE;
    private final int capacity;

    //当前队列中的元素
    private final AtomicInteger count = new AtomicInteger();

    //队列头节点,始终满足head.item==null
    transient Node<E> head;

    //队列的尾节点,始终满足last.next==null
    private transient Node<E> last;

    //用于出队的锁
    private final ReentrantLock takeLock = new ReentrantLock();

    //当队列为空时,保存执行出队的线程
    private final Condition notEmpty = takeLock.newCondition();

    //用于入队的锁
    private final ReentrantLock putLock = new ReentrantLock();

    //当队列满时,保存执行入队的线程
    private final Condition notFull = putLock.newCondition();

构造方法

LinkedBlockingQueue的构造方法有三个,分别如下:

 public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }


    public LinkedBlockingQueue(int capacity) {
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);//last和head在队列为空时都存在,所以队列中至少有一个节点
    }


    public LinkedBlockingQueue(Collection<? extends E> c) {
        this(Integer.MAX_VALUE);
        final ReentrantLock putLock = this.putLock;
        putLock.lock(); // Never contended, but necessary for visibility
        try {
            int n = 0;
            for (E e : c) {
                if (e == null)
                    throw new NullPointerException();
                if (n == capacity)
                    throw new IllegalStateException("Queue full");
                enqueue(new Node<E>(e));
                ++n;
            }
            count.set(n);
        } finally {
            putLock.unlock();
        }
    }

从上面的构造方法中可以得出3点结论:
1. 当调用无参的构造方法时,容量是int的最大值
2. 队列中至少包含一个节点,哪怕队列对外表现为空
3. LinkedBlockingQueue不支持null元素

put(E e)方法

put(E e)方法用于将一个元素插入到队列的尾部,其实现如下:

public void put(E e) throws InterruptedException {
        //不允许元素为null
        if (e == null) throw new NullPointerException();
        int c = -1;
        //以当前元素新建一个节点
        Node<E> node = new Node<E>(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        //获得入队的锁
        putLock.lockInterruptibly();
        try {

            //如果队列已满,那么将该线程加入到Condition的等待队列中
            while (count.get() == capacity) {
                notFull.await();
            }
            //将节点入队
            enqueue(node);
            //得到插入之前队列的元素个数
            c = count.getAndIncrement();
            //如果还可以插入元素,那么释放等待的入队线程
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            //解锁
            putLock.unlock();
        }
        //通知出队线程队列非空
        if (c == 0)
            signalNotEmpty();
    }

从上面的代码分析中可以得出6点结论:
1. LinkedBlockingQueue不允许元素为null,这一点在构造方法中也说过了。
2. 同一时刻,只能有一个线程执行入队操作,因为putLock在将元素插入到队列尾部时加锁了
3. 如果队列满了,那么将会调用notFull的await()方法将该线程加入到Condition等待队列中。await()方法就会释放线程占有的锁,这将导致之前由于被锁阻塞的入队线程将会获取到锁,执行到while循环处,不过可能因为由于队列仍旧是满的,也被加入到条件队列中。
4. 一旦一个出队线程取走了一个元素,并通知了入队等待队列中可以释放线程了,那么第一个加入到Condition队列中的将会被释放,那么该线程将会重新获得put锁,继而执行enqueue()方法,将节点插入到队列的尾部
5. 然后得到插入一个节点之前的元素个数,如果队列中还有空间可以插入,那么就通知notFull条件的等待队列中的线程。
6. 通知出队线程队列为空了,因为插入一个元素之前的个数为0,而插入一个之后队列中的元素就从无变成了有,就可以通知因队列为空而阻塞的出队线程了。

signalNotEmpty()方法只会在put/take之类的入队方法中才会被调用,并且是当队列元素从无到有的时候。下面是signalNotEmpty()方法的实现:

 private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        //获取takeLock
        takeLock.lock();
        try {
            //释放notEmpty条件队列中的第一个等待线程
            notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
    }

E take()方法

take()方法用于得到队头的元素,在队列为空时会阻塞,知道队列中有元素可取。其实现如下:

public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        //获取takeLock锁
        takeLock.lockInterruptibly();
        try {
            //如果队列为空,那么加入到notEmpty条件的等待队列中
            while (count.get() == 0) {
                notEmpty.await();
            }
            //得到队头元素
            x = dequeue();
            //得到取走一个元素之前队列的元素个数
            c = count.getAndDecrement();
            //如果队列中还有数据可取,释放notEmpty条件等待队列中的第一个线程
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        //如果队列中的元素从满到非满,通知put线程
        if (c == capacity)
            signalNotFull();
        return x;
    }

上面的代码注释已将说明了take()方法的整体路程,大体上与put()相对。
当队列为空时,就加入到notEmpty(的条件等待队列中,当队列不为空时就取走一个元素,当取完发现还有元素可取时,再通知一下自己的伙伴(等待在条件队列中的线程);最后,如果队列从满到非满,通知一下put线程。
下面看一下dequeue()的删除节点操作,其特别之处在于头节点是一个哨兵节点。

private E dequeue() {
        // assert takeLock.isHeldByCurrentThread();
        // assert head.item == null;
        Node<E> h = head;
        Node<E> first = h.next;
        h.next = h; // help GC
        head = first;
        E x = first.item;
        first.item = null;
        return x;
    }

入队、出队总结

LinkedBlockingQueue中除了上面的put()方法之外,还有另外几个入队的方法,比如offer(E)、offer(E,long,TimeUnit);也有另外几个出队的方法,比如poll()、poll(long,TimeUnit),但实现和put()以及take()都大同小异,这儿就不再一一分析了。下面就两个方法总结一下:
LinkedBlockingQueue是允许两个线程同时在两端进行入队或出队的操作的,但一端同时只能有一个线程进行操作,这是通过两把锁来区分的;为了维持底部数据的统一,引入了AtomicInteger的一个count变量,表示队列中元素的个数。count只能在两个地方变化,一个是入队的方法(可以+1),另一个是出队的方法(可以-1),而AtomicInteger是原子安全的,所以也就确保了底层队列的数据同步。
另外,入队、出队线程之间还存在合作的关系,这个以入队为例:当一群线程执行入队操作时,一个线程A幸运地占有了putLock锁,然后也成功的插入了一个元素,但是插完这个元素就达到了队列的容量了,当这个线程A释放了锁之后,前面一群线程中一个线程B又获得了putLock锁,但是由于队列已经满了,那么线程B释放了putLock锁后被加入到了notEmpty条件的等待队列中;由于释放了锁,线程C也抢到了锁,但是很不幸,它也被加入到了等待队列中,并且被加在了线程B的尾部;这时一个出队线程出现了,它成功地取走了一个元素,使得队列从满变为了非满状态,并且调用signalNotFull()方法通知了notFull的等待队列,这时线程B又重新获得了锁,插入了一个元素,插完一个元素,它发现还有容量可以插元素,它也没有忘记了和它一起被困在条件队列中的线程C,就调用了notFull.await()通知了线程C,这样线程C也执行了插入元素的操作。出队的过程与这个基本相同,就不再介绍了。由此可以看到,入队的线程不止和出队的线程协作,还和自己的难兄难弟,在条件队列中等待的入队线程协作;出队的线程同样不止和入队的线程协作,还和另外的出队线程协作。

remove()方法

remove()方法用于删除队列中一个元素,如果队列中不含有该元素,那么返回false;有的话则删除并返回true。入队和出队都是只获取一个锁,而remove()方法需要同时获得两把锁,其实现如下:

 public boolean remove(Object o) {
        //因为队列不包含null元素,返回false
        if (o == null) return false;
        //获取两把锁
        fullyLock();
        try {
            //从头的下一个节点开始遍历
            for (Node<E> trail = head, p = trail.next;
                 p != null;
                 trail = p, p = p.next) {
                 //如果匹配,那么将节点从队列中移除,trail表示前驱节点
                if (o.equals(p.item)) {
                    unlink(p, trail);
                    return true;
                }
            }
            return false;
        } finally {
            //释放两把锁
            fullyUnlock();
        }
    }

可以看到remove()方法中首先获取两把锁,然后再执行遍历删除操作,最后释放两把锁。下面先看一下是如何获取和释放两把锁的,其实现如下:

    /**
     * Locks to prevent both puts and takes.
     */
    void fullyLock() {
        putLock.lock();
        takeLock.lock();
    }

    /**
     * Unlocks to allow both puts and takes.
     */
    void fullyUnlock() {
        takeLock.unlock();
        putLock.unlock();
    }

那么问题来了,为什么remove()方法同时需要两把锁?
remove()操作会从队列的头遍历到尾,用到了队列的两端,所以需要对两端加锁,而对两端加锁就需要获取两把锁;入队和出队均只在队列的一端操作,所以只需获取一把锁。

size()方法

size()方法用于返回队列中元素的个数,其实现如下:

public int size() {
        return count.get();
    }

由于count是一个AtomicInteger的变量,所以该方法是一个原子性的操作,是线程安全的。

总结

在上面分析LinkedBlockingQueue的源码之后,可以与ArrayBlockingQueue做一个比较。
相同点有如下2点:
1. 不允许元素为null
2. 线程安全的队列

不同点有如下几点:
1. ArrayBlockingQueue底层基于定长的数组,所以容量限制了;LinkedBlockingQueue底层基于链表实现队列,所以容量可选,如果不设置,那么容量是int的最大值
2. ArrayBlockingQueue内部维持一把锁和两个条件,同一时刻只能有一个线程队列的一端操作;LinkedBlockingQueue内部维持两把锁和两个条件,同一时刻可以有两个线程在队列的两端操作,但同一时刻只能有一个线程在一端操作。
3. LinkedBlockingQueue的remove()类似方法时,由于需要对整个队列链表实现遍历,所以需要获取两把锁,对两端加锁。

    原文作者:红黑树
    原文地址: https://my.oschina.net/90888/blog/1624758
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞