Java多线程 -- JUC包源码分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/无锁链表

为什么是SkipList,不是TreeMap的红黑树?
无锁链表的精髓
ConcurrentSkipListMap
ConcurrentSkipListSet

为什么是SkipList?

大家都知道,在Java集合类中,有一个TreeMap,相对于HashMap,它的一个最大特点是:key是有序的。TreeMap的内部是用红黑树实现的。同HashMap一样,它也是非线程安全的。

然后在JUC包中,提供了一个线程安全的、有序的HashMap,也就是今天所要讲的ConcurrentSkipListMap。(关于什么是SkipList,或者“跳跃表“,此处不再详述,大家自行google之)

那为什么实现这个线程安全的、有序的HashMap,不用红黑树,而用SkipList呢?

借用Doug Lea的原话:

The reason is that there are no known efficient lock-free insertion and deletion algorithms for search trees.

也就是目前计算机领域没找到一种高效的、作用在树上的、无锁的、增加和删除结点的办法。

那为什么SkipList就可以无锁的实现结点的增加、删除呢?那就要从以下的故事说起:

无锁链表

问题的提出

在Doug Lea的注释中,他引用了下面这篇无锁链表的Paper:
“A pragmatic implementation of non-blocking linked lists” (大家自行google之)

咋一看,无锁链表不是很简单嘛,还值得写一篇paper专门论述?我们在前面讲AQS的时候,已反复用到无锁队列,其实现也是链表。那究竟区别在哪呢?

我们前面所讲的无锁队列、栈,都是只在头、尾做cas操作,这个问题不大。而玄机就在,如果你在链表的中间做insert/delete操作,照通常的cas做法,就会出问题!

关于这个问题,那篇论文中有清晰的论述,此处引用如下:
操作1: 在结点10后面插入结点20,没什么问题
《Java多线程 -- JUC包源码分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/无锁链表》

操作2:删除结点10,也没什么问题
《Java多线程 -- JUC包源码分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/无锁链表》

但如果2个线程同时操作,一个删除结点10,一个要在10后面插入结点20。这2个操作都各自是CAS的,这个时候就会出问题:新插入的结点20会丢失!这个问题不是简单的用CAS就可以解决的。
《Java多线程 -- JUC包源码分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/无锁链表》

那为什么会出现这个问题呢?
究其原因:我们在删除结点10的时候,实际操作的是10的前驱H,10本身并没有任何变化。这样,在往10后插入20的这个线程,不知道10已经删除了!!

要解决这个问题,论文中提出了如下的解决办法:
《Java多线程 -- JUC包源码分析18 -- ConcurrentSkipListMap(Set)/TreeMap(Set)/无锁链表》

把10的删除分2步:第1步,把10的next指针,mark成删除,也就是软删;第2步,找机会,物理删除。

这个解决办法有一个关键点:就是“把10的next指针指向20(insert操作)“ 和 “判断10本身是否删除“这2个,必须是原子的,必须在1个cas里面完成!!!

解决办法

办法1: AtomicMarkableReference
要实现上面所说的,一个办法是用前面所提到的AtomicMarkableReference,也就是说,每个next要是AtomicMarkableReference类型。但这个办法,不够高效。Doug Lea在ConcurrentSkipListMap的实现中,用了另外一种办法

办法2:Mark结点
* +——+ +——+ +——+ +——+
* … | b |——>| n |—–>|marker|——>| f | …
* +——+ +——+ +——+ +——+
*
* 3. CAS b’s next pointer over both n and its marker.
* From this point on, no new traversals will encounter n,
* and it can eventually be GCed.
* +——+ +——+
* … | b |———————————–>| f | …
* +——+ +——+

我们的目的,不就是想标记结点n,也就是标记它的next字段嘛?那就新造一个marker结点,让n.next = marker。

这样也达到了,删除n的时候,标记n的next指针的目的。

ConcurrentSkipListMap

SkipList结构

解决了无锁链表的insert/delete问题,也就解决了SkipList的一个关键问题。因为SkipList,通俗点讲,就是多层链表叠起来的:一个大的单向链表,存储所有的

//最底层Node结点。所有的<k,v>对,都是这个单向链表串起来的。
static final class Node<K,V> {
        final K key;
        volatile Object value;
        volatile Node<K,V> next;      
        。。。
}

//上面的Index层的结点
    static class Index<K,V> {
        final Node<K,V> node;  //不存储实际数据,指向Node
        final Index<K,V> down;  //关键点:每个Index结点,必须有个指针,指向其下一个Level对应的结点
        volatile Index<K,V> right; //自己也组成单向链表
        。。。
 }

//整个ConcurrentSkipListMap就只用记录最顶层的head结点
public class ConcurrentSkipListMap<K,V> extends AbstractMap<K,V> implements ConcurrentNavigableMap<K,V>, Cloneable, java.io.Serializable {
    。。。
    private transient volatile HeadIndex<K,V> head;
}

    final void initialize() {
        keySet = null;
        entrySet = null;
        values = null;
        descendingMap = null;
        randomSeed = seedGenerator.nextInt() | 0x0100; // ensure nonzero
        head = new HeadIndex<K,V>(new Node<K,V>(null, BASE_HEADER, null),
                                  null, null, 1);
    }

put操作

    public V put(K key, V value) {
        if (value == null)
            throw new NullPointerException();
        return doPut(key, value, false);
    }

    private V doPut(K kkey, V value, boolean onlyIfAbsent) {
        Comparable<? super K> key = comparable(kkey);
        for (;;) {
            Node<K,V> b = findPredecessor(key);  //先找到结点的前驱(一定在base level上)
            Node<K,V> n = b.next;
            for (;;) {
                if (n != null) {
                    Node<K,V> f = n.next;
                    if (n != b.next)               // inconsistent read
                        break;
                    Object v = n.value;
                    if (v == null) {               // n is deleted
                        n.helpDelete(b, f);
                        break;
                    }
                    if (v == n || b.value == null) // b is deleted
                        break;
                    int c = key.compareTo(n.key);
                    if (c > 0) {
                        b = n;
                        n = f;
                        continue;
                    }
                    if (c == 0) {     //结点存在,直接改值
                        if (onlyIfAbsent || n.casValue(v, value))
                            return (V)v;
                        else
                            break; // restart if lost race to replace value
                    }
                    // else c < 0; fall through
                }

                Node<K,V> z = new Node<K,V>(kkey, value, n);
                if (!b.casNext(n, z))  //结点不存在,新建一个,插入进入
                    break;         

                int level = randomLevel();  //关键点:判断是否要给此结点加上Index层。randomLevel有50%概率会返回0,也就是说,50%的结点,不会有Index层,只会在最底层的链表上面

                if (level > 0)
                    insertIndex(z, level);   //level > 0,为其建索引
                return null;
            }
        }
    }

//关键函数:查找一个结点的前驱,header开始,从左往右、从上往下遍历
    private Node<K,V> findPredecessor(Comparable<? super K> key) {
        if (key == null)
            throw new NullPointerException(); // don't postpone errors
        for (;;) {
            Index<K,V> q = head;
            Index<K,V> r = q.right;
            for (;;) {
                if (r != null) {
                    Node<K,V> n = r.node;
                    K k = n.key;
                    if (n.value == null) {
                        if (!q.unlink(r))
                            break;           // restart
                        r = q.right;         // reread r
                        continue;
                    }
                    if (key.compareTo(k) > 0) {
                        q = r;
                        r = r.right;
                        continue;
                    }
                }
                Index<K,V> d = q.down;   //跳到下一层
                if (d != null) {
                    q = d;
                    r = d.right;
                } else
                    return q.node;
            }
        }
    }

get操作

    public V get(Object key) {
        return doGet(key);
    }

    private V doGet(Object okey) {
        Comparable<? super K> key = comparable(okey);

        for (;;) {
            Node<K,V> n = findNode(key);
            if (n == null)
                return null;
            Object v = n.value;
            if (v != null)
                return (V)v;
        }
    }

    private Node<K,V> findNode(Comparable<? super K> key) {
        for (;;) {
            Node<K,V> b = findPredecessor(key); //再次用的此函数
            Node<K,V> n = b.next;
            for (;;) {
                if (n == null)
                    return null;
                Node<K,V> f = n.next;
                if (n != b.next)                // inconsistent read
                    break;
                Object v = n.value;
                if (v == null) {                // n is deleted
                    n.helpDelete(b, f);
                    break;
                }
                if (v == n || b.value == null)  // b is deleted
                    break;
                int c = key.compareTo(n.key);
                if (c == 0)
                    return n;
                if (c < 0)
                    return null;
                b = n;
                n = f;
            }
        }
    }

ConcurrentSkipListMap中,关于链表的操作,细节非常多,上面只是分析了其整体实现思路。关于insert/delete, insertIndex的具体实现细节,本文不在详述,留待大家对照代码仔细分析。

ConcurrentSkipListSet

我们知道,TreeSet是基于TreeMap实现的,就是对TreeMap的一个简单封装。

同样, ConcurrentSkipListSet也就是对ConcurrentSkipListMap的一个简单封装,具体不再详述。

    public ConcurrentSkipListSet() {
        m = new ConcurrentSkipListMap<E,Object>();
    }
    原文作者:JUC
    原文地址: https://blog.csdn.net/chunlongyu/article/details/52507059
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞