Jdk1.6 JUC源码解析(27)-Exchanger
作者:大飞
功能简介:
- Exchanger是一种线程间安全交换数据的机制。可以和之前分析过的SynchronousQueue对比一下:线程A通过SynchronousQueue将数据a交给线程B;线程A通过Exchanger和线程B交换数据,线程A把数据a交给线程B,同时线程B把数据b交给线程A。可见,SynchronousQueue是交给一个数据,Exchanger是交换两个数据。
源码分析:
- 先看下内部结构:
private static final class Node extends AtomicReference<Object> {
/** 创建这个节点的线程提供的用于交换的数据。 */
public final Object item;
/** 等待唤醒的线程 */
public volatile Thread waiter;
/**
* Creates node with given item and empty hole.
* @param item the item
*/
public Node(Object item) {
this.item = item;
}
}
/**
* 一个Slot就是一对线程交换数据的地方。
* 这里对Slot做了缓存行填充,能够避免伪共享问题。
* 虽然填充导致浪费了一些空间,但Slot是按需创建,一般没什么问题。
*/
private static final class Slot extends AtomicReference<Object> {
// Improve likelihood of isolation on <= 64 byte cache lines
long q0, q1, q2, q3, q4, q5, q6, q7, q8, q9, qa, qb, qc, qd, qe;
}
/**
* Slot数组,在需要时才进行初始化。
* 用volatile修饰,因为这样可以安全的使用双重锁检测方式构建。
*/
private volatile Slot[] arena = new Slot[CAPACITY];
/**
* arena(Slot数组)的容量。设置这个值用来避免竞争。
*/
private static final int CAPACITY = 32;
/**
* 正在使用的slot下标的最大值。当一个线程经历了多次CAS竞争后,
* 这个值会递增;当一个线程自旋等待超时后,这个值会递减。
*/
private final AtomicInteger max = new AtomicInteger();
内部结构很清晰,首先内部包含一个Slot数组,默认容量是32,用来避免以一些竞争,有点类似于ConcurrentHashMap的策略;其次,交换数据的场所就是Slot,它本身进行了cache line填充,避免了伪共享问题;最后,每个要进行数据交换的线程在内部会用一个Node来表示。
伪共享说明:假设一个类的两个相互独立的属性a和b在内存地址上是连续的(比如FIFO队列的头尾指针),那么它们通常会被加载到相同的cpu cache line里面。并发情况下,如果一个线程修改了a,会导致整个cache line失效(包括b),这时另一个线程来读b,就需要从内存里再次加载了,这种多线程频繁修改ab的情况下,虽然a和b看似独立,但它们会互相干扰,非常影响性能。
- 看完了内部结构,接下来就从Exchanger的交换数据方法exchange入手来分析代码:
/**
* 等待其他线程到达交换点,然后与其进行数据交换。
*
* 如果其他线程到来,那么交换数据,返回。
*
* 如果其他线程未到来,那么当前线程等待,知道如下情况发生:
* 1.有其他线程来进行数据交换。
* 2.当前线程被中断。
*/
public V exchange(V x) throws InterruptedException {
if (!Thread.interrupted()) {//检测当前线程是否被中断。
//进行数据交换。
Object v = doExchange(x == null? NULL_ITEM : x, false, 0);
if (v == NULL_ITEM)
return null; //检测结果是否为null。
if (v != CANCEL) //检测是否被取消。
return (V)v;
Thread.interrupted(); // 清除中断标记。
}
throw new InterruptedException();
}
/**
* 等待其他线程到达交换点,然后与其进行数据交换。
*
* 如果其他线程到来,那么交换数据,返回。
*
* 如果其他线程未到来,那么当前线程等待,知道如下情况发生:
* 1.有其他线程来进行数据交换。
* 2.当前线程被中断。
* 3.超时。
*/
public V exchange(V x, long timeout, TimeUnit unit)
throws InterruptedException, TimeoutException {
if (!Thread.interrupted()) {
Object v = doExchange(x == null? NULL_ITEM : x,
true, unit.toNanos(timeout));
if (v == NULL_ITEM)
return null;
if (v != CANCEL)
return (V)v;
if (!Thread.interrupted())
throw new TimeoutException();
}
throw new InterruptedException();
}
上面的方法都调用了doExchange方法,主要逻辑在这个方法里,分析下这个方法:
/**
* 这个方法会处理不同的情况,使用Object而不是泛型,主要是为了返回一些
* 哨兵值(比如表示null和取消的对象)。
*
* @param item 用来进行交换的数据。
* @param timed 如果有超时延迟,设置为true
* @param nanos 具体的超时时间。
* @return 返回另一个线程(与当前线程交换数据)的数据,或者CANCEL(表示取消)
*/
private Object doExchange(Object item, boolean timed, long nanos) {
Node me = new Node(item); // 创建当前节点me。
int index = hashIndex(); // 计算出当前slot的下标。
int fails = 0; // 用来保存CAS失败的次数。
for (;;) {
Object y; // 用来保存当前slot中可能存在的Node。
Slot slot = arena[index]; // 按照前面计算出的下标获取当前slot。
if (slot == null)
createSlot(index); // 如果slot为null,那么创建一个slot,然后继续循环。
else if ((y = slot.get()) != null && slot.compareAndSet(y, null)) { // 如果slot不为空,那么slot可能被另一个Node给占了,如果确实存在这个Node,尝试将其置空。(表示当前节点要和这个Node交换数据了)
Node you = (Node)y; // 给这个Node转型,赋给you。
if (you.compareAndSet(null, item)) { // 将item设置给you,注意you本身是一个AtomicReference,这里相当于把item设置到you的value字段上。
LockSupport.unpark(you.waiter); // 然后唤醒you节点上等待的线程。
return you.item; // 返回you的item。
} // 竞争失败,放弃,继续循环。
}
else if (y == null && // 如果slot为空,那么说明没有要和当前线程交换数据的线程,
slot.compareAndSet(null, me)) { //那么当前线程先尝试把这个slot给占了。
if (index == 0) // 如果slot下标为0,那么阻塞等待。
return timed? awaitNanos(me, slot, nanos): await(me, slot); // 有超时的话,会阻塞给定的时间。
Object v = spinWait(me, slot); // 如果slot下标不是0,自旋等待,等待其他线程来和当前线程交换数据,然后返回交换后的数据。
if (v != CANCEL)
return v;
me = new Node(item); // 如果取消的话,重试,重建一个Node,之前的Node就丢弃了。
int m = max.get(); // 获取当前slot下标的最大值。
if (m > (index >>>= 1)) // 如果当前允许的最大索引太大。
max.compareAndSet(m, m - 1); // 递减最大索引
}
else if (++fails > 1) { // 如果1个slot竞争失败超过2次。
int m = max.get();
if (fails > 3 && m < FULL && max.compareAndSet(m, m + 1)) //如果竞争失败超过3次,尝试递增最大索引值。
index = m + 1; // 增加索引值。
else if (--index < 0) // 换个index。
index = m; // 绕回逻辑,防止index越界。
}
}
}
这里形象的理解一下:
其实就是”我”和”你”(可能有多个”我”,多个”你”)在一个叫Slot的地方做交易(一手交钱,一手交货),过程分以下步骤:
1.我到交易地点(Slot)的时候,你已经到了,那我就尝试喊你交易,如果你回应了我,决定和我交易那么进入第2步;如果别人抢先一步把你喊走了,那我只能再找别人了,进入第5步。
2.我拿出钱交给你,你可能会接收我的钱,然后把货给我,交易结束;也可能嫌我掏钱太慢(超时)或者接个电话(中断),TM的不卖了,走了,那我只能再找别人买货了(从头开始)。
3.我到交易地点的时候,你不在,那我先尝试把这个交易点给占了(一屁股做凳子上…),如果我成功抢占了单间(交易点),那就坐这儿等着你拿货来交易,进入第4步;如果被别人抢座了,那我只能在找别的地方儿了,进入第5步。
4.你拿着货来了,喊我交易,然后完成交易;也可能我等了好长时间你都没来,我不等了,继续找别人交易去,走的时候我看了一眼,一共没多少人,弄了这么多单间(交易地点Slot),太TM浪费了,我喊来交易地点管理员:一共也没几个人,搞这么多单间儿干毛,给哥撤一个!。然后再找别人买货(从头开始);或者我老大给我打了个电话,不让我买货了(中断)。
5.如果之前我尝试交易了2次都没成功,那我就想我TM选的这个位置(Slot下标)是不是风水不好啊,换个地儿继续(从头开始);如果之前都尝试交易了4次还没成功,我怒了,喊过来交易地点的管理员:给哥再开一个单间(Slot),加一个凳子,这么多人就这么几个破凳子够谁用!
看一下doExchange调用的计算slot下标的方法:
/**
* Returns a hash index for the current thread. Uses a one-step
* FNV-1a hash code (http://www.isthe.com/chongo/tech/comp/fnv/)
* based on the current thread's Thread.getId(). These hash codes
* have more uniform distribution properties with respect to small
* moduli (here 1-31) than do other simple hashing functions.
*
* <p>To return an index between 0 and max, we use a cheap
* approximation to a mod operation, that also corrects for bias
* due to non-power-of-2 remaindering (see {@link
* java.util.Random#nextInt}). Bits of the hashcode are masked
* with "nbits", the ceiling power of two of table size (looked up
* in a table packed into three ints). If too large, this is
* retried after rotating the hash by nbits bits, while forcing new
* top bit to 0, which guarantees eventual termination (although
* with a non-random-bias). This requires an average of less than
* 2 tries for all table sizes, and has a maximum 2% difference
* from perfectly uniform slot probabilities when applied to all
* possible hash codes for sizes less than 32.
*
* @return a per-thread-random index, 0 <= index < max
*/
private final int hashIndex() {
long id = Thread.currentThread().getId();
int hash = (((int)(id ^ (id >>> 32))) ^ 0x811c9dc5) * 0x01000193;
int m = max.get();
int nbits = (((0xfffffc00 >> m) & 4) | // Compute ceil(log2(m+1))
((0x000001f8 >>> m) & 2) | // The constants hold
((0xffff00f2 >>> m) & 1)); // a lookup table
int index;
while ((index = hash & ((1 << nbits) - 1)) > m) // May retry on
hash = (hash >>> nbits) | (hash << (33 - nbits)); // non-power-2 m
return index;
}
这里就是根据当前线程的ID,算一个hash值,然后针对slot最大index值做了一个近似取模的操作来计算slot的下标。
接下来看一下createSlot方法:
private void createSlot(int index) {
// 在同步块外面创建Slot实例,以减小同步块范围。
Slot newSlot = new Slot();
Slot[] a = arena;
synchronized (a) {
if (a[index] == null)
a[index] = newSlot;
}
}
再看一下awaitNanos方法:
/**
* 在下标为0的Slot上等待获取其他线程填充的值。
* 如果在Slot被填充之前超时或者被中断,那么操作失败。
*/
private Object awaitNanos(Node node, Slot slot, long nanos) {
int spins = TIMED_SPINS;
long lastTime = 0;
Thread w = null;
for (;;) {
Object v = node.get();
if (v != null)
//如果已经被其他线程填充了值,那么返回这个值。
return v;
long now = System.nanoTime();
if (w == null)
w = Thread.currentThread();
else
nanos -= now - lastTime;
lastTime = now;
if (nanos > 0) {
if (spins > 0)
--spins; //先自旋几次。
else if (node.waiter == null)
node.waiter = w; //自旋阶段完毕后,将当前线程设置到node的waiter域。
else if (w.isInterrupted())
tryCancel(node, slot); //如果当前线程被中断,尝试取消node。
else
LockSupport.parkNanos(node, nanos); //阻塞给定的时间。
}
else if (tryCancel(node, slot) && !w.isInterrupted())
//超时后,如果当前线程没有被中断,那么从Slot数组的其他位置看看有没有等待交换数据的节点
return scanOnTimeout(node);
}
}
awaitNanos中的自旋次数为TIMED_SPINS,这里说明一下自旋次数:
/**
* 单核处理器下这个自旋次数为0
* 多核情况下,这个值设置为大多数系统中上下文切换时间的平均值。
*/
private static final int SPINS = (NCPU == 1) ? 0 : 2000;
/**
* 在有超时情况下阻塞等待之前自旋的次数。.
* 超时等待的自旋次数之所以更少,是因为检测时间也需要耗费时间。
* 这里的值是一个经验值。
*/
private static final int TIMED_SPINS = SPINS / 20;
继续看一下tryCancel方法:
private static boolean tryCancel(Node node, Slot slot) {
if (!node.compareAndSet(null, CANCEL))//尝试取消node
return false;
if (slot.get() == node) // pre-check to minimize contention
slot.compareAndSet(node, null); //如果还关联在sot上,断开关联。
return true;
}
继续看awaitNanos方法中最后调用的scanOnTimeout方法,这个方法在要取消的时候调用,找一下其他下标的Slot上有没有可以交换数据的节点,找到的话就可以成功交换数据,而不取消了:
private Object scanOnTimeout(Node node) {
Object y;
for (int j = arena.length - 1; j >= 0; --j) {
//从Slot数组的后面往前找
Slot slot = arena[j];
if (slot != null) {
//找到了有初始化好的Slot,然后看看里面有没有node。
while ((y = slot.get()) != null) {
//发现有node,尝试和这个node进行数据交换。
if (slot.compareAndSet(y, null)) {
Node you = (Node)y;
//尝试进行数据交换,
if (you.compareAndSet(null, node.item)) {
//如果交换成功(把当前节点的数据交给you),唤醒you上面等待的线程。
LockSupport.unpark(you.waiter);
//返回you的数据。
return you.item;
}
}
}
}
}
//没找到其他等待交换数据的线程,最后取消当前节点node。
return CANCEL;
}
上面看的awaitNanos方法是在下标为0的Slot里面,有超时情况下的处理方式。再看下没有超时情况的处理方法await:
private static Object await(Node node, Slot slot) {
Thread w = Thread.currentThread();
int spins = SPINS;
for (;;) {
Object v = node.get();
if (v != null)
//如果已经被其他线程填充了值,那么返回这个值。
return v;
else if (spins > 0) // 先自旋几次。
--spins;
else if (node.waiter == null) // 自旋阶段完毕后,将当前线程设置到node的waiter域。
node.waiter = w;
else if (w.isInterrupted()) // 如果当前线程被中断,尝试取消当前node。
tryCancel(node, slot);
else // 否则阻塞当前线程。
LockSupport.park(node);
}
}
之前看的awaitNanos和await方法都是在下标为0的Slot的情况下采取的有阻塞行为的处理方式,如果下标不为0,采取完全自旋的方式,调用方法spinWait:
private static Object spinWait(Node node, Slot slot) {
int spins = SPINS;
for (;;) {
Object v = node.get();
if (v != null)
return v;
else if (spins > 0)
--spins; //先自旋
else
tryCancel(node, slot); //自旋了指定的次数还没等到交换的数据,尝试取消。
}
}
最后看一下arena(Slot数组),默认的容量和实际使用的下标最大值:
private static final int CAPACITY = 32;
/**
* The value of "max" that will hold all threads without
* contention. When this value is less than CAPACITY, some
* otherwise wasted expansion can be avoided.
*/
private static final int FULL =
Math.max(0, Math.min(CAPACITY, NCPU / 2) - 1);
前面说过arena容量默认为32,目的是为了减少线程的竞争,但实际上对arena的使用不会超过FULL这个值(避免一些空间浪费)。这个值取的是32(默认CAPACITY)和CPU核心数量的一半,这两个数的较小值在减1的数和0的较大值…. 也就是说,如果CPU核很多的情况下,这个值最大也就是31,;如果是单核或者双核CPU,这个值就是0,也就是说只能用arena[0]。这也是为什么前面的hashIndex方法里面会做的(近似)取模操作比较复杂,因为实际的能使用的Slot数组范围可能不是2的幂。 Exchanger的代码解析完毕!