JUC中多线程的集合——ConcurrentHashMap

部分摘自

http://www.importnew.com/22007.html
https://www.cnblogs.com/everSeeker/p/5601861.html

一些过时的知识

ConcurrentHashMap是一个代码超过6000行的类。绝对跟简单无缘。也绝对不是一片博客能讲的清楚的。

下面使一些过时的知识:
ConcurrentHashMap采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个Map加锁的设计,分段锁大大的提高了高并发环境下的处理能力。但同时,由于不是对整个Map加锁,导致一些需要扫描整个Map的方法(如size(), containsValue())需要使用特殊的实现,另外一些方法(如clear())甚至放弃了对一致性的要求(ConcurrentHashMap是弱一致性的,意味着你put进一个元素,一段事件内你get不到这个元素)。
ConcurrentHashMap中的分段锁称为Segment,Segment[]的数组长度是能够同时更新ConccurentHashMap且不产生锁竞争的最大线程数。它即类似于HashMap(JDK7与JDK8中HashMap的实现)的结构,即内部拥有一个Entry数组,数组中的每个元素又是一个链表;同时又是一个ReentrantLock(Segment继承了ReentrantLock)。ConcurrentHashMap中的HashEntry相对于HashMap中的Entry有一定的差异性:HashEntry中的value以及next都被volatile修饰,这样在多线程读写过程中能够保持它们的可见性
《JUC中多线程的集合——ConcurrentHashMap》
在这个实现中,该map只满足弱一致性。刚刚put进去的元素有可能不能被get立刻看到。
不过上面这些都不重要了,因为JDK1.8中这个实现已经配抛弃了。JDK1.8主要进行了下面两方面改动

  1. 取消segments字段,直接采用transient volatile HashEntry[] table保存数据,采用table数组元素作为锁,从而实现了对每一hash位置数据进行加锁,进一步减少并发冲突的概率。

  2. 将原先table数组+单向链表的数据结构,变更为table数组+单向链表+红黑树的结构。对于hash表来说,最核心的能力在于将key hash之后能均匀的分布在数组中。如果hash之后散列的很均匀,那么table数组中的每个队列长度主要为0或者1。但实际情况并非总是如此理想,虽然ConcurrentHashMap类默认的加载因子为0.75,但是在数据量过大或者运气不佳的情况下,还是会存在一些队列长度过长的情况,如果还是采用单向列表方式,那么查询某个节点的时间复杂度为O(n);因此,对于个数超过8(默认值)的列表,jdk1.8中采用了红黑树的结构,那么查询的时间复杂度可以降低到O(logN),可以改进性能。

JDK1.8中的实现

1.8中, ConcurrentHashMap的底层数据结构是成员变量 transient volatile Node<K,V>[] table;(注意,这里写成transient是因为该类另外实现了序列化和反序列化方法以保证向前兼容性)

使用下面的函数原子性的操作该数据结构

    static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                        Node<K,V> c, Node<K,V> v) {
        return U.compareAndSwapObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
    }

Unsafe类的compareAndSwapObject,tab是要更新的数组,第二个参数时偏移量(对应值为field),第三个参数为expect,最后一参数时update。逻辑是field == expect,则将field置为update。在类初始化时,首先要获取Unsafe单例对象。Unsafe对象和操作所需要的偏移量在类初始化时准备。

    static {
        try {
            U = sun.misc.Unsafe.getUnsafe();
            Class<?> k = ConcurrentHashMap.class;
            //通过Unsafe对象和class对象拿到对应的偏移量。
            SIZECTL = U.objectFieldOffset
                (k.getDeclaredField("sizeCtl"));//对应实例变量sizeCtl;
            TRANSFERINDEX = U.objectFieldOffset
                (k.getDeclaredField("transferIndex"));
            BASECOUNT = U.objectFieldOffset
                (k.getDeclaredField("baseCount"));
            CELLSBUSY = U.objectFieldOffset
                (k.getDeclaredField("cellsBusy"));
            Class<?> ck = CounterCell.class;
            CELLVALUE = U.objectFieldOffset
                (ck.getDeclaredField("value"));
            Class<?> ak = Node[].class;
            ABASE = U.arrayBaseOffset(ak);
            int scale = U.arrayIndexScale(ak);
            if ((scale & (scale - 1)) != 0)
                throw new Error("data type scale not a power of two");
            ASHIFT = 31 - Integer.numberOfLeadingZeros(scale);
        } catch (Exception e) {
            throw new Error(e);
        }
    }

我们看一下put方法

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        if (key == null || value == null) throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        //for循环就是方便break。相当于一个while死循环
        for (Node<K,V>[] tab = table;;) {
            Node<K,V> f; int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();//如果tab为空则进行初始化
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {//如果数组所在位置还没有被占用,则直接把entry放在那个位置,结束退出
                if (casTabAt(tab, i, null,
                             new Node<K,V>(hash, key, value, null)))
                    break;                   // no lock when adding to empty bin
            }
            else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);//其他线程正在进行resize时的特殊处理
            else {
                V oldVal = null;
                synchronized (f) {//对当前数组位置的头节点加锁
                    if (tabAt(tab, i) == f) {//双重检查
                        if (fh >= 0) {//插入链表
                            binCount = 1;
                            for (Node<K,V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash &&
                                    ((ek = e.key) == key ||
                                     (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K,V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K,V>(hash, key,
                                                              value, null);
                                    break;
                                }
                            }
                        }
                        else if (f instanceof TreeBin) {//插入红黑树
                            Node<K,V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
                                                           value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        addCount(1L, binCount);
        return null;
    }


除了,

        /** * Acquires write lock for tree restructuring. */
        private final void lockRoot() {
            if (!U.compareAndSwapInt(this, LOCKSTATE, 0, WRITER))
                contendedLock(); // offload to separate method
        }

        /** * Releases write lock for tree restructuring. */
        private final void unlockRoot() {
            lockState = 0;
        }

        /** * Possibly blocks awaiting root lock. */
        private final void contendedLock() {
            boolean waiting = false;
            for (int s;;) {
                if (((s = lockState) & ~WAITER) == 0) {
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, WRITER)) {
                        if (waiting)
                            waiter = null;
                        return;
                    }
                }
                else if ((s & WAITER) == 0) {
                    if (U.compareAndSwapInt(this, LOCKSTATE, s, s | WAITER)) {
                        waiting = true;
                        waiter = Thread.currentThread();
                    }
                }
                else if (waiting)
                    LockSupport.park(this);
            }
        }
    原文作者:JUC
    原文地址: https://blog.csdn.net/define_us/article/details/80773370
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞