JUC SynchronousQueue 分析

基本介绍

SynchronousQueue JUC阻塞队列的一种,队列无实际容量,一个put操作必须等待一个take操作的执行,才会解除阻塞状态。将put考虑为生产者的话,需要等待消费者,即take操作来取它的数据,才能够从阻塞等待中返回。数据在SynchronousQueue中是被直接传递的,由put线程传递给take线程

使用场景

SynchronousQueue的使用场景比较少见,具体的一个场景可以参考ExecutorsnewCachedThreadPool方法

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

JDK线程池实现中,每当没有空闲线程执行任务的时候,会尝试将任务offer到任务队列,若队列满了,如果线程数上限(maxThreadSize,构造方法第二个参数)还没达到,则构造一个新的线程,使用该线程来运行任务。

newCachedThreadPool使用SynchronousQueue作为任务队列,提交任务时若遇到无空闲线程,则每次offer都会失败,因为线程上限接近无穷大,则每次都会创建新的线程来执行,并且因为设置了线程空闲超时时间(默认60s),所以该线程池构造方法构造的线程池具备线程数可伸缩(空闲太多则销毁,不够则创建)的能力,不会出现任务被reject的现象(但是若任务一直处理不过来,很可能会因为创建太多线程跑任务导致OOM)

源码分析

Tips:

1.学习源码之前需要掌握数据结构知识里面的链表、队列(链表方式实现的队列)、栈(链表方式实现的栈)的一些概念和常规操作

2.采用源码+注释的方式来讲解,分析主要在中文注释上面

同其他阻塞队列实现,重点还是在于put和take的实现,SynchronousQueue使用内部类Transfer来统一实现存取操作,put/take等操作都是调用Transfer的transfer方法来实现其逻辑:

abstract static class Transferer<E> {
    abstract E transfer(E e, boolean timed, long nanos);
}

put调用transfer传入实际的数据,而take调用transfer则传入null,代表是要取数据的一方

Transfer有两种实现方案,公平实现和非公平实现。公平实现采用队列的思想(fifo)实现;而非公平实现采用栈的思想(lifo)实现。公平性体现在先进的阻塞等待线程(可以是阻塞的put or 阻塞的take)优先获得匹配的权利,解除阻塞状态(先进先出)

公平实现

实现类为TransferQueue,首先看Queue的构造节点实现类QNode的表示:

static final class QNode {
    volatile QNode next;          // 指向队列下一个节点
    volatile Object item;         // 存储数据,同时用于处理canceled的情况(设置为自身this)
    volatile Thread waiter;       // 存储当前线程,控制线程阻塞与唤醒
    final boolean isData;         // 该节点是否为数据节点(true: 生产者,put操作; false: 消费者,take操作)

    // 构造方法
    QNode(Object item, boolean isData) {
        this.item = item;
        this.isData = isData;
    }

    // 定义的一些方法,用于实现cas设置next,item,同时提供tryCanncel方法以及判断当前节点是否为cancel等方法

    boolean casNext(QNode cmp, QNode val) {
        return next == cmp &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    boolean casItem(Object cmp, Object val) {
        return item == cmp &&
            UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
    }

    // cancel将item设置为自身引用
    void tryCancel(Object cmp) {
        UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
    }

    boolean isCancelled() {
        return item == this;
    }

    boolean isOffList() {
        return next == this;
    }

    // next以及item cas的设置依赖Unsafe机制
    private static final sun.misc.Unsafe UNSAFE;
    private static final long itemOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = QNode.class;
            itemOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("item"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

TransferQueue中使用head和tail分别指向队列头和队列尾,注意head是一个dumy node,除此之外还有一个cleanMe字段,在讲解后面的clean逻辑会涉及到该字段,最后也使用Unsafe来实现CAS操作

这里就不贴代码了,感兴趣的直接看下源码SynchronousQueueTransferQueue的定义即可

TransferQueue实现了Transfer抽象类,实现具体传递逻辑在transfer方法内,下面分析重点方法transfer:

主要流程:
1. 如果队列为空或者调用者与队列中元素属于同一模式(put, take),则构造一个新节点添加到队尾,且使得当前调用线程处于阻塞等待(等待被匹配),直到被唤醒,由于唤醒可能是由于cancel引起的,因此需要处理cancel的情况(调用clean进行清理动作),若不是由于cancel引起的唤醒,则表示被匹配
2. 否则(相对于1),尝试匹配并唤醒队列头节点,若匹配成功,则返回其值(take线程获取匹配到的节点的item,put线程返回null)

过程中会有很多强化判断,防止出现不一致读(多线程并发无锁的实现,一定是需要考虑很多不一致读的场景)

E transfer(E e, boolean timed, long nanos) {

    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // 防空判断
            continue;                       // spin

        if (h == t || t.isData == isData) { // 队列为空或者是当前操作与队列中节点(队尾节点)属于同一个操作,take or put
            QNode tn = t.next;
            if (t != tail)                  // 有并发操作更新了tail
                continue;
            if (tn != null) {               // 并发操作导致tail改变,需要重新设置tail
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // 达到超时等待时间,直接返回null
                return null;

            // 构造节点并追加到队尾
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // CAS失败,从头来过
                continue;

            advanceTail(t, s);              // 前面追加到尾部之后,需要重新设置tail

            // 然后需要将进行该操作的线程进行阻塞等待,等待出现匹配动作,由这个动作的触发者对它进行唤醒,返回的是匹配节点(QNode)的item
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // cancel的时候会将item(x)设置为自身(s),这里判断是否为被canncel,如果是则对节点进行清理
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // 发现节点还没有从队列中摘除,进行摘除操作
                advanceHead(t, s);          // unlink if head
                if (x != null)              // 断开s(QNode)中属性的引用
                    s.item = s;
                s.waiter = null;
            }

            // Tips: take操作返回之前put的数据,put操作则返回null(e为null)
            return (x != null) ? (E)x : e;

        } else {                            // 可以尝试匹配的分支
            QNode m = h.next;               // 队列头(真实节点是head的next)是待匹配的节点
            if (t != tail || m == null || h != head)
                continue;                   // 判断是否有并发操作引起导致tail,head,head.next的变化

            Object x = m.item;
            if (isData == (x != null) ||    // m(待匹配的节点)已经被匹配过?并发匹配问题?
                x == m ||                   // m 是canceled的(Tips: tryCancel会将item设置为节点自身引用)
                !m.casItem(x, e)) {         // CAS递交元素失败
                advanceHead(h, m);          // 满足上述条件,将节点出队
                continue;
            }

            // 走到这里证明成功匹配,节点出队,并唤醒匹配节点的等待线程,并将结果带回
            advanceHead(h, m); 
            LockSupport.unpark(m.waiter);
            // Tips: take操作返回之前put的数据,put操作则返回null(e为null)
            return (x != null) ? (E)x : e;
        }
    }
}

下面看下阻塞等待方法awaitFulfill:

Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    // 自旋次数计算
    int spins = ((head.next == s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted())
            s.tryCancel(e); // 线程被中断,取消节点
        Object x = s.item;
        // item的值被替换过,有两种情况:
        // 1. 有匹配的节点线程对其进行过处理,返回匹配的值(该阻塞线程若是put,返回null,若是take,返回put给它CAS设置的item)
        // 2. 由下面的超时阻塞等待,若到了超时时间,则tryCancel设置item为自身引用,此时返回的是s自身,这也解释了为何在transfer方法中阻塞等待返回之后需要判断是否cancel的处理: if (x == s)
        if (x != e)
            return x;
        if (timed) { // 超时阻塞等待的处理
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(e);
                continue;
            }
        }
        if (spins > 0)
            --spins;
        else if (s.waiter == null)
            s.waiter = w;
        else if (!timed)
            LockSupport.park(this); // 非超时的阻塞等待,等待被唤醒,具体的唤醒逻辑在transfer的里面(unpark调用)
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos); // 超时的阻塞等待
    }
}

还有个clean方法,比较难懂,猜测其存在的意义如下:

我们注意到在TransferQueue的定义中,除了定义指向哨兵队头的head,队尾的tail,还有一个属性叫cleanMe,该属性在clean方法里面会被用到,主要作用是用来辅助进行清理。当TransferQueue的最后一个元素需要被移除(可能是cancel等原因导致的移除),此时该元素代表队尾,移除动作无法直接在队尾上面执行!因为TransferQueue是单链表的形式实现,总不可能从头遍历来进行移除,一是效率,二是还要考虑超级多的并发,所以遍历过程可能会因为多线程并发操作导致非一致性的读。所以这里使用了一个叫cleanMe的内部属性来暂存此时需要被移除的队尾节点的前继节点,等待下一次进入该方法的时候进行移除操作(下一次进来tail肯定发生了变化)

clean方法的代码分析如下:

void clean(QNode pred, QNode s) {
    s.waiter = null; // forget thread

    // pred为期望的s的前继节点
    // 若pred不为s的前继节点,不进入处理逻辑,证明s已经被unlinked了
    while (pred.next == s) { // Return early if already unlinked
        QNode h = head;
        QNode hn = h.next;   // Absorb cancelled first node as head

        // 处理节点被cancel的情况,直接让节点出队
        if (hn != null && hn.isCancelled()) {
            advanceHead(h, hn);
            continue;
        }

        // 确保tail是真正的tail
        QNode t = tail;      // Ensure consistent read for tail
        if (t == h)
            return;
        QNode tn = t.next;
        if (t != tail)
            continue;
        if (tn != null) {
            advanceTail(t, tn);
            continue;
        }

        // 如果s不是tail,那么尝试直接将s unlinked
        if (s != t) {        // If not tail, try to unsplice
            QNode sn = s.next;
            // s.next = s 表示节点已经unlinked了
            if (sn == s || pred.casNext(s, sn))
                return; // 成功unlinked则直接返回
        }

        // 走到这里,主要场景是 s == tail(要清除的节点是tail的处理场景)
        QNode dp = cleanMe;
        if (dp != null) {    // cleanMe不为null,有节点需要处理
            QNode d = dp.next; // 指向需要unlinked的节点(Tips: cleanMe总是暂存待删除节点的前继,不明白的建议了解链表删除的知识点)
            QNode dn; // unlinked节点的next
            if (d == null ||               // d is gone or
                d == dp ||                 // d is off list or
                !d.isCancelled() ||        // d not cancelled or
                (d != t &&                 // d not tail and
                 (dn = d.next) != null &&  // has successor
                 dn != d &&                // that is on list
                 dp.casNext(d, dn)))       // d unspliced -> 走到这里会执行unlinked动作
                casCleanMe(dp, null); // 重置cleanMe为null
            if (dp == pred) // cleanMe == pred,证明待处理的s的前继已经被其他线程给暂存了,那么可以直接返回
                return;      // s is already saved node
        } else if (casCleanMe(null, pred)) // cleanMe为空,尝试将s的前继pred设置到cleanMe,成功后返回,等待下一次调用clean方法再处理
            return;          // Postpone cleaning s
    }
}

非公平实现

非公平实现使用stack来实现,lifo的思想,最后进的元素先完成匹配,使用内部实现类TransferStack来实现非公平版本,与TransferQueue类似,定义了SNode来代表构成stack的节点:

static final class SNode {
    volatile SNode next;        // 指向stack中下一个节点
    volatile SNode match;       // 指向匹配节点的引用
    volatile Thread waiter;     // 指向需要阻塞等待的线程引用
    Object item;                // data; or null for REQUESTs
    int mode;
    // Note: item and mode fields don't need to be volatile
    // since they are always written before, and read after,
    // other volatile/atomic operations.

    SNode(Object item) {
        this.item = item;
    }

    boolean casNext(SNode cmp, SNode val) {
        return cmp == next &&
            UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
    }

    // match的时候会尝试唤醒阻塞等待的线程
    boolean tryMatch(SNode s) {
        if (match == null &&
            UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
            Thread w = waiter;
            if (w != null) {    // waiters need at most one unpark
                waiter = null;
                LockSupport.unpark(w);
            }
            return true;
        }
        return match == s;
    }

    // 将match设置为自身引用来表达节点cancel的场景
    void tryCancel() {
        UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
    }

    boolean isCancelled() {
        return match == this;
    }

    // Unsafe mechanics
    private static final sun.misc.Unsafe UNSAFE;
    private static final long matchOffset;
    private static final long nextOffset;

    static {
        try {
            UNSAFE = sun.misc.Unsafe.getUnsafe();
            Class<?> k = SNode.class;
            matchOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("match"));
            nextOffset = UNSAFE.objectFieldOffset
                (k.getDeclaredField("next"));
        } catch (Exception e) {
            throw new Error(e);
        }
    }
}

栈节点定义中有个mode字段,取值来自TransferStack定义的常量:

// 代表take操作
static final int REQUEST    = 0;
// 代表put操作
static final int DATA       = 1;
// 代表节点正在进行匹配动作
static final int FULFILLING = 2;

同样的,重点还是在内部实现的transfer方法:

@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {

    SNode s = null; // constructed/reused as needed
    int mode = (e == null) ? REQUEST : DATA;

    for (;;) {
        SNode h = head;
        if (h == null || h.mode == mode) {  // stack为空或者该操作与栈顶节点是相同模式(take / put)
            if (timed && nanos <= 0) {      // 超时等待达到超时时间
                if (h != null && h.isCancelled())
                    casHead(h, h.next);     // 这里需要处理节点被cancel的场景
                else
                    return null;
            } else if (casHead(h, s = snode(s, e, h, mode))) { // 节点入栈顶
                SNode m = awaitFulfill(s, timed, nanos); // 阻塞等待,等待被匹配,返回匹配的节点
                if (m == s) {               // m == s表示s被cancel,那么执行清理动作(Tips: 这里的clean比queue的实现不太一样,后面分析)
                    clean(s);
                    return null;
                }
                // 匹配,弹出节点,然后返回数据(take返回匹配节点的数据,put返回null)
                if ((h = head) != null && h.next == s)
                    casHead(h, s.next);     // help s's fulfiller
                return (E) ((mode == REQUEST) ? m.item : s.item);
            }
        } else if (!isFulfilling(h.mode)) { // 栈顶元素还没有被匹配,尝试匹配
            if (h.isCancelled())            // 同样需要判断是否cancel
                casHead(h, h.next);         // pop and retry
            else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
                for (;;) { // 循环找匹配节点,若发现stack所有能匹配的节点都被匹配完了,那么清空stack并跳出循环
                    SNode m = s.next;       // m is s's match
                    if (m == null) {        // 没有找到匹配节点,栈空,尝试清空栈,跳出循环
                        casHead(s, null);   // pop fulfill node
                        s = null;           // use new node next time
                        break;              // restart main loop
                    }
                    // 找到匹配节点m, mn指向匹配节点的下一个节点
                    SNode mn = m.next;
                    if (m.tryMatch(s)) {
                        // 成功匹配,弹出当前节点s和匹配的节点m,然后返回数据
                        casHead(s, mn);     // pop both s and m
                        return (E) ((mode == REQUEST) ? m.item : s.item);
                    } else                  // 匹配失败,证明match节点被其他线程进行匹配了,尝试unlink match节点
                        s.casNext(m, mn);   // help unlink
                }
            }
        } else {                            // 栈顶元素被标记为匹配中模式,协助进行匹配
            SNode m = h.next;               // m is h's match
            if (m == null)                  // 没有找到匹配节点,尝试清空stack
                casHead(h, null);           // pop fulfilling node
            else {
                SNode mn = m.next;
                if (m.tryMatch(h))          // 尝试匹配,成功则弹出栈顶和匹配的match节点
                    casHead(h, mn);         // pop both h and m
                else                        // 匹配失败,证明已经被其他线程匹配了,那么unlink这个match节点
                    h.casNext(m, mn);       // help unlink
            }
        }
    }
}

awaitFulfill方法相比TransferQueue要简单一些:


SNode awaitFulfill(SNode s, boolean timed, long nanos) {
    final long deadline = timed ? System.nanoTime() + nanos : 0L;
    Thread w = Thread.currentThread();
    int spins = (shouldSpin(s) ?
                 (timed ? maxTimedSpins : maxUntimedSpins) : 0);
    for (;;) {
        if (w.isInterrupted()) // 判断线程是否被中断
            s.tryCancel();
        SNode m = s.match; // 取match, 不为null 则证明被其他线程匹配了(Tips: SNode的tryMatch方法), 此时可以返回
        if (m != null)
            return m;
        if (timed) { // 超时await的处理
            nanos = deadline - System.nanoTime();
            if (nanos <= 0L) {
                s.tryCancel(); // 超时则设置cancel,Tips: cancel是把match设置为自身引用
                continue;
            }
        }
        if (spins > 0)
            spins = shouldSpin(s) ? (spins-1) : 0;
        else if (s.waiter == null)
            s.waiter = w; // establish waiter so can park next iter
        else if (!timed)
            LockSupport.park(this); // 阻塞等待
        else if (nanos > spinForTimeoutThreshold)
            LockSupport.parkNanos(this, nanos); // 超时阻塞等待
    }
}

再看看clean方法,与TransferQueue的处理不一样,没有使用暂存的思路来做,而是每一次调用clean必然确保要清除的节点会被直接清除掉,而且还会额外处理掉其他cancel的节点:

根据注释的提示,最差的情况会导致遍历整个stack来实现清除指定的节点,而且在多线程条件下,会出现需要clean的节点被其他线程removed的情况(其他线程也调用了clean方法),当然最后也能达到效果,只是该clean的执行者是其他线程而已,无意中而为之

void clean(SNode s) {
    s.item = null;   // forget item
    s.waiter = null; // forget thread

    // 找s的第一个合理后继节点(非cancel,当然有可能为null)
    SNode past = s.next;
    if (past != null && past.isCancelled())
        past = past.next;

    // 协助处理从栈顶被取消的节点,设置新的栈顶
    SNode p;
    while ((p = head) != null && p != past && p.isCancelled())
        casHead(p, p.next);

    // 处理新的栈顶p和和前面s的第一个合理后继past之间被cancel的节点(unlinked的处理)
    while (p != null && p != past) {
        SNode n = p.next;
        if (n != null && n.isCancelled())
            p.casNext(n, n.next);
        else
            p = n;
    }
}

上面的处理逻辑,总能保证s一定被unlinked掉,无论是自己做,还是通过其他协助者完成(其他线程调用clean)

总结

  • SynchronousQueue,阻塞队列,无实际容量,take需要等待一个put才能解除阻塞,反之亦然
  • 内部有公平实现和非公平实现,默认使用非公平实现,吞吐量更高,非公平性采用栈的概念,后来的先匹配,而公平性可想而知就是FIFO,采用队列的思想,先来的先匹配
  • 使用场景不多,可参考Executors中的newCachedThreadPool实现

反反复复看了多次SynchronousQueue,才渐渐地摸透当中的核心原理,思想比较简单,但是代码细节比较难琢磨透彻,写下这篇文章的时候还有一些细节看得不是很明白(比如公平实现和非公平实现中clean的处理为何会不一样?TransferStack中FULFILLING定义存在的意义是啥?等等)以后若有时间摸得更透会继续补充

    原文作者:JUC
    原文地址: https://blog.csdn.net/d6619309/article/details/80958228
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞