java.util.concurrent
该包中提供了很多并发环境中表现优秀的类,今天先来介绍下java.util.concurrent.atomic.Striped64这个类
Striped64的基本思想是热点数据分离,从Striped64这个类的字面意也可以进行理解, Stripe的中文意思包含拆分,条纹化的意思.Striped64是一个抽象类,继承自Number.
首先介绍一下Striped64比较重要的一个内部类Cell,这个内部类是热点数据分离的关键,先贴下源码
@sun.misc.Contended static final class Cell { volatile long value; Cell(long x) { value = x; } final boolean cas(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, valueOffset, cmp, val); } // Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long valueOffset; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> ak = Cell.class; valueOffset = UNSAFE.objectFieldOffset (ak.getDeclaredField("value")); } catch (Exception e) { throw new Error(e); } } }
@sun.misc.Contended
这个注解是以一种更优雅的方式避免伪共享问题(注:伪共享的相关问题可自行搜索),是一种以空间换时间的解决方案.java8之前的版本为了避免伪共享问题需要开发者自行进行缓存行填充,这种实现方式非常不友好,并且在java7版本中JVM会自动优化,无用的填充字段不能生效,java8则提供了这个注解,更优雅的避免了伪共享的问题.
接下来介绍Cell中比较重要的成员变量
volatile long value:
value就是用来承载实际的数据,用volatile修饰解决的主要问题是所线程访问中的可见性问题
volatile详解可以参考https://www.cnblogs.com/tangyanbo/p/6538488.html
private static final sun.misc.Unsafe UNSAFE :
从这个类的字面意思可以理解为”不安全的”,在计算机领域中什么是不安全的,学习过c或者c++的同学肯定知道指针是不安全,java语言已经对开发者屏蔽了直接内存地址的操作,但是具体内存地址的操作又不可以避免,Unsafe这个类就是用来操作内存的.
进入Unsafe类的内部,可以看到Unsafe类的构造方法是私有的,所以我们无法通过构造器直接获取Unsafe对象,但是Unsafe提供了一个静态方法,可以获取Unsafe对象:
@CallerSensitive public static Unsafe getUnsafe() { Class var0 = Reflection.getCallerClass(); if (!VM.isSystemDomainLoader(var0.getClassLoader())) { throw new SecurityException("Unsafe"); } else { return theUnsafe; } }
public static boolean isSystemDomainLoader(ClassLoader var0) { return var0 == null; }
但是,非常可惜,如果加载这个类的类加载器不为空(也就是加载这个类的类加载器不是BootStrap ClassLoader)就会抛出一个SecurityException…….所以我们也无法通过这个方法获取Unsafe对象.
方法总是比问题多,网上有很多的方法,有一种是改变一些虚拟机参数,这种方法没有具体实践过,推荐的一种方式是通过反射获取:
public static Unsafe getUnsafe() { try { Class<Unsafe> unsafeClass = Unsafe.class; Field theUnsafe = unsafeClass.getDeclaredField("theUnsafe"); theUnsafe.setAccessible(true); return (Unsafe) theUnsafe.get(null); } catch (NoSuchFieldException | IllegalAccessException e) { e.printStackTrace(); } return null; }
通过这种方式获取到Unsafe对象
private static final long valueOffset :
这个字段代表了对象中value这个成员变量相对于对象头的偏移量,通过这个变量可以迅速定位对象中成员变量的位置从而进行操作
说完Cell继续来说Striped64:
/** Number of CPUS, to place bound on table size */ static final int NCPU = Runtime.getRuntime().availableProcessors(); /** * Table of cells. When non-null, size is a power of 2. */ transient volatile Cell[] cells; /** * Base value, used mainly when there is no contention, but also as * a fallback during table initialization races. Updated via CAS. */ transient volatile long base; /** * Spinlock (locked via CAS) used when resizing and/or creating Cells. */ transient volatile int cellsBusy;
NCPU :
cpu数量,主要在cell数组扩容中参考使用
cells :
Cell内容类组成的数组, 热点数据分离的关键
base :
基础变量, 竞争不激烈时主要通过这个成员变量进行累加
cellsBusy :
是一把高效的自旋锁,主要通过CAS获取和释放锁,相对于Lock这种重量级的锁效率高
下面贴下源码,看下Striped64中一些成员变量的初始化
// Unsafe mechanics private static final sun.misc.Unsafe UNSAFE; private static final long BASE; private static final long CELLSBUSY; private static final long PROBE; static { try { UNSAFE = sun.misc.Unsafe.getUnsafe(); Class<?> sk = Striped64.class; BASE = UNSAFE.objectFieldOffset (sk.getDeclaredField("base")); CELLSBUSY = UNSAFE.objectFieldOffset (sk.getDeclaredField("cellsBusy")); Class<?> tk = Thread.class; PROBE = UNSAFE.objectFieldOffset (tk.getDeclaredField("threadLocalRandomProbe")); } catch (Exception e) { throw new Error(e); } }
UNSAFE : 这个不用再介绍,参考上面的Cell中UNSAFE成员变量的处理
BASE:代表的base成员变量相对于Striped64对象头的偏移地址
CELLSBUSY:代表cellsBusy这个自旋锁相对于Striped64对象头的偏移地址
PROBE:代表threadLocalRandomProbe相对于Thread类对象头的偏移地址,下面是jdk源码中对probe的介绍
The Thread probe fields maintained via ThreadLocalRandom serve as per-thread hash codes. We let them remain uninitialized as zero (if they come in this way) until they contend at slot 0. They are then initialized to values that typically do not often conflict with others. Contention and/or table collisions are indicated by failed CASes when performing an update operation. Upon a collision, if the table size is less than the capacity, it is doubled in size unless some other thread holds the lock. If a hashed slot is empty, and lock is available, a new Cell is created. Otherwise, if the slot exists, a CAS is tried. Retries proceed by "double hashing", using a secondary hash (Marsaglia XorShift) to try to find a free slot.
probe代表的每个线程的哈希值,在第一个槽位未出现竞争的时候probe都会保持为未初始化状态0.槽位出现竞争后,Pro本就会进行初始化,初始化的值不会和其他产生冲突.(个人英文水平有限,大概是这样理解的)
具体实现细节可以参考下ThreadLocalRandom中的probe初始化流程:
public static ThreadLocalRandom current() { if (UNSAFE.getInt(Thread.currentThread(), PROBE) == 0) localInit(); return instance; }
如果当前线程的probe值为0的话则执行本地初始化流程
/** * Initialize Thread fields for the current thread. Called only * when Thread.threadLocalRandomProbe is zero, indicating that a * thread local seed value needs to be generated. Note that even * though the initialization is purely thread-local, we need to * rely on (static) atomic generators to initialize the values. */ static final void localInit() { int p = probeGenerator.addAndGet(PROBE_INCREMENT); int probe = (p == 0) ? 1 : p; // skip 0 long seed = mix64(seeder.getAndAdd(SEEDER_INCREMENT)); Thread t = Thread.currentThread(); UNSAFE.putLong(t, SEED, seed); UNSAFE.putInt(t, PROBE, probe); }
主要是进行一些CAS的和散列运算,具体细节不再跟进,最终将计算出来的probe值设置到当前线程的threadLocalRandomProbe这个成员变量中
下面来介绍Striped64中几个辅助方法:
首先介绍一下CAS的基本思想
CAS:
Compare AND Swap 比较并交换
CAS主要涉及到三个值 内存值 V 期望值 E 修改值 U, 如果内存值V和期望值E相等,则将内存这个V更新为修改值U否则不进行任何操作.CAS实现主要依托于CPU的CAS指令,从硬件层面来提升效率
下面贴出Striped64四个辅助方法的源码:
/** * CASes the base field. */ final boolean casBase(long cmp, long val) { return UNSAFE.compareAndSwapLong(this, BASE, cmp, val); } /** * CASes the cellsBusy field from 0 to 1 to acquire lock. */ final boolean casCellsBusy() { return UNSAFE.compareAndSwapInt(this, CELLSBUSY, 0, 1); } /** * Returns the probe value for the current thread. * Duplicated from ThreadLocalRandom because of packaging restrictions. */ static final int getProbe() { return UNSAFE.getInt(Thread.currentThread(), PROBE); } /** * Pseudo-randomly advances and records the given probe value for the * given thread. * Duplicated from ThreadLocalRandom because of packaging restrictions. */ static final int advanceProbe(int probe) { probe ^= probe << 13; // xorshift probe ^= probe >>> 17; probe ^= probe << 5; UNSAFE.putInt(Thread.currentThread(), PROBE, probe); return probe; }
casBase(long cmp, long val)
对base成员变量进行cas更新,如果base成员变量和cmp相等,则将base成员变量更新为val并返回true,如果base成员变量和cmp不相等,则不对base成员变量进行更新,返回false.
casCellsBusy()
对cellsBusy(自旋锁)成员变量进行更新,如果cellsBusy为0(自旋锁未被占用状态)则将cellsBusy置为1(占用状态)并返回true.如果cellsBusy为1(占用状态),获取自旋锁失败,不进行任何操作,返回false.
getProbe()
返回当前线程的probe值(hash值)
advanceProbe
通过移位和异或运算更新当前线程的probe值(hash值)
经过了上面这么多的铺垫,开始进入今天的主题
Show Me The Code:
/** * Handles cases of updates involving initialization, resizing, * creating new Cells, and/or contention. See above for * explanation. This method suffers the usual non-modularity * problems of optimistic retry code, relying on rechecked sets of * reads. * * @param x the value * @param fn the update function, or null for add (this convention * avoids the need for an extra field or function in LongAdder). * @param wasUncontended false if CAS failed before call */ final void longAccumulate(long x, LongBinaryOperator fn, boolean wasUncontended) { int h; if ((h = getProbe()) == 0) { ThreadLocalRandom.current(); // force initialization h = getProbe(); wasUncontended = true; } boolean collide = false; // True if last slot nonempty for (;;) { Cell[] as; Cell a; int n; long v; if ((as = cells) != null && (n = as.length) > 0) { if ((a = as[(n - 1) & h]) == null) { if (cellsBusy == 0) { // Try to attach new Cell Cell r = new Cell(x); // Optimistically create if (cellsBusy == 0 && casCellsBusy()) { boolean created = false; try { // Recheck under lock Cell[] rs; int m, j; if ((rs = cells) != null && (m = rs.length) > 0 && rs[j = (m - 1) & h] == null) { rs[j] = r; created = true; } } finally { cellsBusy = 0; } if (created) break; continue; // Slot is now non-empty } } collide = false; } else if (!wasUncontended) // CAS already known to fail wasUncontended = true; // Continue after rehash else if (a.cas(v = a.value, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; else if (n >= NCPU || cells != as) collide = false; // At max size or stale else if (!collide) collide = true; else if (cellsBusy == 0 && casCellsBusy()) { try { if (cells == as) { // Expand table unless stale Cell[] rs = new Cell[n << 1]; for (int i = 0; i < n; ++i) rs[i] = as[i]; cells = rs; } } finally { cellsBusy = 0; } collide = false; continue; // Retry with expanded table } h = advanceProbe(h); } else if (cellsBusy == 0 && cells == as && casCellsBusy()) { boolean init = false; try { // Initialize table if (cells == as) { Cell[] rs = new Cell[2]; rs[h & 1] = new Cell(x); cells = rs; init = true; } } finally { cellsBusy = 0; } if (init) break; } else if (casBase(v = base, ((fn == null) ? v + x : fn.applyAsLong(v, x)))) break; // Fall back on using base } }
首先介绍下三个形参:
x :
累加值,没有什么好说的
fn :
LongBinaryOperator类型,LongBinaryOperator是一个函数式接口,long applyAsLong(long left, long right)是接口中的方法, 描述的函数类型就是将两个long型数据转化为一个long型数据用Lambda表达式来对fn赋值可以为:
1>fn = (long left, long right) -> return left + right;
2>fn = (long left, long right) -> return left – right;
3>fn = (long left, long right) -> return left * right;
…
上述都是合法的LongBinaryOperator类型,具体实现需要程序员指定
wasUncontended : 是否无竞争,如果CAS失败后该字段是false
方法中包含了一个无限循环,这是CAS的标配
声明一些局部变量
as : Cell数组
n : Cell数组的容量
v : 一个Cell的value值
循环体中主要包含三个主要分支, 现在分别进行介绍:
分之一:
子分支一:
1.判断cells数组不为null, 并且cells数组的容量大于0
2.根据当前线程的probe值从槽位中取值,判断不为空
3.判断无竞争cellsBusy为0
4.初始化一个Cell,并将改Cell的成员变量value设置为x
5.再次判断是否自旋锁为未被占用状态,并且可以通过CAS操作获取自旋锁
6.获取自旋锁成功后将created标记为设置为false
7.声明一些局部变量,判断cells数据是否不为空并且容量大于0,并且当前线程通过probe获取到的槽位为null,判断成立后将该槽位设置为初始化的Cell,并将created标记为设置为true
8.最终释放自旋锁,如果created标记位为true,退出循环,否则继续下一次循环
9.如果在步骤3是cellsBusy为1,则将碰撞collide设置为false,并对当前线程probe进行”rehash”操作,进入下一次循环
子分支二:
如果CAS存在竞争,将竞争标志位设置为false, 然后对当前线程probe进行”rehash”操作,进入下一次循环
子分支三:
CAS更新当前槽位Cell的value值,如果成功推出循环,否则对当前线程的probe进行”rehash”操作,进入下一次循环
子分支四:
如果cells数组的容量大于CPU数量或者cells进行了变更,碰撞标志位设置为false, rehash操作后进入下一次循环
子分支五:
如果没有碰撞,将碰撞标志位设置为true, rehash操作后进入下一次循环
子分支六:
获取到自旋锁,将cells数据进行二倍数量的扩容
分支二:
1.成员变量cells赋值给as, 判断as是否不为null并且as的容量不为0
2.判断cellsBusy为0,即自旋锁未被占用, 通过CAS将自旋锁cellsBusy设置为1占用状态
3.初始化标记为设置为false
4.判断如果cells == as, 初始化一个容量为2的Cell数组
5.通过散列运算将Cell数组的一个槽位设置值
6.将初始化的Cell数组赋值给cells成员变量, 初始化标志位设置为true
7.最终释放自旋锁, 将cellsBusy设置为0
8.如果初始化标记位为true, 终止循环, 否则进入下一次循环
分支三:
CAS更新base成员变量, 如果给定的LongBinaryOperator为null的话,更新值为base + x
如果给定的LongBinaryOperator不为null, 更新值为fn.applyAsLong(v, x)的结果
如果CAS更新成功则直接退出循环, 否则进入下一次循环
个人能力有限,有什么写的不对的地方大家多多指教