1.8以前用分段锁实现
static class Segment<K, V> extends ReentrantLock implements Serializable {
final float loadFactor;
Segment(float lf) {
this.loadFactor = lf;
}
}
1.8及之后用了Unsafe类的CAS操作和并发扩容机制
public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {
/**
* 很重要,-1表示正在初始化;
* -N表示有N-1条线程在正在扩容;
* 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,
* 0代表使用默认的容量16
* 类似于扩容阈值。它的值始终是当前ConcurrentHashMap容量的0.75倍,
* 这与loadfactor是对应的。实际容量>=sizeCtl,则扩容。
*/
private transient volatile int sizeCtl;
transient volatile Node<K,V>[] table;
// 只有扩容是不为null
private transient volatile Node<K,V>[] nextTable;
// 参数、Node、TreeNode和HashMap类似
private static final int MAXIMUM_CAPACITY = 1 << 30;
private static final int DEFAULT_CAPACITY = 16;
static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;
private static final int DEFAULT_CONCURRENCY_LEVEL = 16;
private static final float LOAD_FACTOR = 0.75f;
static final int TREEIFY_THRESHOLD = 8;
static final int UNTREEIFY_THRESHOLD = 6;
static final int MIN_TREEIFY_CAPACITY = 64;
static class Node<K, V> implements Map.Entry<K, V> {
......
}
static final class TreeNode<K, V> extends Node<K, V> {
......
}
// 封装TreeNode,包含树根,hash为常量TREEBIN(<0),可以用hash<0判断是链表节点还是树节点,红黑树存放的是TreeBin
static final class TreeBin<K, V> extends Node<K, V> {
TreeNode<K, V> root;
volatile TreeNode<K, V> first;
......
}
// 利用Unsafe类的CAS方法
private static final sun.misc.Unsafe U;
// CAS地获得i位置的节点
static final <K, V> Node<K, V> tabAt(Node<K, V>[] tab, int i) {
return (Node<K, V>) U.getObjectVolatile(tab, ((long) i << ASHIFT) + ABASE);
}
// 将c与tab[i]比较,相同则插入v
static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i, Node<K, V> c, Node<K, V> v) {
return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
}
// 设置tab[i]的值,仅在上锁区被调用
static final <K, V> void setTabAt(Node<K, V>[] tab, int i, Node<K, V> v) {
U.putObjectVolatile(tab, ((long) i << ASHIFT) + ABASE, v);
}
// 不直接利用key。hashCode(),不允许key == null,HashMap key == null时hash为0
static final int spread(int h) {
return (h ^ (h >>> 16)) & HASH_BITS;
}
public V put(K key, V value) {
return putVal(key, value, false);
}
final V putVal(K key, V value, boolean onlyIfAbsent) {
// 不允许key、value为null
if (key == null || value == null)
throw new NullPointerException();
int hash = spread(key.hashCode());
int binCount = 0;
for (Node<K, V>[] tab = table;;) {
Node<K, V> f;
int n, i, fh;
if (tab == null || (n = tab.length) == 0)
tab = initTable();
// 对应的tab[i]为空,CAS地放入节点
else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null)))
break;
// MOVED是ForwardingNode的哈希值,如果检测到ForwardingNode,帮助扩容
} else if ((fh = f.hash) == MOVED)
tab = helpTransfer(tab, f);
else {
V oldVal = null;
// 如果tab[i]不为空,且未检测到ForwardingNode,对tab[i]头结点加锁,下面的过程和HashMap类似
synchronized (f) {
if (tabAt(tab, i) == f) {
if (fh >= 0) {
binCount = 1;
for (Node<K, V> e = f;; ++binCount) {
K ek;
if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
oldVal = e.val;
if (!onlyIfAbsent)
e.val = value;
break;
}
Node<K, V> pred = e;
if ((e = e.next) == null) {
pred.next = new Node<K, V>(hash, key, value, null);
break;
}
}
} else if (f instanceof TreeBin) {
Node<K, V> p;
binCount = 2;
if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {
oldVal = p.val;
if (!onlyIfAbsent)
p.val = value;
}
}
}
}
if (binCount != 0) {
if (binCount >= TREEIFY_THRESHOLD)
treeifyBin(tab, i);
if (oldVal != null)
return oldVal;
break;
}
}
}
// 增加计数
addCount(1L, binCount);
return null;
}
// tab的初始化只能由一条线程进行
private final Node<K, V>[] initTable() {
Node<K, V>[] tab;
int sc;
while ((tab = table) == null || tab.length == 0) {
// tab为null,sizeCtl<0,说明其它线程在初始化tab,等待
if ((sc = sizeCtl) < 0)
Thread.yield();
// CAS将sizeCtl改为 -1,成功的话由当前线程初始化tab
else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
try {
if ((tab = table) == null || tab.length == 0) {
// sc == 0的话使用默认的容量
int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
@SuppressWarnings("unchecked")
Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
table = tab = nt;
// 将sizeCtl设置为表大小的0.75
sc = n - (n >>> 2);
}
} finally {
sizeCtl = sc;
}
break;
}
}
return tab;
}
// 不加锁,只是CAS地获得头结点,再遍历头结点的链表或者树
public V get(Object key) {
Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
int h = spread(key.hashCode());
if ((tab = table) != null && (n = tab.length) > 0 &&
// CAS获得头结点
(e = tabAt(tab, (n - 1) & h)) != null) {
if ((eh = e.hash) == h) {
if ((ek = e.key) == key || (ek != null && key.equals(ek)))
return e.val;
}
// tab[i]是红黑树
else if (eh < 0)
return (p = e.find(h, key)) != null ? p.val : null;
// tab[i]是链表
while ((e = e.next) != null) {
if (e.hash == h &&
((ek = e.key) == key || (ek != null && key.equals(ek))))
return e.val;
}
}
return null;
}
// 这个方法返回值是一个估计值,由于存在并发的插入和删除,因此返回值可能与实际值会有出入。
public int size() {
long n = sumCount();
return ((n < 0L) ? 0 :
(n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
(int)n);
}
// 线程put()、remove()时如果检测到ForwardingNode会进行扩容
static final class ForwardingNode<K,V> extends Node<K,V> {
final Node<K,V>[] nextTable;
ForwardingNode(Node<K,V>[] tab) {
// ForwardingNode的hash值是MOVED
super(MOVED, null, null, null);
this.nextTable = tab;
}
Node<K,V> find(int h, Object k) {
......
}
}
// 检查到ForwardingNode则帮助扩容
final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
Node<K,V>[] nextTab; int sc;
if (tab != null && (f instanceof ForwardingNode) &&
// 利用ForwardingNode中的nextTable
(nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
int rs = resizeStamp(tab.length);
while (nextTab == nextTable && table == tab &&
(sc = sizeCtl) < 0) {
if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
sc == rs + MAX_RESIZERS || transferIndex <= 0)
break;
if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
transfer(tab, nextTab);
break;
}
}
return nextTab;
}
return table;
}
/**
* 实际扩容时,节点为空则置为ForwardingNode,处理下一个节点;
* 节点为ForwardingNode时表示该节点已经处理完毕,处理下一个节点;
* 否则锁住该处的头结点,分成两条链插入nextTab中,类似HashMap,最后置为ForwardingNode
* */
private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
......
}
}
总结一下,ConcurrentHashMap主要利用了Unsafe类的CAS操作和并发扩容机制,唯一需要加锁的地方是put()、remove()、扩容等操作时对头节点加锁,并发度比分段锁的实现更高。
(1)内部的volatile int sizeCtl表示table是否在初始化、是否在扩容,由线程CAS地去改变;
(2)3个比较重要的CAS方法,get()时利用了CAS,无需加锁;
(3)用内部类ForwardingNode帮助并发扩容,当put()、remove()等操作检测到ForwardingNode时(通过hash值,ForwardingNode的hash为MOVED),线程优先加入扩容的过程,对某个节点转移完毕后置为ForwardingNode,表示该节点转移完毕;
(4)不允许key、value为null。我是这样理解的,get(key) == null时无法判断是key不存在还是value == null,HashMap是单线程的,可以通过if(map.contains(key)) {map.get(key);}来判断,而ConcurrentHashMap则无法判断,除非加锁。