JUC-Map的选择

引言

       大家都知道基于map集合,线程安全用Hashtable,不安全用HashMap,基于这个问题下的细节又是怎样的呢?

HashMap

多线程用的下问题

1、多线程put操作后,get操作导致死循环。
2、多线程put非NULL元素后,get操作得到NULL值。
3、多线程put操作,导致元素丢失。

 

本次主要关注[HashMap]-死循环问题。

 

为何出现死循环?

大家都知道,HashMap采用链表解决Hash冲突,具体的HashMap的分析可以参考一下HashMap源码 。因为是链表结构,那么就很容易形成闭合的链路,这样在循环的时候只要有线程对这个HashMap进行get操作就会产生死循环。但是,我好奇的是,这种闭合的链路是如何形成的呢。在单线程情况下,只有一个线程对HashMap的数据结构进行操作,是不可能产生闭合的回路的。那就只有在多线程并发的情况下才会出现这种情况,那就是在put操作的时候,如果size>initialCapacity*loadFactor,那么这时候HashMap就会进行rehash操作,随之HashMap的结构就会发生翻天覆地的变化。很有可能就是在两个线程在这个时候同时触发了rehash操作,产生了闭合的回路。

下面我们从源码中一步一步地分析这种回路是如何产生的。先看一下put操作:

 

 

存储数据put

public V put(K key, V value)
{
    ......
    //算Hash值
    int hash = hash(key.hashCode());
    int i = indexFor(hash, table.length);
    //如果该key已被插入,则替换掉旧的value (链接操作)
    for (Entry<K,V> e = table[i]; e != null; e = e.next) {
        Object k;
        if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
            V oldValue = e.value;
            e.value = value;
            e.recordAccess(this);
            return oldValue;
        }
    }
    modCount++;
    //该key不存在,需要增加一个结点
    addEntry(hash, key, value, i);
    return null;
}

当我们往HashMap中put元素的时候,先根据key的hash值得到这个元素在数组中的位置(即下标),然后就可以把这个元素放到对应的位置中了。 如果这个元素所在的位置上已经存放有其他元素了,那么在同一个位子上的元素将以链表的形式存放,新加入的放在链头,而先前加入的放在链尾。

 

 

检查容量是否超标addEntry



void addEntry(int hash, K key, V value, int bucketIndex)
{
    Entry<K,V> e = table[bucketIndex];
    table[bucketIndex] = new Entry<K,V>(hash, key, value, e);
    //查看当前的size是否超过了我们设定的阈值threshold,如果超过,需要resize
    if (size++ >= threshold)
        resize(2 * table.length);
}

可以看到,如果现在size已经超过了threshold,那么就要进行resize操作,新建一个更大尺寸的hash表,然后把数据从老的Hash表中迁移到新的Hash表中:

 

 

调整Hash表大小resize



void resize(int newCapacity)
{
    Entry[] oldTable = table;
    int oldCapacity = oldTable.length;
    ......
    //创建一个新的Hash Table
    Entry[] newTable = new Entry[newCapacity];
    //将Old Hash Table上的数据迁移到New Hash Table上
    transfer(newTable);
    table = newTable;
    threshold = (int)(newCapacity * loadFactor);
}

当table[]数组容量较小,容易产生哈希碰撞,所以,Hash表的尺寸和容量非常的重要。一般来说,Hash表这个容器当有数据要插入时,都会检查容量有没有超过设定的thredhold,如果超过,需要增大Hash表的尺寸,这个过程称为resize。

多个线程同时往HashMap添加新元素时,多次resize会有一定概率出现死循环,因为每次resize需要把旧的数据映射到新的哈希表,这一部分代码在HashMap#transfer() 方法,如下:



void transfer(Entry[] newTable)
{
    Entry[] src = table;
    int newCapacity = newTable.length;
    //下面这段代码的意思是:
    //  从OldTable里摘一个元素出来,然后放到NewTable中
    for (int j = 0; j < src.length; j++) {
        Entry<K,V> e = src[j];
        if (e != null) {
            src[j] = null;
            do { Entry<K,V> next = e.next; int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } while (e != null);
        }
    }
}

红色部分代码是导致多线程使用hashmap出现CUP使用率骤增,从而多个线程阻塞的罪魁祸首。


Hashtable

      看API它的大部分方法都用synchronized在HashMap的上层封装一层。这所有对Map的操作都线程安全了。

      但是这样就完了吗?大量操作用Hashtable代替HashMap的话,性能会有很大差距。

     那么如何保证线程安全,又不损耗太大的性能呢?

ConcurrentHashMap

     如是在java的并发开发API包下出来了一个接口:ConcurrentMap,它的实现类:ConcurrentHashMap。

   介绍

      在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非 常低下,基于以上两个原因,便有了ConcurrentHashMap

      多线程会导致HashMap的Entry链表 形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获 取Entry。

      HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable 的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同 步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方 法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。

       HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的 线程都必须竞争同一把锁。

       ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存 储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数 据也能被其他线程访问。

       ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重 入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数 据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种 数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元 素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时, 必须首先获得与它对应的Segment锁。

       看起来已经完美解决的问题,真的如此么?

    使用案例

       ConcurrentHashMap通常只被看做并发效率更高的Map,用来替换其他线程安全的Map容器,比如Hashtable和Collections.synchronizedMap。实际上,线程安全的容器,特别是Map,应用场景没有想象中的多,很多情况下一个业务会涉及容器的多个操作,即复合操作,并发执行时,线程安全的容器只能保证自身的数据不被破坏,但无法保证业务的行为是否正确。

举个例子:统计文本中单词出现的次数,把单词出现的次数记录到一个Map中,代码如下:

private final Map<String, Long> wordCounts = new ConcurrentHashMap<>();

public long increase(String word) {
    Long oldValue = wordCounts.get(word);
    Long newValue = (oldValue == null) ? 1L : oldValue + 1;
    wordCounts.put(word, newValue);
    return newValue;
}

如果多个线程并发调用这个increase()方法,increase()的实现就是错误的,因为多个线程用相同的word调用时,很可能会覆盖相互的结果,造成记录的次数比实际出现的次数少。

除了用锁解决这个问题,另外一个选择是使用ConcurrentMap接口定义的方法:

public interface ConcurrentMap<K, V> extends Map<K, V> {
    V putIfAbsent(K key, V value);
    boolean remove(Object key, Object value);
    boolean replace(K key, V oldValue, V newValue);
    V replace(K key, V value);
}

这是个被很多人忽略的接口,也经常见有人错误地使用这个接口。ConcurrentMap接口定义了几个基于 CAS(Compare and Set)操作,很简单,但非常有用,下面的代码用ConcurrentMap解决上面问题:

private final ConcurrentMap<String, Long> wordCounts = new ConcurrentHashMap<>();

public long increase(String word) {
    Long oldValue, newValue;
    while (true) {
        oldValue = wordCounts.get(word);
        if (oldValue == null) {
            // Add the word firstly, initial the value as 1
            newValue = 1L;
            if (wordCounts.putIfAbsent(word, newValue) == null) {
                break;
            }
        } else {
            newValue = oldValue + 1;
            if (wordCounts.replace(word, oldValue, newValue)) {
                break;
            }
        }
    }
    return newValue;
}

代码有点复杂,主要因为ConcurrentMap中不能保存value为null的值,所以得同时处理word不存在和已存在两种情况。

上面的实现每次调用都会涉及Long对象的拆箱和装箱操作,很明显,更好的实现方式是采用AtomicLong,下面是采用AtomicLong后的代码:

private final ConcurrentMap<String, AtomicLong> wordCounts = new ConcurrentHashMap<>();

public long increase(String word) {
    AtomicLong number = wordCounts.get(word);
    if (number == null) {
        AtomicLong newNumber = new AtomicLong(0);
        number = wordCounts.putIfAbsent(word, newNumber);
        if (number == null) {
            number = newNumber;
        }
    }
    return number.incrementAndGet();
}

这个实现仍然有一处需要说明的地方,如果多个线程同时增加一个目前还不存在的词,那么很可能会产生多个newNumber对象,但最终只有一个newNumber有用,其他的都会被扔掉。对于这个应用,这不算问题,创建AtomicLong的成本不高,而且只在添加不存在词是出现。但换个场景,比如缓存,那么这很可能就是问题了,因为缓存中的对象获取成本一般都比较高,而且通常缓存都会经常失效,那么避免重复创建对象就有价值了。下面的代码演示了怎么处理这种情况:

private final ConcurrentMap<String, Future<ExpensiveObj>> cache = new ConcurrentHashMap<>();

public ExpensiveObj get(final String key) {
    Future<ExpensiveObj> future = cache.get(key);
    if (future == null) {
        Callable<ExpensiveObj> callable = new Callable<ExpensiveObj>() {
            @Override
            public ExpensiveObj call() throws Exception {
                return new ExpensiveObj(key);
            }
        };
        FutureTask<ExpensiveObj> task = new FutureTask<>(callable);

        future = cache.putIfAbsent(key, task);
        if (future == null) {
            future = task;
            task.run();
        }
    }

    try {
        return future.get();
    } catch (Exception e) {
        cache.remove(key);
        throw new RuntimeException(e);
    }
}

解决方法其实就是用一个Proxy对象来包装真正的对象,跟常见的lazy load原理类似;使用FutureTask主要是为了保证同步,避免一个Proxy创建多个对象。注意,上面代码里的异常处理是不准确的。

 再补充一下,如果真要实现前面说的统计单词次数功能,最合适的方法是Guava包中AtomicLongMap。

MapMaker

       因为不完美,就会有大神去补充它,如是在谷歌的第三方jar-Guava中的有了MapMaker实现(或cache实现,不在此补充)。

    使用案例

       以下摘自网上来源:

       Google Collections中的MapMaker融合了 Weak Reference 线程安全高并发性能异步超时清理自定义构建元素 等强大功能于一身。

常阅读优秀源代码的童鞋都知道,一般叫Maker的对象都是Builder模式,而这个MapMaker就是来” Build “Map的.

一、google collection工具包的MapMaker使用:

public static void main(String[] args) {
        /**
         * expiration(3, TimeUnit.SECONDS)设置超时时间为3秒
         */
        ConcurrentMap<String , String> map = new MapMaker().concurrencyLevel(32).softKeys().weakValues()
                .expiration(3, TimeUnit.SECONDS).makeComputingMap(
                        /**
                         * 提供当Map里面不包含所get的项,可以自动加入到Map的功能
                         * 可以将这里的返回值放到对应的key的value中
                         */
                        new Function<String, String>() {
                            public String apply(String s) {
                                return "creating " + s + " -> Object";
                            }
                        }
                );

        map.put("a","testa");
        map.put("b","testb");

        System.out.println(map.get("a"));
        System.out.println(map.get("b"));
        System.out.println(map.get("c"));

        try {
            // 4秒后,大于超时时间,缓存失效。
            Thread.sleep(4000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        System.out.println(map.get("a"));
        System.out.println(map.get("b"));
        System.out.println(map.get("c"));
    }

结果如下:

testa
testb
creating c -> Object
creating a -> Object
creating b -> Object
creating c -> Object

二、先看下其api的相关demo片段:

// 使用案例:存储验证码
    // <String, String> == <用户唯一,验证码>
    // expiration(15, TimeUnit.MINUTES) 有效期15分钟
    ConcurrentMap<String,String> capthcaMap = new MapMaker().expiration(15, TimeUnit.MINUTES).makeMap();

    // 设置ConcurrentMap的concurrencyLevel参数 ,例如ConcurrentHashMap是用来控制其Segment数组的大小
    ConcurrentMap<String,Object> map1 = new MapMaker().concurrencyLevel(8).makeMap();

    // 构造各种不同reference作为key和value的map
    ConcurrentMap<String,Object> map2 = new MapMaker().softKeys().weakValues().makeMap();

    // 提供当Map里面不包含所get的项,可以自动加入到Map的功能
    ConcurrentMap<String,Integer> map3 = new MapMaker()
            .makeComputingMap(
                    new Function<String, Integer>() {
                        public Integer apply(String key) {
                            return 1;
                        }
                    }
            );

可以看出过了4秒后,缓存失效,所以呈现如此结果。






    原文作者:JUC
    原文地址: https://blog.csdn.net/qq342643414/article/details/60334483
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞