02.JUC 集合 - ConcurrentHashMap

基本概念

ConcurrentHashMap 采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个 Map 加锁的设计,分段锁大大的提高了高并发环境下的处理能力。

内部构造

ConcurrentHashMap 实际上是由多个 Segment(分段)组成, Segment 的内部结构与 HashMap 一样,都是由哈希表构成。因此也可以理解为由多个 “HashMap” 构成。

当多个线程对 ConcurrentHashMap 操作时,实际就是操作不同的 Segment,并且它具有特性:

  • 访问不同的 Segment 时,线程之间相互不影响;

  • 访问同一个 Segment 时,需要用锁来保证数据的一致性,即与 HashTable 一样,是线程安全的。

正是由于这个特性,所以 ConcurrentHashMap 支持多个线程对其同时操作,大大地提高了读写性能。明白了这些,再来看看它内部的真正结构。

1.HashEntry

即节点,通过不同节点链接构成了一个单向链表。

 // 键、值,即 <K,V>
final K key;
volatile V value; 

// 哈希值
final int hash; 

// 后继指针
final HashEntry<K,V> next; 

HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
    this.key = key;
    this.hash = hash;
    this.next = next;
    this.value = value;
}

根据代码可以绘制节点结构如下图:

《02.JUC 集合 - ConcurrentHashMap》

2.Segment

即段,内部维护着一个由于多个单向链表组成的数组,实际就是一个哈希表 。该类继承自 ReentrantReadWriteLock ,内部维护着一对读写锁

// 继承自 ReentrantReadWriteLock ,内部维护着一对读写锁
static class Segment<K,V> extends ReentrantReadWriteLock ... {
    // 临界值
    transient int threshold;

    // 加载因子
    final float loadFactor;

    // 由于多个单链表组成的数组,即哈希表
    transient volatile HashEntry<K, V>[] table;

    Segment(int initialCapacity, float lf) {
        loadFactor = lf;
        setTable(HashEntry.<K, V> newArray(initialCapacity));
    }

    void setTable(HashEntry<K, V>[] newTable) {
        threshold = (int) (newTable.length * loadFactor);
        table = newTable;
    }
}

根据代码,可以绘制 Segment 结构如下:

《02.JUC 集合 - ConcurrentHashMap》

3.构造函数

最后再来看看 ConcurrentHashMap 的构成,与开头讲的一样, 它是由多个段构成。具体如下图所示:

《02.JUC 集合 - ConcurrentHashMap》

再来看看它的构造函数:

static final int MAX_SEGMENTS = 1 << 16; 

// 由多个段组成的数组
final Segment<K, V>[] segments;

public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {

    // ①入参校验
    if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0){
        throw new IllegalArgumentException();
    }

    // ② 根据并发确定 segment 的数量,并创建 segment 数组

    if (concurrencyLevel > MAX_SEGMENTS){
        concurrencyLevel = MAX_SEGMENTS;
    }

    // 保证 [segmen 数量]大于[并发量],且该数必须是[符合要求]的[最小的 2 的倍数]
    // 假设 concurrencyLevel = 9,则 ssize = 16
    int sshift = 0;
    int ssize = 1;
    while (ssize < concurrencyLevel) {
        ++sshift;
        ssize <<= 1;
    }

    // 用于哈希计算,找节点 key 的存储位置。
    segmentShift = 32 - sshift;
    segmentMask = ssize - 1;

    // 关键 -> 创建 Segment 数组
    this.segments = Segment.newArray(ssize);


    // ③ 计算初始容量确定每个 segment 的大小,并创建 segment 

    if (initialCapacity > MAXIMUM_CAPACITY){
        initialCapacity = MAXIMUM_CAPACITY;
    }
    int c = initialCapacity / ssize;
    if (c * ssize < initialCapacity){
        ++c;
    }

    // 类似 ssize,必须是大于 c 的最小的 2 的倍数
    int cap = 1;
    while (cap < c){
        cap <<= 1;
    }

    // 初始化(创建)所有的 segment ,并加入 segments
    for (int i = 0; i < this.segments.length; ++i){
        this.segments[i] = createSegment(cap, loadFactor);
    }
}

// 创建 Segment 
protected Segment<K, V> createSegment(int initialCapacity, float lf) {
    return new Segment<K, V>(initialCapacity, lf);
}

// 创建 Segments ,即 Segment 数组,在 Segment 类中定义
@(JUC 集合)SuppressWarnings("unchecked")
static final <K,V> Segment<K,V>[] newArray(int i) {
    return new Segment[i];
}

操作方法

上面提到过对 ConcurrentHashMap 的操作,实际上就是不同的 Segment 的操作。因此该过程可以分为两个步骤:①确定 Segment 的位置 ②对 Segment 进行操作。

实际过程也确实如此,下面来看它的实现过程:

  • 查找指定的 segment,通过 key 的哈希值来确定位置
final Segment<K, V> segmentFor(int hash) {
    return segments[(hash >>> segmentShift) & segmentMask];
}
  • 基本操作,包括 put / set / remove,实际都是通过 segment 来操作
// ①
public V put(K key, V value) {
    // 关键 -> value 不能为空
    if (value == null){
        // 抛出异常...
    }
    int hash = hash(key.hashCode());
    return segmentFor(hash).put(key, hash, value, false);
}

// ②
public V get(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).get(key, hash);
}

// ③
public V remove(Object key) {
    int hash = hash(key.hashCode());
    return segmentFor(hash).remove(key, hash, null);
}

观察代码再次验证了对 ConcurrentHashMap 操作实际就是对不同的 Segment 进行操作。下面来看 Segment 操作的具体实现。

1.Segment.put

 transient volatile int count;

V put(K key, int hash, V value, boolean onlyIfAbsent) {
    // 加锁
    writeLock().lock();
    try {

        // 判断容量是否达到临界值,count 表示 segment 存储的所有节点数量
        int c = count;
        if (c++ > threshold){
            // segment 扩容操作,最后再分析
            rehash();
        }

        // 计算节点在 segment 内部哈希表的位置 
        HashEntry<K, V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K, V> first = tab[index];
        HashEntry<K, V> e = first;

        // 跳出循环情况:
        // ①e = null ,未找到匹配节点
        // ②匹配到不为 null 的节点时 
        while (e != null && (e.hash != hash || !key.equals(e.key))){
            e = e.next;
        }

        V oldValue;

        // 存在匹配节点替换值
        if (e != null) {
            oldValue = e.value;
            if (!onlyIfAbsent){
                e.value = value;
            }
        } else { // 没有匹配,添加新节点到哈希表指定位置的链表头
            oldValue = null;
            ++modCount;
            tab[index] = new HashEntry<K, V>(key, hash, first, value);
            count = c; 
        }
        return oldValue;
    } finally {
        writeLock().unlock();
    }
}

2.Segment.get

V get(Object key, int hash) {
    readLock().lock();
    try {
        // 判断 segment 内部哈希表是否为空
        if (count != 0) { 
            // 遍历哈希表指定位置的链表
            HashEntry<K, V> e = getFirst(hash);
            while (e != null) {
                if (e.hash == hash && key.equals(e.key)) {
                    return e.value;
                }
                e = e.next;
            }
        }
        return null;
    } finally {
        readLock().unlock();
    }
}

// 找到哈希表指定位置的链表头
HashEntry<K, V> getFirst(int hash) {
    HashEntry<K, V>[] tab = table;
    return tab[hash & (tab.length - 1)];
}

3.Segment.remove

V remove(Object key, int hash, Object value) {
    writeLock().lock();
    try {
        int c = count - 1;

        // 遍历 segment 所有节点,寻找匹配的节点
        HashEntry<K,V>[] tab = table;
        int index = hash & (tab.length - 1);
        HashEntry<K,V> first = tab[index];
        HashEntry<K,V> e = first;
        while (e != null && (e.hash != hash || !key.equals(e.key))){
            e = e.next;
        }

        V oldValue = null;

        // 不为 null,说明存在匹配节点
        if (e != null) {
            V v = e.value;
            if (value == null || value.equals(v)) {
                oldValue = v;
                ++modCount;

                // 关键 -> [头节点] 到 e ,逐个修改连接到 e.next
                // 假设链表为 1->2->3->4->5,删除 3 后,链表为 2->1->4->5
                HashEntry<K,V> newFirst = e.next;
                for (HashEntry<K,V> p = first; p != e; p = p.next){
                    newFirst = relinkHashEntry(p, newFirst);
                }
                tab[index] = newFirst;
                count = c; 
            }
        }
        return oldValue;
    } finally {
        writeLock().unlock();
    }
}

// 重新链接节点
protected HashEntry<K, V> relinkHashEntry(HashEntry<K, V> e, HashEntry<K, V> next) {
    return new HashEntry<K, V>(e.key, e.hash, next, e.value);
}

4.Segment.rehash

void rehash() {
    HashEntry<K, V>[] oldTable = table;
    int oldCapacity = oldTable.length;
    if (oldCapacity >= MAXIMUM_CAPACITY){
        return;
    }

    // 扩容 table 容量(2 倍扩充)
    HashEntry<K, V>[] newTable = HashEntry.newArray(oldCapacity << 1);
    threshold = (int) (newTable.length * loadFactor);

    int sizeMask = newTable.length - 1;

    // 遍历 segment 内哈希表上的所有链表
    for (int i = 0; i < oldCapacity; i++) {
        HashEntry<K, V> e = oldTable[i];

        // 表示该位置存在链表
        if (e != null) {
            HashEntry<K, V> next = e.next;

            // 关键 -> 重新计算在新哈希表的位置
            int idx = e.hash & sizeMask;

            // 判断链表是不是只有一个节点
            if (next == null){
                // 是的话,设置新哈希表指定位置的链表头
                newTable[idx] = e;

            }else { // 关键 -> 不是,则重用部分链表节点,提高效率 
                HashEntry<K, V> lastRun = e;
                int lastIdx = idx;
                for (HashEntry<K, V> last = next; last != null; last = last.next) {
                    int k = last.hash & sizeMask;
                    if (k != lastIdx) {
                        lastIdx = k;
                        lastRun = last;
                    }
                }

                // 找到可重用部分的链表节点,从该节点开始链表结束的部分就直接转移过来了。 
                newTable[lastIdx] = lastRun;

                // 从链表头节点 - 该节点的部分需要重新添加到新哈希表的位置
                for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
                    int k = p.hash & sizeMask;
                    HashEntry<K, V> n = newTable[k];
                    newTable[k] = relinkHashEntry(p, n);
                }
            }
        }
    }
    table = newTable;
}

关于重用部分链表节点的实现原理。

  • 假设现在存在旧哈希表的指定位置上存在链表为 1->2->3->4->5;

  • 现在执行扩容操作后,重新计算节点位置。节点 1 的位置为新哈希表的[index1];节点2 的新位置为 [index2];节点 3、4、5 的新位置为[lastIndx]。

  • 那么可重用的部分为节点 3、4、5 ,只需将节点 3 转移到新位置即可。然后再对 1、2 节点重新链接。

可以看到rehash过程的主要思路就是找到重用点,即节点 3,也就是图中的红色节点,途中虚线框中的节点构成了可重用的节点链表。

《02.JUC 集合 - ConcurrentHashMap》

    原文作者:JUC
    原文地址: https://blog.csdn.net/u012420654/article/details/56669282
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞