ConcurrentHashMap源码分析(基于JDK1.8)


前言 面试中常常问到Hashtable、HashMap和ConcurrentHashMap的区别。大家都知道HashMap是线程不安全的,
Hashtable和
ConcurrentHashMap是线程安全的。
Hashtable保证线程安全的方法,基本都是在操作集合的方法上加synchronized关键字,我们有必要知道
ConcurrentHashMap底层实现和如何保证线程安全性。
ConcurrentHashMap
引入了分段锁的概念,对于不同Bucket的操作不需要全局锁来保证线程安全。
建议学习
ConcurrentHashMap之前,先学习Hashtable和
HashMap,并且要有一些并发的基础。
在看源码的时候一定要注意,本文讲的是JDK1.8的源码,ConcurrentHashMap在JDK1.8相比之前的版本有很大变化,1.8版本的源码达到6000+行,而1.6的源码只有一千多行。

在JDK1.6的实现中,使用的是一个segments数组来存储。

  1. final Segment<K,V>[] segments;

JDK1.8中是使用Node数组来存储。

  1. transient volatile Node<K,V>[] table;

JDK1.8中,Segment类只有在序列化和反序列化时才会被用到。






ConcurrentHashMap数据结构

其底层数据结构实现如下。
《ConcurrentHashMap源码分析(基于JDK1.8)》


可以看到,桶中的结构可能是链表,也可能是红黑树,红黑树是为了提高查找效率。

ConcurrentHashMap的继承关系

  1. public class ConcurrentHashMap<K,V> extends AbstractMap<K,V>
  2. implements ConcurrentMap<K,V>, Serializable

其内部框架图如下。
《ConcurrentHashMap源码分析(基于JDK1.8)》

ConcurrentHashMap为什么高效? Hashtable低效主要是因为所有访问Hashtable的线程都争夺一把锁。如果容器有很多把锁,每一把锁控制容器中的一部分数据,那么当多个线程访问容器里的不同部分的数据时,线程之前就不会存在锁的竞争,这样就可以有效的提高并发的访问效率。 这也正是ConcurrentHashMap使用的分段锁技术。将ConcurrentHashMap容器的数据分段存储,每一段数据分配一个Segment(锁),当线程占用其中一个Segment时,其他线程可正常访问其他段数据。

  1. static class Segment<K,V> extends ReentrantLock implements Serializable {
  2. private static final long serialVersionUID = 2249069246763182397L;
  3. final float loadFactor;
  4. Segment(float lf) { this.loadFactor = lf; }
  5. }

ConcurrentHashMap的用到的重要内部类



Node类
该类是此集合最重要的一个内部类。只要是插入ConcurrentHashMap的结点,都会被包装成Node

  1. static class Node<K,V> implements Map.Entry<K,V> {
  2. final int hash;
  3. final K key;
  4. volatile V val;
  5. volatile Node<K,V> next;
  6. Node(int hash, K key, V val, Node<K,V> next) {
  7. this.hash = hash;
  8. this.key = key;
  9. this.val = val;
  10. this.next = next;
  11. }
  12. public final K getKey() { return key; }
  13. public final V getValue() { return val; }
  14. public final int hashCode() { return key.hashCode() ^ val.hashCode(); }
  15. public final String toString(){ return key + "=" + val; }
  16. public final V setValue(V value) {
  17. throw new UnsupportedOperationException(); //不允许被修改val
  18. }
  19. public final boolean equals(Object o) {
  20. Object k, v, u; Map.Entry<?,?> e;
  21. return ((o instanceof Map.Entry) &&
  22. (k = (e = (Map.Entry<?,?>)o).getKey()) != null &&
  23. (v = e.getValue()) != null &&
  24. (k == key || k.equals(key)) &&
  25. (v == (u = val) || v.equals(u)));
  26. }
  27. /**
  28. * 支持map的get方法,通过hashcode和Key来获取一个node结点
  29. */
  30. Node<K,V> find(int h, Object k) {
  31. Node<K,V> e = this;
  32. if (k != null) {
  33. do {
  34. K ek;
  35. if (e.hash == h &&
  36. ((ek = e.key) == k || (ek != null && k.equals(ek))))
  37. return e;
  38. } while ((e = e.next) != null);
  39. }
  40. return null;
  41. }
  42. }


TreeNode类

  1. static final class TreeNode<K,V> extends Node<K,V> {
  2. TreeNode<K,V> parent; // red-black tree links
  3. TreeNode<K,V> left;
  4. TreeNode<K,V> right;
  5. TreeNode<K,V> prev; // needed to unlink next upon deletion
  6. boolean red;
  7. TreeNode(int hash, K key, V val, Node<K,V> next,
  8. TreeNode<K,V> parent) {
  9. super(hash, key, val, next);
  10. this.parent = parent;
  11. }
  12. Node<K,V> find(int h, Object k) {
  13. return findTreeNode(h, k, null);
  14. }
  15. /**
  16. * Returns the TreeNode (or null if not found) for the given key
  17. * starting at given root.
  18. */
  19. final TreeNode<K,V> findTreeNode(int h, Object k, Class<?> kc) {
  20. if (k != null) {
  21. TreeNode<K,V> p = this;
  22. do {
  23. int ph, dir; K pk; TreeNode<K,V> q;
  24. TreeNode<K,V> pl = p.left, pr = p.right;
  25. if ((ph = p.hash) > h)
  26. p = pl;
  27. else if (ph < h)
  28. p = pr;
  29. else if ((pk = p.key) == k || (pk != null && k.equals(pk)))
  30. return p;
  31. else if (pl == null)
  32. p = pr;
  33. else if (pr == null)
  34. p = pl;
  35. else if ((kc != null ||
  36. (kc = comparableClassFor(k)) != null) &&
  37. (dir = compareComparables(kc, k, pk)) != 0)
  38. p = (dir < 0) ? pl : pr;
  39. else if ((q = pr.findTreeNode(h, k, kc)) != null)
  40. return q;
  41. else
  42. p = pl;
  43. } while (p != null);
  44. }
  45. return null;
  46. }
  47. }

如果链表的数据过长是会转换为红黑树来处理。当它并不是直接转换,而是将这些链表的节点包装成TreeNode放在TreeBin对象中,然后由TreeBin完成红黑树的转换。TreeBin类的代码很长,其实
TreeBin的构造方法就是一个建立红黑树的过程。

ConcurrentHashMap的初始化
ConcurrentHashMap的table初始化是在putVal

时发生的。
putVal时,如果发现table没有初始化,那么会调用
initTable来初始化table,注意
initTable在扩容时也会被执行

  1. private final Node<K,V>[] initTable() {
  2. Node<K,V>[] tab; int sc;
  3. while ((tab = table) == null || tab.length == 0) {
  4. if ((sc = sizeCtl) < 0) //sizeCtl为负数时,说明有其他线程在初始化
  5. Thread.yield(); //让出时间片
  6. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //把SIZECTL置为-1,接下来本线程对table进行操作
  7. try {
  8. if ((tab = table) == null || tab.length == 0) {
  9. int n = (sc > 0) ? sc : DEFAULT_CAPACITY;
  10. @SuppressWarnings("unchecked")
  11. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n];
  12. table = tab = nt;
  13. sc = n - (n >>> 2); //下一次扩容的阀值,大小为0.75*n
  14. }
  15. } finally {
  16. sizeCtl = sc;
  17. }
  18. break;
  19. }
  20. }
  21. return tab;
  22. }

上面的代码中,sizeCtl是一个比较重要的变量。

  1. /**
  2. * Table initialization and resizing control. When negative, the
  3. * table is being initialized or resized: -1 for initialization,
  4. * else -(1 + the number of active resizing threads). Otherwise,
  5. * when table is null, holds the initial table size to use upon
  6. * creation, or 0 for default. After initialization, holds the
  7. * next element count value upon which to resize the table.
  8. */
  9. private transient volatile int sizeCtl;

sizeCtl是一个控制标识符,用来控制table初始化和扩容操作的,在不同的地方有不同的用途,其值也不同,所代表的含义也不同:

负数代表正在进行初始化或扩容操作,-1时表示在初始化中,-N表示
有N-1个线程正在进行扩容操作。 正数或0代表hash表还没有被初始化,这个数值表示初始化或下一次进行扩容的大小。

所以,如果某个线程想要初始化table或者对table扩容,需要去竞争sizeCtl这个共享变量,获得变量的线程才有许可去进行接下来的操作,没能获得的线程将会一直自旋来尝试获得这个共享变量,所以获得sizeCtl这个变量的线程在完成工作之后需要设置回来,使得其他的线程可以走出自旋进行接下来的操作。

initTable方法中我们可以看到,当线程发现sizeCtl小于0的时候,他就会让出CPU时间,而稍后再进行尝试,当发现sizeCtl不再小于0的时候,就会通过调用方法compareAndSwapInt来把sizeCtl共享变量变为-1,以告诉其他试图获得sizeCtl变量的线程,目前正在由本线程在享用该变量,在我完成我的任务之前你可以先休息一会,等会再来试试吧,我完成工作之后会释放掉的。在完成初始化table的任务之后,线程需要将sizeCtl设置成可以使得其他线程获得变量的状态。

这里需要解释一下这句代码的重要性。

  1. else if (U.compareAndSwapInt(this, SIZECTL, sc, -1)) { //把SIZECTL置为-1,接下来本线程对table进行操作

在并发编程的情况下,线程A和线程B都试图初始化table,线程A优先进行了初始化,但是此时初始化还没有完成,所以table为null,但其实已经在初始化当中了,而线程B发现table为null,所以他会继续竞争sizeCtl来尝试初始化table,如果没有上面的第二次检测的代码,线程B也会进行初始化。

ConcurrentHashMap查询记录方法详解,get方法 在ConcurrentHashMap中查询一条记录首先需要知道这条记录存储的table的位置(可以看成卡槽,每个卡槽中都会有一个链表或者一棵红黑树),该位置上可能为null,如果为null,说明想要查询的记录还不存在于ConcurrentHashMap中,否则,就在该位置上的链表或者红黑树中查找记录。

  1. public V get(Object key) {
  2. Node<K,V>[] tab; Node<K,V> e, p; int n, eh; K ek;
  3. int h = spread(key.hashCode()); //计算key对应的hashcode
  4. if ((tab = table) != null && (n = tab.length) > 0 &&
  5. (e = tabAt(tab, (n - 1) & h)) != null) { // 表不为空并且表的长度大于0并且key所在的桶不为空
  6. if ((eh = e.hash) == h) { //获取到的val去hash然后和前面的hashcode比较
  7. if ((ek = e.key) == key || (ek != null && key.equals(ek)))
  8. return e.val;
  9. }
  10. else if (eh < 0) //hash值小于0时,在此桶对应的红黑树查找
  11. return (p = e.find(h, key)) != null ? p.val : null;
  12. while ((e = e.next) != null) { //结点hash值大于0的情况,在链表中查找
  13. if (e.hash == h &&
  14. ((ek = e.key) == key || (ek != null && key.equals(ek))))
  15. return e.val;
  16. }
  17. }
  18. return null;
  19. }

get
操作可以总结如下: 计算hash值 判断table是否为空,如果为空,直接返回null 根据hash值获取table中的Node节点(tabAt(tab, (n – 1) & h)),然后根据链表或者树形方式找到相对应的节点,返回其value值。

ConcurrentHashMap的put方法

ConcurrentHashMap的put操作与HashMap并没有多大区别,其核心思想依然是根据hash值计算节点插入在table的位置,如果该位置为空,则直接插入,否则插入到链表或者树中。但是ConcurrentHashMap会涉及到多线程情况就会复杂很多。

  1. public V put(K key, V value) {
  2. return putVal(key, value, false);
  3. }
  4. /** Implementation for put and putIfAbsent */
  5. final V putVal(K key, V value, boolean onlyIfAbsent) {
  6. if (key == null || value == null) throw new NullPointerException();
  7. int hash = spread(key.hashCode()); //计算hash
  8. int binCount = 0;
  9. for (Node<K,V>[] tab = table;;) {
  10. Node<K,V> f; int n, i, fh;
  11. if (tab == null || (n = tab.length) == 0)
  12. tab = initTable(); //初始化table
  13. else if ((f = tabAt(tab, i = (n - 1) & hash)) == null) { //hash到的桶为null,直接保存结点
  14. if (casTabAt(tab, i, null,
  15. new Node<K,V>(hash, key, value, null)))
  16. break; // no lock when adding to empty bin
  17. }
  18. else if ((fh = f.hash) == MOVED)
  19. tab = helpTransfer(tab, f); // 有线程正在进行扩容操作,则先帮助扩容
  20. else {
  21. V oldVal = null;
  22. synchronized (f) {
  23. if (tabAt(tab, i) == f) {
  24. if (fh >= 0) { //fh >= 0 表示为链表,将该节点插入到链表尾部
  25. binCount = 1; //binCount表示每个桶里面的node数
  26. for (Node<K,V> e = f;; ++binCount) {
  27. K ek;
  28. if (e.hash == hash &&
  29. ((ek = e.key) == key ||
  30. (ek != null && key.equals(ek)))) {
  31. oldVal = e.val; //hash和key都一样,替换原来的val
  32. if (!onlyIfAbsent)
  33. e.val = value;
  34. break;
  35. }
  36. Node<K,V> pred = e;
  37. if ((e = e.next) == null) {
  38. pred.next = new Node<K,V>(hash, key,
  39. value, null); //把结点插入链表尾部
  40. break;
  41. }
  42. }
  43. }
  44. else if (f instanceof TreeBin) { //如果结点不是要插入链表,那就要插入红黑树
  45. Node<K,V> p;
  46. binCount = 2;
  47. if ((p = ((TreeBin<K,V>)f).putTreeVal(hash, key,
  48. value)) != null) {
  49. oldVal = p.val;
  50. if (!onlyIfAbsent)
  51. p.val = value;
  52. }
  53. }
  54. }
  55. }
  56. if (binCount != 0) {
  57. if (binCount >= TREEIFY_THRESHOLD) //桶里面的节点数大于TREEIFY_THRESHOLD
  58. treeifyBin(tab, i); //如果链表长度已经达到临界值8 就需要把链表转换为树结构
  59. if (oldVal != null)
  60. return oldVal;
  61. break;
  62. }
  63. }
  64. }
  65. addCount(1L, binCount);
  66. return null;
  67. }

putVal操作可以总结如下:

一,判空,ConcurrentHashMap的key、value都不允许为null

二,计算hash。利用方法计算hash值。

三,遍历table,进行节点插入操作,过程如下:

如果table为空,则表示ConcurrentHashMap还没有初始化,则进行初始化操作:initTable() 根据hash值获取节点的位置i,若该位置为空,则直接插入,这个过程是不需要加锁的。计算f位置:i=(n – 1) & hash 如果检测到fh = f.hash == -1,则f是ForwardingNode节点,表示有其他线程正在进行扩容操作,则帮助线程一起进行扩容操作 如果f.hash >= 0 表示是链表结构,则遍历链表,如果存在当前key节点则替换value,否则插入到链表尾部。如果f是TreeBin类型节点,则按照红黑树的方法更新或者增加节点 若链表长度 >= TREEIFY_THRESHOLD(默认是8),则将链表转换为红黑树结构

四,调用addCount方法,ConcurrentHashMap的size + 1

ConcurrentHashMap的treeifyBin方法 在putVal方法中,我们可以看到如果binCount超过了界限值TREEIFY_THRESHOLD,会调用treeifyBin来把链表转化成红黑树。

  1. private final void treeifyBin(Node<K,V>[] tab, int index) {
  2. Node<K,V> b; int n, sc;
  3. if (tab != null) {
  4. if ((n = tab.length) < MIN_TREEIFY_CAPACITY) //table的size小于最小treeify的size
  5. tryPresize(n << 1); //扩容,避免同一个桶里面的结点数过多
  6. else if ((b = tabAt(tab, index)) != null && b.hash >= 0) { //桶中存在结点并且结点的hash值大于等于0
  7. synchronized (b) { //对桶中第一个结点加锁,也就是对单个桶进行加锁
  8. if (tabAt(tab, index) == b) {
  9. TreeNode<K,V> hd = null, tl = null;
  10. for (Node<K,V> e = b; e != null; e = e.next) { //遍历链表,修改成红黑树
  11. TreeNode<K,V> p =
  12. new TreeNode<K,V>(e.hash, e.key, e.val,
  13. null, null);
  14. if ((p.prev = tl) == null)
  15. hd = p;
  16. else
  17. tl.next = p;
  18. tl = p;
  19. }
  20. setTabAt(tab, index, new TreeBin<K,V>(hd));
  21. }
  22. }
  23. }
  24. }
  25. }

此函数用于将桶中的数据结构转化为红黑树,
其中,值得注意的是,当table的长度未达到阈值时,会进行一次扩容操作,该操作会使得触发treeifyBin操作的某个桶中的所有元素进行一次重新分配,这样可以避免某个桶中的结点数量太大。 转化的时候,要把对应的桶加锁。

ConcurrentHashMap的size方法
size 和mappingCount都可以用来返回ConcurrentHashMap的估计size,因为在进行统计的时候有其他线程正在进行插入和删除操作,所以size只能是一个估计值。当然为了这个不精确的值,ConcurrentHashMap也是操碎了心。

  1. public int size() {
  2. long n = sumCount();
  3. return ((n < 0L) ? 0 :
  4. (n > (long)Integer.MAX_VALUE) ? Integer.MAX_VALUE :
  5. (int)n);
  6. }
  1. public long mappingCount() {
  2. long n = sumCount();
  3. return (n < 0L) ? 0L : n; // ignore transient negative values
  4. }




mappingCount的方法注释上可以知道,计算size的时候更推崇
mappingCount方法。

size方法调用了sumCount这个方法。

  1. final long sumCount() {
  2. CounterCell[] as = counterCells; CounterCell a;
  3. long sum = baseCount; //ConcurrentHashMap中元素个数,但返回的不一定是当前Map的真实元素个数。基于CAS无锁更新
  4. if (as != null) {
  5. for (int i = 0; i < as.length; ++i) {
  6. if ((a = as[i]) != null)
  7. sum += a.value;
  8. }
  9. }
  10. return sum;
  11. }

sumCount()就是迭代counterCells来统计sum的过程。我们知道put操作时,肯定会影响size()。putVal时,会调用addCount方法来修改map的size,在addCount中就是通过操作
counterCells来保留size的信息的。

ConcurrentHashMap的扩容操作

当ConcurrentHashMap中table元素个数达到了容量阈值(sizeCtl)时,则需要进行扩容操作。在put操作时最后一个会调用addCount(long x, int check),该方法主要做两个工作:1.更新baseCount;2.检测是否需要扩容操作。

  1. private final void addCount(long x, int check) {
  2. CounterCell[] as; long b, s;
  3. // 这里一段代码省略了,作用是更新baseCount
  4. //check >= 0 :则需要进行扩容操作
  5. if (check >= 0) {
  6. Node<K,V>[] tab, nt; int n, sc;
  7. while (s >= (long)(sc = sizeCtl) && (tab = table) != null &&
  8. (n = tab.length) < MAXIMUM_CAPACITY) {
  9. int rs = resizeStamp(n);
  10. if (sc < 0) {
  11. if ((sc >>> RESIZE_STAMP_SHIFT) != rs || sc == rs + 1 ||
  12. sc == rs + MAX_RESIZERS || (nt = nextTable) == null ||
  13. transferIndex <= 0)
  14. break;
  15. if (U.compareAndSwapInt(this, SIZECTL, sc, sc + 1))
  16. transfer(tab, nt);
  17. }
  18. //当前线程是唯一的或是第一个发起扩容的线程 此时nextTable=null
  19. else if (U.compareAndSwapInt(this, SIZECTL, sc,
  20. (rs << RESIZE_STAMP_SHIFT) + 2))
  21. transfer(tab, null);
  22. s = sumCount();
  23. }
  24. }
  25. }

上面的代码中,有一个重要的方法transfer,这个就是核心的扩容操作。分两步: 构建一个nextTable,其大小为原来大小的两倍,这个步骤是在单线程环境下完成的 将原来table里面的内容复制到nextTable中,这个步骤是允许多线程操作的,所以性能得到提升,减少了扩容的时间消耗

  1. private final void transfer(Node<K,V>[] tab, Node<K,V>[] nextTab) {
  2. int n = tab.length, stride;
  3. // 每核处理的量小于16,则强制赋值16
  4. if ((stride = (NCPU > 1) ? (n >>> 3) / NCPU : n) < MIN_TRANSFER_STRIDE)
  5. stride = MIN_TRANSFER_STRIDE; // subdivide range
  6. if (nextTab == null) { // initiating
  7. try {
  8. @SuppressWarnings("unchecked")
  9. Node<K,V>[] nt = (Node<K,V>[])new Node<?,?>[n << 1]; //构建一个nextTable对象,其容量为原来容量的两倍
  10. nextTab = nt;
  11. } catch (Throwable ex) { // try to cope with OOME
  12. sizeCtl = Integer.MAX_VALUE;
  13. return;
  14. }
  15. nextTable = nextTab;
  16. transferIndex = n;
  17. }
  18. int nextn = nextTab.length;
  19. // 连接点指针,用于标志位(fwd的hash值为-1,fwd.nextTable=nextTab)
  20. ForwardingNode<K,V> fwd = new ForwardingNode<K,V>(nextTab);
  21. // 当advance == true时,表明该节点已经处理过了
  22. boolean advance = true;
  23. boolean finishing = false; // to ensure sweep before committing nextTab
  24. for (int i = 0, bound = 0;;) {
  25. Node<K,V> f; int fh;
  26. // 控制 --i ,遍历原hash表中的节点
  27. while (advance) {
  28. int nextIndex, nextBound;
  29. if (--i >= bound || finishing)
  30. advance = false;
  31. else if ((nextIndex = transferIndex) <= 0) {
  32. i = -1;
  33. advance = false;
  34. }
  35. // 用CAS计算得到的transferIndex
  36. else if (U.compareAndSwapInt
  37. (this, TRANSFERINDEX, nextIndex,
  38. nextBound = (nextIndex > stride ?
  39. nextIndex - stride : 0))) {
  40. bound = nextBound;
  41. i = nextIndex - 1;
  42. advance = false;
  43. }
  44. }
  45. if (i < 0 || i >= n || i + n >= nextn) {
  46. int sc;
  47. if (finishing) { // 已经完成所有节点复制了
  48. nextTable = null;
  49. table = nextTab; // table 指向nextTable
  50. sizeCtl = (n << 1) - (n >>> 1); // sizeCtl阈值为原来的1.5倍
  51. return; // 跳出死循环,
  52. }
  53. // CAS 更新扩容阈值,在这里面sizectl值减一,说明新加入一个线程参与到扩容操作
  54. if (U.compareAndSwapInt(this, SIZECTL, sc = sizeCtl, sc - 1)) {
  55. if ((sc - 2) != resizeStamp(n) << RESIZE_STAMP_SHIFT)
  56. return;
  57. finishing = advance = true;
  58. i = n; // recheck before commit
  59. }
  60. }
  61. // 遍历的节点为null,则放入到ForwardingNode 指针节点
  62. else if ((f = tabAt(tab, i)) == null)
  63. advance = casTabAt(tab, i, null, fwd);
  64. // f.hash == -1 表示遍历到了ForwardingNode节点,意味着该节点已经处理过了
  65. // 这里是控制并发扩容的核心
  66. else if ((fh = f.hash) == MOVED)
  67. advance = true; // already processed
  68. else {
  69. // 节点加锁
  70. synchronized (f) {
  71. // 节点复制工作
  72. if (tabAt(tab, i) == f) {
  73. Node<K,V> ln, hn;
  74. // fh >= 0 ,表示为链表节点
  75. if (fh >= 0) {
  76. // 构造两个链表 一个是原链表 另一个是原链表的反序排列
  77. int runBit = fh & n;
  78. Node<K,V> lastRun = f;
  79. for (Node<K,V> p = f.next; p != null; p = p.next) {
  80. int b = p.hash & n;
  81. if (b != runBit) {
  82. runBit = b;
  83. lastRun = p;
  84. }
  85. }
  86. if (runBit == 0) {
  87. ln = lastRun;
  88. hn = null;
  89. }
  90. else {
  91. hn = lastRun;
  92. ln = null;
  93. }
  94. for (Node<K,V> p = f; p != lastRun; p = p.next) {
  95. int ph = p.hash; K pk = p.key; V pv = p.val;
  96. if ((ph & n) == 0)
  97. ln = new Node<K,V>(ph, pk, pv, ln);
  98. else
  99. hn = new Node<K,V>(ph, pk, pv, hn);
  100. }
  101. // 在nextTable i 位置处插上链表
  102. setTabAt(nextTab, i, ln);
  103. // 在nextTable i + n 位置处插上链表
  104. setTabAt(nextTab, i + n, hn);
  105. // 在table i 位置处插上ForwardingNode 表示该节点已经处理过了
  106. setTabAt(tab, i, fwd);
  107. // advance = true 可以执行--i动作,遍历节点
  108. advance = true;
  109. }
  110. // 如果是TreeBin,则按照红黑树进行处理,处理逻辑与上面一致
  111. else if (f instanceof TreeBin) {
  112. TreeBin<K,V> t = (TreeBin<K,V>)f;
  113. TreeNode<K,V> lo = null, loTail = null;
  114. TreeNode<K,V> hi = null, hiTail = null;
  115. int lc = 0, hc = 0;
  116. for (Node<K,V> e = t.first; e != null; e = e.next) {
  117. int h = e.hash;
  118. TreeNode<K,V> p = new TreeNode<K,V>
  119. (h, e.key, e.val, null, null);
  120. if ((h & n) == 0) {
  121. if ((p.prev = loTail) == null)
  122. lo = p;
  123. else
  124. loTail.next = p;
  125. loTail = p;
  126. ++lc;
  127. }
  128. else {
  129. if ((p.prev = hiTail) == null)
  130. hi = p;
  131. else
  132. hiTail.next = p;
  133. hiTail = p;
  134. ++hc;
  135. }
  136. }
  137. // 扩容后树节点个数若<=6,将树转链表
  138. ln = (lc <= UNTREEIFY_THRESHOLD) ? untreeify(lo) :
  139. (hc != 0) ? new TreeBin<K,V>(lo) : t;
  140. hn = (hc <= UNTREEIFY_THRESHOLD) ? untreeify(hi) :
  141. (lc != 0) ? new TreeBin<K,V>(hi) : t;
  142. setTabAt(nextTab, i, ln);
  143. setTabAt(nextTab, i + n, hn);
  144. setTabAt(tab, i, fwd);
  145. advance = true;
  146. }
  147. }
  148. }
  149. }
  150. }
  151. }

先抛开多线程的环境,在单线程的环境下,分析transfer的步骤如下: 一,为每个内核分任务,并保证其不小于16 二,检查nextTable是否为null,如果是,则初始化nextTable,使其容量为table的两倍 三,死循环遍历节点,知道finished:节点从table复制到nextTable中,支持并发,思路如下:

如果节点 f 为null,则插入ForwardingNode(采用Unsafe.compareAndSwapObjectf方法实现),当一个线程遍历到的节点如果ForwardingNode,则继续往后遍历,如果不是,则将该节点加锁,防止其他线程进入 如果f为链表的头节点(fh >= 0),则先构造一个反序链表,然后把他们分别放在nextTable的i和i + n位置,并将ForwardingNode 插入原节点位置,代表已经处理过了 如果f为TreeBin节点,同样也是构造一个反序 ,同时需要判断是否需要进行unTreeify()操作,并把处理的结果分别插入到nextTable的i 和i+nw位置,并插入ForwardingNode 节点

四,所有节点复制完成后,则将table指向nextTable,同时更新sizeCtl = nextTable的0.75倍(如果又有结点加入map,总的结点数达到这个值,就会触发扩容)。

在多线程环境下,ConcurrentHashMap用两点来保证正确性:ForwardingNode和synchronized。当一个线程遍历到的节点如果是ForwardingNode,则继续往后遍历,如果不是,则将该节点加锁,防止其他线程进入,完成后设置ForwardingNode节点,以便要其他线程可以看到该节点已经处理过了,如此交叉进行,高效而又安全。



扩容过程如下。
《ConcurrentHashMap源码分析(基于JDK1.8)》









参考:
http://www.cnblogs.com/leesf456/p/5453341.html
http://cmsblogs.com/?p=2283
http://www.cnblogs.com/leesf456/p/5453341.html 红黑树:
http://cmsblogs.com/?p=2329 ConcurrentHashMap的弱一致性
http://ifeve.com/concurrenthashmap-weakly-consistent/

HashMap、Hashtable、HashSet 和 ConcurrentHashMap 的比较
http://www.54tianzhisheng.cn/2017/06/13/HashMap-Hashtable/

    原文作者:xuzombie
    原文地址: https://blog.csdn.net/xu768840497/article/details/79194701
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞