前言 面试中常常问到Hashtable、HashMap和ConcurrentHashMap的区别。大家都知道HashMap是线程不安全的,
Hashtable和
ConcurrentHashMap是线程安全的。
Hashtable保证线程安全的方法,基本都是在操作集合的方法上加synchronized关键字,我们有必要知道
ConcurrentHashMap底层实现和如何保证线程安全性。
ConcurrentHashMap
引入了分段锁的概念,对于不同Bucket的操作不需要全局锁来保证线程安全。
建议学习
ConcurrentHashMap之前,先学习Hashtable和
HashMap,并且要有一些并发的基础。
在看源码的时候一定要注意,本文讲的是JDK1.8的源码,ConcurrentHashMap在JDK1.8相比之前的版本有很大变化,1.8版本的源码达到6000+行,而1.6的源码只有一千多行。
在JDK1.6的实现中,使用的是一个segments数组来存储。
final Segment<K,V>[] segments;
JDK1.8中是使用Node数组来存储。
transient volatile Node<K,V>[] table;
JDK1.8中,Segment类只有在序列化和反序列化时才会被用到。
ConcurrentHashMap数据结构
其底层数据结构实现如下。
可以看到,桶中的结构可能是链表,也可能是红黑树,红黑树是为了提高查找效率。
ConcurrentHashMap的继承关系
public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
implements ConcurrentMap<K,V>, Serializable
其内部框架图如下。
ConcurrentHashMap为什么高效? Hashtable低效主要是因为所有访问Hashtable的线程都争夺一把锁。如果容器有很多把锁,每一把锁控制容器中的一部分数据,那么当多个线程访问容器里的不同部分的数据时,线程之前就不会存在锁的竞争,这样就可以有效的提高并发的访问效率。 这也正是ConcurrentHashMap使用的分段锁技术。将ConcurrentHashMap容器的数据分段存储,每一段数据分配一个Segment(锁),当线程占用其中一个Segment时,其他线程可正常访问其他段数据。
static class Segment<K,V> extends ReentrantLock implements Serializable {
private static final long serialVersionUID = 2249069246763182397L;
final float loadFactor;
Segment(float lf) { this.loadFactor = lf; }
}
ConcurrentHashMap的用到的重要内部类
Node类
该类是此集合最重要的一个内部类。只要是插入ConcurrentHashMap的结点,都会被包装成Node
static class Node<K,V> implements Map.Entry<K,V> {
final int hash;
final K key;
volatile V val;
volatile Node<K,V> next;
Node(int hash, K key, V val, Node<K,V> next) {
this.hash = hash;
this.key = key;
this.val = val;
this.next = next;
}
public final K getKey() { return key; }
public final V getValue() { return val; }
public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
public final String toString(){ return key + "=" + val; }
public final V setValue(V value) {
throw new UnsupportedOperationException(); //不允许被修改val
}
public final boolean equals(Object o) {
Object k, v, u; Map.Entry<?,?> e;
return ((o instanceof Map.Entry) &&
(k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
(v = e.getValue()) != null &&
(k == key || k.equals(key)) &&
(v == (u = val) || v.equals(u)));
}
/**
* 支持map的get方法,通过hashcode和Key来获取一个node结点
*/
Node<K,V> find(int h, Object k) {
Node<K,V> e = this;
if (k != null) {
do {
K ek;
if (e.hash == h &&
((ek = e.key) == k || (ek != null && k.equals(ek))))
return e;
} while ((e = e.next) != null);
}
return null;
}
}
TreeNode类
static final class TreeNode<K,V> extends Node<K,V> {
TreeNode<K,V> parent; // red-black tree links
TreeNode<K,V> left;
TreeNode<K,V> right;
TreeNode<K,V> prev; // needed to unlink next upon deletion
boolean red;
TreeNode(int hash, K key, V val, Node<K,V> next,
TreeNode<K,V> parent) {
super(hash, key, val, next);
this.parent = parent;
}
Node<K,V> find(int h, Object k) {
return findTreeNode(h, k, null);
}
/**
* Returns the TreeNode (or null if not found) for the given key
* starting at given root.
*/
final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
if (k != null) {
TreeNode<K,V> p = this;
do {
int ph, dir; K pk; TreeNode<K,V> q;
TreeNode<K,V> pl = p.left, pr = p.right;
if ((ph = p.hash) > h)
p = pl;
else if (ph < h)
p = pr;
else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
return p;
else if (pl == null)
p = pr;
else if (pr == null)
p = pl;
else if ((kc != null ||
(kc = comparableClassFor(k)) != null) &&
(dir = compareComparables(kc, k, pk)) != 0)
p = (dir < 0) ? pl : pr;
else if ((q = pr.findTreeNode(h, k, kc)) != null)
return q;
else
p = pl;
} while (p != null);
}
return null;
}
}
如果链表的数据过长是会转换为红黑树来处理。当它并不是直接转换,而是将这些链表的节点包装成TreeNode放在TreeBin对象中,然后由TreeBin完成红黑树的转换。TreeBin类的代码很长,其实
TreeBin的构造方法就是一个建立红黑树的过程。
ConcurrentHashMap的初始化
ConcurrentHashMap的table初始化是在putVal
时发生的。
putVal时,如果发现table没有初始化,那么会调用
initTable来初始化table,注意
initTable在扩容时也会被执行
。
private final Node<K,V>[] initTable() {
Node<K,V>[] tab; int sc;
while ((tab = table) == null || tab.length == 0) {
if ((sc = sizeCtl) < 0) //sizeCtl为负数时,说明有其他线程在初始化
Thread.yield(); //让出时间片
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //把
SIZECTL置为-1,接下来本线程对table进行操作try {
if ((tab = table) == null || tab.length == 0) {
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
table = tab = nt;
sc = n - (n >>> 2); //下一次扩容的阀值,大小为0.75*n
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
上面的代码中,sizeCtl是一个比较重要的变量。
/**
* Table initialization and resizing control. When negative, the
* table is being initialized or resized: -1 for initialization,
* else -(1 + the number of active resizing threads). Otherwise,
* when table is null, holds the initial table size to use upon
* creation, or 0 for default. After initialization, holds the
* next element count value upon which to resize the table.
*/
private transient volatile int sizeCtl;
sizeCtl是一个控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同:
负数代表正在进行初始化或扩容操作,-1时表示在初始化中,-N表示
有N-1个线程正在进行扩容操作。 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小。
所以,如果某个线程想要初始化table或者对table扩容,需要去竞争sizeCtl这个共享变量,获得变量的线程才有许可去进行接下来的操作,没能获得的线程将会一直自旋来尝试获得这个共享变量,所以获得sizeCtl这个变量的线程在完成工作之后需要设置回来,使得其他的线程可以走出自旋进行接下来的操作。
initTable方法中我们可以看到,当线程发现sizeCtl小于0的时候,他就会让出CPU时间,而稍后再进行尝试,当发现sizeCtl不再小于0的时候,就会通过调用方法compareAndSwapInt来把sizeCtl共享变量变为-1,以告诉其他试图获得sizeCtl变量的线程,目前正在由本线程在享用该变量,在我完成我的任务之前你可以先休息一会,等会再来试试吧,我完成工作之后会释放掉的。在完成初始化table的任务之后,线程需要将sizeCtl设置成可以使得其他线程获得变量的状态。
这里需要解释一下这句代码的重要性。
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //把SIZECTL置为-1,接下来本线程对table进行操作
在并发编程的情况下,线程A和线程B都试图初始化table,线程A优先进行了初始化,但是此时初始化还没有完成,所以table为null,但其实已经在初始化当中了,而线程B发现table为null,所以他会继续竞争sizeCtl来尝试初始化table,如果没有上面的第二次检测的代码,线程B也会进行初始化。
ConcurrentHashMap查询记录方法详解,get方法 在ConcurrentHashMap中查询一条记录首先需要知道这条记录存储的table的位置(可以看成卡槽,每个卡槽中都会有一个链表或者一棵红黑树),该位置上可能为null,如果为null,说明想要查询的记录还不存在于ConcurrentHashMap中,否则,就在该位置上的链表或者红黑树中查找记录。
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode()); //计算key对应的hashcode
if ((tab = table) != null && (n = tab.length) > 0 &&
(e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空
if ((eh = e.hash) == h) { //获取到的val去hash然后和前面的hashcode比较
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
else if (eh < 0) //hash值小于0时,在此桶对应的红黑树查找
return (p = e.find(h, key)) != null ? p.val : null;
while ((e = e.next) != null) { //结点hash值大于0的情况,在链表中查找
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
get
操作可以总结如下: 计算hash值 判断table是否为空,如果为空,直接返回null 根据hash值获取table中的Node节点(tabAt(tab, (n – 1) & h)),然后根据链表或者树形方式找到相对应的节点,返回其value值。
ConcurrentHashMap的put方法
ConcurrentHashMap的put操作与HashMap并没有多大区别,其核心思想依然是根据hash值计算节点插入在table的位置,如果该位置为空,则直接插入,否则插入到链表或者树中。但是ConcurrentHashMap会涉及到多线程情况就会复杂很多。
public V put(K key, V value) {
return putVal(key, value, false);
}
/** Implementation for put and putIfAbsent */
final V putVal(K key, V value, boolean onlyIfAbsent) {
if (key == null || value == null) throw new NullPointerException();
int hash = spread(key.hashCode()); //计算hash
int binCount = 0;
for (Node<K,V>[] tab = table;;) {
Node<K,V> f; int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable(); //初始化table
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //hash到的桶为null,直接保存结点
if (casTabAt(tab, i, null,
new Node<K,V>(hash, key, value, null)))
break; // no lock when adding to empty bin
}
else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f); // 有线程正在进行扩容操作,则先帮助扩容
else {
V oldVal = null;
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) { //fh >= 0 表示为链表,将该节点插入到链表尾部
binCount = 1; //binCount表示每个桶里面的node数
for (Node<K,V> e = f;; ++binCount) {
K ek;
if (e.hash == hash &&
((ek = e.key) == key ||
(ek != null && key.equals(ek)))) {
oldVal = e.val; //hash和key都一样,替换原来的val
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K,V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K,V>(hash, key,
value, null); //把结点插入链表尾部
break;
}
}
}
else if (f instanceof TreeBin) { //如果结点不是要插入链表,那就要插入红黑树
Node<K,V> p;
binCount = 2;
if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD) //桶里面的节点数大于
TREEIFY_THRESHOLDtreeifyBin(tab, i); //如果链表长度已经达到临界值8 就需要把链表转换为树结构
if (oldVal != null)
return oldVal;
break;
}
}
}
addCount(1L, binCount);
return null;
}
putVal操作可以总结如下:
一,判空,ConcurrentHashMap的key、value都不允许为null
二,计算hash。利用方法计算hash值。
三,遍历table,进行节点插入操作,过程如下:
如果table为空,则表示ConcurrentHashMap还没有初始化,则进行初始化操作:initTable() 根据hash值获取节点的位置i,若该位置为空,则直接插入,这个过程是不需要加锁的。计算f位置:i=(n – 1) & hash 如果检测到fh = f.hash == -1,则f是ForwardingNode节点,表示有其他线程正在进行扩容操作,则帮助线程一起进行扩容操作 如果f.hash >= 0 表示是链表结构,则遍历链表,如果存在当前key节点则替换value,否则插入到链表尾部。如果f是TreeBin类型节点,则按照红黑树的方法更新或者增加节点 若链表长度 >= TREEIFY_THRESHOLD(默认是8),则将链表转换为红黑树结构
四,调用addCount方法,ConcurrentHashMap的size + 1
ConcurrentHashMap的treeifyBin方法 在putVal方法中,我们可以看到如果binCount超过了界限值TREEIFY_THRESHOLD,会调用treeifyBin来把链表转化成红黑树。
private final void treeifyBin(Node<K,V>[] tab, int index) {
Node<K,V> b; int n, sc;
if (tab != null) {
if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //table的size小于最小treeify的size
tryPresize(n << 1); //扩容,避免同一个桶里面的结点数过多
else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //桶中存在结点并且结点的hash值大于等于0
synchronized (b) { //对桶中第一个结点加锁,也就是对单个桶进行加锁
if (tabAt(tab, index) == b) {
TreeNode<K,V> hd = null, tl = null;
for (Node<K,V> e = b; e != null; e = e.next) { //遍历链表,修改成红黑树
TreeNode<K,V> p =
new TreeNode<K,V>(e.hash, e.key, e.val,
null, null);
if ((p.prev = tl) == null)
hd = p;
else
tl.next = p;
tl = p;
}
setTabAt(tab, index, new TreeBin<K,V>(hd));
}
}
}
}
}
此函数用于将桶中的数据结构转化为红黑树,
其中,值得注意的是,当table的长度未达到阈值时,会进行一次扩容操作,该操作会使得触发treeifyBin操作的某个桶中的所有元素进行一次重新分配,这样可以避免某个桶中的结点数量太大。 转化的时候,要把对应的桶加锁。
ConcurrentHashMap的size方法
size 和mappingCount都可以用来返回ConcurrentHashMap的估计size,因为在进行统计的时候有其他线程正在进行插入和删除操作,所以size只能是一个估计值。当然为了这个不精确的值,ConcurrentHashMap也是操碎了心。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
public long mappingCount() {
long n = sumCount();
return (n < 0L) ? 0L : n; // ignore transient negative values
}
从
mappingCount的方法注释上可以知道,计算size的时候更推崇
mappingCount方法。
size方法调用了sumCount这个方法。
final long sumCount() {
CounterCell[] as = counterCells; CounterCell a;
long sum = baseCount; //ConcurrentHashMap中元素个数,但返回的不一定是当前Map的真实元素个数。基于CAS无锁更新
if (as != null) {
for (int i = 0; i < as.length; ++i) {
if ((a = as[i]) != null)
sum += a.value;
}
}
return sum;
}
sumCount()就是迭代counterCells来统计sum的过程。我们知道put操作时,肯定会影响size()。putVal时,会调用addCount方法来修改map的size,在addCount中就是通过操作
counterCells来保留size的信息的。
ConcurrentHashMap的扩容操作
当ConcurrentHashMap中table元素个数达到了容量阈值(sizeCtl)时,则需要进行扩容操作。在put操作时最后一个会调用addCount(long x, int check),该方法主要做两个工作:1.更新baseCount;2.检测是否需要扩容操作。
private final void addCount(long x, int check) {
CounterCell[] as; long b, s;
// 这里一段代码省略了,作用是更新baseCount
//check >= 0 :则需要进行扩容操作
if (check >= 0) {
Node<K,V>[] tab, nt; int n, sc;
while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
(n = tab.length) < MAXIMUM_CAPACITY) {
int rs = resizeStamp(n);
if (sc < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
transfer(tab, nt);
}
//当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
else if (U.compareAndSwapInt(this, SIZECTL, sc,
(rs << RESIZE_STAMP_SHIFT) + 2))
transfer(tab, null);
s = sumCount();
}
}
}
上面的代码中,有一个重要的方法transfer,这个就是核心的扩容操作。分两步: 构建一个nextTable,其大小为原来大小的两倍,这个步骤是在单线程环境下完成的 将原来table里面的内容复制到nextTable中,这个步骤是允许多线程操作的,所以性能得到提升,减少了扩容的时间消耗
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
int n = tab.length, stride;
// 每核处理的量小于16,则强制赋值16
if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
stride = MIN_TRANSFER_STRIDE; // subdivide range
if (nextTab == null) { // initiating
try {
@SuppressWarnings("unchecked")
Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //构建一个nextTable对象,其容量为原来容量的两倍
nextTab = nt;
} catch (Throwable ex) { // try to cope with OOME
sizeCtl = Integer.MAX_VALUE;
return;
}
nextTable = nextTab;
transferIndex = n;
}
int nextn = nextTab.length;
// 连接点指针,用于标志位(fwd的hash值为-1,fwd.nextTable=nextTab)
ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
// 当advance == true时,表明该节点已经处理过了
boolean advance = true;
boolean finishing = false; // to ensure sweep before committing nextTab
for (int i = 0, bound = 0;;) {
Node<K,V> f; int fh;
// 控制 --i ,遍历原hash表中的节点
while (advance) {
int nextIndex, nextBound;
if (--i >= bound || finishing)
advance = false;
else if ((nextIndex = transferIndex) <= 0) {
i = -1;
advance = false;
}
// 用CAS计算得到的transferIndex
else if (U.compareAndSwapInt
(this, TRANSFERINDEX, nextIndex,
nextBound = (nextIndex > stride ?
nextIndex - stride : 0))) {
bound = nextBound;
i = nextIndex - 1;
advance = false;
}
}
if (i < 0 || i >= n || i + n >= nextn) {
int sc;
if (finishing) {
// 已经完成所有节点复制了nextTable = null;
table = nextTab; // table 指向nextTable
sizeCtl = (n << 1) - (n >>> 1); // sizeCtl阈值为原来的1.5倍
return; // 跳出死循环,
}
// CAS 更新扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
return;
finishing = advance = true;
i = n; // recheck before commit
}
}
// 遍历的节点为null,则放入到ForwardingNode 指针节点
else if ((f = tabAt(tab, i)) == null)
advance = casTabAt(tab, i, null, fwd);
// f.hash == -1 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了
// 这里是控制并发扩容的核心
else if ((fh = f.hash) == MOVED)
advance = true; // already processed
else {
// 节点加锁
synchronized (f) {
// 节点复制工作
if (tabAt(tab, i) == f) {
Node<K,V> ln, hn;
// fh >= 0 ,表示为链表节点
if (fh >= 0) {
// 构造两个链表 一个是原链表 另一个是原链表的反序排列
int runBit = fh & n;
Node<K,V> lastRun = f;
for (Node<K,V> p = f.next; p != null; p = p.next) {
int b = p.hash & n;
if (b != runBit) {
runBit = b;
lastRun = p;
}
}
if (runBit == 0) {
ln = lastRun;
hn = null;
}
else {
hn = lastRun;
ln = null;
}
for (Node<K,V> p = f; p != lastRun; p = p.next) {
int ph = p.hash; K pk = p.key; V pv = p.val;
if ((ph & n) == 0)
ln = new Node<K,V>(ph, pk, pv, ln);
else
hn = new Node<K,V>(ph, pk, pv, hn);
}
// 在nextTable i 位置处插上链表
setTabAt(nextTab, i, ln);
// 在nextTable i + n 位置处插上链表
setTabAt(nextTab, i + n, hn);
// 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
setTabAt(tab, i, fwd);
// advance = true 可以执行--i动作,遍历节点
advance = true;
}
// 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
else if (f instanceof TreeBin) {
TreeBin<K,V> t = (TreeBin<K,V>)f;
TreeNode<K,V> lo = null, loTail = null;
TreeNode<K,V> hi = null, hiTail = null;
int lc = 0, hc = 0;
for (Node<K,V> e = t.first; e != null; e = e.next) {
int h = e.hash;
TreeNode<K,V> p = new TreeNode<K,V>
(h, e.key, e.val, null, null);
if ((h & n) == 0) {
if ((p.prev = loTail) == null)
lo = p;
else
loTail.next = p;
loTail = p;
++lc;
}
else {
if ((p.prev = hiTail) == null)
hi = p;
else
hiTail.next = p;
hiTail = p;
++hc;
}
}
// 扩容后树节点个数若<=6,将树转链表
ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
(hc != 0) ? new TreeBin<K,V>(lo) : t;
hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
(lc != 0) ? new TreeBin<K,V>(hi) : t;
setTabAt(nextTab, i, ln);
setTabAt(nextTab, i + n, hn);
setTabAt(tab, i, fwd);
advance = true;
}
}
}
}
}
}
先抛开多线程的环境,在单线程的环境下,分析transfer的步骤如下: 一,为每个内核分任务,并保证其不小于16 二,检查nextTable是否为null,如果是,则初始化nextTable,使其容量为table的两倍 三,死循环遍历节点,知道finished:节点从table复制到nextTable中,支持并发,思路如下:
如果节点 f 为null,则插入ForwardingNode(采用Unsafe.compareAndSwapObjectf方法实现),当一个线程遍历到的节点如果ForwardingNode,则继续往后遍历,如果不是,则将该节点加锁,防止其他线程进入 如果f为链表的头节点(fh >= 0),则先构造一个反序链表,然后把他们分别放在nextTable的i和i + n位置,并将ForwardingNode 插入原节点位置,代表已经处理过了 如果f为TreeBin节点,同样也是构造一个反序 ,同时需要判断是否需要进行unTreeify()操作,并把处理的结果分别插入到nextTable的i 和i+nw位置,并插入ForwardingNode 节点
四,所有节点复制完成后,则将table指向nextTable,同时更新sizeCtl = nextTable的0.75倍(如果又有结点加入map,总的结点数达到这个值,就会触发扩容)。
在多线程环境下,ConcurrentHashMap用两点来保证正确性:ForwardingNode和synchronized。当一个线程遍历到的节点如果是ForwardingNode,则继续往后遍历,如果不是,则将该节点加锁,防止其他线程进入,完成后设置ForwardingNode节点,以便要其他线程可以看到该节点已经处理过了,如此交叉进行,高效而又安全。
扩容过程如下。
参考:
http://www.cnblogs.com/leesf456/p/5453341.html
http://cmsblogs.com/?p=2283
http://www.cnblogs.com/leesf456/p/5453341.html 红黑树:
http://cmsblogs.com/?p=2329 ConcurrentHashMap的弱一致性
http://ifeve.com/concurrenthashmap-weakly-consistent/
HashMap、Hashtable、HashSet 和 ConcurrentHashMap 的比较
http://www.54tianzhisheng.cn/2017/06/13/HashMap-Hashtable/