基本介绍
SynchronousQueue
JUC阻塞队列的一种,队列无实际容量,一个put操作必须等待一个take操作的执行,才会解除阻塞状态。将put考虑为生产者的话,需要等待消费者,即take操作来取它的数据,才能够从阻塞等待中返回。数据在SynchronousQueue
中是被直接传递的,由put线程传递给take线程
使用场景
SynchronousQueue
的使用场景比较少见,具体的一个场景可以参考Executors
的newCachedThreadPool
方法
public static ExecutorService newCachedThreadPool() {
return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
60L, TimeUnit.SECONDS,
new SynchronousQueue<Runnable>());
}
JDK线程池实现中,每当没有空闲线程执行任务的时候,会尝试将任务offer到任务队列,若队列满了,如果线程数上限(maxThreadSize,构造方法第二个参数)还没达到,则构造一个新的线程,使用该线程来运行任务。
newCachedThreadPool使用SynchronousQueue
作为任务队列,提交任务时若遇到无空闲线程,则每次offer都会失败,因为线程上限接近无穷大,则每次都会创建新的线程来执行,并且因为设置了线程空闲超时时间(默认60s),所以该线程池构造方法构造的线程池具备线程数可伸缩(空闲太多则销毁,不够则创建)的能力,不会出现任务被reject的现象(但是若任务一直处理不过来,很可能会因为创建太多线程跑任务导致OOM)
源码分析
Tips:
1.学习源码之前需要掌握数据结构知识里面的链表、队列(链表方式实现的队列)、栈(链表方式实现的栈)的一些概念和常规操作
2.采用源码+注释的方式来讲解,分析主要在中文注释上面
同其他阻塞队列实现,重点还是在于put和take的实现,SynchronousQueue
使用内部类Transfer
来统一实现存取操作,put/take等操作都是调用Transfer
的transfer方法来实现其逻辑:
abstract static class Transferer<E> {
abstract E transfer(E e, boolean timed, long nanos);
}
put调用transfer传入实际的数据,而take调用transfer则传入null,代表是要取数据的一方
Transfer
有两种实现方案,公平实现和非公平实现。公平实现采用队列的思想(fifo)实现;而非公平实现采用栈的思想(lifo)实现。公平性体现在先进的阻塞等待线程(可以是阻塞的put or 阻塞的take)优先获得匹配的权利,解除阻塞状态(先进先出)
公平实现
实现类为TransferQueue
,首先看Queue的构造节点实现类QNode
的表示:
static final class QNode {
volatile QNode next; // 指向队列下一个节点
volatile Object item; // 存储数据,同时用于处理canceled的情况(设置为自身this)
volatile Thread waiter; // 存储当前线程,控制线程阻塞与唤醒
final boolean isData; // 该节点是否为数据节点(true: 生产者,put操作; false: 消费者,take操作)
// 构造方法
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
// 定义的一些方法,用于实现cas设置next,item,同时提供tryCanncel方法以及判断当前节点是否为cancel等方法
boolean casNext(QNode cmp, QNode val) {
return next == cmp &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
boolean casItem(Object cmp, Object val) {
return item == cmp &&
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, val);
}
// cancel将item设置为自身引用
void tryCancel(Object cmp) {
UNSAFE.compareAndSwapObject(this, itemOffset, cmp, this);
}
boolean isCancelled() {
return item == this;
}
boolean isOffList() {
return next == this;
}
// next以及item cas的设置依赖Unsafe机制
private static final sun.misc.Unsafe UNSAFE;
private static final long itemOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = QNode.class;
itemOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("item"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
TransferQueue
中使用head和tail分别指向队列头和队列尾,注意head是一个dumy node,除此之外还有一个cleanMe字段,在讲解后面的clean逻辑会涉及到该字段,最后也使用Unsafe来实现CAS操作
这里就不贴代码了,感兴趣的直接看下源码
SynchronousQueue
中TransferQueue
的定义即可
TransferQueue
实现了Transfer
抽象类,实现具体传递逻辑在transfer方法内,下面分析重点方法transfer:
主要流程:
1. 如果队列为空或者调用者与队列中元素属于同一模式(put, take),则构造一个新节点添加到队尾,且使得当前调用线程处于阻塞等待(等待被匹配),直到被唤醒,由于唤醒可能是由于cancel引起的,因此需要处理cancel的情况(调用clean进行清理动作),若不是由于cancel引起的唤醒,则表示被匹配
2. 否则(相对于1),尝试匹配并唤醒队列头节点,若匹配成功,则返回其值(take线程获取匹配到的节点的item,put线程返回null)
过程中会有很多强化判断,防止出现不一致读(多线程并发无锁的实现,一定是需要考虑很多不一致读的场景)
E transfer(E e, boolean timed, long nanos) {
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 防空判断
continue; // spin
if (h == t || t.isData == isData) { // 队列为空或者是当前操作与队列中节点(队尾节点)属于同一个操作,take or put
QNode tn = t.next;
if (t != tail) // 有并发操作更新了tail
continue;
if (tn != null) { // 并发操作导致tail改变,需要重新设置tail
advanceTail(t, tn);
continue;
}
if (timed && nanos <= 0) // 达到超时等待时间,直接返回null
return null;
// 构造节点并追加到队尾
if (s == null)
s = new QNode(e, isData);
if (!t.casNext(null, s)) // CAS失败,从头来过
continue;
advanceTail(t, s); // 前面追加到尾部之后,需要重新设置tail
// 然后需要将进行该操作的线程进行阻塞等待,等待出现匹配动作,由这个动作的触发者对它进行唤醒,返回的是匹配节点(QNode)的item
Object x = awaitFulfill(s, e, timed, nanos);
if (x == s) { // cancel的时候会将item(x)设置为自身(s),这里判断是否为被canncel,如果是则对节点进行清理
clean(t, s);
return null;
}
if (!s.isOffList()) { // 发现节点还没有从队列中摘除,进行摘除操作
advanceHead(t, s); // unlink if head
if (x != null) // 断开s(QNode)中属性的引用
s.item = s;
s.waiter = null;
}
// Tips: take操作返回之前put的数据,put操作则返回null(e为null)
return (x != null) ? (E)x : e;
} else { // 可以尝试匹配的分支
QNode m = h.next; // 队列头(真实节点是head的next)是待匹配的节点
if (t != tail || m == null || h != head)
continue; // 判断是否有并发操作引起导致tail,head,head.next的变化
Object x = m.item;
if (isData == (x != null) || // m(待匹配的节点)已经被匹配过?并发匹配问题?
x == m || // m 是canceled的(Tips: tryCancel会将item设置为节点自身引用)
!m.casItem(x, e)) { // CAS递交元素失败
advanceHead(h, m); // 满足上述条件,将节点出队
continue;
}
// 走到这里证明成功匹配,节点出队,并唤醒匹配节点的等待线程,并将结果带回
advanceHead(h, m);
LockSupport.unpark(m.waiter);
// Tips: take操作返回之前put的数据,put操作则返回null(e为null)
return (x != null) ? (E)x : e;
}
}
}
下面看下阻塞等待方法awaitFulfill:
Object awaitFulfill(QNode s, E e, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
// 自旋次数计算
int spins = ((head.next == s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e); // 线程被中断,取消节点
Object x = s.item;
// item的值被替换过,有两种情况:
// 1. 有匹配的节点线程对其进行过处理,返回匹配的值(该阻塞线程若是put,返回null,若是take,返回put给它CAS设置的item)
// 2. 由下面的超时阻塞等待,若到了超时时间,则tryCancel设置item为自身引用,此时返回的是s自身,这也解释了为何在transfer方法中阻塞等待返回之后需要判断是否cancel的处理: if (x == s)
if (x != e)
return x;
if (timed) { // 超时阻塞等待的处理
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this); // 非超时的阻塞等待,等待被唤醒,具体的唤醒逻辑在transfer的里面(unpark调用)
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos); // 超时的阻塞等待
}
}
还有个clean方法,比较难懂,猜测其存在的意义如下:
我们注意到在
TransferQueue
的定义中,除了定义指向哨兵队头的head,队尾的tail,还有一个属性叫cleanMe,该属性在clean方法里面会被用到,主要作用是用来辅助进行清理。当TransferQueue
的最后一个元素需要被移除(可能是cancel等原因导致的移除),此时该元素代表队尾,移除动作无法直接在队尾上面执行!因为TransferQueue
是单链表的形式实现,总不可能从头遍历来进行移除,一是效率,二是还要考虑超级多的并发,所以遍历过程可能会因为多线程并发操作导致非一致性的读。所以这里使用了一个叫cleanMe的内部属性来暂存此时需要被移除的队尾节点的前继节点,等待下一次进入该方法的时候进行移除操作(下一次进来tail肯定发生了变化)
clean方法的代码分析如下:
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
// pred为期望的s的前继节点
// 若pred不为s的前继节点,不进入处理逻辑,证明s已经被unlinked了
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next; // Absorb cancelled first node as head
// 处理节点被cancel的情况,直接让节点出队
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn);
continue;
}
// 确保tail是真正的tail
QNode t = tail; // Ensure consistent read for tail
if (t == h)
return;
QNode tn = t.next;
if (t != tail)
continue;
if (tn != null) {
advanceTail(t, tn);
continue;
}
// 如果s不是tail,那么尝试直接将s unlinked
if (s != t) { // If not tail, try to unsplice
QNode sn = s.next;
// s.next = s 表示节点已经unlinked了
if (sn == s || pred.casNext(s, sn))
return; // 成功unlinked则直接返回
}
// 走到这里,主要场景是 s == tail(要清除的节点是tail的处理场景)
QNode dp = cleanMe;
if (dp != null) { // cleanMe不为null,有节点需要处理
QNode d = dp.next; // 指向需要unlinked的节点(Tips: cleanMe总是暂存待删除节点的前继,不明白的建议了解链表删除的知识点)
QNode dn; // unlinked节点的next
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // d unspliced -> 走到这里会执行unlinked动作
casCleanMe(dp, null); // 重置cleanMe为null
if (dp == pred) // cleanMe == pred,证明待处理的s的前继已经被其他线程给暂存了,那么可以直接返回
return; // s is already saved node
} else if (casCleanMe(null, pred)) // cleanMe为空,尝试将s的前继pred设置到cleanMe,成功后返回,等待下一次调用clean方法再处理
return; // Postpone cleaning s
}
}
非公平实现
非公平实现使用stack来实现,lifo的思想,最后进的元素先完成匹配,使用内部实现类TransferStack
来实现非公平版本,与TransferQueue
类似,定义了SNode
来代表构成stack的节点:
static final class SNode {
volatile SNode next; // 指向stack中下一个节点
volatile SNode match; // 指向匹配节点的引用
volatile Thread waiter; // 指向需要阻塞等待的线程引用
Object item; // data; or null for REQUESTs
int mode;
// Note: item and mode fields don't need to be volatile
// since they are always written before, and read after,
// other volatile/atomic operations.
SNode(Object item) {
this.item = item;
}
boolean casNext(SNode cmp, SNode val) {
return cmp == next &&
UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);
}
// match的时候会尝试唤醒阻塞等待的线程
boolean tryMatch(SNode s) {
if (match == null &&
UNSAFE.compareAndSwapObject(this, matchOffset, null, s)) {
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
return match == s;
}
// 将match设置为自身引用来表达节点cancel的场景
void tryCancel() {
UNSAFE.compareAndSwapObject(this, matchOffset, null, this);
}
boolean isCancelled() {
return match == this;
}
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long matchOffset;
private static final long nextOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = SNode.class;
matchOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("match"));
nextOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("next"));
} catch (Exception e) {
throw new Error(e);
}
}
}
栈节点定义中有个mode字段,取值来自TransferStack
定义的常量:
// 代表take操作
static final int REQUEST = 0;
// 代表put操作
static final int DATA = 1;
// 代表节点正在进行匹配动作
static final int FULFILLING = 2;
同样的,重点还是在内部实现的transfer方法:
@SuppressWarnings("unchecked")
E transfer(E e, boolean timed, long nanos) {
SNode s = null; // constructed/reused as needed
int mode = (e == null) ? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // stack为空或者该操作与栈顶节点是相同模式(take / put)
if (timed && nanos <= 0) { // 超时等待达到超时时间
if (h != null && h.isCancelled())
casHead(h, h.next); // 这里需要处理节点被cancel的场景
else
return null;
} else if (casHead(h, s = snode(s, e, h, mode))) { // 节点入栈顶
SNode m = awaitFulfill(s, timed, nanos); // 阻塞等待,等待被匹配,返回匹配的节点
if (m == s) { // m == s表示s被cancel,那么执行清理动作(Tips: 这里的clean比queue的实现不太一样,后面分析)
clean(s);
return null;
}
// 匹配,弹出节点,然后返回数据(take返回匹配节点的数据,put返回null)
if ((h = head) != null && h.next == s)
casHead(h, s.next); // help s's fulfiller
return (E) ((mode == REQUEST) ? m.item : s.item);
}
} else if (!isFulfilling(h.mode)) { // 栈顶元素还没有被匹配,尝试匹配
if (h.isCancelled()) // 同样需要判断是否cancel
casHead(h, h.next); // pop and retry
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {
for (;;) { // 循环找匹配节点,若发现stack所有能匹配的节点都被匹配完了,那么清空stack并跳出循环
SNode m = s.next; // m is s's match
if (m == null) { // 没有找到匹配节点,栈空,尝试清空栈,跳出循环
casHead(s, null); // pop fulfill node
s = null; // use new node next time
break; // restart main loop
}
// 找到匹配节点m, mn指向匹配节点的下一个节点
SNode mn = m.next;
if (m.tryMatch(s)) {
// 成功匹配,弹出当前节点s和匹配的节点m,然后返回数据
casHead(s, mn); // pop both s and m
return (E) ((mode == REQUEST) ? m.item : s.item);
} else // 匹配失败,证明match节点被其他线程进行匹配了,尝试unlink match节点
s.casNext(m, mn); // help unlink
}
}
} else { // 栈顶元素被标记为匹配中模式,协助进行匹配
SNode m = h.next; // m is h's match
if (m == null) // 没有找到匹配节点,尝试清空stack
casHead(h, null); // pop fulfilling node
else {
SNode mn = m.next;
if (m.tryMatch(h)) // 尝试匹配,成功则弹出栈顶和匹配的match节点
casHead(h, mn); // pop both h and m
else // 匹配失败,证明已经被其他线程匹配了,那么unlink这个match节点
h.casNext(m, mn); // help unlink
}
}
}
}
awaitFulfill方法相比TransferQueue
要简单一些:
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
final long deadline = timed ? System.nanoTime() + nanos : 0L;
Thread w = Thread.currentThread();
int spins = (shouldSpin(s) ?
(timed ? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted()) // 判断线程是否被中断
s.tryCancel();
SNode m = s.match; // 取match, 不为null 则证明被其他线程匹配了(Tips: SNode的tryMatch方法), 此时可以返回
if (m != null)
return m;
if (timed) { // 超时await的处理
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
s.tryCancel(); // 超时则设置cancel,Tips: cancel是把match设置为自身引用
continue;
}
}
if (spins > 0)
spins = shouldSpin(s) ? (spins-1) : 0;
else if (s.waiter == null)
s.waiter = w; // establish waiter so can park next iter
else if (!timed)
LockSupport.park(this); // 阻塞等待
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos); // 超时阻塞等待
}
}
再看看clean方法,与TransferQueue
的处理不一样,没有使用暂存的思路来做,而是每一次调用clean必然确保要清除的节点会被直接清除掉,而且还会额外处理掉其他cancel的节点:
根据注释的提示,最差的情况会导致遍历整个stack来实现清除指定的节点,而且在多线程条件下,会出现需要clean的节点被其他线程removed的情况(其他线程也调用了clean方法),当然最后也能达到效果,只是该clean的执行者是其他线程而已,无意中而为之
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
// 找s的第一个合理后继节点(非cancel,当然有可能为null)
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// 协助处理从栈顶被取消的节点,设置新的栈顶
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// 处理新的栈顶p和和前面s的第一个合理后继past之间被cancel的节点(unlinked的处理)
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
上面的处理逻辑,总能保证s一定被unlinked掉,无论是自己做,还是通过其他协助者完成(其他线程调用clean)
总结
SynchronousQueue
,阻塞队列,无实际容量,take需要等待一个put才能解除阻塞,反之亦然- 内部有公平实现和非公平实现,默认使用非公平实现,吞吐量更高,非公平性采用栈的概念,后来的先匹配,而公平性可想而知就是FIFO,采用队列的思想,先来的先匹配
- 使用场景不多,可参考
Executors
中的newCachedThreadPool实现
反反复复看了多次
SynchronousQueue
,才渐渐地摸透当中的核心原理,思想比较简单,但是代码细节比较难琢磨透彻,写下这篇文章的时候还有一些细节看得不是很明白(比如公平实现和非公平实现中clean的处理为何会不一样?TransferStack
中FULFILLING定义存在的意义是啥?等等)以后若有时间摸得更透会继续补充