引言
大家都知道基于map集合,线程安全用Hashtable,不安全用HashMap,基于这个问题下的细节又是怎样的呢?
HashMap
多线程用的下问题
1、多线程put操作后,get操作导致死循环。
2、多线程put非NULL元素后,get操作得到NULL值。
3、多线程put操作,导致元素丢失。
本次主要关注[HashMap]-死循环问题。
为何出现死循环?
大家都知道,HashMap采用链表解决Hash冲突,具体的HashMap的分析可以参考一下HashMap源码 。因为是链表结构,那么就很容易形成闭合的链路,这样在循环的时候只要有线程对这个HashMap进行get操作就会产生死循环。但是,我好奇的是,这种闭合的链路是如何形成的呢。在单线程情况下,只有一个线程对HashMap的数据结构进行操作,是不可能产生闭合的回路的。那就只有在多线程并发的情况下才会出现这种情况,那就是在put操作的时候,如果size>initialCapacity*loadFactor,那么这时候HashMap就会进行rehash操作,随之HashMap的结构就会发生翻天覆地的变化。很有可能就是在两个线程在这个时候同时触发了rehash操作,产生了闭合的回路。
下面我们从源码中一步一步地分析这种回路是如何产生的。先看一下put操作:
存储数据put
public V put(K key, V value)
{
......
//算Hash值
int hash = hash(key.hashCode());
int i = indexFor(hash, table.length);
//如果该key已被插入,则替换掉旧的value (链接操作)
for (Entry<K,V> e = table[i]; e != null; e = e.next) {
Object k;
if (e.hash == hash && ((k = e.key) == key || key.equals(k))) {
V oldValue = e.value;
e.value = value;
e.recordAccess(this);
return oldValue;
}
}
modCount++;
//该key不存在,需要增加一个结点
addEntry(hash, key, value, i);
return null;
}
当我们往HashMap中put元素的时候,先根据key的hash值得到这个元素在数组中的位置(即下标),然后就可以把这个元素放到对应的位置中了。 如果这个元素所在的位置上已经存放有其他元素了,那么在同一个位子上的元素将以链表的形式存放,新加入的放在链头,而先前加入的放在链尾。
检查容量是否超标addEntry
void addEntry(int hash, K key, V value, int bucketIndex)
{
Entry<K,V> e = table[bucketIndex];
table[bucketIndex] = new Entry<K,V>(hash, key, value, e);
//查看当前的size是否超过了我们设定的阈值threshold,如果超过,需要resize
if (size++ >= threshold)
resize(2 * table.length);
}
可以看到,如果现在size已经超过了threshold,那么就要进行resize操作,新建一个更大尺寸的hash表,然后把数据从老的Hash表中迁移到新的Hash表中:
调整Hash表大小resize
void resize(int newCapacity)
{
Entry[] oldTable = table;
int oldCapacity = oldTable.length;
......
//创建一个新的Hash Table
Entry[] newTable = new Entry[newCapacity];
//将Old Hash Table上的数据迁移到New Hash Table上
transfer(newTable);
table = newTable;
threshold = (int)(newCapacity * loadFactor);
}
当table[]数组容量较小,容易产生哈希碰撞,所以,Hash表的尺寸和容量非常的重要。一般来说,Hash表这个容器当有数据要插入时,都会检查容量有没有超过设定的thredhold,如果超过,需要增大Hash表的尺寸,这个过程称为resize。
多个线程同时往HashMap添加新元素时,多次resize会有一定概率出现死循环,因为每次resize需要把旧的数据映射到新的哈希表,这一部分代码在HashMap#transfer() 方法,如下:
void transfer(Entry[] newTable)
{
Entry[] src = table;
int newCapacity = newTable.length;
//下面这段代码的意思是:
// 从OldTable里摘一个元素出来,然后放到NewTable中
for (int j = 0; j < src.length; j++) {
Entry<K,V> e = src[j];
if (e != null) {
src[j] = null;
do { Entry<K,V> next = e.next; int i = indexFor(e.hash, newCapacity); e.next = newTable[i]; newTable[i] = e; e = next; } while (e != null);
}
}
}
红色部分代码是导致多线程使用hashmap出现CUP使用率骤增,从而多个线程阻塞的罪魁祸首。
Hashtable
看API它的大部分方法都用synchronized在HashMap的上层封装一层。这所有对Map的操作都线程安全了。
但是这样就完了吗?大量操作用Hashtable代替HashMap的话,性能会有很大差距。
那么如何保证线程安全,又不损耗太大的性能呢?
ConcurrentHashMap
如是在java的并发开发API包下出来了一个接口:ConcurrentMap,它的实现类:ConcurrentHashMap。
介绍
在并发编程中使用HashMap可能导致程序死循环。而使用线程安全的HashTable效率又非 常低下,基于以上两个原因,便有了ConcurrentHashMap
多线程会导致HashMap的Entry链表 形成环形数据结构,一旦形成环形数据结构,Entry的next节点永远不为空,就会产生死循环获 取Entry。
HashTable容器使用synchronized来保证线程安全,但在线程竞争激烈的情况下HashTable 的效率非常低下。因为当一个线程访问HashTable的同步方法,其他线程也访问HashTable的同 步方法时,会进入阻塞或轮询状态。如线程1使用put进行元素添加,线程2不但不能使用put方 法添加元素,也不能使用get方法来获取元素,所以竞争越激烈效率越低。
HashTable容器在竞争激烈的并发环境下表现出效率低下的原因是所有访问HashTable的 线程都必须竞争同一把锁。
ConcurrentHashMap所使用的锁分段技术。首先将数据分成一段一段地存 储,然后给每一段数据配一把锁,当一个线程占用锁访问其中一个段数据的时候,其他段的数 据也能被其他线程访问。
ConcurrentHashMap是由Segment数组结构和HashEntry数组结构组成。Segment是一种可重 入锁(ReentrantLock),在ConcurrentHashMap里扮演锁的角色;HashEntry则用于存储键值对数 据。一个ConcurrentHashMap里包含一个Segment数组。Segment的结构和HashMap类似,是一种 数组和链表结构。一个Segment里包含一个HashEntry数组,每个HashEntry是一个链表结构的元 素,每个Segment守护着一个HashEntry数组里的元素,当对HashEntry数组的数据进行修改时, 必须首先获得与它对应的Segment锁。
看起来已经完美解决的问题,真的如此么?
使用案例
ConcurrentHashMap通常只被看做并发效率更高的Map,用来替换其他线程安全的Map容器,比如Hashtable和Collections.synchronizedMap。实际上,线程安全的容器,特别是Map,应用场景没有想象中的多,很多情况下一个业务会涉及容器的多个操作,即复合操作,并发执行时,线程安全的容器只能保证自身的数据不被破坏,但无法保证业务的行为是否正确。
举个例子:统计文本中单词出现的次数,把单词出现的次数记录到一个Map中,代码如下:
private final Map<String, Long> wordCounts = new ConcurrentHashMap<>();
public long increase(String word) {
Long oldValue = wordCounts.get(word);
Long newValue = (oldValue == null) ? 1L : oldValue + 1;
wordCounts.put(word, newValue);
return newValue;
}
如果多个线程并发调用这个increase()方法,increase()的实现就是错误的,因为多个线程用相同的word调用时,很可能会覆盖相互的结果,造成记录的次数比实际出现的次数少。
除了用锁解决这个问题,另外一个选择是使用ConcurrentMap接口定义的方法:
public interface ConcurrentMap<K, V> extends Map<K, V> {
V putIfAbsent(K key, V value);
boolean remove(Object key, Object value);
boolean replace(K key, V oldValue, V newValue);
V replace(K key, V value);
}
这是个被很多人忽略的接口,也经常见有人错误地使用这个接口。ConcurrentMap接口定义了几个基于 CAS(Compare and Set)操作,很简单,但非常有用,下面的代码用ConcurrentMap解决上面问题:
private final ConcurrentMap<String, Long> wordCounts = new ConcurrentHashMap<>();
public long increase(String word) {
Long oldValue, newValue;
while (true) {
oldValue = wordCounts.get(word);
if (oldValue == null) {
// Add the word firstly, initial the value as 1
newValue = 1L;
if (wordCounts.putIfAbsent(word, newValue) == null) {
break;
}
} else {
newValue = oldValue + 1;
if (wordCounts.replace(word, oldValue, newValue)) {
break;
}
}
}
return newValue;
}
代码有点复杂,主要因为ConcurrentMap中不能保存value为null的值,所以得同时处理word不存在和已存在两种情况。
上面的实现每次调用都会涉及Long对象的拆箱和装箱操作,很明显,更好的实现方式是采用AtomicLong,下面是采用AtomicLong后的代码:
private final ConcurrentMap<String, AtomicLong> wordCounts = new ConcurrentHashMap<>();
public long increase(String word) {
AtomicLong number = wordCounts.get(word);
if (number == null) {
AtomicLong newNumber = new AtomicLong(0);
number = wordCounts.putIfAbsent(word, newNumber);
if (number == null) {
number = newNumber;
}
}
return number.incrementAndGet();
}
这个实现仍然有一处需要说明的地方,如果多个线程同时增加一个目前还不存在的词,那么很可能会产生多个newNumber对象,但最终只有一个newNumber有用,其他的都会被扔掉。对于这个应用,这不算问题,创建AtomicLong的成本不高,而且只在添加不存在词是出现。但换个场景,比如缓存,那么这很可能就是问题了,因为缓存中的对象获取成本一般都比较高,而且通常缓存都会经常失效,那么避免重复创建对象就有价值了。下面的代码演示了怎么处理这种情况:
private final ConcurrentMap<String, Future<ExpensiveObj>> cache = new ConcurrentHashMap<>();
public ExpensiveObj get(final String key) {
Future<ExpensiveObj> future = cache.get(key);
if (future == null) {
Callable<ExpensiveObj> callable = new Callable<ExpensiveObj>() {
@Override
public ExpensiveObj call() throws Exception {
return new ExpensiveObj(key);
}
};
FutureTask<ExpensiveObj> task = new FutureTask<>(callable);
future = cache.putIfAbsent(key, task);
if (future == null) {
future = task;
task.run();
}
}
try {
return future.get();
} catch (Exception e) {
cache.remove(key);
throw new RuntimeException(e);
}
}
解决方法其实就是用一个Proxy对象来包装真正的对象,跟常见的lazy load原理类似;使用FutureTask主要是为了保证同步,避免一个Proxy创建多个对象。注意,上面代码里的异常处理是不准确的。
再补充一下,如果真要实现前面说的统计单词次数功能,最合适的方法是Guava包中AtomicLongMap。
MapMaker
因为不完美,就会有大神去补充它,如是在谷歌的第三方jar-Guava中的有了MapMaker实现(或cache实现,不在此补充)。
使用案例
以下摘自网上来源:
Google Collections中的MapMaker融合了 Weak Reference , 线程安全 , 高并发性能 , 异步超时清理 , 自定义构建元素 等强大功能于一身。
常阅读优秀源代码的童鞋都知道,一般叫Maker的对象都是Builder模式,而这个MapMaker就是来” Build “Map的.
一、google collection工具包的MapMaker使用:
public static void main(String[] args) {
/**
* expiration(3, TimeUnit.SECONDS)设置超时时间为3秒
*/
ConcurrentMap<String , String> map = new MapMaker().concurrencyLevel(32).softKeys().weakValues()
.expiration(3, TimeUnit.SECONDS).makeComputingMap(
/**
* 提供当Map里面不包含所get的项,可以自动加入到Map的功能
* 可以将这里的返回值放到对应的key的value中
*/
new Function<String, String>() {
public String apply(String s) {
return "creating " + s + " -> Object";
}
}
);
map.put("a","testa");
map.put("b","testb");
System.out.println(map.get("a"));
System.out.println(map.get("b"));
System.out.println(map.get("c"));
try {
// 4秒后,大于超时时间,缓存失效。
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(map.get("a"));
System.out.println(map.get("b"));
System.out.println(map.get("c"));
}
结果如下:
testa
testb
creating c -> Object
creating a -> Object
creating b -> Object
creating c -> Object
二、先看下其api的相关demo片段:
// 使用案例:存储验证码
// <String, String> == <用户唯一,验证码>
// expiration(15, TimeUnit.MINUTES) 有效期15分钟
ConcurrentMap<String,String> capthcaMap = new MapMaker().expiration(15, TimeUnit.MINUTES).makeMap();
// 设置ConcurrentMap的concurrencyLevel参数 ,例如ConcurrentHashMap是用来控制其Segment数组的大小
ConcurrentMap<String,Object> map1 = new MapMaker().concurrencyLevel(8).makeMap();
// 构造各种不同reference作为key和value的map
ConcurrentMap<String,Object> map2 = new MapMaker().softKeys().weakValues().makeMap();
// 提供当Map里面不包含所get的项,可以自动加入到Map的功能
ConcurrentMap<String,Integer> map3 = new MapMaker()
.makeComputingMap(
new Function<String, Integer>() {
public Integer apply(String key) {
return 1;
}
}
);
可以看出过了4秒后,缓存失效,所以呈现如此结果。