JUC--ConcurrentHashMap源码分析(一)(基于JDK1.7)

1 概述

前面分析研究了HashMap,我们知道HashMap不是线程安全的,那么如果需要线程安全的HashMap怎么办呢?这个时候我们有几种解决方法。

(1)使用HashTable代替HashMap;

(2)使用Collections.synchronizeMap(hashMap);

(3)使用ConcurrentHashMap;

针对这三种方法得区别我们在文章后面再谈,下面我们就针对ConcurrentHashMap(JDK1.7)进行分析。

2 数据结构

ConcurrentHashMap与HashTable不同的是,这里并不是对整个数组加锁,而是对数组进行分段加锁(即对Segment[])进行加锁。所以针对ConcurrentHashMap,只要不同线程获取的不是相同的Segment,就不会产生锁竞争。所以ConcnrrentHashMap比HashTable的效率高很多。

总结来说ConcurrentHashMap的数据结构就是:Segment 数组、HashEntry 数组和HashEntry链表组成。如图:

《JUC--ConcurrentHashMap源码分析(一)(基于JDK1.7)》

3 内部类

针对内部类,这里我们仅仅看一下Segment这个内部类的结构。

static final class Segment<K,V> extends ReentrantLock implements Serializable {
	private static final long serialVersionUID = 2249069246763182397L;
	
	//和HashMap中的HashEntry相同,真正存放数据的桶
	transient volatile HashEntry<K,V>[] table;
	transient int count;
	transient int modCount;
	transient int threshold;
	final float loadFactor;

    ... ...
}

我们可以看出来,Segment内部实际上就拥有了对HashEntry数组的一个分段存储。

4 属性

...
//默认的并发水平,也即segment的数量
static final int DEFAULT_CONCURRENCY_LEVEL = 16;   

// 最大的segment的数量
static final int MAX_SEGMENTS = 1 << 16; 

//segment的掩码,用来对segment进行定位,判断哪个segment
final int segmentMask; 

//segment的偏移,segment中的索引
final int segmentShift; 

//Segment 数组,存放数据时首先需要定位到具体的 Segment 中。 
final Segment<K,V>[] segments;
...

Segment数组就是用于对HashEntry数组进行分段后的一个存储,主要是为了操作HashEntry的时候进行分段加锁,从而减小对锁的竞争。这就是与HashTable的区别所在,HashTable是对整个HashEntry数组进行加锁,所以执行的效率相对较低。

5 构造函数

针对构造函数,我们这里仅仅查看一个最核心的函数即可。

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
		
        //验证参数的有效性    		
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();
			
		//segment的最大容量只能为2^16	
        if (concurrencyLevel > MAX_SEGMENTS)  
            concurrencyLevel = MAX_SEGMENTS;
			
        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) { //另ssize为2的幂
            ++sshift;
            ssize <<= 1;
        }
        this.segmentShift = 32 - sshift;
        this.segmentMask = ssize - 1;
        
		//利用segmentShift和segmentMask可以通过key的hash值与这个值做&运算确定Segment索引
        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        //检查给的容量值是否大于允许的最大容量值
        int c = initialCapacity / ssize;        
        //计算每个Segment平均应该放置多少个元素,向上取整
        if (c * ssize < initialCapacity)
            ++c;
        int cap = MIN_SEGMENT_TABLE_CAPACITY;
        while (cap < c)
            cap <<= 1;
        
		//创建一个segment实例,并作为segment数组的一个元素
        Segment<K,V> s0 =
            new Segment<K,V>(loadFactor, (int)(cap * loadFactor),
                             (HashEntry<K,V>[])new HashEntry[cap]);
        Segment<K,V>[] ss = (Segment<K,V>[])new Segment[ssize];
        UNSAFE.putOrderedObject(ss, SBASE, s0); 
        this.segments = ss;
    }

6 核心函数

针对核心函数,这里我们仅仅看两个用得比较多的函数put和get。

6.1 put函数

public V put(K key, V value) {
	Segment<K,V> s;
	
	//验证value是否为空
	if (value == null)
	    throw new NullPointerException();
	
    //获取key的hash值	
	int hash = hash(key);
	
	//获取Segment数组的索引
	int j = (hash >>> segmentShift) & segmentMask;
	
	//定位到Segment数组的位置
	if ((s = (Segment<K,V>)UNSAFE.getObject (segments, (j << SSHIFT) + SBASE)) == null)
	s = ensureSegment(j);
	
	//调用Segment中的put进行值的设置
	return s.put(key, hash, value, false);
}

从上面的源码我们可以发现:当确定好了具体是哪一个Segment之后,我们就直接调用Segment的put函数来进行value的存储了。

那么这个存储的过程到底是怎样进行一个加锁的呢?我们继续来看一下Segment中put函数的实现。

    final V put(K key, int hash, V value, boolean onlyIfAbsent) {
	
        //如果加锁失败,则调用scanAndLockForPut方法
        HashEntry<K,V> node = tryLock() ? null :
            scanAndLockForPut(key, hash, value); 
        V oldValue;
        try {
            HashEntry<K,V>[] tab = table;
			
            //同hashMap相同的哈希定位方式
            int index = (tab.length - 1) & hash; 
            HashEntry<K,V> first = entryAt(tab, index);
            for (HashEntry<K,V> e = first;;) {
			
                //若不为null,则持续查找,直到找到key和hash值相同的节点,将其value更新
                if (e != null) { 
                    K k;
                    if ((k = e.key) == key ||
                        (e.hash == hash && key.equals(k))) {
                        oldValue = e.value;
                        if (!onlyIfAbsent) {
                            e.value = value;
                            ++modCount;
                        }
                        break;
                    }
                    e = e.next;
                }
				
                //若头结点为null
                else { 
				
                    //在遍历key对应节点链时没有找到相应的节点
                    if (node != null) 
                        node.setNext(first);
					
                    //当前修改并不需要让其他线程知道,在锁退出时修改自然会
                    //更新到内存中,可提升性能
                    else
                        node = new HashEntry<K,V>(hash, key, value, first);
                    int c = count + 1;
					
                    //如果超过阈值,则进行rehash操作
                    if (c > threshold && tab.length < MAXIMUM_CAPACITY)
                        rehash(node);
                    else
                        setEntryAt(tab, index, node);
                    ++modCount;
                    count = c;
                    oldValue = null;
                    break;
                }
            }
        } finally {
            unlock();
        }
        return oldValue;
    }

可以看见这个方法在进入的时候就要进行一次获取锁的操作,如果没有获取到锁,我们可以猜想就要进行再次尝试或者进行自旋等待,这个再次获取锁或者自旋等待的操作,多半就是在scanAndLockForPut函数中执行了。

我们来看一下scanAndLockForPut函数里面做了什么。

    private HashEntry<K,V> scanAndLockForPut(K key, int hash, V value) {
        
		//根据hash值找到segment中的HashEntry节点
        HashEntry<K,V> first = entryForHash(this, hash); //首先获取头结点
        HashEntry<K,V> e = first;
        HashEntry<K,V> node = null;
        int retries = -1; // negative while locating node
        while (!tryLock()) {  //持续遍历该哈希链
            HashEntry<K,V> f; // to recheck first below
            if (retries < 0) {
                if (e == null) {
					
					//若不存在要插入的节点,则创建一个新的节点
                    if (node == null) 
                        node = new HashEntry<K,V>(hash, key, value, null);
                    retries = 0;
                }
                else if (key.equals(e.key))
                    retries = 0;
                else
                    e = e.next;
            }
            else if (++retries > MAX_SCAN_RETRIES) { 
            //尝试次数超出限制,则进行自旋等待
                lock();
                break;
            }
            /*当在自旋过程中发现节点链的链头发生了变化,则更新节点链的链头,
            并重置retries值为-1,重新为尝试获取锁而自旋遍历*/
            else if ((retries & 1) == 0 &&
                     (f = entryForHash(this, hash)) != first) {
                e = first = f; // re-traverse if entry changed
                retries = -1;
            }
        }
        return node;
    }

scanAndLockForPut方法,该操作持续查找key对应的节点链中是否已存在该节点,如 果没有找到已存在的节点,则预创建一个新节点,并且尝试n次,直到尝试次数超出限 制,才真正进入等待状态,即所谓的自旋等待。

上面就是put函数的分析,接下来我们看一看get函数又是咋个实现的。

6.2 get函数

这里我们直接上源码:

public V get(Object key) {
	 Segment<K,V> s;
	 HashEntry<K,V>[] tab;
	 int h = hash(key);
	 
	 //定位到具体的Segment
	 long u = (((h >>> segmentShift) & segmentMask) << SSHIFT) + SBASE;
	 
	 //获取Segment即对应的HashEntry数组
	 if ((s = (Segment<K,V>)UNSAFE.getObjectVolatile(segments, u)) != null &&
		 (tab = s.table) != null) {
			 
		//循环获取HashEntry,并判断key和hash值是否相等	 
		 for (HashEntry<K,V> e = (HashEntry<K,V>) UNSAFE.getObjectVolatile
				  (tab, ((long)(((tab.length - 1) & h)) << TSHIFT) + TBASE);
			 e != null; e = e.next) {
			K k;
			if ((k = e.key) == key || (e.hash == h && key.equals(k)))
				return e.value;
		}
	}
	return null;
}

get函数的实现比较简单,只需要将 Key 通过 Hash 之后定位到具体的 Segment ,再通过一次 Hash 定位到具体的元素上。

由于 HashEntry 中的 value 属性是用 volatile 关键词修饰的,保证了内存可见性,所以每次获取时都是最新值。

ConcurrentHashMap 的 get 方法是非常高效的,因为整个过程都不需要加锁。

7 总结

上面我们分析了ConcurrentHashMap(JDK1.7)的实现,在文章开头我们也提到了想要实现线程安全的Map的三种方式,现在我们就来总结一下这三种方式的区别。

(1)使用HashTable代替HashMap;

HashTable的内部其实是使用synchronized关键字来对HashTable对象进行加锁来实现线程安全的,所以无论是get操作还是put操作都需要获取到锁,这样依赖锁竞争比较严重,效率较低。

(2)使用Collections.synchronizeMap(hashMap);

这种方法内部其实也是试用synchronized关键字来对mutex对象进行加锁的操作,其实和第一种方法的效率相同。

(3)使用ConcurrentHashMap;

这种方法对HashEntry数组进行分段加锁,使进入不同的HashEntry数组需要获取的锁不同,从而产生锁竞争的概率大大减小。另外,get操作也不需要获取锁,所以这种方法失效率最高的。

 

    原文作者:JUC
    原文地址: https://blog.csdn.net/ONROAD0612/article/details/82754051
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞