线程间交换数据的Exchanger

今天给大家介绍一个并发包中的线程工具Exchanger,他的主要作用是用来进行线程之间的数据交换的,一起来看看吧。

Exchanger

背书中:Exchanger是一个用来进行线程之间的数据交换的工具类,它提供了一个同步点,在这个同步点,两个线程可以互相交换数据,当一个线程执行exchange()方法时,会等待第二个线程执行exchange()方法,这个时刻就应该是书中所说的同步点,这两个线程就可以互相交换数据。

举个栗子

就拿今天晚上吃饭做例子吧,a去买鸡翅包饭,b去买了汉堡,然后互相替换了一个,上代码吧

package thread.exchanger;

import java.util.concurrent.Exchanger;

/**
 * @author ZhaoWeinan
 * @date 2018/10/16
 * @description
 */
public class ExchangerDemo {

    private static Exchanger<String> exchanger = new Exchanger<>();

    public static void main(String[] args){
        new Thread(new Runnable() {
            @Override
            public void run() {
                String a = "我买了鸡翅包饭!";
                System.out.println(Thread.currentThread().getName() + "说:" + a);
                try {
                    System.out.println(Thread.currentThread().getName() + "说:等着b买汉堡!");
                    exchanger.exchange(a);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();

        new Thread(new Runnable() {
            @Override
            public void run() {
                String b = "我买了汉堡!";
                System.out.println(Thread.currentThread().getName() + "说:" + b);

                try {
                    System.out.println(Thread.currentThread().getName() + "说:现在我们来交换吧!");
                    String a = exchanger.exchange(b);
                    //把第一个线程中 a 变量拿了过来
                    System.out.println(Thread.currentThread().getName() + "说我拿到了鸡翅包饭!");
                    System.out.println(Thread.currentThread().getName() + "说:" + a);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

这段代码很好的描述了这个场景,创建了两个线程,在线程一中定义了一个变量a,a的内容是“我买了鸡翅包饭!”,在线程二中定义了一个变量b,b的内容是“我买了汉堡!”,然后线程一调用了exchanger.exchange(a)方法,进行了阻塞,也就是书中所说的等待同步点,在线程二中,调用exchanger.exchange(b)方法,也就是说,这两个线程都到达了同步点,所以线程一获取到了线程二中的变量“我买了汉堡!”,线程二获取到了线程一中的变量“我买了鸡翅包饭!”,代码运行结果如下:

《线程间交换数据的Exchanger》 运行结果

栗子就先说到这,来看一下源码吧

看下源码

《线程间交换数据的Exchanger》 Exchanger类的结构

简单说一下,他主要有两个个内部类Node,Participant

Node类
@sun.misc.Contended static final class Node {
    // Arena的索引
    int index;
    //最后记录的Exchanger的bound属性值
    //Exchanger类中有个bound属性,用volatile修饰的
    int bound;
    //当前bound中原子操作的失败次数
    //可以推断 Exchanger是用volatile、原子操作来保证线程安全的
    int collides;
    //用于自旋的伪随机数
    int hash;
    //这个线程的数据对象吧,是用来交换的对象
    Object item;
    //用来释放线程的对象
    volatile Object match;
    //当挂起,设置为次线程,否则为空
    volatile Thread parked;
}

这个类看来主要是用来进行数据交换的类,Exchanger类中,有一个Node类型的slot属性,一个Node[]类型的arena属性,这两个就是所谓的单槽、多槽的两种模式,是在这两模式中用来交换的槽位,来看看Participant类

Participant类
 static final class Participant extends ThreadLocal<Node> {
        public Node initialValue() { return new Node(); }
    }

Participant类继承了ThreadLocal,主要作用就是初始化一个node对象
再来看看核心的交换方法

slotExchange单槽交换

为了方便解说,整段代码就不贴了,一段一段的说吧

 Node p = participant.get();
 Thread t = Thread.currentThread();

创建一个Node对象p,获取了当前线程t

for (Node q;;) {
            //先判断slot属性,
            //不为空,证明有线程在等待着数据交换
            if ((q = slot) != null) {
          .........
              }
        }

先判断slot属性,不为空,证明有线程在等待着数据交换

if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }

说说这段代码,compareAndSwapObject方法是sun.misc.Unsafe类的一个方法

/* 在obj的offset位置比较object field和期望的值,如果相同则更新。这个方法
     * 的操作应该是原子的,因此提供了一种不可中断的方式更新object field。
     * @param var1 包含要修改field的对象
     * @param var2 object中field的偏移量
     * @param var4 希望field中存在的值
     * @param var5 如果期望值var4与var1的field当前值相同,设置var1的field值为这个新值
     * @return true 如果field的值被更改,则返回true             
     */
    public final native boolean compareAndSwapObject(Object var1, long var2, Object var4, Object var5);

有机会给大家细讲,在这主要作用是,用于比较this对象,this对象就是Exchanger的实例,this对象的SLOT偏移量上的属性与是否与q相等,如果相等则把this对象的这个属性更新为null,返回true,反之不更新,返回false,看一眼SLOT怎么定义的

private static final long SLOT;
....
SLOT = U.objectFieldOffset
                (ek.getDeclaredField("slot"));

这个SLOT对应的是“slot”字段,也就是上面那个方法比较的是Exchanger实例中的slot属性

if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }

如果compareAndSwapObject执行成功,就把该线程挂起,然后唤起等待的线程,返回交换的结果,如果没有执行成功,那么我们来看看下面else块中的代码:

    for (Node q;;) {
            //先判断slot属性,
            //不为空,证明有线程在等待着数据交换
            if ((q = slot) != null) {
                if (U.compareAndSwapObject(this, SLOT, q, null)) {
                    Object v = q.item;
                    q.match = item;
                    Thread w = q.parked;
                    if (w != null)
                        U.unpark(w);
                    return v;
                }
                // 存在线程间cpu的竞争,构建一个多槽位arena为解决问题
                if (NCPU > 1 && bound == 0 &&
                        U.compareAndSwapInt(this, BOUND, 0, SEQ))
                    arena = new Node[(FULL + 2) << ASHIFT];
            }
            else if (arena != null)
                //如果构建多槽位,返回,使用多槽位模式解决问题
                return null;
            else {
                //继续使用compareAndSwapObject就行比较交换
               //这边入参变了,大家注意一下
              //总结下这段代码的意思就是用当前线程来占领槽位
                p.item = item;
                if (U.compareAndSwapObject(this, SLOT, null, p))
                    break;
                p.item = null;
            }
        }

如果存在线程间cpu的竞争,构建一个多槽位arena为解决问题,否则,判断当前操作是否为空,如果为空就跳出循环,否则,无线循环来走上面的流程

       //当前线程占领了槽位,等待其它线程来交换数据
        int h = p.hash;
        long end = timed ? System.nanoTime() + ns : 0L;
        int spins = (NCPU > 1) ? SPINS : 1;
        Object v;
        //循环,直到match为空
        while ((v = p.match) == null) {
            if (spins > 0) {
                //自旋
                h ^= h << 1; h ^= h >>> 3; h ^= h << 10;
                if (h == 0)
                    h = SPINS | (int)t.getId();
                else if (h < 0 && (--spins & ((SPINS >>> 1) - 1)) == 0)
                      //线程自旋等待中,为了避免别的线程都在等待,先让出cpu执行权
                      Thread.yield();
            }
           //其它线程来交换数据了,修改槽位
            else if (slot != p)
                spins = SPINS;
           //线程没有发生中断
           //还是单槽模式
           //而且还没有超时
            else if (!t.isInterrupted() && arena == null &&
                    (!timed || (ns = end - System.nanoTime()) > 0L)) {
                //设置BLOCKER
                U.putObject(t, BLOCKER, this);
                p.parked = t;
                if (slot == p)
                    //阻塞
                    U.park(false, ns);
                //清空 BLOCKER
                p.parked = null;
                U.putObject(t, BLOCKER, null);
            }
            else if (U.compareAndSwapObject(this, SLOT, p, null)) {
                //使用compareAndSwapObject进行比较交换,如果成功跳出循环
               //如果是超时了,或者线程被中断,则返回null
                v = timed && ns <= 0L && !t.isInterrupted() ? TIMED_OUT : null;
                break;
            }
        }
        U.putOrderedObject(p, MATCH, null);
        p.item = null;
        p.hash = h;
        //交换完毕,返回v
        //成功的话就是要交换的数据,如果超时、线程中断等情况返回null
        return v;

这段代码的主要意思是,当前线程占领了槽位,等待其它线程来交换数据,自旋进行等待,如果有别的线程来交换数据了,那么使用compareAndSwapObject进行比较交换,交换到数据,唤起被挂起的线程,返回交换结果,如果是超时或者线程被中断时,返回null。
这就是单槽位模式,多槽位,没有怎么研究过,相信原理差不多,文章有点冗长,就不说多槽位模式了。

应用场景

说说应用场景吧,书上说可以用用于同步任务队列,遗传算法(表示没有听过),数据校对(这个还感觉比较靠谱),说说我第一次见到他吧,是我继承别人的代码的时候,一个定时任务用到的,有很多张mysql表,需要计算汇总结果到一张表里面,使用了Exchanger,但是感觉实现方式很多,这一种并不是很惊艳,不知道大家有没有好的使用场景,可以说一下。

Exchanger就为大家简单的说到这,欢迎大家来交流,指出文中一些说错的地方,让我加深认识,愿大家没有bug,谢谢!

    原文作者:小草莓子桑
    原文地址: https://www.jianshu.com/p/71b9a1db9c5b
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞