PriorityBlockingQueue源码-JUC阻塞队列3
它是我们要说的第三个BlockingQueue,我们依然把BlockingQueue的几个操作放到这里。
操作 | 抛出异常 | 特殊值 | 阻塞 | 超时 |
---|---|---|---|---|
插入 | add(e) | offer(e) | put(e) | offer(e, time, unit) |
移除 | remove() | poll() | take() | poll(time, unit) |
检查 | element() | peek() | – | – |
使用
public static void main(String[] args) throws InterruptedException {
BlockingQueue<User> blockQueue = new PriorityBlockingQueue<>(1);
blockQueue.put(new User(1,1,"张三"));
blockQueue.put(new User(2,5,"李四"));
blockQueue.put(new User(3,2,"赵六"));
blockQueue.put(new User(3,7,"是是是"));
System.out.println("大小是:"+blockQueue.size());
for(int i = 0;i<blockQueue.size();i++) {
User user = blockQueue.take();
System.out.println(user);
}
}
@Data
public class User implements Comparable<User>{
private Integer id;
private int priority;
private String name;
public User(Integer id,int priority,String name) {
this.id = id;
this.priority = priority;
this.name = name;
}
@Override
public int compareTo(User o) {
return this.priority - o.priority;
}
@Override
public String toString() {
return "User [id=" + id + ", priority=" + priority + ", name=" + name + "]";
}
}
构造方法
public PriorityBlockingQueue(int initialCapacity,
Comparator<? super E> comparator) {
if (initialCapacity < 1)
throw new IllegalArgumentException();
this.lock = new ReentrantLock();
this.notEmpty = lock.newCondition();
this.comparator = comparator;
this.queue = new Object[initialCapacity];
}
可以有初始容量,但是没有容量上线 最大是Integer.MAX_VALUE-8
一把锁ReentrantLock
一个notEmpty的condition
底层是数组(默认初始容量是11),可扩容,无容量上限
正式上面的容量无上限,导致put方法看起来跟ArrayBlockingQueue和LinkedBlockingQueue上了判断容量上线条件阻塞的代码逻辑。
while (count == items.length)
notFull.await();
put()方法
public void put(E e) {
offer(e); // 永远不会因为容量导致条件阻塞
}
public boolean offer(E e) {
if (e == null)
throw new NullPointerException();
final ReentrantLock lock = this.lock;
lock.lock();
int n, cap;
Object[] array;
while ((n = size) >= (cap = (array = queue).length))
tryGrow(array, cap);
try {
Comparator<? super E> cmp = comparator;
if (cmp == null)
siftUpComparable(n, e, array);
else
siftUpUsingComparator(n, e, array, cmp);
size = n + 1;
notEmpty.signal();
} finally {
lock.unlock();
}
return true;
}
整体流程跟之前讲的两个BlockingQueue还是很相似的 但有一些区别:
1)直接调用了offer 获取锁直接添加,没有容量限制
2)没有容量限制,所以会自动扩容 tryGrow()
3)如何进入容器并保证有序
4)唤醒notEmpty.await()的线程