基本概念
ConcurrentHashMap 采用了分段锁的设计,只有在同一个分段内才存在竞态关系,不同的分段锁之间没有锁竞争。相比于对整个 Map 加锁的设计,分段锁大大的提高了高并发环境下的处理能力。
内部构造
ConcurrentHashMap 实际上是由多个 Segment(分段)组成, Segment 的内部结构与 HashMap 一样,都是由哈希表构成。因此也可以理解为由多个 “HashMap” 构成。
当多个线程对 ConcurrentHashMap 操作时,实际就是操作不同的 Segment,并且它具有特性:
访问不同的 Segment 时,线程之间相互不影响;
访问同一个 Segment 时,需要用锁来保证数据的一致性,即与 HashTable 一样,是线程安全的。
正是由于这个特性,所以 ConcurrentHashMap 支持多个线程对其同时操作,大大地提高了读写性能。明白了这些,再来看看它内部的真正结构。
1.HashEntry
即节点,通过不同节点链接构成了一个单向链表。
// 键、值,即 <K,V>
final K key;
volatile V value;
// 哈希值
final int hash;
// 后继指针
final HashEntry<K,V> next;
HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
根据代码可以绘制节点结构如下图:
2.Segment
即段,内部维护着一个由于多个单向链表组成的数组,实际就是一个哈希表 。该类继承自 ReentrantReadWriteLock ,内部维护着一对读写锁。
// 继承自 ReentrantReadWriteLock ,内部维护着一对读写锁
static class Segment<K,V> extends ReentrantReadWriteLock ... {
// 临界值
transient int threshold;
// 加载因子
final float loadFactor;
// 由于多个单链表组成的数组,即哈希表
transient volatile HashEntry<K, V>[] table;
Segment(int initialCapacity, float lf) {
loadFactor = lf;
setTable(HashEntry.<K, V> newArray(initialCapacity));
}
void setTable(HashEntry<K, V>[] newTable) {
threshold = (int) (newTable.length * loadFactor);
table = newTable;
}
}
根据代码,可以绘制 Segment 结构如下:
3.构造函数
最后再来看看 ConcurrentHashMap 的构成,与开头讲的一样, 它是由多个段构成。具体如下图所示:
再来看看它的构造函数:
static final int MAX_SEGMENTS = 1 << 16;
// 由多个段组成的数组
final Segment<K, V>[] segments;
public ConcurrentHashMap(int initialCapacity, float loadFactor, int concurrencyLevel) {
// ①入参校验
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0){
throw new IllegalArgumentException();
}
// ② 根据并发确定 segment 的数量,并创建 segment 数组
if (concurrencyLevel > MAX_SEGMENTS){
concurrencyLevel = MAX_SEGMENTS;
}
// 保证 [segmen 数量]大于[并发量],且该数必须是[符合要求]的[最小的 2 的倍数]
// 假设 concurrencyLevel = 9,则 ssize = 16
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1;
}
// 用于哈希计算,找节点 key 的存储位置。
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
// 关键 -> 创建 Segment 数组
this.segments = Segment.newArray(ssize);
// ③ 计算初始容量确定每个 segment 的大小,并创建 segment
if (initialCapacity > MAXIMUM_CAPACITY){
initialCapacity = MAXIMUM_CAPACITY;
}
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity){
++c;
}
// 类似 ssize,必须是大于 c 的最小的 2 的倍数
int cap = 1;
while (cap < c){
cap <<= 1;
}
// 初始化(创建)所有的 segment ,并加入 segments
for (int i = 0; i < this.segments.length; ++i){
this.segments[i] = createSegment(cap, loadFactor);
}
}
// 创建 Segment
protected Segment<K, V> createSegment(int initialCapacity, float lf) {
return new Segment<K, V>(initialCapacity, lf);
}
// 创建 Segments ,即 Segment 数组,在 Segment 类中定义
@(JUC 集合)SuppressWarnings("unchecked")
static final <K,V> Segment<K,V>[] newArray(int i) {
return new Segment[i];
}
操作方法
上面提到过对 ConcurrentHashMap 的操作,实际上就是不同的 Segment 的操作。因此该过程可以分为两个步骤:①确定 Segment 的位置 ②对 Segment 进行操作。
实际过程也确实如此,下面来看它的实现过程:
- 查找指定的 segment,通过 key 的哈希值来确定位置
final Segment<K, V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
- 基本操作,包括 put / set / remove,实际都是通过 segment 来操作
// ①
public V put(K key, V value) {
// 关键 -> value 不能为空
if (value == null){
// 抛出异常...
}
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false);
}
// ②
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
// ③
public V remove(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).remove(key, hash, null);
}
观察代码再次验证了对 ConcurrentHashMap 操作实际就是对不同的 Segment 进行操作。下面来看 Segment 操作的具体实现。
1.Segment.put
transient volatile int count;
V put(K key, int hash, V value, boolean onlyIfAbsent) {
// 加锁
writeLock().lock();
try {
// 判断容量是否达到临界值,count 表示 segment 存储的所有节点数量
int c = count;
if (c++ > threshold){
// segment 扩容操作,最后再分析
rehash();
}
// 计算节点在 segment 内部哈希表的位置
HashEntry<K, V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K, V> first = tab[index];
HashEntry<K, V> e = first;
// 跳出循环情况:
// ①e = null ,未找到匹配节点
// ②匹配到不为 null 的节点时
while (e != null && (e.hash != hash || !key.equals(e.key))){
e = e.next;
}
V oldValue;
// 存在匹配节点替换值
if (e != null) {
oldValue = e.value;
if (!onlyIfAbsent){
e.value = value;
}
} else { // 没有匹配,添加新节点到哈希表指定位置的链表头
oldValue = null;
++modCount;
tab[index] = new HashEntry<K, V>(key, hash, first, value);
count = c;
}
return oldValue;
} finally {
writeLock().unlock();
}
}
2.Segment.get
V get(Object key, int hash) {
readLock().lock();
try {
// 判断 segment 内部哈希表是否为空
if (count != 0) {
// 遍历哈希表指定位置的链表
HashEntry<K, V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
return e.value;
}
e = e.next;
}
}
return null;
} finally {
readLock().unlock();
}
}
// 找到哈希表指定位置的链表头
HashEntry<K, V> getFirst(int hash) {
HashEntry<K, V>[] tab = table;
return tab[hash & (tab.length - 1)];
}
3.Segment.remove
V remove(Object key, int hash, Object value) {
writeLock().lock();
try {
int c = count - 1;
// 遍历 segment 所有节点,寻找匹配的节点
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key))){
e = e.next;
}
V oldValue = null;
// 不为 null,说明存在匹配节点
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
++modCount;
// 关键 -> [头节点] 到 e ,逐个修改连接到 e.next
// 假设链表为 1->2->3->4->5,删除 3 后,链表为 2->1->4->5
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next){
newFirst = relinkHashEntry(p, newFirst);
}
tab[index] = newFirst;
count = c;
}
}
return oldValue;
} finally {
writeLock().unlock();
}
}
// 重新链接节点
protected HashEntry<K, V> relinkHashEntry(HashEntry<K, V> e, HashEntry<K, V> next) {
return new HashEntry<K, V>(e.key, e.hash, next, e.value);
}
4.Segment.rehash
void rehash() {
HashEntry<K, V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY){
return;
}
// 扩容 table 容量(2 倍扩充)
HashEntry<K, V>[] newTable = HashEntry.newArray(oldCapacity << 1);
threshold = (int) (newTable.length * loadFactor);
int sizeMask = newTable.length - 1;
// 遍历 segment 内哈希表上的所有链表
for (int i = 0; i < oldCapacity; i++) {
HashEntry<K, V> e = oldTable[i];
// 表示该位置存在链表
if (e != null) {
HashEntry<K, V> next = e.next;
// 关键 -> 重新计算在新哈希表的位置
int idx = e.hash & sizeMask;
// 判断链表是不是只有一个节点
if (next == null){
// 是的话,设置新哈希表指定位置的链表头
newTable[idx] = e;
}else { // 关键 -> 不是,则重用部分链表节点,提高效率
HashEntry<K, V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K, V> last = next; last != null; last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
// 找到可重用部分的链表节点,从该节点开始链表结束的部分就直接转移过来了。
newTable[lastIdx] = lastRun;
// 从链表头节点 - 该节点的部分需要重新添加到新哈希表的位置
for (HashEntry<K, V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K, V> n = newTable[k];
newTable[k] = relinkHashEntry(p, n);
}
}
}
}
table = newTable;
}
关于重用部分链表节点的实现原理。
假设现在存在旧哈希表的指定位置上存在链表为 1->2->3->4->5;
现在执行扩容操作后,重新计算节点位置。节点 1 的位置为新哈希表的[index1];节点2 的新位置为 [index2];节点 3、4、5 的新位置为[lastIndx]。
那么可重用的部分为节点 3、4、5 ,只需将节点 3 转移到新位置即可。然后再对 1、2 节点重新链接。
可以看到rehash过程的主要思路就是找到重用点,即节点 3,也就是图中的红色节点,途中虚线框中的节点构成了可重用的节点链表。