ConcurrentHashmap源码分析(jdk7)

2019独角兽企业重金招聘Python工程师标准>>> 《ConcurrentHashmap源码分析(jdk7)》

版本说明:jdk1.7.0_79
ConcurrentHashmap使用分段锁的设计。它将数据分为多个段存储,每个段是一个hash表。在不同的分段上不存在竞争关系,而只有在同一段上才会产生竞争。这种锁分离的设计减小了锁的粒度,也就提升了并发能力。

类声明

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable

属性

下面是常量

    /** * 默认的初始容量。如果构造器中没指定初始容量,则使用该默认值 */
    static final int DEFAULT_INITIAL_CAPACITY = 16;

    /** * 默认的负载因子。 */
    static final float DEFAULT_LOAD_FACTOR = 0.75f;

    /** * 默认的并发级别:16。也就是段的数量。该值在初始化后不允许再修改。 */
    static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    /** * 最大容量。必须是2的n次方 */
    static final int MAXIMUM_CAPACITY = 1 << 30;

    /** * The minimum capacity for per-segment tables. Must be a power * of two, at least two to avoid immediate resizing on next use * after lazy construction. * * 一个分段的最小容量,为2。它必须是2的次方。 */
    static final int MIN_SEGMENT_TABLE_CAPACITY = 2;

    /** * The maximum number of segments to allow; used to bound * constructor arguments. Must be power of two less than 1 << 24. * * 允许的段的最大数量,为2的16次方。 */
    static final int MAX_SEGMENTS = 1 << 16; // slightly conservative

    /** * Number of unsynchronized retries in size and containsValue * methods before resorting to locking. This is used to avoid * unbounded retries if tables undergo continuous modification * which would make it impossible to obtain an accurate result. */
    static final int RETRIES_BEFORE_LOCK = 2;

下面是域属性

    //对segment进行定位
    final int segmentMask;

    //段的偏移值
    final int segmentShift;

    //段数组,表示所有的分段
    final Segment<K,V>[] segments;

    //每个分段,包含多个HashEntry
    transient volatile HashEntry<K,V>[] table;

    //每个bucket,由多个节点组成的链表
    static final class HashEntry<K,V> {
        final int hash;
        final K key;
        volatile V value;
        volatile HashEntry<K,V> next;
    }

concurrencyLevel :一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。

    /** * Segments are specialized versions of hash tables. * This subclasses from ReentrantLock opportunistically, just to * simplify some locking and avoid separate construction. * * 每个分段都是一个特殊的hash表。继承自ReentrantLock */
    static final class Segment<K,V> extends ReentrantLock implements Serializable {

        static final int MAX_SCAN_RETRIES =
            Runtime.getRuntime().availableProcessors() > 1 ? 64 : 1;

        /** * 每个段的table。元素通过entryAt/setEntryAt来访问,以保证volatile可见性。 */
        transient volatile HashEntry<K,V>[] table;

        transient int count;

        transient int modCount;

        transient int threshold;

        final float loadFactor;

        Segment(float lf, int threshold, HashEntry<K,V>[] tab) {
            this.loadFactor = lf;
            this.threshold = threshold;
            this.table = tab;
        } 

构造器

CurrentHashMap需要三个参数:initialCapacity(初始容量),loadFactor(负载因子),concurrencyLevel(并发级别),这个三个参数都对应有默认值。CurrentHashMap存在几个重载的方法,但最终都是调用下面的指定参数的方法。

    @SuppressWarnings("unchecked")
    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        //参数不合法时抛异常
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
        //保证段的数量不超过最大值MAX_SEGMENTS
        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        //计算出一个最小的次方数sshift,保证2的sshift次方>=并发级别(比如传入15,2的3次方不行,则2的4次方可以)。
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;//ssize的终值为段的总数
        }
        //高位的位数
        this.segmentShift = 32 - sshift;
        //段的掩码(hash码共32位)
        this.segmentMask = ssize - 1;//(2^n-1对应二进制肯定全是1)

        ////保证初始容量不超过最大值
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //计算每个段的容量(不是整除,容量+1)
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;

        //保证每个段的容量是2^cap(比c大的最小2^cap)。
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;//cap的终值即为每个段的容量

        // create segments and segments[0]
        //创建并初始化segments[0]
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0);//ordered write of segments[0]
        this.segments = ss;
    }

ConcurrentHashMap在初始化时,如果使用自定义的并发级别,则并发级别的最终取值并不是直接取自定义的原值,而是要经过计算的。计算过程是找到一个最小的指数sshift,使得2^sshift>=concurrencyLevel,比如传入了15,则这个数sshift就是4,那么并发级别也就是16。如果按照移位来理解,也就相当于移动了4位,也就是说偏移量为4。

假设initialCapacity为100,concurrencyLevel为15。
现在来计算整个过程。

        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }

由于15不是一个数的n次方,所以找到一个比15大的最小的数16,所以ssize=16,sshift=4。

        //高位的位数(hash码共32位)
        this.segmentShift = 32 - sshift;
        //段的掩码
        this.segmentMask = ssize - 1;//(2^n-1对应二进制肯定全是1)

由于hash码为32位,所以segmentShift=32-4=28,segmentMask=16-1=15。

        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;

100/16=6余4,所以c=6,因为除不尽,所以c++后c=7

        int cap = MIN_SEGMENT_TABLE_CAPACITY;        
        while (cap < c)
            cap <<= 1;

7不是一个数的n次方,所以找到一个比7大的最小的数8,所以cap=8。
到此为止,就已经确定了段的数量为ssize=16,每个段的容量cap=8,段的偏移值segmentShift=28,段的掩码segmentMask=15。
段的数量确定之后就不再发生变化,即使需要扩容,也不会影响段数组segments,而只会对某一个段进行rehash,而不会影响到其它的段。

接下来就开始初始化第一个段。

        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        //创建分段数组
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); // ordered write of segments[0]

要说明的是,初始化时仅仅只初始化第一个段,其它的段则采取延迟初始化的策略。

put操作

    @SuppressWarnings("unchecked")
    public V put(K key, V value) {
        Segment<K,V> s;
        //不允许value为null
        if (value == null)
            throw new NullPointerException();
        //计算key的hash值
        int hash = hash(key);
        //根据hash、段偏移和段掩码,定位段的位置(index)
        int j = (hash >>> segmentShift) & segmentMask;
        //
        if ((s = (Segment<K,V>)UNSAFE.getObject          // nonvolatile; recheck
             (segments, (j << SSHIFT) + SBASE)) == null) // in ensureSegment
            //非volatile操作,重新检查已确保
            s = ensureSegment(j);
        return s.put(key, hash, value, false);
    }

get操作

    public V get(Object key) {
        Segment<K,V> s; // manually integrate access methods to reduce overhead
        HashEntry<K,V>[] tab;
        //计算key的hash值
        int h = hash(key);

        long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
        if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
            (tab = s.table) != null) {
            for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
                     (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
                 e != null; e = e.next) {
                K k;
                if ((k = e.key) == key || (e.hash == h && key.equals(k)))
                    return e.value;
            }
        }
        return null;
    }

总结

1.ConcurrentHashMap是如何初始化的

仅初始化第一个段,其它段采用延迟初始化策略(插入元素时,段不存在才初始化)。

2.ConcurrentHashMap是如何定位到segment和定位到entry的?

3.ConcurrentHashMap是如何扩容的?

并发级别确定后,就不会再发生变化(final),也就是段的数量就不变了。

4.ConcurrentHashMap与HashMap和同步的Map的区别?

同步Map在同步的时候锁住了所有的段,而ConcurrentHashMap加锁的时候根据散列值锁住了散列值锁对应的分段,锁的粒度更细,因此提高了并发性能

未完待续

转载于:https://my.oschina.net/javandroid/blog/878232

    原文作者:weixin_34015566
    原文地址: https://blog.csdn.net/weixin_34015566/article/details/92464737
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞