ConcurrentHashMap 1.8 源码分析

         前两已经分析过几篇关于CHM的源码,本篇分析下1.8中的实现,已经弃用之前 segment 双桶的机制。但是本质还是将锁细化达到性能的提升,但是不是之前版本中定义的segment 上锁,而是用了synchronized关键字锁住 table (1.8中只维护了和hashmap中类似的数据结构。一个数据,数组内的结构是链表或者红黑树)中本次所操作对象的链表头。同样大量使用了UnSafe的 本地方法。如CAS,putOr并且最大的改进在于实现了并发的扩容。同时内部数据结构与Java8中HashMap一样,增加了红黑树。当链表中的长度大于一定值后,转化为红黑树(红黑树结构的同时也仍然保持了链表的结构,下面会详细介绍)。

其实对于1.8的源码分析 我所引用 的两篇分章已经分析的十分详细了,本篇本章就不对每个方法或参数在详细说明了,只对其中我认为难以理解的对方加以概括,重点对如何实现并发扩容进行了分析。如有不对的地方,欢迎大家一起来交流。

 

       先简单分析下put操作。我们可以猜想,在之前的实现中,首先定位segment,上锁。而后操作在对segment中的map进一步处理。现在的实现中并没有用segment,而是延用 hashMap中单桶,定位到链表后,直接 上锁,而后对链表就行操作,同样是将锁细化。简化了很多的计算操作。 从put的源码可以看出总体与hashmap的put操作相差不大,除了加锁,另外就是增加了判断当前结点是否是ForwardingNode。表示当前正在扩容,且该结点已经扩容完毕。而后通过helpTransfer 判断是否参与扩容的过程。

 

红黑树

与1.8中的HashMap一样,当链表长度超过一定长度时,会转换成红黑树。但是 两者之间有一点细微的区别。如下代码,表示HashMap的树节点的数据结构。

HashMap
/**
 * Entry for Tree bins. Extends LinkedHashMap.Entry (which in turn
 * extends Node) so can be used as extension of either regular or
 * linked node.
 */   也是继承于HashMap中的Node,可以将该节点做为数组中的链表节点。
static final class TreeNode<K,V> extends LinkedHashMap.Entry<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;   。。。}

HashMap中TreeNode的实现了红黑树功能的方法。

ConcurrentHashMap中的TreeNode相对来说很简单,只定义了基本的数据结构和一定查找的方法。如下

/**
 * Nodes for use in TreeBins
 */
ConcurrentHashMap  树的成员与上面类似,但是
static final class TreeNode<K,V> extends Node<K,V> {
    TreeNode<K,V> parent;  // red-black tree links
    TreeNode<K,V> left;
    TreeNode<K,V> right;
    TreeNode<K,V> prev;    // needed to unlink next upon deletion
    boolean red;
《ConcurrentHashMap 1.8 源码分析》  }

并没有实现红黑树应实现的方法。但是两者有一个共通点,就是与单纯的红黑树的数据结构多了一个变量,就是prev,增加这个变量的意义在于删除红黑树的node时,需要找到被删node的上一个结点,因为如果只是单向链表与红黑树的结构。删除红黑树的结点时,单向链表就不能维护了,因为找不到被册结点的上一个结点。因此这里增加了一个prev的结构,形成双向链表。

而这样做是其增加了一个TreeBin来包装TreeNode,而这个容器不直接保存用户的key,value信息。hash值为定值-2,在遍历时可通过hash值判断是当前Node是哪种结构。-1表示正在ForwardingNode,-2为TreeBin,大于0为链表。

ConcurrentHashMap  
/**
 * TreeNodes used at the heads of bins. TreeBins do not hold user
 * keys or values, but instead point to list of TreeNodes and
 * their root. They also maintain a parasitic read-write lock
 * forcing writers (who hold bin lock) to wait for readers (who do
 * not) to complete before tree restructuring operations.
 */
static final class TreeBin<K,V> extends Node<K,V> {
    TreeNode<K,V> root;     //红黑树的根结点。
    volatile TreeNode<K,V> first;   // 指向链表的头部,虽然为红黑树,但保持了链表的结构。在unTree化时简化。
    volatile Thread waiter;
    volatile int lockState; 。。。}

 

《ConcurrentHashMap 1.8 源码分析》

 

接下来介绍下由链表是怎么转化为红黑树的。首先通过如下的方法将原来的Node转换成TreeNode,从且从单身链表转成双向链表。此时还没有发生红黑树的操作。

/* ---------------- Conversion from/to TreeBins -------------- */

/**
 * Replaces all linked nodes in bin at given index unless table is
 * too small, in which case resizes instead.
 */
private final void treeifyBin(Node<K,V>[] tab, int index) {
    Node<K,V> b; int n, sc;
    if (tab != null) {
        if ((n = tab.length) < MIN_TREEIFY_CAPACITY)
            tryPresize(n << 1);
        else if ((b = tabAt(tab, index)) != null && b.hash >= 0) {
            synchronized (b) {
                if (tabAt(tab, index) == b) {
                    TreeNode<K,V> hd = null, tl = null;
                    for (Node<K,V> e = b; e != null; e = e.next) {
                        TreeNode<K,V> p =
                            new TreeNode<K,V>(e.hash, e.key, e.val,
                                              null, null);
                        if ((p.prev = tl) == null)
                            hd = p;
                        else
                            tl.next = p;
                        tl = p;
                    }
                    setTabAt(tab, index, new TreeBin<K,V>(hd));
                }
            }
        }
    }
}
setTabAt(tab, index, new TreeBin<K,V>(hd));

然后new封装转换的TreeNode,在构造 方法里转换成红黑树。hd为原链表的头部。现在做为红黑树的first。在构造方法里执行的过程就和hashMap中转红黑树的过程类似 ,根据key的 systemCode的大小,决定在树中的位置,形成一个颗二叉树。最后通过r = balanceInsertion(r, x);  由二叉树转为红黑树。r为root指针。

而在HashMap中就没有first指针的概念,虽然其内部同样还是个双向链表(通过prev实现)。它是在转换成红黑树之后 ,通过将红黑树的root结点做为first node。

//   HashMap 中treeify后执行的方法,将root做为链表的first node
/**
 * Ensures that the given root is the first node of its bin.
 */
static <K,V> void moveRootToFront(Node<K,V>[] tab, TreeNode<K,V> root) {
    int n;
    if (root != null && tab != null && (n = tab.length) > 0) {
        int index = (n - 1) & root.hash;
        TreeNode<K,V> first = (TreeNode<K,V>)tab[index];
        if (root != first) {
            Node<K,V> rn;
            tab[index] = root;
            TreeNode<K,V> rp = root.prev;
            if ((rn = root.next) != null)
                ((TreeNode<K,V>)rn).prev = rp;
            if (rp != null)
                rp.next = rn;
            if (first != null)
                first.prev = root;
            root.next = first;
            root.prev = null;
        }
        assert checkInvariants(root);
    }
}

UNSAFE   

如下,这三个操作是实现并发访问的关键。通过UNSAFE的本地方法,1.7中就已经引入过,而在1.8中继续扮演着重要的角色。并且大量使用了CAS的操作。

关于  UNSAFE方法  在之前1.7就已经分析过
@SuppressWarnings("unchecked")  //定位tab中index为i的 Node
static final <K,V> Node<K,V> tabAt(Node<K,V>[] tab, int i) {
    return (Node<K,V>)U.getObjectVolatile(tab, ((long)i << ASHIFT) + ABASE);
}
//CAS设置tab中index为i的结点为node
static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}
//Volatile  写 tab中的 Node。在加锁的情况下调用 
static final <K,V> void setTabAt(Node<K,V>[] tab, int i, Node<K,V> v) {
    U.putObjectVolatile(tab, ((long)i << ASHIFT) + ABASE, v);
}

并发扩容

扩容这一操作相比于之前的实现,1.8中最大的不同在于实现了并发扩容。其核心 是通过了一个特殊的Node。其定义如下 如果遍历时发现node的 hash值是 -1,表示当前正在扩容。且当前table中的该node已经容完成。遍历下一个node进行扩容。

/**
 * A node inserted at head of bins during transfer operations.
 */
static final class ForwardingNode<K,V> extends Node<K,V> {
    final Node<K,V>[] nextTable;
    ForwardingNode(Node<K,V>[] tab) {
        super(MOVED, null, null, null);
        this.nextTable = tab;
    }
    Node<K,V> find(int h, Object k) {
       。。。。
    }
}

而其并发到底又是怎么实现的呢 ? 接下来看,详细代码就不贴了,这里分析关键地方,

private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
    int n = tab.length, stride;
    if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
        stride = MIN_TRANSFER_STRIDE; // subdivide range   
。。。。。。。
}

stride 非常关键,大致的意思 是每个处理器处理的node个数不小于16。而这一步的实现在下面的方法 。

i=0开始,第一步CAS  设置  transferIndex,该值初始为原表的大小,假设n为64,第一个线程将其CAS设为64-16等于48.则第一个线程就先执行64到48之前的node的扩容操作,如果在此期间第二个线程要执行到transfer方法,则transferIndex为 32.其执行48到32之间。但是如果没有其它线程进来执行,则它们就接着往下,争取下一个stride数量的扩容操作。但是当最后transferIndex的值小于等于0时。表示此时已经不需要参与扩容了。如此通过CAS设置transferIndex的值,解决并发的冲突。同时每个node扩容时,要上锁。以防其它操作改变该链表的结构。这样就可以有多个线程并发扩容,且不会产生冲突。

Node<K,V> f; int fh;
while (advance) {
    int nextIndex, nextBound;
    if (--i >= bound || finishing)
        advance = false;
    else if ((nextIndex = transferIndex) <= 0) {
        i = -1;
        advance = false;
    }
    else if (U.compareAndSwapInt
             (this, TRANSFERINDEX, nextIndex,
              nextBound = (nextIndex > stride ?
                           nextIndex - stride : 0))) {
        bound = nextBound;
        i = nextIndex - 1;
        advance = false;
    }
}
if (i < 0 || i >= n || i + n >= nextn) {
    int sc;
    if (finishing) {
        nextTable = null;
        table = nextTab;
        sizeCtl = (n << 1) - (n >>> 1);
        return;
    }
    if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
        if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
            return;
        finishing = advance = true;
        i = n; // recheck before commit
    }
}

如下是在transfer过程中,原node是链表的情况下扩容过程。

if (tabAt(tab, i) == f) {
    Node<K,V> ln, hn;
    if (fh >= 0) {
        int runBit = fh & n;
        Node<K,V> lastRun = f;
        for (Node<K,V> p = f.next; p != null; p = p.next) {
            int b = p.hash & n;
            if (b != runBit) {
                runBit = b;
                lastRun = p;
            }
        }
        if (runBit == 0) {
            ln = lastRun;
            hn = null;
        }
        else {
            hn = lastRun;
            ln = null;
        }
        for (Node<K,V> p = f; p != lastRun; p = p.next) {
            int ph = p.hash; K pk = p.key; V pv = p.val;
            if ((ph & n) == 0)
                ln = new Node<K,V>(ph, pk, pv, ln);
            else
                hn = new Node<K,V>(ph, pk, pv, hn);
        }
        setTabAt(nextTab, i, ln);
        setTabAt(nextTab, i + n, hn);
        setTabAt(tab, i, fwd);
        advance = true;
    }

下图表示遍历到第i个结点,上锁后,判断是链表后,首先是找到lastRun,其表示尾部  hash&n  相同的排在最前面的node。1表示扩容后在高位hn,0表示在低位ln(也就是原来的位置)。而之所以 用hash&n 的 表达式来决定原node在扩容后的位置,而没有通过hash计算新数组的大小 来决定 位置 。这个实现十分的巧妙,直接将hash值与原表的大小& 一下。 这样的做的原因是因为, put操作时决定结点所在位置时通过 hash& n-1。而n我们这里定义的是2的x次方。n-1显然就是 11..11的结点,而n就是 1..00。举个例子n为16,则n-1为 1111。而n为10000。对应扩容后的大小为100000,原结点在新数组的位置就是hash& 11111。现在我们发现 原之前的差别就在于 原数组的&操作。也就是 11111中第一个1。所以扩容时,不需要重新计算。而只需要将原来的hash值与n进行&操作(这样的操作简化了计算的复杂度)。就可以确定在新table的位置。0表示原索引,而1表示 i+n的位置。且不需要考虑并发的问题,nextTable的  i和 i+n的位置 只会由原table的i中的node给重新占据,而我们一开始就对table的 node  i已经上锁了。所以是安全的。

《ConcurrentHashMap 1.8 源码分析》

上面分析了链表的反序处理。红黑树类似,但是需要判读是否unTree。

总结:进一步将锁细化,不在是设置的并发级别,随着扩容之后 ,锁粒度进一步细化,并且提供了并发扩容,且大量使用了UNSAFE的本地方法,性能也进一步提升。
 

本文没有对CHM的所有操作进行分析 ,如get,size,remove等,在下面引用的两篇文章已经很详细的分析,上文只是对部门关键点表明了我的一点点的看法。如有不正之处希望大家指出来。本文后面接着在完善。

 

 

 

 

http://www.importnew.com/22007.html

http://blog.csdn.net/u010723709/article/details/48007881

    原文作者:红黑树
    原文地址: https://my.oschina.net/ovirtKg/blog/777520
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞