JUC源码——ConcurrentHashMap(1.8)

    1.8以前用分段锁实现

static class Segment<K, V> extends ReentrantLock implements Serializable {
    final float loadFactor;

    Segment(float lf) {
        this.loadFactor = lf;
    }
}

    1.8及之后用了Unsafe类的CAS操作和并发扩容机制

public class ConcurrentHashMap<K, V> extends AbstractMap<K, V> implements ConcurrentMap<K, V>, Serializable {

    /**
     * 很重要,-1表示正在初始化;
     * -N表示有N-1条线程在正在扩容;
     * 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小,
     * 0代表使用默认的容量16
     * 类似于扩容阈值。它的值始终是当前ConcurrentHashMap容量的0.75倍,
     * 这与loadfactor是对应的。实际容量>=sizeCtl,则扩容。
     */
    private transient volatile int sizeCtl;
    
    transient volatile Node<K,V>[] table;
    
    // 只有扩容是不为null
    private transient volatile Node<K,V>[] nextTable;

    // 参数、Node、TreeNode和HashMap类似
    private static final int MAXIMUM_CAPACITY = 1 << 30;

    private static final int DEFAULT_CAPACITY = 16;

    static final int MAX_ARRAY_SIZE = Integer.MAX_VALUE - 8;

    private static final int DEFAULT_CONCURRENCY_LEVEL = 16;

    private static final float LOAD_FACTOR = 0.75f;

    static final int TREEIFY_THRESHOLD = 8;

    static final int UNTREEIFY_THRESHOLD = 6;

    static final int MIN_TREEIFY_CAPACITY = 64;

    static class Node<K, V> implements Map.Entry<K, V> {
        ......
    }

    static final class TreeNode<K, V> extends Node<K, V> {
        ......
    }

    // 封装TreeNode,包含树根,hash为常量TREEBIN(<0),可以用hash<0判断是链表节点还是树节点,红黑树存放的是TreeBin
    static final class TreeBin<K, V> extends Node<K, V> {
        TreeNode<K, V> root;
        volatile TreeNode<K, V> first;
        ......
    }

    // 利用Unsafe类的CAS方法
    private static final sun.misc.Unsafe U;

    // CAS地获得i位置的节点
    static final <K, V> Node<K, V> tabAt(Node<K, V>[] tab, int i) {
        return (Node<K, V>) U.getObjectVolatile(tab, ((long) i << ASHIFT) + ABASE);
    }

    // 将c与tab[i]比较,相同则插入v
    static final <K, V> boolean casTabAt(Node<K, V>[] tab, int i, Node<K, V> c, Node<K, V> v) {
        return U.compareAndSwapObject(tab, ((long) i << ASHIFT) + ABASE, c, v);
    }

    // 设置tab[i]的值,仅在上锁区被调用
    static final <K, V> void setTabAt(Node<K, V>[] tab, int i, Node<K, V> v) {
        U.putObjectVolatile(tab, ((long) i << ASHIFT) + ABASE, v);
    }

    // 不直接利用key。hashCode(),不允许key == null,HashMap key == null时hash为0
    static final int spread(int h) {
        return (h ^ (h >>> 16)) & HASH_BITS;
    }

    public V put(K key, V value) {
        return putVal(key, value, false);
    }

    final V putVal(K key, V value, boolean onlyIfAbsent) {
        // 不允许key、value为null
        if (key == null || value == null)
            throw new NullPointerException();
        int hash = spread(key.hashCode());
        int binCount = 0;
        for (Node<K, V>[] tab = table;;) {
            Node<K, V> f;
            int n, i, fh;
            if (tab == null || (n = tab.length) == 0)
                tab = initTable();
            // 对应的tab[i]为空,CAS地放入节点
            else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) {
                if (casTabAt(tab, i, null, new Node<K, V>(hash, key, value, null)))
                    break;
            // MOVED是ForwardingNode的哈希值,如果检测到ForwardingNode,帮助扩容
            } else if ((fh = f.hash) == MOVED)
                tab = helpTransfer(tab, f);
            else {
                V oldVal = null;
                // 如果tab[i]不为空,且未检测到ForwardingNode,对tab[i]头结点加锁,下面的过程和HashMap类似
                synchronized (f) {
                    if (tabAt(tab, i) == f) {
                        if (fh >= 0) {
                            binCount = 1;
                            for (Node<K, V> e = f;; ++binCount) {
                                K ek;
                                if (e.hash == hash && ((ek = e.key) == key || (ek != null && key.equals(ek)))) {
                                    oldVal = e.val;
                                    if (!onlyIfAbsent)
                                        e.val = value;
                                    break;
                                }
                                Node<K, V> pred = e;
                                if ((e = e.next) == null) {
                                    pred.next = new Node<K, V>(hash, key, value, null);
                                    break;
                                }
                            }
                        } else if (f instanceof TreeBin) {
                            Node<K, V> p;
                            binCount = 2;
                            if ((p = ((TreeBin<K, V>) f).putTreeVal(hash, key, value)) != null) {
                                oldVal = p.val;
                                if (!onlyIfAbsent)
                                    p.val = value;
                            }
                        }
                    }
                }
                if (binCount != 0) {
                    if (binCount >= TREEIFY_THRESHOLD)
                        treeifyBin(tab, i);
                    if (oldVal != null)
                        return oldVal;
                    break;
                }
            }
        }
        // 增加计数
        addCount(1L, binCount);
        return null;
    }
    
    // tab的初始化只能由一条线程进行
    private final Node<K, V>[] initTable() {
        Node<K, V>[] tab;
        int sc;
        while ((tab = table) == null || tab.length == 0) {
            // tab为null,sizeCtl<0,说明其它线程在初始化tab,等待
            if ((sc = sizeCtl) < 0)
                Thread.yield(); 
            // CAS将sizeCtl改为 -1,成功的话由当前线程初始化tab
            else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) {
                try {
                    if ((tab = table) == null || tab.length == 0) {
                        // sc == 0的话使用默认的容量
                        int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
                        @SuppressWarnings("unchecked")
                        Node<K, V>[] nt = (Node<K, V>[]) new Node<?, ?>[n];
                        table = tab = nt;
                        // 将sizeCtl设置为表大小的0.75
                        sc = n - (n >>> 2);
                    }
                } finally {
                    sizeCtl = sc;
                }
                break;
            }
        }
        return tab;
    }
    
    // 不加锁,只是CAS地获得头结点,再遍历头结点的链表或者树
    public V get(Object key) {
        Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
        int h = spread(key.hashCode());
        if ((tab = table) != null && (n = tab.length) > 0 &&
        	// CAS获得头结点
            (e = tabAt(tab, (n - 1) & h)) != null) {
            if ((eh = e.hash) == h) {
                if ((ek = e.key) == key || (ek != null && key.equals(ek)))
                    return e.val;
            }
            // tab[i]是红黑树
            else if (eh < 0)
                return (p = e.find(h, key)) != null ? p.val : null;
            // tab[i]是链表
            while ((e = e.next) != null) {
                if (e.hash == h &&
                    ((ek = e.key) == key || (ek != null && key.equals(ek))))
                    return e.val;
            }
        }
        return null;
    }
    
    // 这个方法返回值是一个估计值,由于存在并发的插入和删除,因此返回值可能与实际值会有出入。
    public int size() {
        long n = sumCount();
        return ((n < 0L) ? 0 :
                (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
                (int)n);
    }
    
    // 线程put()、remove()时如果检测到ForwardingNode会进行扩容
    static final class ForwardingNode<K,V> extends Node<K,V> {
        final Node<K,V>[] nextTable;
        ForwardingNode(Node<K,V>[] tab) {
        	// ForwardingNode的hash值是MOVED
            super(MOVED, null, null, null);
            this.nextTable = tab;
        }

        Node<K,V> find(int h, Object k) {
        	......
        }
    }
    
    // 检查到ForwardingNode则帮助扩容
    final Node<K,V>[] helpTransfer(Node<K,V>[] tab, Node<K,V> f) {
        Node<K,V>[] nextTab; int sc;
        if (tab != null && (f instanceof ForwardingNode) &&
        	// 利用ForwardingNode中的nextTable
            (nextTab = ((ForwardingNode<K,V>)f).nextTable) != null) {
            int rs = resizeStamp(tab.length);
            while (nextTab == nextTable && table == tab &&
                   (sc = sizeCtl) < 0) {
                if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
                    sc == rs + MAX_RESIZERS || transferIndex <= 0)
                    break;
                if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1)) {
                    transfer(tab, nextTab);
                    break;
                }
            }
            return nextTab;
        }
        return table;
    }
    
    /** 
     * 实际扩容时,节点为空则置为ForwardingNode,处理下一个节点;
     * 节点为ForwardingNode时表示该节点已经处理完毕,处理下一个节点;
     * 否则锁住该处的头结点,分成两条链插入nextTab中,类似HashMap,最后置为ForwardingNode
     * */
    private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
        ......
    }
}

    总结一下,ConcurrentHashMap主要利用了Unsafe类的CAS操作和并发扩容机制,唯一需要加锁的地方是put()、remove()、扩容等操作时对头节点加锁,并发度比分段锁的实现更高。

    (1)内部的volatile int sizeCtl表示table是否在初始化、是否在扩容,由线程CAS地去改变;

    (2)3个比较重要的CAS方法,get()时利用了CAS,无需加锁;

    (3)用内部类ForwardingNode帮助并发扩容,当put()、remove()等操作检测到ForwardingNode时(通过hash值,ForwardingNode的hash为MOVED),线程优先加入扩容的过程,对某个节点转移完毕后置为ForwardingNode,表示该节点转移完毕;

    (4)不允许key、value为null。我是这样理解的,get(key) == null时无法判断是key不存在还是value == null,HashMap是单线程的,可以通过if(map.contains(key)) {map.get(key);}来判断,而ConcurrentHashMap则无法判断,除非加锁。

    原文作者:JUC
    原文地址: https://blog.csdn.net/weixin_39420024/article/details/80078859
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞