1.并发Queue
在并发队列上JDK提供了两套实现,一个是以ConcurrentLinkedQueue为代表的高性能队列,一个是以BlockingQueue接口为代表的阻塞队列,无论哪种都继承自Queue接口
2.ConcurrentLinkedQueue
ConcurrentLinkedQueue:是一个适用于高并发场景下的队列,通过无锁的方式,实现了高并发状态下的高性能,通常ConcurrentLinkedQueue性能好于BlockingQueue。它是一个基于链接节点的无界线程安全队列。ConcurrentLinkedQueue是非阻塞队列。
ConcurrentLinkedQueue的重要方法:
(1) add()和offer()都是加入元素的方法(在ConcurrentLinkedQueue中,这两个方法没有任何区别)
(2) poll()和peek()都是取头元素节点,区别在于前者会删除元素,后者不会。
3.BlockingQueue接口的重要方法
(1) offer(anObject): 表示如果可能的话,将anObject加到BlockingQueue里,即如果BlockingQueue可以容纳,则返回true,否则返回false(本方法不阻塞当前执行方法的线程)
(2) offer(E o,long timeout,TimeUnit unit):该方法可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
(3) put(anObject):该方法把anObject加到BlockingQueue里,如果BlockQueue没有空间,则调用此方法的线程被阻断直到BlockingQueue里面有空间再继续。
(4) poll(long time,TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则直到时间超时还没有数据可取时,则返回失败。
(5) task():取走BlockingQueue里排在首位的对象,若BlockingQueue为空,阻断进入等待状态直到BlockingQueue有新的数据被加入。
(6) drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法可以提升获取数据效率;不需要多次分批次加锁或释放锁。
实例代码:
package com.lhf.collection;
import java.util.Iterator;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.TimeUnit;
/**
* 有界队列
*
*
* @author liuhefei
*
*/
public class UseBlockingQueue {
public static void main(String[] args) {
//1.高性能无阻塞无界队列
/*ConcurrentLinkedQueue<String> clq = new ConcurrentLinkedQueue<>();
clq.offer("a"); //添加元素
clq.add("b"); //添加元素
clq.add("c");
clq.add("d");
System.err.println("取出队列的头部元素:"+clq.poll()); //从队列的头部取出一个元素,并且从容器本身移除
System.out.println("队列长度:"+clq.size());
System.out.println("从头部取出元素:" + clq.peek()); //从队列的头部取出一个元素,但是不会移除这个元素
System.out.println("队列长度:"+clq.size());
*/
//2.有界队列
/*ArrayBlockingQueue<String> abq = new ArrayBlockingQueue<>(5); //这里的5表示队列的容量限制最大为5
ArrayBlockingQueue<String> abq2 = new ArrayBlockingQueue<>(5); //这里的5表示队列的容量限制最大为5
try {
abq.put("a");
abq.add("b");
abq.add("c");
abq.add("d");
abq.add("d");
//abq.add("e");
System.err.println(abq.offer("f",2, TimeUnit.SECONDS)); //设置等待的时间为2秒,如果在指定的时间内,还不能往队列中加入f,则返回失败
abq.drainTo(abq2, 3); //将abq队列中的3个元素拷贝到abq2中
abq2.add("liuhefei");
abq2.add("xiaoming");
//遍历abq2
for (Iterator iterator = abq2.iterator(); iterator.hasNext();) {
String string = (String) iterator.next();
System.out.println("元素:" + string);
}
System.out.println();
} catch (InterruptedException e) {
e.printStackTrace();
}
*/
//3.无界且基于阻塞队列
/*LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<>();
lbq.add("a");
lbq.add("b");
lbq.add("c");
for (String string : lbq) {
System.out.println("队列元素:" + string);
}
*/
//4.不能盛放任何元素的阻塞队列
SynchronousQueue<String> sq = new SynchronousQueue<>();
//获取元素
new Thread(new Runnable() {
@Override
public void run() {
try {
System.out.println("元素内容:" + sq.take());
sq.take(); //获取一个元素
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t1").start();
//添加元素
new Thread(new Runnable() {
@Override
public void run() {
sq.add("a");
}
},"t2").start();
}
}
实现一个阻塞队列,实例代码如下:
MyQueue.java
package com.lhf.collection;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 实现一个阻塞队列
*
*
* @author liuhefei
*
*/
public class MyQueue {
//1.定义队列的容器
private final LinkedList<Object> list = new LinkedList<>();
//2.定义计数器int count
private final AtomicInteger count = new AtomicInteger(0);
//3.定义一个容器的最大容量限制
private final int maxSize;
public MyQueue(int maxSize) {
this.maxSize = maxSize;
}
//4.定义容器的最小容量
private final int minSize = 0;
//5.定义一个锁
private final Object lock = new Object();
//添加元素
public void put(Object obj) {
//加锁
synchronized(lock) {
//判断容器是否满了
while(count.get() == this.maxSize) { //在添加元素时,容器满了,就需要等待
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//添加新元素进入容器
list.add(obj);
count.getAndIncrement(); //count++
System.out.println("元素:" + obj + " 成功添加进入容器!");
//进行唤醒可能正在等待的take方法操作中的线程
lock.notify(); //唤醒线程
}
}
//获取元素
public Object take() {
Object temp = null;
//加锁
synchronized (lock) {
//判断容器是否为空
while(count.get() == minSize) { //如果容器为空,在取数据时会进入等待状态,当有新元素加入之后,才会唤醒
try {
lock.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
temp = list.removeFirst(); //取出元素
count.getAndDecrement(); //count--
System.out.println("元素: " + temp + " 已经从容器中取走");
}
return temp;
}
//获取长度
public int size() {
return count.get();
}
//遍历
public List<Object> getQueueList() {
return list;
}
}
MyQueueTest.java
package com.lhf.collection;
/**
* 阻塞队列
*
*
* @author liuhefei
*
*/
public class MyQueueTest {
public static void main(String[] args) {
MyQueue mq = new MyQueue(10); //10是容量
mq.put("a");
mq.put("b");
mq.put("c");
mq.put("d");
mq.put("e");
System.out.println("当前元素个数: " + mq.size());
Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
mq.put("f");
mq.put("g");
mq.put("k");
mq.put("n");
}
},"t1");
Thread t2 = new Thread(new Runnable() {
@Override
public void run() {
try {
//休眠
Thread.sleep(1000);
Object o1 = mq.take(); //取值
Thread.sleep(1000);
Object o2 = mq.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
},"t2");
t1.start(); //启动线程
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
t2.start();
try {
Thread.sleep(5000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(mq.getQueueList().toString());
}
}