JUC源码——ConcurrentHashMap(1.8)

    1.8以前用分段锁实现

static class Segment<K, V> extends ReentrantLock implements Serializable {
	final float loadFactor;

	Segment(float lf) {
		this.loadFactor = lf;
	}
}

    1.8及之后用了Unsafe类的CAS操作和并发扩容机制

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {

	/**
	 * 很重要,-1表示正在初始化;
	 * -N表示有N-1条线程在正在扩容;
	 * 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,
	 * 0代表使用默认的容量16
	 * 类似于扩容阈值。它的值始终是当前ConcurrentHashMap容量的0.75倍,
	 * 这与loadfactor是对应的。实际容量>=sizeCtl,则扩容。
	 */
	private transient volatile int sizeCtl;
	
	transient volatile Node<K,V>[] table;
	
	// 只有扩容是不为null
    private transient volatile Node<K,V>[] nextTable;

	// 参数、Node、TreeNode和HashMap类似
	private static final int MAXIMUM_CAPACITY = 1 << 30;

	private static final int DEFAULT_CAPACITY = 16;

	static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

	private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

	private static final float LOAD_FACTOR = 0.75f;

	static final int TREEIFY_THRESHOLD = 8;

	static final int UNTREEIFY_THRESHOLD = 6;

	static final int MIN_TREEIFY_CAPACITY = 64;

	static class Node<K, V> implements Map.Entry<K, V> {
		......
	}

	static final class TreeNode<K, V> extends Node<K, V> {
		......
	}

	// 封装TreeNode,包含树根,hash为常量TREEBIN(<0),可以用hash<0判断是链表节点还是树节点,红黑树存放的是TreeBin
	static final class TreeBin<K, V> extends Node<K, V> {
		TreeNode<K, V> root;
		volatile TreeNode<K, V> first;
		......
	}

	// 利用Unsafe类的CAS方法
	private static final sun.misc.Unsafe U;

	// CAS地获得i位置的节点
	static final <K, V> Node<K, V> tabAt(Node<K, V>[] tab, int i) {
		return (Node<K, V>) U.getObjectVolatile(tab, ((long) i << ASHIFT) + ABASE);
	}

	// 将c与tab[i]比较,相同则插入v
	static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i, Node<K, V> c, Node<K, V> v) {
		return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
	}

	// 设置tab[i]的值,仅在上锁区被调用
	static final <K, V> void setTabAt(Node<K, V>[] tab, int i, Node<K, V> v) {
		U.putObjectVolatile(tab, ((long) i << ASHIFT) + ABASE, v);
	}

	// 不直接利用key。hashCode(),不允许key == null,HashMap key == null时hash为0
	static final int spread(int h) {
		return (h ^ (h >>> 16)) & HASH_BITS;
	}

	public V put(K key, V value) {
		return putVal(key, value, false);
	}

	final V putVal(K key, V value, boolean onlyIfAbsent) {
		// 不允许key、value为null
		if (key == null || value == null)
			throw new NullPointerException();
		int hash = spread(key.hashCode());
		int binCount = 0;
		for (Node<K, V>[] tab = table;;) {
			Node<K, V> f;
			int n, i, fh;
			if (tab == null || (n = tab.length) == 0)
				tab = initTable();
			// 对应的tab[i]为空,CAS地放入节点
			else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
				if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null)))
					break;
			// MOVED是ForwardingNode的哈希值,如果检测到ForwardingNode,帮助扩容
			} else if ((fh = f.hash) == MOVED)
				tab = helpTransfer(tab, f);
			else {
				V oldVal = null;
				// 如果tab[i]不为空,且未检测到ForwardingNode,对tab[i]头结点加锁,下面的过程和HashMap类似
				synchronized (f) {
					if (tabAt(tab, i) == f) {
						if (fh >= 0) {
							binCount = 1;
							for (Node<K, V> e = f;; ++binCount) {
								K ek;
								if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
									oldVal = e.val;
									if (!onlyIfAbsent)
										e.val = value;
									break;
								}
								Node<K, V> pred = e;
								if ((e = e.next) == null) {
									pred.next = new Node<K, V>(hash, key, value, null);
									break;
								}
							}
						} else if (f instanceof TreeBin) {
							Node<K, V> p;
							binCount = 2;
							if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {
								oldVal = p.val;
								if (!onlyIfAbsent)
									p.val = value;
							}
						}
					}
				}
				if (binCount != 0) {
					if (binCount >= TREEIFY_THRESHOLD)
						treeifyBin(tab, i);
					if (oldVal != null)
						return oldVal;
					break;
				}
			}
		}
		// 增加计数
		addCount(1L, binCount);
		return null;
	}
	
	// tab的初始化只能由一条线程进行
	private final Node<K, V>[] initTable() {
		Node<K, V>[] tab;
		int sc;
		while ((tab = table) == null || tab.length == 0) {
			// tab为null,sizeCtl<0,说明其它线程在初始化tab,等待
			if ((sc = sizeCtl) < 0)
				Thread.yield(); 
			// CAS将sizeCtl改为 -1,成功的话由当前线程初始化tab
			else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
				try {
					if ((tab = table) == null || tab.length == 0) {
						// sc == 0的话使用默认的容量
						int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
						@SuppressWarnings("unchecked")
						Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
						table = tab = nt;
						// 将sizeCtl设置为表大小的0.75
						sc = n - (n >>> 2);
					}
				} finally {
					sizeCtl = sc;
				}
				break;
			}
		}
		return tab;
	}
	
	// 不加锁,只是CAS地获得头结点,再遍历头结点的链表或者树
	public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
        	// CAS获得头结点
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // tab[i]是红黑树
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            // tab[i]是链表
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
	
	// 这个方法返回值是一个估计值,由于存在并发的插入和删除,因此返回值可能与实际值会有出入。
	public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
	
	// 线程put()、remove()时如果检测到ForwardingNode会进行扩容
	static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
        	// ForwardingNode的hash值是MOVED
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

        Node<K,V> find(int h, Object k) {
        	......
        }
    }
	
	// 检查到ForwardingNode则帮助扩容
	final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
        	// 利用ForwardingNode中的nextTable
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
	
	/** 
	 * 实际扩容时,节点为空则置为ForwardingNode,处理下一个节点;
	 * 节点为ForwardingNode时表示该节点已经处理完毕,处理下一个节点;
	 * 否则锁住该处的头结点,分成两条链插入nextTab中,类似HashMap,最后置为ForwardingNode
	 * */
	private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
		......
	}
}

    总结一下,ConcurrentHashMap主要利用了Unsafe类的CAS操作和并发扩容机制,唯一需要加锁的地方是put()、remove()、扩容等操作时对头节点加锁,并发度比分段锁的实现更高。

    (1)内部的volatile int sizeCtl表示table是否在初始化、是否在扩容,由线程CAS地去改变;

    (2)3个比较重要的CAS方法,get()时利用了CAS,无需加锁;

    (3)用内部类ForwardingNode帮助并发扩容,当put()、remove()等操作检测到ForwardingNode时(通过hash值,ForwardingNode的hash为MOVED),线程优先加入扩容的过程,对某个节点转移完毕后置为ForwardingNode,表示该节点转移完毕;

    (4)不允许key、value为null。我是这样理解的,get(key) == null时无法判断是key不存在还是value == null,HashMap是单线程的,可以通过if(map.contains(key)) {map.get(key);}来判断,而ConcurrentHashMap则无法判断,除非加锁。

    原文作者:JUC
    原文地址: https://blog.csdn.net/weixin_39420024/article/details/80078859
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞