Jdk1.6 JUC源码解析(25)-ConcurrentHashMap
作者:大飞
功能简介:
- ConcurrentHashMap是一种线程安全的HashMap。相对于HashTable和Collections.synchronizedMap(),ConcurrentHashMap具有更好的性能和伸缩性,是由于其使用了分段锁的策略,将内部数据分为多个段,每个段单独加锁,而不是整个HashMap加锁,这样能减少很多不必要的锁争用。
源码分析:
- ConcurrentHashMap实现了ConcurrentMap接口,先简单了解下这个接口:
public interface ConcurrentMap<K, V> extends Map<K, V> {
/**
* 如果map中已经存在给定的key,返回map中key对应的value;
* 如果不存在给定的key,插入给定的key和value。
* 这个是一个原子操作,逻辑相当于:
* if (!map.containsKey(key))
* return map.put(key, value);
* else
* return map.get(key);
*/
V putIfAbsent(K key, V value);
/**
* 如果map中存在给定的key,并且map中对应的value也等于给定的value,
* 那么删除这个key和value。
* 这是一个原子操作,逻辑相当于:
* if (map.containsKey(key) && map.get(key).equals(value)) {
* map.remove(key);
* return true;
* } else return false;
*/
boolean remove(Object key, Object value);
/**
* 如果map中存在给定的key,并且map中对应的value也等于给定的oldValue,
* 那么将这个key对应的value替换成newValue。
* 这是一个原子操作,逻辑相当于:
* if (map.containsKey(key) && map.get(key).equals(oldValue)) {
* map.put(key, newValue);
* return true;
* } else return false;
*/
boolean replace(K key, V oldValue, V newValue);
/**
* 如果map中已经存在给定的key,
* 那么将这个key对应的value替换成给定的value。
* 这是一个原子操作,逻辑相当于:
* if (map.containsKey(key)) {
* return map.put(key, value);
* } else return null;
*/
V replace(K key, V value);
}
ConcurrentMap扩展了Map接口,定义了上面4个原子操作方法。
- 接下来看下ConcurrentHashMap的内部结构:
/**
* segment数组, 每一个segment都是一个hash table。
*/
final Segment<K,V>[] segments;
重点看下segment的实现吧,首先看数据结构:
static final class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
/**
* 记录segment(哈希表)中的元素数量。
* 另一个重要角色就是其他操作会利用count的volatile读写来保证可见性,避免使用锁。
*/
transient volatile int count;
/**
* 统计跟踪修改,用来保证一些批量操作的一致性。
* 比如统计所有segment元素个数时,如果统计过程发现modCount变化
* 那么需要重试。
*/
transient int modCount;
/**
* 当哈希表的容量超过了这个阀值,表会扩容,里面的元素会重新散列。
* 这个值一般是:capacity * loadFactor
*/
transient int threshold;
/**
* 存放数组的哈希表。
*/
transient volatile HashEntry<K,V>[] table;
/**
* 哈希表的加载因子。
* @serial
*/
final float loadFactor;
再看下segment的构造方法:
Segment(int initialCapacity, float lf) {
loadFactor = lf;
setTable(HashEntry.<K,V>newArray(initialCapacity));
}
void setTable(HashEntry<K,V>[] newTable) {
threshold = (int)(newTable.length * loadFactor);
table = newTable;
}
static final class HashEntry<K,V> {
final K key;
final int hash;
volatile V value;
final HashEntry<K,V> next;
HashEntry(K key, int hash, HashEntry<K,V> next, V value) {
this.key = key;
this.hash = hash;
this.next = next;
this.value = value;
}
@SuppressWarnings("unchecked")
static final <K,V> HashEntry<K,V>[] newArray(int i) {
return new HashEntry[i];
}
}
可见,构造segment时需要一个初始容量和一个加载因子,segment内部会创建一个长度为初始容量大小的HashEntry数组。
如果对哈希表数据结构比较熟悉的话会知道,哈希表内部一般会有初始容量ic和加载因子lf,当哈希表中的元素数量达到(ic * lf)的时候,就会触发哈希表进行rehash。这有什么影响呢?假设哈希表使用链表法来解决哈希冲突,那么如果加载因子太大,会导致哈希表中每个桶里面的链表平均长度过长,这样会影响查询性能;但如果加载因子过小,又会浪费太多内存空间。所以也是一种时间和空间的权衡,需要按实际情况来选择合适的加载因子。 最后看下ConcurrentHashMap的构造方法:
/* ---------------- Constants -------------- */
//默认segment中hashTable长度。
static final int DEFAULT_INITIAL_CAPACITY = 16;
//默认加载因子。
static final float DEFAULT_LOAD_FACTOR = 0.75f;
//默认table的并发级别,其实就是segment数组长度。
static final int DEFAULT_CONCURRENCY_LEVEL = 16;
//table的最大容量。
static final int MAXIMUM_CAPACITY = 1 << 30;
//允许的最大的segment数组长度。
static final int MAX_SEGMENTS = 1 << 16;
/**
* 在size和containsValue方法中,加锁之前的尝试操作次数。
*/
static final int RETRIES_BEFORE_LOCK = 2;
/* ---------------- Fields -------------- */
/**
* 计算segment下标的掩码。一个key的hash code高位(由segmentShift确定)用来确定segment下标。
*/
final int segmentMask;
/**
* segment下标的位移值。
*/
final int segmentShift;
public ConcurrentHashMap(int initialCapacity,
float loadFactor, int concurrencyLevel) {
if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
throw new IllegalArgumentException();
if (concurrencyLevel > MAX_SEGMENTS)
concurrencyLevel = MAX_SEGMENTS; //concurrencyLevel不能超过最大值
// Find power-of-two sizes best matching arguments
int sshift = 0;
int ssize = 1;
while (ssize < concurrencyLevel) {
++sshift;
ssize <<= 1; //ssize最后是比concurrencyLevel大的最小的2的幂。
}
/*
* 假设传入的concurrencyLevel是50,
* 那么ssize就是64,sshift就是6,segmentMask就是 00000000 00000000 00000000 00111111*/
segmentShift = 32 - sshift;
segmentMask = ssize - 1;
this.segments = Segment.newArray(ssize);
if (initialCapacity > MAXIMUM_CAPACITY)
initialCapacity = MAXIMUM_CAPACITY;
int c = initialCapacity / ssize;
if (c * ssize < initialCapacity)
++c;
int cap = 1;
while (cap < c)
cap <<= 1; //cap其实就是比总体容量平均分到每个segment的数量大的最小的2的幂...有点绕,
for (int i = 0; i < this.segments.length; ++i)
this.segments[i] = new Segment<K,V>(cap, loadFactor); //把segment都初始化一下。
}
public ConcurrentHashMap(int initialCapacity, float loadFactor) {
this(initialCapacity, loadFactor, DEFAULT_CONCURRENCY_LEVEL);
}
public ConcurrentHashMap(int initialCapacity) {
this(initialCapacity, DEFAULT_LOAD_FACTOR, DEFAULT_CONCURRENCY_LEVEL);
}
可以看到构造方法中计算了几个重要的数:segment掩码、segment位移值、segment数组长度和segment内部哈希表容量,注意后两个都是2的幂,想到了什么? a & (b – 1)吧哈哈。 这里做个小结:ConcurrentMap内部包含一个segment的数组;而segment本身又是一个哈希表,并且自带锁;内部哈希表使用链表法解决哈希冲突,每个数组元素是一个单链表。
- 现在从插入和获取操作切入,来理解源码。先看下插入操作:
public V put(K key, V value) {
if (value == null)
throw new NullPointerException();
int hash = hash(key.hashCode());
return segmentFor(hash).put(key, hash, value, false);
}
注意到,put过程中,首先要根据key的hashCode,再次算一个hash值出来;其次是要根据这个hash值来确定一个segment,然后把key-value存到这个segment里面。
/**
* Applies a supplemental hash function to a given hashCode, which
* defends against poor quality hash functions. This is critical
* because ConcurrentHashMap uses power-of-two length hash tables,
* that otherwise encounter collisions for hashCodes that do not
* differ in lower or upper bits.
*/
private static int hash(int h) {
// Spread bits to regularize both segment and index locations,
// using variant of single-word Wang/Jenkins hash.
h += (h << 15) ^ 0xffffcd7d;
h ^= (h >>> 10);
h += (h << 3);
h ^= (h >>> 6);
h += (h << 2) + (h << 14);
return h ^ (h >>> 16);
}
/**
* Returns the segment that should be used for key with given hash
* @param hash the hash code for the key
* @return the segment
*/
final Segment<K,V> segmentFor(int hash) {
return segments[(hash >>> segmentShift) & segmentMask];
}
首先看下这个hash算法,它相当于在key本身的hashCode上做了加强,再次hash一次,使得hash值更加散列。这样做的原因是因为ConcurrentHashMap中哈希表的长度都是2的幂,会增加一些冲突几率,比如两个hashCode高位不同但低位相同,对哈希表长度取模时正好忽略了这些高位,造成冲突。这里是采用了Wang/Jenkins哈希算法的一个变种,更多相关信息可以google之。
接下来是确定segment的步骤,在上面ConcurrentHashMap的构造方法中我们看到,sshift和segmentMask有个关系,如果sshift=6,那么segmentMask后面就有6个为1的bit。 其实这里是用hash值除去低sshift位剩余的高位,来确定segment的下标。 定位到了segment,继续看segment中怎么put元素的:
V put(K key, int hash, V value, boolean onlyIfAbsent) {
lock();// 加锁
try {
int c = count;
if (c++ > threshold) // ensure capacity
rehash(); // 如果添加一个元素后,超过扩容阀值,那么进行rehash。
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1); // 对hash取模算出key对应的哈希表的桶的下标。
HashEntry<K,V> first = tab[index]; // 找出桶的第一个节点
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next; // 遍历一下桶里的单链表,看看有没有相同的。
V oldValue;
if (e != null) {
// 如果找到了相同的,记录旧值
oldValue = e.value;
if (!onlyIfAbsent) // 并且有覆盖标识
e.value = value; // 那么覆盖这个值
}
else {
// 如果没找到相同的。
oldValue = null;
++modCount; // 因为会改变哈希表元素个数,所以modCount累加。
// 将元素设置为桶内新的第一个节点。
tab[index] = new HashEntry<K,V>(key, hash, first, value);
count = c; // 注意这里做了一个volatile写。
}
return oldValue; // 返回旧值。
} finally {
unlock(); // 解锁。
}
}
代码也比较容易理解,注意有rehash的情况,看下rehash方法:
void rehash() {
HashEntry<K,V>[] oldTable = table;
int oldCapacity = oldTable.length;
if (oldCapacity >= MAXIMUM_CAPACITY)
return; //不能超过最大容量。
/*
* 将哈希表中所有桶里的节点重新分配到新哈希表中。
* 由于使用的容量是2的幂,所有一部分节点会分配到新哈希表中相同
* 下标的桶里,这样我们就可以重用这些节点,而无需重新创建。
* 按照统计数据,在默认的加载因子下,大约只有六分之一的节点在
* 哈希表扩容的时候需要拷贝(重新创建对象)。
*/
HashEntry<K,V>[] newTable = HashEntry.newArray(oldCapacity<<1);
threshold = (int)(newTable.length * loadFactor);
int sizeMask = newTable.length - 1;
for (int i = 0; i < oldCapacity ; i++) {
HashEntry<K,V> e = oldTable[i];
if (e != null) {
HashEntry<K,V> next = e.next;
int idx = e.hash & sizeMask;
if (next == null)
newTable[idx] = e; // 链表上唯一的节点,直接复制到新table。
else {
// 重用从尾部往前能定位到新table中同一个桶的,最长的连续节点。
HashEntry<K,V> lastRun = e;
int lastIdx = idx;
for (HashEntry<K,V> last = next;
last != null;
last = last.next) {
int k = last.hash & sizeMask;
if (k != lastIdx) {
lastIdx = k;
lastRun = last;
}
}
newTable[lastIdx] = lastRun;
// 其他的节点就copy过去了。
for (HashEntry<K,V> p = e; p != lastRun; p = p.next) {
int k = p.hash & sizeMask;
HashEntry<K,V> n = newTable[k];
newTable[k] = new HashEntry<K,V>(p.key, p.hash,
n, p.value);
}
}
}
}
table = newTable;
}
put的实现看完了,继续看下从ConcurrentHashMap中get的实现:
public V get(Object key) {
int hash = hash(key.hashCode());
return segmentFor(hash).get(key, hash);
}
流程还是先算hash值,然后确定到segment,然后调用segment的get方法:
V get(Object key, int hash) {
if (count != 0) { // 这里做一个volatile读
HashEntry<K,V> e = getFirst(hash); //获取相应桶的第一个节点。
while (e != null) {
if (e.hash == hash && key.equals(e.key)) {
V v = e.value;
if (v != null)
return v; //如果是相同的key,返回value。
return readValueUnderLock(e); // recheck
}
e = e.next;
}
}
return null;
}
HashEntry<K,V> getFirst(int hash) {
HashEntry<K,V>[] tab = table;
return tab[hash & (tab.length - 1)];
}
/**
* 加锁读value。如果value为nul的情况下调用这个方法。
* 只有在编译器将HashEntry的初始化和其赋值给table的指令重排才会
* 出现这种情况,这在内存模型下是合法的,但从没发生过。
*/
V readValueUnderLock(HashEntry<K,V> e) {
lock();
try {
return e.value;
} finally {
unlock();
}
}
- 理解了put和get过程,其他方法也很好理解了:
boolean containsKey(Object key, int hash) {
if (count != 0) { // read-volatile
HashEntry<K,V> e = getFirst(hash);
while (e != null) {
if (e.hash == hash && key.equals(e.key))
return true;
e = e.next;
}
}
return false;
}
V remove(Object key, int hash, Object value) {
lock();
try {
int c = count - 1;
HashEntry<K,V>[] tab = table;
int index = hash & (tab.length - 1);
HashEntry<K,V> first = tab[index];
HashEntry<K,V> e = first;
while (e != null && (e.hash != hash || !key.equals(e.key)))
e = e.next;
V oldValue = null;
if (e != null) {
V v = e.value;
if (value == null || value.equals(v)) {
oldValue = v;
// All entries following removed node can stay
// in list, but all preceding ones need to be
// cloned.
++modCount;
HashEntry<K,V> newFirst = e.next;
for (HashEntry<K,V> p = first; p != e; p = p.next)
newFirst = new HashEntry<K,V>(p.key, p.hash,
newFirst, p.value);
tab[index] = newFirst;
count = c; // write-volatile
}
}
return oldValue;
} finally {
unlock();
}
}
我们会发现所有的写操作最后都会写一下count,而且所有的读操作最前面都会读一下count,由于count是volatile修饰的,所以这样相当于加了内存屏障(volatile写和后面的volatile读不能重排),保证了读操作能够看到最新的写的变化。
以上理解有偏差,感谢@不待人亲指正。
仔细看源码注释发现:
All (synchronized) write operations should write to the “count” field after structurally changing any bin.
也就是说只有bin(HashEntry链)的结构变化之后才会写count(覆盖的情况不会写count)。
所以这里纠正一下:所有改变bin结构的写操作都会写一下count,可以保证HashEntry的可见性(因为无论是添加还是删除,bin起始的HashEntry都会发生变化,由于HashEntry的next域是不变的,所以删除时需要将目标HashEntry之前的Entry都拷贝一下)。
而覆盖旧值的情况下不会写count,因为HashEntry的value本身也是volatile的,可以保证自身的可见性。
- 我们上面还看到了有一个RETRIES_BEFORE_LOCK值,看看这个值起什么作用:
public int size() {
final Segment<K,V>[] segments = this.segments;
long sum = 0;
long check = 0;
int[] mc = new int[segments.length];
// Try a few times to get accurate count. On failure due to
// continuous async changes in table, resort to locking.
for (int k = 0; k < RETRIES_BEFORE_LOCK; ++k) {
check = 0;
sum = 0;
int mcsum = 0;
for (int i = 0; i < segments.length; ++i) {
sum += segments[i].count;
mcsum += mc[i] = segments[i].modCount;
}
if (mcsum != 0) {
for (int i = 0; i < segments.length; ++i) {
check += segments[i].count;
if (mc[i] != segments[i].modCount) {
check = -1; // force retry
break;
}
}
}
if (check == sum)
break;
}
if (check != sum) { // Resort to locking all segments
sum = 0;
for (int i = 0; i < segments.length; ++i)
segments[i].lock();
for (int i = 0; i < segments.length; ++i)
sum += segments[i].count;
for (int i = 0; i < segments.length; ++i)
segments[i].unlock();
}
if (sum > Integer.MAX_VALUE)
return Integer.MAX_VALUE;
else
return (int)sum;
}
上面size的过程就是,累加所有segment中的count,如果过程中segment中元素数量发生了变化,那么重试。如果重试了RETRIES_BEFORE_LOCK次(默认是2)都不行,那么将所有segment加锁,然后累加count,然后再解锁。在containsValue里面也是这么玩儿的,代码就不贴了。
- 其他代码也很容易看懂了,就分析到这里。最后注意下,ConcurrentHashMap也提供了Key和Value的集合视图,它们和ConcurrentHashMap共享一份数据,它们的迭代器是弱一致的。
ConcurrentHashMap的代码解析完毕! 参见:
Jdk1.6 JUC源码解析(7)-locks-ReentrantLock