lesson2:java阻塞队列的demo及源码分析

本文向大家展示了java阻塞队列的使用场景、源码分析及特定场景下的使用方式。java的阻塞队列是jdk1.5之后在并发包中提供的一组队列,主要的使用场景是在需要使用生产者消费者模式时,用户不必再通过多线程自己实现,可以通过阻塞队列直接实现消息的分发和消费,方便简单,降低了开发难度,在本章的最后,我们在分析阻塞队列源码时,也会有demo展示因为对代码的不了解而错误的使用阻塞队列时的灾难情况。下面列举出了所有实现BlockingQueue接口的队列:ArrayBlockingQueue、LinkedBlockingQueue、PriorityBlockingQueue、DelayQueue、LinkedTransferQueue、LinkedBlockingDeque和SynchronousQueue。本文主要介绍LinkedBlockingQueue相关的使用场景及源码分析,关于其它的阻塞队列,后面我会在额外的章节做详细的介绍。

demo源码:https://github.com/mantuliu/javaAdvance 中的类Lesson2BlockingQueueDemo

我们先看一下BlockingQueue接口的几个主要方法在LinkedBlockingQueue:add(),offer(),put(),take(),poll();

a.首先来看add(E e)方法,此方法是在LinkedBlockingQueue的父类AbstractQueue中实现的,下面的代码展示了add的实现方法,就是直接调用offer()方法:

 

    public boolean add(E e) {
        if (offer(e))
            return true;
        else
            throw new IllegalStateException("Queue full");
    }

b.我们来看看第二个方法,offer(E e)的实现:

 

    public boolean offer(E e) {
        if (e == null) throw new NullPointerException();//判断新增的元素如果是空,则直接抛出异常
        final AtomicInteger count = this.count;//自增整数
        if (count.get() == capacity)//判断容量是否已经到达最大值,已经达到变直接返回,后面我们会看到put方法的不一样地方
            return false;
        int c = -1;
        Node<E> node = new Node(e);//将新增的节点e包装成节点Node
        final ReentrantLock putLock = this.putLock;//获取阻塞队列的入队列的锁,可以想的到,此队列还有一个出队列的锁
        putLock.lock();//将入队列的锁上锁
        try {
            if (count.get() < capacity) {//上锁之后再次判断容量是否达到最大值
                enqueue(node);//将元素入队列
                c = count.getAndIncrement();//c的值是之前队列元素的数量
                if (c + 1 < capacity)//此元素存入到队列后队列所有元素的数量和依然小于容量
                    notFull.signal();//Condition notFull发出信号通知给关注此信号量的线程
            }
        } finally {
            putLock.unlock();//释放锁
        }
        if (c == 0)
            signalNotEmpty();//信号通知,具体见下面的分析
        return c >= 0;
    }

    private void enqueue(Node<E> node) {
        // assert putLock.isHeldByCurrentThread();
        // assert last.next == null;
        last = last.next = node;//入队列,链表操作
    }


    private void signalNotEmpty() {
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            notEmpty.signal();//Condition notEmpty发出信号通知给关注此信号量的线程,主要是当队列元素为空时,take()方法已经处于等待状态,这时有元素进入到队列需要唤醒
        } finally {
            takeLock.unlock();
        }
    }

 c.我们再来分析一下offer(E e, long timeout, TimeUnit unit),通过下面的分析,我们可以看出offer(E e, long timeout, TimeUnit unit)与offer(E e)方法的区别是当队列元素的数量已经达到容量上限时,offer(E e, long timeout, TimeUnit unit)会等待timeout的时间,再这个过程中会循环判断元素是否可以进入队列,最后在超时后,还没有进入队列,则丢弃此元素,返回入队列失败。

    public boolean offer(E e, long timeout, TimeUnit unit)
        throws InterruptedException {

        if (e == null) throw new NullPointerException();//存储的元素为null,直接抛出异常
        long nanos = unit.toNanos(timeout);//计算超时时间
        int c = -1;
        final ReentrantLock putLock = this.putLock;//拿到put锁
        final AtomicInteger count = this.count;//拿到已有的元素数量
        putLock.lockInterruptibly();//上锁,准备存入元素
        try {
            while (count.get() == capacity) {//循环判断是否达到容量的上限,如果没到容量上限,则不进入while循环
                if (nanos <= 0)//如果剩余等待时间已经小于0,则直接返回添加元素失败
                    return false;
                nanos = notFull.awaitNanos(nanos);//线程终止,释放put锁,等待notFull的condition的通知
            }
            enqueue(new Node<E>(e));//入队列,下面与offer(E e)方法相同
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
        return true;
    }

d.put(E e)方法,put(E e)方法与offer(E e)方法的主要区别就在于,如果队列元素已满,则put()方法的线程一直处于等待状态,对于put()方法的使用,如果我们的业务系统每有设计好,很可能会带来灾难性的后果,后面我会有一个demo代码来分析解释。

   public void put(E e) throws InterruptedException {
        if (e == null) throw new NullPointerException();
        // Note: convention in all put/take/etc is to preset local var
        // holding count negative to indicate failure unless set.
        int c = -1;
        Node<E> node = new Node(e);
        final ReentrantLock putLock = this.putLock;
        final AtomicInteger count = this.count;
        putLock.lockInterruptibly();
        try {
            /*
             * Note that count is used in wait guard even though it is
             * not protected by lock. This works because count can
             * only decrease at this point (all other puts are shut
             * out by lock), and we (or some other waiting put) are
             * signalled if it ever changes from capacity. Similarly
             * for all other uses of count in other wait guards.
             */
            while (count.get() == capacity) {//put方法与offer方法的主要区别就在这里,如果队列元素已经满了,线程处于一直等待状态
                notFull.await();
            }
            enqueue(node);
            c = count.getAndIncrement();
            if (c + 1 < capacity)
                notFull.signal();
        } finally {
            putLock.unlock();
        }
        if (c == 0)
            signalNotEmpty();
    }

e.E take()方法,此处注意,我们用到了读锁,由于take()和offer()、put()用的锁不是同一把锁,所以他们之间互不干扰,唯一的交集是队列元素的数量,这也是队列元素的数量用AtomicInteger来记录的原因,因为AtomicInteger的加减操作都是原子操作。

    public E take() throws InterruptedException {
        E x;
        int c = -1;
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;//注意这里获取的是读锁
        takeLock.lockInterruptibly();//加读锁
        try {
            while (count.get() == 0) {//循环判断队列元素是否为空
                notEmpty.await();//等待非空信号
            }
            x = dequeue();//出队列
            c = count.getAndDecrement();//元素个数减一操作
            if (c > 1)
                notEmpty.signal();//如果剩余元素数大于0,发出notEmpty信号
        } finally {
            takeLock.unlock();//释放锁
        }
        if (c == capacity)
            signalNotFull();//发出未满信号
        return x;
    }

f.E poll()方法,poll()是一个非阻塞的取元素的方法

    public E poll() {
        final AtomicInteger count = this.count;
        if (count.get() == 0)
            return null;
        E x = null;
        int c = -1;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lock();
        try {
            if (count.get() > 0) {
                x = dequeue();
                c = count.getAndDecrement();
                if (c > 1)
                    notEmpty.signal();
            }
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

 g.E poll(long timeout, TimeUnit unit),当队列元素为空时,阻塞timeout,循环取元素,如果超时后未取到,则直接返回

    public E poll(long timeout, TimeUnit unit) throws InterruptedException {
        E x = null;
        int c = -1;
        long nanos = unit.toNanos(timeout);
        final AtomicInteger count = this.count;
        final ReentrantLock takeLock = this.takeLock;
        takeLock.lockInterruptibly();
        try {
            while (count.get() == 0) {
                if (nanos <= 0)
                    return null;
                nanos = notEmpty.awaitNanos(nanos);
            }
            x = dequeue();
            c = count.getAndDecrement();
            if (c > 1)
                notEmpty.signal();
        } finally {
            takeLock.unlock();
        }
        if (c == capacity)
            signalNotFull();
        return x;
    }

h.构造方法源码分析

    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);//默认容量是整数最大值
    }

    /**
     * Creates a {@code LinkedBlockingQueue} with the given (fixed) capacity.
     *
     * @param capacity the capacity of this queue
     * @throws IllegalArgumentException if {@code capacity} is not greater
     *         than zero
     */
    public LinkedBlockingQueue(int capacity) {//可以自行设置队列容量
        if (capacity <= 0) throw new IllegalArgumentException();
        this.capacity = capacity;
        last = head = new Node<E>(null);
    }

以我的经验,一般我们在使用生产者-消费者模式时,所使用的消费方法,都是take(),比较方便简单,poll()方法在特殊情况下才会使用到。因为阻塞队列有容量的限制,一旦发生容量已满(可能是消费者线程挂了或者消费速度太慢),并且入队列的方法是put或offer(timeout)的方法,线程就会一直等待,目前我们的业务系统大多数都是在多线程的环境下运行,就会造成线程被耗光导致整个服务停服,就算使用了线程池,也会造成线程池内的工作线程全部被耗光,线程池不能再提供服务,下面的demo模拟展示了线程被快速耗光停止服务的情况:

package com.mantu.advance;

import java.util.concurrent.*;

/**
 * blog http://www.cnblogs.com/mantu/
 * github https://github.com/mantuliu/
 * @author mantu
 *
 */
public class Lesson2BlockingQueueDemo {
    public static LinkedBlockingQueue queue = new LinkedBlockingQueue(5);//声明阻塞队列的数量为5,
    public static void main(String [] args){
        Receiver receiver  = new Receiver();//消费者
        
        for(int i = 0;i<20;i++){
            new Thread(new SenderPut()).start();//发送者,20个线程同时发送,在生产环境,可能会有成千上万的线程同时发送
        }
        /*
        for(int i = 0;i<20;i++){
            new Thread(new SenderOffer()).start();//大家可以试一下offer方法与put方法完全不一样
        }
        */
        try {
            Thread.currentThread().sleep(3000L);//停顿3秒钟
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
        new Thread(receiver).start();//消费者线程启动
    }
}

class SenderPut implements Runnable {

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            System.out.println("已经进入线程id:"+Thread.currentThread().getId()+"的内部");//标识此线程已经被执行
            Lesson2BlockingQueueDemo.queue.put(Thread.currentThread().getId());
            System.out.println("当前发送的线程id为:"+Thread.currentThread().getId());//标识此线程已经发送消息到队列完毕
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

class SenderOffer implements Runnable {

    @Override
    public void run() {
        // TODO Auto-generated method stub
        try {
            System.out.println("已经进入线程id:"+Thread.currentThread().getId()+"的内部");
            Lesson2BlockingQueueDemo.queue.offer(Thread.currentThread().getId());
            System.out.println("当前发送的线程id为:"+Thread.currentThread().getId());
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }
}

class Receiver implements Runnable {
    @Override
    public void run() {
        // TODO Auto-generated method stub
        while(true){
            try {
                System.out.println("当前取出的线程id为:"+Lesson2BlockingQueueDemo.queue.take());
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
    }
    
}

 执行结果如下,执行结果证明了我们之前的结论:线程全部被耗光。

已经进入线程id:10的内部
已经进入线程id:11的内部
已经进入线程id:9的内部
已经进入线程id:12的内部
已经进入线程id:15的内部
已经进入线程id:16的内部
已经进入线程id:19的内部
已经进入线程id:20的内部
已经进入线程id:21的内部
已经进入线程id:18的内部
已经进入线程id:24的内部
已经进入线程id:17的内部
已经进入线程id:28的内部
当前发送的线程id为:15
已经进入线程id:14的内部
已经进入线程id:22的内部
当前发送的线程id为:9
当前发送的线程id为:12
当前发送的线程id为:10
当前发送的线程id为:11
已经进入线程id:25的内部
已经进入线程id:13的内部
已经进入线程id:27的内部
已经进入线程id:23的内部
已经进入线程id:26的内部
当前取出的线程id为:11
当前发送的线程id为:16
当前发送的线程id为:19
当前取出的线程id为:10
当前取出的线程id为:12
当前发送的线程id为:20
当前取出的线程id为:9
当前发送的线程id为:21
当前发送的线程id为:18
当前取出的线程id为:15
当前取出的线程id为:16
当前取出的线程id为:19
当前发送的线程id为:17
当前发送的线程id为:24
当前发送的线程id为:28
当前取出的线程id为:20
当前取出的线程id为:21
当前发送的线程id为:14
当前发送的线程id为:22
当前取出的线程id为:18
当前取出的线程id为:24
当前发送的线程id为:25
当前发送的线程id为:13
当前取出的线程id为:17
当前取出的线程id为:28
当前发送的线程id为:27
当前发送的线程id为:23
当前取出的线程id为:14
当前取出的线程id为:22
当前发送的线程id为:26
当前取出的线程id为:25
当前取出的线程id为:13
当前取出的线程id为:27
当前取出的线程id为:23
当前取出的线程id为:26

 

    原文作者:mantu
    原文地址: http://www.cnblogs.com/mantu/p/5761417.html
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞