Java并发编程(二)

1.并发Queue
在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue接口

2.ConcurrentLinkedQueue
ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。ConcurrentLinkedQueue是非阻塞队列。

ConcurrentLinkedQueue的重要方法:

(1) add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,这两个方法没有任何区别)
(2) poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。

3.BlockingQueue接口的重要方法

   (1) offer(anObject): 表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false(本方法不阻塞当前执行方法的线程)

   (2) offer(E o,long timeout,TimeUnit unit):该方法可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。

   (3) put(anObject):该方法把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。

   (4) poll(long time,TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取时,则返回失败。

   (5) task():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。

   (6) drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法可以提升获取数据效率;不需要多次分批次加锁或释放锁。

实例代码:

package com.lhf.collection;

import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;

/**
 * 有界队列
 * 
 * 
 * @author liuhefei
 * 
 */
public class UseBlockingQueue {

	public static void main(String[] args) {
		//1.高性能无阻塞无界队列
		/*ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>();
		clq.offer("a");   //添加元素
		clq.add("b");   //添加元素
		clq.add("c");
		clq.add("d");
		System.err.println("取出队列的头部元素:"+clq.poll());   //从队列的头部取出一个元素,并且从容器本身移除
		System.out.println("队列长度:"+clq.size());
		System.out.println("从头部取出元素:" + clq.peek());   //从队列的头部取出一个元素,但是不会移除这个元素
		System.out.println("队列长度:"+clq.size());
		*/
		
		//2.有界队列
		/*ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<>(5);    //这里的5表示队列的容量限制最大为5
		ArrayBlockingQueue<String> abq2 = new ArrayBlockingQueue<>(5);    //这里的5表示队列的容量限制最大为5
		try {
			abq.put("a");
			abq.add("b");
			abq.add("c");
			abq.add("d");
			abq.add("d");
			//abq.add("e");
			System.err.println(abq.offer("f",2, TimeUnit.SECONDS));  //设置等待的时间为2秒,如果在指定的时间内,还不能往队列中加入f,则返回失败
			
			abq.drainTo(abq2, 3);   //将abq队列中的3个元素拷贝到abq2中
			abq2.add("liuhefei");
			abq2.add("xiaoming");
			//遍历abq2
			for (Iterator iterator = abq2.iterator(); iterator.hasNext();) {
				String string = (String) iterator.next();
				System.out.println("元素:" + string);
			}
			System.out.println();
		} catch (InterruptedException e) {
			
			e.printStackTrace();
		}
	    */
		
		//3.无界且基于阻塞队列
		/*LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<>();
		lbq.add("a");
		lbq.add("b");
		lbq.add("c");
		for (String string : lbq) {
			System.out.println("队列元素:" + string);
		}
		*/
		
		//4.不能盛放任何元素的阻塞队列
		SynchronousQueue<String> sq = new SynchronousQueue<>();
		//获取元素
		new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					System.out.println("元素内容:" + sq.take());
					sq.take();   //获取一个元素
				} catch (InterruptedException e) {
					e.printStackTrace();
				}   
			}
		},"t1").start();
		
		//添加元素
	    new Thread(new Runnable() {

			@Override
			public void run() {
				sq.add("a");
			}
	    },"t2").start();
		
	}

}

实现一个阻塞队列,实例代码如下:
MyQueue.java

package com.lhf.collection;

import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * 实现一个阻塞队列
 * 
 * 
 * @author liuhefei
 *
 */
public class MyQueue {
	
	//1.定义队列的容器
	private final LinkedList<Object> list = new LinkedList<>();
	
	//2.定义计数器int count
	private final AtomicInteger count = new AtomicInteger(0);
	
	//3.定义一个容器的最大容量限制
	private final int maxSize;
	public MyQueue(int maxSize) {
		this.maxSize = maxSize;
	}
	//4.定义容器的最小容量
	private final int minSize = 0;
	
	//5.定义一个锁
	private final Object lock = new Object();
	
	//添加元素
	public void put(Object obj) {
		
		//加锁
		synchronized(lock) {
			//判断容器是否满了
			while(count.get() == this.maxSize) {   //在添加元素时,容器满了,就需要等待
				try {
					lock.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			//添加新元素进入容器
			list.add(obj);
			count.getAndIncrement();   //count++
			System.out.println("元素:" + obj + " 成功添加进入容器!");
			//进行唤醒可能正在等待的take方法操作中的线程
			lock.notify();    //唤醒线程
			
		}
	}
	
	//获取元素
	public Object take() {
		Object temp = null;
		//加锁
		synchronized (lock) {
			//判断容器是否为空
			while(count.get() == minSize) {   //如果容器为空,在取数据时会进入等待状态,当有新元素加入之后,才会唤醒
				try {
					lock.wait();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
			}
			temp = list.removeFirst();  //取出元素
			count.getAndDecrement();   //count--
			System.out.println("元素: " + temp + " 已经从容器中取走");
		}
		
		return temp;
	}
	
	
	//获取长度
	public int size() {
		return count.get();
	}
	
	//遍历
	public List<Object> getQueueList() {
		return list;
	}
}

MyQueueTest.java

package com.lhf.collection;

/**
 * 阻塞队列
 * 
 * 
 * @author liuhefei
 * 
 */
public class MyQueueTest {

	public static void main(String[] args) {
		MyQueue mq = new MyQueue(10);   //10是容量
		mq.put("a");
		mq.put("b");
		mq.put("c");
		mq.put("d");
		mq.put("e");
		
		System.out.println("当前元素个数: " + mq.size());
		
		Thread t1 = new Thread(new Runnable() {

			@Override
			public void run() {
				mq.put("f");
				mq.put("g");
				mq.put("k");
				mq.put("n");
			}
			
		},"t1");
		
		
		Thread t2 = new Thread(new Runnable() {

			@Override
			public void run() {
				try {
					//休眠
					Thread.sleep(1000);
					Object o1 = mq.take();   //取值
					Thread.sleep(1000);
					Object o2 = mq.take();
				} catch (InterruptedException e) {
					e.printStackTrace();
				}
				
			}
			
		},"t2");
		
		t1.start();   //启动线程
		try {
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		t2.start();
		try {
			Thread.sleep(5000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		System.out.println(mq.getQueueList().toString());
	}

}
点赞