ConcurrentLinkedQueue的设计非常考验设计功底,全程使用cas操作。为了使用无锁的机制,这个队列简化了很多操作,把主力全部放到了作为一个队列的使用方法,offer(),poll(),peek(),isEmpty()等队列常用方法。由于全程无锁,设计复杂程度远远高于直接使用锁,所以读起来比较晦涩难懂。所以面试的时候说出这个是实现过程,就比较容易加分。
public ConcurrentLinkedQueue() {
head = tail = new Node<E>(null);
}
最重要的两个节点,head和tail,初始化的时候都是指向一个空节点的。
public boolean offer(E e) {
checkNotNull(e);
final Node<E> newNode = new Node<E>(e);
for (Node<E> t = tail, p = t;;) {
Node<E> q = p.next;
if (q == null) {
if (p.casNext(null, newNode)) {
if (p != t)
//更新tail
casTail(t, newNode);
return true;
}
}
else if (p == q)
//哨兵重新找
//哨兵节点是直接看别人写的,感觉比较适合,这里也这么称呼
//这个节点的下一个节点是指向自己的
p = (t != (t = tail)) ? t : head;
else
p = (p != t && t != (t = tail)) ? t : q;
}
}
offer操作的代码可以说非常精简。offer先把数据建立成一个节点,然后使用到t,p两个变量同时指向tail的位置,q是p的next,只要q为null就进入cas环节。这里都好理解,但是更新tail的时候需要做p!=t的判定,这个就是比较有讲究的地方了,具体下面说,哨兵节点是在poll产生的,具体在Poll中结合的说。
p = (p != t && t != (t = tail)) ? t : q;
这段代码利用了java的特性,就是解析成指令后,按照指令从左往右执行。
int t=6;
System.out.println(t!=(t=5));
例如上面这个程序,遇到t的时候就把6取出来,然后t=5赋值,此时表达式就变成6!=5。 以上程序,用c++重写后,结果就不是这个样子了。
这一句简单的话,就是在tail更新后使用新的tail的值,tail没有被其他线程更新的话,就直接让p移动到p的下一个节点的位置。以此找到正确的tail。哨兵节点的处理方式同理。
offer方法如果返回只会返回true,所以不能用返回值来做逻辑判断。
很多人都有这个疑问,p和t原来都是指向tail的,p经过查找,表示的是最后一个节点的位置,t经过查找,找到的是tail的位置,也就是说最后一个节点和tail节点不相同的时候,就更新tail。
p!=t的情况发生在两种情况下。
条件1,由于tail是有条件更新的,当tail指向最后最后一个节点的时候,此时添加节点后,t和p都没有变动,不会触发tail的更新,此时tail就是倒数第二个节点,等再次添加的时候,进入p!=t的条件,开始更新tail的位置。
条件2,tail被其他线程更新了,p指向的tail,已经不是tail指向的位置了。
由于这个条件,tail指向的可能是最后一个节点,可能是倒数第二个,可能是倒数第三个(就是在新加了元素后,还没执行更新tail的时候,tail此时就是节点的倒数第三个位置),在并发大的情况,下tail还可能极限到倒数第四个。最后和倒数第二个的情况,大家可以推导出来,这里再说一下倒数第四个的情况。触发条件是,tail是倒数第二的情况,然后同时两个线程一起并发添加,第一个线程使用cas添加成功,此时开始走p!=t的情况,与此同时,另外一个线程虽然cas失败,但是p的节点找到当时的最后一个,此时第一个线程添加节点成功,然后通过重新查找找到下一个节点,开始添加成功,而此时tail的cas开始执行,这个瞬间,tail还保持到原来的位置,但是已经新添加了2个节点。这个状态的时间非常短,但是有可能出现。
为什么说极限是倒数第四个,没有倒数第五个。因为cas操作是原子的,能成功的只有一个其他都是失败。所以以下代码最多容纳两个线程,一个casnext成功,进入castail的流程,另外的一个线程是casnext刚失败,然后往后找了一个节点,执行casNext成功,就算并发量再大,其他casnext都是失败的,一个时间点上只允许一个成功,此时casTail处于刚要成功的位置,加上原来正常的倒数第二个节点,正好是倒数第四个。
if (p.casNext(null, newNode)) {
if (p != t)
casTail(t, newNode);
很多人都问这个问题,如果是正常操作的话,应该让tail成为最后一个节点,这样每次都可以直接取出tail,然后往后cas添加。以上这个思维其实是加锁的一种做法,毕竟casNext和casTail是两个操作,在不加锁的情况下,多线程跑起来,没法保证这个操作。
我在网上找到了类似的算法,那个算法就是保证线程都结束后,tail就是最后一个节点。
p=new node();
while(true){
//获取最后节点
tail=getTail()
if(tail->next==null){
//添加节点
if(casAddNode()){
break;
}
}else{
//更新tail节点
cas(tail,tail->next)
}
}
//更新tail
castail(tail,q)
为了保证tail是最后一个,每个线程都在做tail的维护工作。但是在并发高的情况下,tail也只是保证在当前一个时间点上是最后一个元素,你在更新的时候别人还在加元素,你刚更新完tail,另外的线程就加了一个元素,所以在多线程添加元素的情况下,也只能是依靠现有的tail去查找到最后一个元素。相对于每次添加元素而言,tail并不一定是最后一个元素,而且还加入了频繁的casTail操作。
后面又提出了一种优化方式,就是对tail的定义为接近最后一个元素的位置。
p=new node();
while(true){
//获取最后节点
tail=getTail()
if(tail->next==null){
//添加节点
if(casAddNode()){
break;
}
}else{
//更新tail节点
while(tail->next != null)
tail = tail->next;
}
}
//更新tail
castail(tail,q)
这个是对第一种的改良,用while替换了cas操作,效率上高了很多,存在的问题就是tail的更新在完成之后,那么在并发高的情况下,还没更新tail的时候,可能已经加入了很多节点,每个都是拿着当前变量去查找最后一个节点,就算更新了tail也不会使用tail的值。每个线程找真正最后一个节点的遍历次数可能非常多。
于是新的算法被提出来Michael & Scott的改良。ConcurrentLinkedQueue就是用了这个算法。就是tail本身定义就是接近结尾的元素。相比第二个算法的改良,就是使用了tail的更新,并不是单纯一个局部变量去更新,p = (p != t && t != (t = tail)) ? t : q;再有人更新tail就使用了tail,第二种的while循环则要和加入的速度竞争,效果和一个水缸,一个口在放水,一个口在加水一样。
Michael & Scott的改良本身对tail的定义就不是最后一个节点,所以更新只是为了防止距离太远,如果距离可以接近,那可以省去这个cas操作。只有一种情况会出现不更新,就是tail节点就是原来添加元素的最后一个节点,在并发高的情况下,p!=t的条件很容易满足,只要多线程开始处理,很多情况下都是p!=t的,都会尝试更新tail节点。慢慢的就退化成上述的第二种情况,每次都会去更新tail。
public E poll() {
restartFromHead:
for (;;) {
for (Node<E> h = head, p = h, q;;) {
E item = p.item;
if (item != null && p.casItem(item, null)) {
if (p != h)
updateHead(h, ((q = p.next) != null) ? q : p);
return item;
}
else if ((q = p.next) == null) {
updateHead(h, p);
return null;
}
//哨兵节点
else if (p == q)
continue restartFromHead;
else
p = q;
}
}
}
poll的操作相对offer,要稍微好看一些,直接从head开始遍历,初始化的时候head是一个空节点,head表示的也不一定是第一个元素,起码初始化的时候head是一个空节点,p表示的是一个有数据的节点,h表示的就是head的位置,p!=h只有一种情况下出现,就是head是一个空节点而不是第一个元素节点,在p!=h的情况下,如果p有下一个节点,那么p的下一个节点作为head,否则p本身作为head。p的next作为head的时候,head就是指向了第一个元素,再次删除的时候,就只会把一个元素的值设置为null,并不会更新head,此时head又成为了一个表头,他的下一个元素才是第一个数据节点。
final void updateHead(Node<E> h, Node<E> p) {
if (h != p && casHead(h, p))
h.lazySetNext(h);
}
在更新头节点的时候原来的head就会自己指向自己,成为哨兵。
当添加入第一个元素后,head和tail都指向了头结点(不是第一个元素),此时添加的时候获取tail,poll正好把head做成哨兵节点,此时找head的下一个节点的时候就会无限循环下去。
ConcurrentLinkedQueue通过无锁来做到了更高的并发量,是个高性能的队列,但是使用场景相对不如阻塞队列常见,毕竟取数据也要不停的去循环,不如阻塞的逻辑好设计,但是在并发量特别大的情况下,是个不错的选择,性能上好很多,而且这个队列的设计也是特别费力,尤其的使用的改良算法和对哨兵的处理。整体的思路都是比较严谨的,这个也是使用了无锁造成的,我们自己使用无锁的条件的话,这个队列是个不错的参考。