Jdk1.6 JUC源码解析(15)-SynchronousQueue
作者:大飞
功能简介:
- SynchronousQueue是一种特殊的阻塞队列,它本身没有容量,只有当一个线程从队列取数据的同时,另一个线程才能放一个数据到队列中,反之亦然。存取过程相当于一个线程把数据(安全的)交给另一个线程的过程。
- SynchronousQueue也支持公平和非公平模式。
源码分析:
- SynchronousQueue内部采用伪栈和伪队列来实现,分别对应非公平模式和公平模式。先看下这部分实现。
伪栈和伪队列的公共基类:
static abstract class Transferer {
/**
* 转移数据的方法,用来实现put或者take。
*
* @param e 如果不为null,相当于将一个数据交给消费者;
* 如果为null,相当于从一个生产者接收一个消费者交出的数据。
* @param timed 操作是否支持超时。
* @param nanos 超时时间,单位纳秒。
* @return if non-null, the item provided or received; if null,
* the operation failed due to timeout or interrupt --
* the caller can distinguish which of these occurred
* by checking Thread.interrupted.
*/
abstract Object transfer(Object e, boolean timed, long nanos);
}
先看下伪栈实现,内部结构如下:
static final class TransferStack extends Transferer {
/** 表示一个没有得到数据的消费者 */
static final int REQUEST = 0;
/** 表示一个没有交出数据的生产者 */
static final int DATA = 1;
/**
* 表示正在匹配另一个生产者或者消费者。
*/
static final int FULFILLING = 2;
/** 判断是否包含正在匹配(FULFILLING)的标记 */
static boolean isFulfilling(int m) { return (m & FULFILLING) != 0; }
/** Node class for TransferStacks. */
static final class SNode {
volatile SNode next; // 栈中的下一个节点
volatile SNode match; // 和当前节点完成匹配的节点
volatile Thread waiter; // to control park/unpark
Object item; // data; or null for REQUESTs
int mode;
// Note: item和mode不需要volatile修饰,
// 是因为它们在其他的volatile/atomic操作之前写,之后读。
SNode(Object item) {
this.item = item;
}
static final AtomicReferenceFieldUpdater<SNode, SNode>
nextUpdater = AtomicReferenceFieldUpdater.newUpdater
(SNode.class, SNode.class, "next");
boolean casNext(SNode cmp, SNode val) {
return (cmp == next &&
nextUpdater.compareAndSet(this, cmp, val));
}
static final AtomicReferenceFieldUpdater<SNode, SNode>
matchUpdater = AtomicReferenceFieldUpdater.newUpdater
(SNode.class, SNode.class, "match");
/**
* 尝试匹配节点s和当前节点,如果匹配成功,唤醒等待线程。
* (向消费者传递数据或向生产者获取数据)调用tryMatch方法
* 来确定它们的等待线程,然后唤醒这个等待线程。
*
* @param s the node to match
* @return true if successfully matched to s
*/
boolean tryMatch(SNode s) {
if (match == null &&
matchUpdater.compareAndSet(this, null, s)) {
//如果当前节点的match为空,那么CAS设置s为match,然后唤醒waiter。
Thread w = waiter;
if (w != null) { // waiters need at most one unpark
waiter = null;
LockSupport.unpark(w);
}
return true;
}
//如果match不为null,或者CAS设置match失败,那么比较match和s是否为相同对象。
//如果相同,说明已经完成匹配,匹配成功。
return match == s;
}
/**
* 尝试取消当前节点(有线程等待),通过将match设置为自身。
*/
void tryCancel() {
matchUpdater.compareAndSet(this, null, this);
}
boolean isCancelled() {
return match == this;
}
}
/** The head (top) of the stack */
volatile SNode head;
static final AtomicReferenceFieldUpdater<TransferStack, SNode>
headUpdater = AtomicReferenceFieldUpdater.newUpdater
(TransferStack.class, SNode.class, "head");
下面看下伪栈中transfer方法实现细节吧:
/**
* Puts or takes an item.
*/
Object transfer(Object e, boolean timed, long nanos) {
/*
* 基本算法是在一个无限循环中尝试下面三种情况里面的一种:
*
* 1. 如果当前栈为空或者包含与给定节点模式相同的节点,尝试
* 将节点压入栈内,并等待一个匹配节点,最后返回匹配节点
* 或者null(如果被取消)。
*
* 2. 如果当前栈包含于给定节点模式互补的节点,尝试将这个节
* 点打上FULFILLING标记,然后压入栈中,和相应的节点进行
* 匹配,然后将两个节点(当前节点和互补节点)弹出栈,并返
* 回匹配节点的数据。匹配和删除动作不是必须要做的,因为
* 其他线程会执行动作3:
*
* 3. 如果栈顶已经存在一个FULFILLING(正在满足其他节点)的节
* 点,帮助这个节点完成匹配和移除(出栈)的操作。然后继续
* 执行(主循环)。这部分代码基本和动作2的代码一样,只是
* 不会返回节点的数据。
*/
SNode s = null; // constructed/reused as needed
int mode = (e == null)? REQUEST : DATA;
for (;;) {
SNode h = head;
if (h == null || h.mode == mode) { // head为null或者head和e的mode相同。
if (timed && nanos <= 0) { // 如果超时
if (h != null && h.isCancelled())
casHead(h, h.next); // 如果h不为null且被取消,弹出h。
else
return null; // 否则返回null。
} else if (casHead(h, s = snode(s, e, h, mode))) {//创建一个SNode,赋给s,将原本的head节点做为其next节点,并尝试将其设置为新的head。
//等待其他线程来满足当前线程。
SNode m = awaitFulfill(s, timed, nanos);
if (m == s) { // awaitFulfill方法返回后,判断下是否被取消。
clean(s); // 如果取消,清理一下s节点。
return null;
}
if ((h = head) != null && h.next == s) //因为上面已经将s设置为head,如果满足这个条件说明有其他节点t插入到s前面,变成了head,而且这个t就是和s匹配的节点,他们已经完成匹配。
casHead(h, s.next); // 将s的next节点设置为head。相当于把s和t一起移除了。
return mode == REQUEST? m.item : s.item;
}
} else if (!isFulfilling(h.mode)) {
/*
* 如果栈中存在头节点,且和当前节点不是相同模式,
* 那么说明它们是一对儿对等的节点,尝试用当前节
* 点s来满足h节点。
*/
if (h.isCancelled()) // 如果h节点已经被取消
casHead(h, h.next); // 将h节点弹出,并将h节点的next节点设置为栈的head。
else if (casHead(h, s=snode(s, e, h, FULFILLING|mode))) {//尝试将当前节点打上"正在匹配"的标记,并设置为head。
for (;;) {
SNode m = s.next; // s是当前节点,m是s的next节点,它们是正在匹配的两个节点。
if (m == null) { // 如果m为空,可能其他节点把m匹配走了。
casHead(s, null); // 将s弹出
s = null; // 将s置空,下轮循环的时候还会新建。
break; // 回退到主循环再来一次。
}
SNode mn = m.next; // 获取m的next节点,如果s和m匹配成功,mn就得补上head的位置了。
if (m.tryMatch(s)) { // 尝试匹配一下,匹配成功的话会把m上等待的线程唤醒。
casHead(s, mn); // 如果匹配成功,把s和m弹出。
return (mode == REQUEST)? m.item : s.item;
} else // 没匹配成功的话,说明m可能被其他节点满足了。
s.casNext(m, mn); // 说明m已经被其他节点匹配了,那就把m移除掉。
}
}
} else { // 到这儿的话,说明栈顶的h正在匹配过程中。
SNode m = h.next; // m是h的配对儿,h正在和m匹配。
if (m == null) // 如果m为空,其他节点把m匹配走了。
casHead(h, null); // 弹出h。
else {
SNode mn = m.next; // 获取m的next节点,如果m和h匹配成功,mn就得补上head的位置了。
if (m.tryMatch(h)) // 帮忙匹配一下m和h。
casHead(h, mn); // 匹配成功的话,把h和m弹出。
else // 没匹配成功的话,说明m可能被其他节点满足了。
h.casNext(m, mn); // 没成功的话,说明m已经被其他节点匹配了,那就把m移除掉。
}
}
}
}
看下上面方法中调用的创建节点的snode方法:
static SNode snode(SNode s, Object e, SNode next, int mode) {
if (s == null) s = new SNode(e);
s.mode = mode;
s.next = next;
return s;
}
再看下等待被匹配的方法:
/**
* 自旋/阻塞直到节点被匹配。
*
* @param s the waiting node
* @param timed true if timed wait
* @param nanos timeout value
* @return matched node, or s if cancelled
*/
SNode awaitFulfill(SNode s, boolean timed, long nanos) {
/*
* 在s节点真正阻塞之前,将当前线程设置到s上面,然后检
* 查中断状态(不少于一次),以确保后续和s匹配的节点来
* 唤醒当前线程。
*
* 当执行此方法时,如果执行节点恰好在栈顶,阻塞之前会
* 做一些自旋,为的是如果有生产者或消费者马上到来,就
* 不需要执行节点阻塞了。这种优化在多核下是有意义的。
*/
long lastTime = (timed)? System.nanoTime() : 0;
Thread w = Thread.currentThread();
SNode h = head;
int spins = (shouldSpin(s)?
(timed? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel();//如果当前线程被中断了,那么取消当前节点。
SNode m = s.match;
if (m != null)
return m; //如果已经匹配成功,就返回匹配的节点。
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.tryCancel(); //如果超时了,也取消当前节点。
continue;
}
}
if (spins > 0)
spins = shouldSpin(s)? (spins-1) : 0; //自旋控制,每次循环都检测是否满足自旋条件,满足的话,自旋值就减去1,然后进入下次循环(一直减到0)
else if (s.waiter == null)
s.waiter = w; //第一次循环时,会将当前线程设置到s上。
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos); //有超时条件下,会检测超时时间是否大于超时阀值(这应该是一个经验值),大于就阻塞,小于就自旋。
}
}
/**
* 如果s节点就是当前栈中头节点,或者头节点正在匹配过程中,那么可以自旋一下。
*/
boolean shouldSpin(SNode s) {
SNode h = head;
return (h == s || h == null || isFulfilling(h.mode));
}
//=========下面是自旋相关参数,定义在SynchronousQueue类中============================================
/** The number of CPUs, for spin control */
static final int NCPUS = Runtime.getRuntime().availableProcessors();
/**
* The number of times to spin before blocking in timed waits.
* The value is empirically derived -- it works well across a
* variety of processors and OSes. Empirically, the best value
* seems not to vary with number of CPUs (beyond 2) so is just
* a constant.
*/
static final int maxTimedSpins = (NCPUS < 2)? 0 : 32;
/**
* The number of times to spin before blocking in untimed waits.
* This is greater than timed value because untimed waits spin
* faster since they don't need to check times on each spin.
*/
static final int maxUntimedSpins = maxTimedSpins * 16;
/**
* The number of nanoseconds for which it is faster to spin
* rather than to use timed park. A rough estimate suffices.
*/
static final long spinForTimeoutThreshold = 1000L;
最后看清理节点方法:
/**
* 当s节点被取消时,才会调用这个方法。
*/
void clean(SNode s) {
s.item = null; // forget item
s.waiter = null; // forget thread
/*
* At worst we may need to traverse entire stack to unlink
* s. If there are multiple concurrent calls to clean, we
* might not see s if another thread has already removed
* it. But we can stop when we see any node known to
* follow s. We use s.next unless it too is cancelled, in
* which case we try the node one past. We don't check any
* further because we don't want to doubly traverse just to
* find sentinel.
*/
SNode past = s.next;
if (past != null && past.isCancelled())
past = past.next;
// 将从栈顶节点开始到past的连续的取消节点移除。
SNode p;
while ((p = head) != null && p != past && p.isCancelled())
casHead(p, p.next);
// 如果p本身未取消(上面的while碰到一个未取消的节点就会退出,但这个节点和past节点之间可能还有取消节点),再把p到past之间的取消节点都移除。
while (p != null && p != past) {
SNode n = p.next;
if (n != null && n.isCancelled())
p.casNext(n, n.next);
else
p = n;
}
}
}
再看下伪队列实现,内部结构如下:
static final class TransferQueue extends Transferer {
/** Node class for TransferQueue. */
static final class QNode {
volatile QNode next; // next node in queue
volatile Object item; // CAS'ed to or from null
volatile Thread waiter; // to control park/unpark
final boolean isData;
QNode(Object item, boolean isData) {
this.item = item;
this.isData = isData;
}
...
/**
* 尝试取消节点。
* 取消就是将节点的item域指向自身。
*/
void tryCancel(Object cmp) {
itemUpdater.compareAndSet(this, cmp, this);
}
boolean isCancelled() {
return item == this;
}
/**
* 判断节点是否离开了队列。
*/
boolean isOffList() {
return next == this;
}
}
/** 队列头节点 */
transient volatile QNode head;
/** 队列尾节点 */
transient volatile QNode tail;
/**
* 指向一个被取消的节点,如果取消这个节点时,它是最后一个进入队列的节点,
* 那么这个节点可能还没有离开队列。
*/
transient volatile QNode cleanMe;
TransferQueue() {
QNode h = new QNode(null, false); // 初始化一个哨兵节点。
head = h;
tail = h;
}
看下伪队列中transfer方法实现:
/**
* Puts or takes an item.
*/
Object transfer(Object e, boolean timed, long nanos) {
/*
* 基本算法是在一个无限循环中尝试下面两种动作里面的一种:
*
* 1. 如果队列为空,或者包含相同模式(存或者取)的节点。
* 尝试将节点加入等待的队列,直到被匹配(或被取消),
* 同时返回匹配节点的数据。
*
* 2. 如果队列中包含等待的节点,并且当前节点和这个等待
* 节点能相互匹配,那么尝试匹配等待节点并将这个节点
* 出队,然后返回匹配节点的数据。
*
* 在每个动作里面,都会检测并帮助其他线程来完成节点推进。
*
* 在循环开始的时候会做一个非空检测,以避免当前线程看到
* 未初始化的头尾节点。这种情况在当前SynchronousQueue中
* 永远不会发生,但如果调用者持有一个非volatile/final域
* 的话,就有可能会发生。在循环开始的时间做这个非空检测
* 要比在内部(分支里)做性能好一些。
*/
QNode s = null; // constructed/reused as needed
boolean isData = (e != null);
for (;;) {
QNode t = tail;
QNode h = head;
if (t == null || h == null) // 如果看到未初始化的头尾节点
continue;
if (h == t || t.isData == isData) { // 队列为空或者当前节点和队列中节点模式相同。
QNode tn = t.next;
if (t != tail) // 读取到不一致的结果,说明同时有其他线程修改了tail。
continue;
if (tn != null) { // 说明其他线程已经添加了新节点tn,但还没将其设置为tail。
advanceTail(t, tn); // 当前线程帮忙推进尾节点,就是尝试将tn设置为尾节点。
continue;
}
if (timed && nanos <= 0) // 超时。
return null;
if (s == null)
s = new QNode(e, isData); // 初始化s。
if (!t.casNext(null, s)) // 尝试将当前节点s拼接到t后面。
continue; // 不成功就继续下次循环
advanceTail(t, s); // 尝试将s设置为队列尾节点。
Object x = awaitFulfill(s, e, timed, nanos); // 然后等着被匹配。
if (x == s) { // 如果被取消。
clean(t, s); // 清理s节点。
return null;
}
if (!s.isOffList()) { // 如果s节点还没有离开队列。
advanceHead(t, s); // 尝试将s设置为头节点,移除t。
if (x != null)
s.item = s;
s.waiter = null;
}
return (x != null)? x : e;
} else { // 模式正好互补。
QNode m = h.next; // 找到能匹配的节点。
if (t != tail || m == null || h != head)
continue; // 读取到不一致的结果,进入下一轮循环。
Object x = m.item;
if (isData == (x != null) || // 如果m已经被匹配了。
x == m || // 或者m被取消了。
!m.casItem(x, e)) { // 如果尝试将数据e设置到m上失败。
advanceHead(h, m); // 将h出队,m设置为头结点,然后重试。
continue;
}
advanceHead(h, m); // 成功匹配,推进头节点。
LockSupport.unpark(m.waiter); // 唤醒m上的等待线程。
return (x != null)? x : e;
}
}
}
看下transfer方法中调用的advanceHead和advanceTail方法:
/**
* Tries to cas nh as new head; if successful, unlink
* old head's next node to avoid garbage retention.
*/
void advanceHead(QNode h, QNode nh) {
if (h == head && headUpdater.compareAndSet(this, h, nh))
h.next = h; // forget old next
}
/**
* Tries to cas nt as new tail.
*/
void advanceTail(QNode t, QNode nt) {
if (tail == t)
tailUpdater.compareAndSet(this, t, nt);
}
再看下transfer方法中调用的awaitFulfill方法:
/**
* Spins/blocks until node s is fulfilled.
*
* @param s the waiting node
* @param e the comparison value for checking match
* @param timed true if timed wait
* @param nanos timeout value
* @return matched item, or s if cancelled
*/
Object awaitFulfill(QNode s, Object e, boolean timed, long nanos) {
/* Same idea as TransferStack.awaitFulfill */
long lastTime = (timed)? System.nanoTime() : 0;
Thread w = Thread.currentThread();
int spins = ((head.next == s) ?
(timed? maxTimedSpins : maxUntimedSpins) : 0);
for (;;) {
if (w.isInterrupted())
s.tryCancel(e);
Object x = s.item;
if (x != e)
return x;
if (timed) {
long now = System.nanoTime();
nanos -= now - lastTime;
lastTime = now;
if (nanos <= 0) {
s.tryCancel(e);
continue;
}
}
if (spins > 0)
--spins;
else if (s.waiter == null)
s.waiter = w;
else if (!timed)
LockSupport.park(this);
else if (nanos > spinForTimeoutThreshold)
LockSupport.parkNanos(this, nanos);
}
}
TransferStack的awaitFulfill方法思路差不多,代码就不做解析了。再看下transfer中调用的clean方法:
void clean(QNode pred, QNode s) {
s.waiter = null; // forget thread
/*
* 在任意给定的时间点,能删除的节点一定是最后入队的节点。
* 为了满足这个条件,如果当前无法删除s,就将其前驱节点保
* 存为"cleanMe",先删除之前保存的版本。至少节点s和之前
* 保存的节点里面有一个能被删除,所以方法一定会结束。
*/
while (pred.next == s) { // Return early if already unlinked
QNode h = head;
QNode hn = h.next;
if (hn != null && hn.isCancelled()) {
advanceHead(h, hn); //如果head节点的next节点被取消,那么推进一下head节点。
continue;
}
QNode t = tail; // Ensure consistent read for tail
if (t == h) // 如果队列为空,
return;
QNode tn = t.next;
if (t != tail) // 出现不一致读,重试。
continue;
if (tn != null) {
advanceTail(t, tn); // 帮助推进尾节点。
continue;
}
if (s != t) { // 如果s不是尾节点,移除s。
QNode sn = s.next;
if (sn == s || pred.casNext(s, sn)) //如果s已经被移除退出循环,否则尝试断开s
return;
}
/*
* 下面要做的事情大体就是:如果s是位节点,那么不会马上删除s,
* 而是将s的前驱节点设置为cleanMe,下次清理其他取消节点的时候
* 会顺便把s移除。
*/
QNode dp = cleanMe;
if (dp != null) { // 如果dp不为null,说明是前一个被取消节点,将其移除。
QNode d = dp.next;
QNode dn;
if (d == null || // d is gone or
d == dp || // d is off list or
!d.isCancelled() || // d not cancelled or
(d != t && // d not tail and
(dn = d.next) != null && // has successor
dn != d && // that is on list
dp.casNext(d, dn))) // 把之前标记为cleanMe节点的next节点d移除。
casCleanMe(dp, null);
if (dp == pred)
return; // 说明s的前驱已经是cleanMe了(后续会被删掉)。
} else if (casCleanMe(null, pred))
return; // 如果当前cleanMe为null,那么将s前驱节点设置为cleanMe,并退出。
}
}
小总结一下:
从上面的分析可以看出,伪栈的结构下,新来的线程会作为栈顶节点或者优先和栈顶的等待节点进行匹配,并不是公平的;但伪队列的结构下,新来的线程会在队尾,或者和队头的等待节点(最前到的)进行匹配,能够保证一定的公平性。
- 有了内部伪栈和伪队列的实现,SynchronousQueue实现起来就很容易了,看下代码:
private transient volatile Transferer transferer;
public SynchronousQueue() {
this(false);
}
public SynchronousQueue(boolean fair) {
transferer = (fair)? new TransferQueue() : new TransferStack();
}
/**
* 添加一个数据到队列,等到其他线程接收这个数据。
*
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public void put(E o) throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, false, 0) == null) {
Thread.interrupted();
throw new InterruptedException();
}
}
/**
* 添加一个数据到队列,等到其他线程接收这个数据或者超时。
*
* @return <tt>true</tt> if successful, or <tt>false</tt> if the
* specified waiting time elapses before a consumer appears.
* @throws InterruptedException {@inheritDoc}
* @throws NullPointerException {@inheritDoc}
*/
public boolean offer(E o, long timeout, TimeUnit unit)
throws InterruptedException {
if (o == null) throw new NullPointerException();
if (transferer.transfer(o, true, unit.toNanos(timeout)) != null)
return true;
if (!Thread.interrupted())
return false;
throw new InterruptedException();
}
/**
* 添加一个数据到队列,如果有其他线程正等待接收这个数据且接收成功,返回true;否则返回false。
*
* 这个方法不阻塞。
* @param e the element to add
* @return <tt>true</tt> if the element was added to this queue, else
* <tt>false</tt>
* @throws NullPointerException if the specified element is null
*/
public boolean offer(E e) {
if (e == null) throw new NullPointerException();
return transferer.transfer(e, true, 0) != null;
}
/**
* 获取并移除队列前端的数据,如果队列中没有数据,就等待其他线程添加一个数据。
*
* @return the head of this queue
* @throws InterruptedException {@inheritDoc}
*/
public E take() throws InterruptedException {
Object e = transferer.transfer(null, false, 0);
if (e != null)
return (E)e;
Thread.interrupted();
throw new InterruptedException();
}
/**
* 获取并移除队列前端的数据,如果队列中没有数据,就等待其他线程添加一个数据或者超时。
*
* @return the head of this queue, or <tt>null</tt> if the
* specified waiting time elapses before an element is present.
* @throws InterruptedException {@inheritDoc}
*/
public E poll(long timeout, TimeUnit unit) throws InterruptedException {
Object e = transferer.transfer(null, true, unit.toNanos(timeout));
if (e != null || !Thread.interrupted())
return (E)e;
throw new InterruptedException();
}
/**
* 如果其他线程正在添加数据到队列,那么尝试获取并移除这个数据。
*
* 这个方法不阻塞。
* @return the head of this queue, or <tt>null</tt> if no
* element is available.
*/
public E poll() {
return (E)transferer.transfer(null, true, 0);
}
由于SynchronousQueue没有实际的容量,所以其他方法实现起来很简单了:
public boolean isEmpty() {
return true;
}
public int size() {
return 0;
}
public int remainingCapacity() {
return 0;
}
public void clear() {
}
public boolean contains(Object o) {
return false;
}
public boolean remove(Object o) {
return false;
}
public boolean containsAll(Collection<?> c) {
return c.isEmpty();
}
public boolean removeAll(Collection<?> c) {
return false;
}
public boolean retainAll(Collection<?> c) {
return false;
}
...
- 最后看一下SynchronousQueue的序列化,序列化比较特别,因为transferer域本身不需要序列化,但需要记住transferer是内部伪栈和伪队列:
static class WaitQueue implements java.io.Serializable { }
static class LifoWaitQueue extends WaitQueue {
private static final long serialVersionUID = -3633113410248163686L;
}
static class FifoWaitQueue extends WaitQueue {
private static final long serialVersionUID = -3623113410248163686L;
}
private ReentrantLock qlock;
private WaitQueue waitingProducers;
private WaitQueue waitingConsumers;
/**
* Save the state to a stream (that is, serialize it).
*
* @param s the stream
*/
private void writeObject(java.io.ObjectOutputStream s)
throws java.io.IOException {
//序列化时根据TransferQueue类型来创建WaitQueue实例。
boolean fair = transferer instanceof TransferQueue;
if (fair) {
qlock = new ReentrantLock(true);
waitingProducers = new FifoWaitQueue();
waitingConsumers = new FifoWaitQueue();
}
else {
qlock = new ReentrantLock();
waitingProducers = new LifoWaitQueue();
waitingConsumers = new LifoWaitQueue();
}
s.defaultWriteObject();
}
private void readObject(final java.io.ObjectInputStream s)
throws java.io.IOException, ClassNotFoundException {
s.defaultReadObject();
if (waitingProducers instanceof FifoWaitQueue)
transferer = new TransferQueue();
else
transferer = new TransferStack();
}
SynchronousQueue的代码解析完毕! 参见:
Jdk1.6 JUC源码解析(5)-locks-LockSupport