Jdk1.7 JUC源码增量解析(5)-ForkJoin-ForkJoin框架其他过程及方法
作者:大飞
概述:
- 这篇会看一下ForkJoin框架的其他过程,如取消任务、关闭Pool,以及前面没分析到一些方法。
源码分析:
- 前面我们看到,ForkJoinTask本身也是Future的实现,所以也会有取消过程,看下实现:
public boolean cancel(boolean mayInterruptIfRunning) {
return setCompletion(CANCELLED) == CANCELLED;
}
private static final int CANCELLED = -2;
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
过程很简单,就是设置任务运行状态为取消,然后唤醒在任务上等待的线程。其实前面几篇也分析到了这个流程,这里再提一下。
- 看下ForkJoinPool的本身关闭过程:
public void shutdown() {
checkPermission();
shutdown = true;
tryTerminate(false);
}
public List<Runnable> shutdownNow() {
checkPermission();
shutdown = true;
tryTerminate(true);
return Collections.emptyList();
}
可以用过调用shutdown或者shutdownNow来关闭Pool,shutdown方法会等待之前提交到Pool的task完成再真正关闭Pool,同时不会接受新提交的任务;而shutdownNow方法会尝试取消之前提交到Pool且没有完成的任务并关闭Pool,也不会接受新提交的任务。
注意到里面都调用了tryTerminate方法,看下这个方法:
private boolean tryTerminate(boolean now) {
long c;
while (((c = ctl) & STOP_BIT) == 0) {
if (!now) {
if ((int)(c >> AC_SHIFT) != -parallelism)
return false;
if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
queueBase != queueTop) {
if (ctl == c) // staleness check
return false;
continue;
}
}
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
startTerminating();
}
if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
termination.signalAll();
} finally {
lock.unlock();
}
}
return true;
}
细节:
传参为true,首先会尝试设置停止标记总控信息ctl,设置成功的话为调用startTerminating开始结束Pool(设置失败会再次尝试直到成功),然后会判断Pool中是否还有工作线程,没有的话会唤醒termination条件上的等待线程,然后返回true;没有就直接返回true。
传参为false,如果当前还有活动工作线程,或还有阻塞等待join的工作线程,或者还有未完成的任务,方法会直接返回false。调用shutdown方法,里面的tryTerminate返回false的话,因为shutdown里面设置了关闭状态shutdown为true,当工作线程处理完所有任务,空闲(idle)的时候会判断shutdown标识,如果为true的话会再次调用tryTerminate方法。所以Pool最终会关闭。
tryTerminate方法中会调用startTerminating:
private void startTerminating() {
cancelSubmissions();
for (int pass = 0; pass < 3; ++pass) {
ForkJoinWorkerThread[] ws = workers;
if (ws != null) {
for (ForkJoinWorkerThread w : ws) {
if (w != null) {
w.terminate = true;
if (pass > 0) {
w.cancelTasks();
if (pass > 1 && !w.isInterrupted()) {
try {
w.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
}
terminateWaiters();
}
}
}
这个方法前面几篇也分析到过,这里就不仔细分析了,方法流程如下:
1.取消Pool中submissionQueue中的任务。
2.将所有的工作线程的结束状态设置为true。
3.取消所有工作线程的任务队列中未完成的任务。
4.中断所有工作线程。
5.结束还在等待的工作线程。
实际应用时,我们可能会先调用shutdown方法,然后做一些其他工作,最后会等待Pool真正结束,通过调用awaitTermination方法:
public boolean awaitTermination(long timeout, TimeUnit unit)
throws InterruptedException {
long nanos = unit.toNanos(timeout);
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
for (;;) {
if (isTerminated())
return true;
if (nanos <= 0)
return false;
nanos = termination.awaitNanos(nanos);
}
} finally {
lock.unlock();
}
}
方法实现很简单,就是判断Pool是否真正结束了,没结束的话,就会在termination条件上等待;关闭了就返回true。当然这个方法可能超时。
看下isTerminated方法:
public boolean isTerminated() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
(short)(c >>> TC_SHIFT) == -parallelism);
}
判断逻辑就是同时满足Pool的总控信息中有停止标记 且 总的工作线程数为0。
ForkJoinPool还提供了其他几个相关的判断方法:
public boolean isTerminating() {
long c = ctl;
return ((c & STOP_BIT) != 0L &&
(short)(c >>> TC_SHIFT) != -parallelism);
}
判断Pool是否正在结束过程中,判断逻辑是Pool的总控信息中有停止标记,但总的工作线程数不为0。
final boolean isAtLeastTerminating() {
return (ctl & STOP_BIT) != 0L;
}
判断Pool是否正在结束过程中或者已经结束,只要Pool的总控信息中有停止标记就可以了。
public boolean isShutdown() {
return shutdown;
}
判断Pool是否关闭,只要调用过shutdown或者shutdownNow方法,这个就是true了。
- 到这里,ForkJoin过程中主要的流程都覆盖到了,下面看一些没之前没涉及的代码分析,首先看下ForkJoinWorkerThread相关的。
大家可能会发现ForkJoinWorkerThread中定义了casSlotNull和writeSlot方法,但是貌似没地方用到,这是因为为了提高性能,这两个方法已经被手工内联到相关方法内部了。
ForkJoinWorkerThread方法中提供了一个peekTask方法:
final ForkJoinTask<?> peekTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q == null || (m = q.length - 1) < 0)
return null;
int i = locallyFifo ? queueBase : (queueTop - 1);
return q[i & m];
}
方法逻辑很简单,就是根据工作模式看一下当前任务队列中的下一个(可获取的)任务,这个方法主要是来支持ForkJoinTask的peekNextLocalTask方法:
protected static ForkJoinTask<?> peekNextLocalTask() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.peekTask();
}
ForkJoinTask的子类可以利用这个方法做一些逻辑。
ForkJoinWorkerThread方法中还提供了一个drainTasksTo方法:
final int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
int n = 0;
while (queueBase != queueTop) {
ForkJoinTask<?> t = deqTask();
if (t != null) {
c.add(t);
++n;
}
}
return n;
}
这个方法的逻辑是将当前任务队列中所有方法拿出来放到一个给定的集合里面,并返回放入的任务数量,这个方法是用来支持ForkJoinPool的drainTasksTo方法:
protected int drainTasksTo(Collection<? super ForkJoinTask<?>> c) {
int count = 0;
while (queueBase != queueTop) {
ForkJoinTask<?> t = pollSubmission();
if (t != null) {
c.add(t);
++count;
}
}
ForkJoinWorkerThread[] ws;
if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
(ws = workers) != null) {
for (ForkJoinWorkerThread w : ws)
if (w != null)
count += w.drainTasksTo(c);
}
return count;
}
可见ForkJoinPool的drainTasksTo方法的逻辑是将自身任务队列中的任务和所有工作线程中任务队列的任务都拿出来,放入一个给定集合,并返回放入集合的任务数量。
继续看ForkJoinWorkerThread的getQueueSize方法:
final int getQueueSize() {
return queueTop - queueBase;
}
用来支持ForkJoinTask的getQueuedTaskCount方法,返回当前工作线程任务队列中的任务数量。
public static int getQueuedTaskCount() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.getQueueSize();
}
再看下ForkJoinWorkerThread的pollTask方法:
final ForkJoinTask<?> pollTask() {
ForkJoinWorkerThread[] ws;
ForkJoinTask<?> t = pollLocalTask();
if (t != null || (ws = pool.workers) == null)
return t;
int n = ws.length; // cheap version of FJP.scan
int steps = n << 1;
int r = nextSeed();
int i = 0;
while (i < steps) {
ForkJoinWorkerThread w = ws[(i++ + r) & (n - 1)];
if (w != null && w.queueBase != w.queueTop && w.queue != null) {
if ((t = w.deqTask()) != null)
return t;
i = 0;
}
}
return null;
}
final ForkJoinTask<?> pollLocalTask() {
return locallyFifo ? locallyDeqTask() : popTask();
}
pollTask方法中首先通过pollLocalTask来获取一个本地任务。如果没获取到的话,会继续扫描其它工作线程,来窃取一个任务。如果最后没扫描到,就返回null。这个方法是用来支持ForkJoinTask的pollTask方法:
protected static ForkJoinTask<?> pollTask() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.pollTask();
}
ForkJoinTask的子类可以利用这个方法做一些逻辑。
继续看下ForkJoinWorkerThread的getEstimatedSurplusTaskCount方法:
final int getEstimatedSurplusTaskCount() {
return queueTop - queueBase - pool.idlePerActive();
}
方法返回一个估计的剩余任务数量,内部调用了ForkJoinPool的idlePerActive方法:
final int idlePerActive() {
// Approximate at powers of two for small values, saturate past 4
int p = parallelism;
int a = p + (int)(ctl >> AC_SHIFT);
return (a > (p >>>= 1) ? 0 :
a > (p >>>= 1) ? 1 :
a > (p >>>= 1) ? 2 :
a > (p >>>= 1) ? 4 :
8);
}
idlePerActive方法的大概意思是,当前活动线程的数量越少,返回的值越大。假设p(当前并行度,默认是cpu核数)为32,那么有a=1,返回8;a=3,返回4;a=5,返回2;a=9,返回1;a>16,都返回0。 对于getEstimatedSurplusTaskCount来说就是,如果当前活动的工作线程越多,那么估计的剩余任务数量就越接近自身任务队列中的任务数量(因为大家都在忙,被别人窃取的可能性少一些)。 这个方法主要用于支持ForkJoinTask的getSurplusQueuedTaskCount方法:
public static int getSurplusQueuedTaskCount() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.getEstimatedSurplusTaskCount();
}
ForkJoinTask的子类可以利用这个方法做一些逻辑,比如通过这个方法的判断来决定是否fork任务,减少任务的窃取率,提高整体性能。
最后,ForkJoinWorkerThread中还有一个方法,helpQuiescePool:
final void helpQuiescePool() {
boolean active = true;
ForkJoinTask<?> ps = currentSteal; // to restore below
ForkJoinPool p = pool;
//增加Pool的quiescerCount。
p.addQuiescerCount(1);
for (;;) {
ForkJoinWorkerThread[] ws = p.workers;
ForkJoinWorkerThread v = null;
int n;
//选一个窃取牺牲者。
if (queueTop != queueBase)
//当前队列有任务就选自己。
v = this;
//否则扫描工作线程数组,选一个队列里有任务的作为牺牲者。
else if (ws != null && (n = ws.length) > 1) {
ForkJoinWorkerThread w;
int r = nextSeed(); //就是xor-shift算法,不贴代码了。
int steps = n << 1;
for (int i = 0; i < steps; ++i) {
if ((w = ws[(i + r) & (n - 1)]) != null &&
w.queueBase != w.queueTop) {
v = w;
break;
}
}
}
if (v != null) {
ForkJoinTask<?> t;
if (!active) {
active = true;
p.addActiveCount(1);
}
//窃取并执行任务。
if ((t = (v != this) ? v.deqTask() :
locallyFifo ? locallyDeqTask() : popTask()) != null) {
currentSteal = t;
t.doExec();
currentSteal = ps;
}
}
else {
if (active) {
active = false;
p.addActiveCount(-1);
}
//直到Pool休眠再推出。
if (p.isQuiescent()) {
p.addActiveCount(1);
p.addQuiescerCount(-1);
break;
}
}
}
}
helpQuiescePool方法的主要逻辑就是不断的窃取任务(包括从自身的任务队列中窃取)来执行,直到Pool处于休眠状态。方法执行过程中会修改Pool的quiescerCount和activeCount数量,这起到了防止Pool过早结束的作用,可以回头看一下ForkJoinPool的tryAwaitWork方法,里面可能会tryTerminate方法,会有一个自动结束Pool的代码路径。 看下上面方法对Pool休眠的判断:
public boolean isQuiescent() {
return parallelism + (int)(ctl >> AC_SHIFT) + blockedCount == 0;
}
很简单,逻辑就是当前活动线程数量为0,且阻塞等到join的工作线程数量也为0。 再看下方法中修改quiescerCount和activeCount数量的方法:
final void addQuiescerCount(int delta) {
int c;
do {} while (!UNSAFE.compareAndSwapInt(this, quiescerCountOffset,
c = quiescerCount, c + delta));
}
final void addActiveCount(int delta) {
long d = delta < 0 ? -AC_UNIT : AC_UNIT;
long c;
do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
((c + d) & AC_MASK) |
(c & ~AC_MASK)));
}
典型的原子操作,不用解释了。 当然,ForkJoinWorkerThread的getSurplusQueuedTaskCount方法也是用来支持ForkJoinTask的getSurplusQueuedTaskCount方法的:
public static int getSurplusQueuedTaskCount() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.getEstimatedSurplusTaskCount();
}
- 再看下ForkJoinTask相关的。
看下ForkJoinTask中的externalAwaitDone方法,这个方法在之前分析过程中出现过,但是没有分析。这个方法是在非ForkJoin工作线程合并ForkJoinTask的时候会调用到,看下实现:
private int externalAwaitDone() {
int s;
if ((s = status) >= 0) {
boolean interrupted = false;
synchronized (this) {
while ((s = status) >= 0) {
if (s == 0)
UNSAFE.compareAndSwapInt(this, statusOffset,
0, SIGNAL);
else {
try {
wait();
} catch (InterruptedException ie) {
interrupted = true;
}
}
}
}
if (interrupted)
Thread.currentThread().interrupt();
}
return s;
}
实现很简单,就是给ForkJoinTask加个SIGNAL状态(没有的话),然后在ForkJoinTask上等待,注意被唤醒后会传递中断状态。
继续看下ForkJoinTask中获取执行结果的方法,之前没细节分析,先看下不支持超时的get方法:
public final V get() throws InterruptedException, ExecutionException {
int s = (Thread.currentThread() instanceof ForkJoinWorkerThread) ?
doJoin() : externalInterruptibleAwaitDone(0L);
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
throw new ExecutionException(ex);
return getRawResult();
}
逻辑很明了,如果当前线程是ForkJoin工作线程,那么调用doJoin来获取结果;否则调用externalInterruptibleAwaitDone来获取结果。后面还会处理取消和异常的情况,里面涉及到的方法大部分都分析过,这里只看一下externalInterruptibleAwaitDone方法:
private int externalInterruptibleAwaitDone(long millis)
throws InterruptedException {
int s;
if (Thread.interrupted())
throw new InterruptedException();
if ((s = status) >= 0) {
synchronized (this) {
while ((s = status) >= 0) {
if (s == 0)
UNSAFE.compareAndSwapInt(this, statusOffset,
0, SIGNAL);
else {
wait(millis);
if (millis > 0L)
break;
}
}
}
}
return s;
}
这个方法也是在非ForkJoin线程调用时执行的,逻辑很简单,就是给当前ForkJoin任务加个SIGNAL状态(没有的话),然后在ForkJoinTask上等待,方法支持等待超时和中断。
再看下支持超时的get方法:
public final V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
long nanos = unit.toNanos(timeout);
if (status >= 0) {
boolean completed = false;
if (w.unpushTask(this)) {
try {
completed = exec();
} catch (Throwable rex) {
setExceptionalCompletion(rex);
}
}
if (completed)
setCompletion(NORMAL);
else if (status >= 0 && nanos > 0)
w.pool.timedAwaitJoin(this, nanos);
}
}
else {
long millis = unit.toMillis(timeout);
if (millis > 0)
externalInterruptibleAwaitDone(millis);
}
int s = status;
if (s != NORMAL) {
Throwable ex;
if (s == CANCELLED)
throw new CancellationException();
if (s != EXCEPTIONAL)
throw new TimeoutException();
if ((ex = getThrowableException()) != null)
throw new ExecutionException(ex);
}
return getRawResult();
}
方法逻辑也很简单: 如果当前线程是ForkJoin线程,先尝试从自己的任务队列里pop出给定任务(如果给定任务恰好在当前任务队列的顶端),然后执行任务;否则会调用一个支持超时版本的等待合并任务的方法timedAwaitJoin,等待任务完成。最后返回任务执行结果。 如果当前线程不是ForkJoin线程,会调用externalInterruptibleAwaitDone方法,等待任务完成。最后返回任务执行结果。 方法中调用到了ForkJoinPool的timedAwaitJoin方法:
final void timedAwaitJoin(ForkJoinTask<?> joinMe, long nanos) {
//判断任务状态
while (joinMe.status >= 0) {
//如果任务状态是未完成。
//先清空中断标记。
Thread.interrupted();
if ((ctl & STOP_BIT) != 0L) {
//如果Pool关闭了,取消任务。
joinMe.cancelIgnoringExceptions();
break;
}
//阻塞前的工作。
if (tryPreBlock()) {
long last = System.nanoTime();
while (joinMe.status >= 0) {
long millis = TimeUnit.NANOSECONDS.toMillis(nanos);
if (millis <= 0)
break;
joinMe.tryAwaitDone(millis);//等待任务完成。
if (joinMe.status < 0)
break;
if ((ctl & STOP_BIT) != 0L) {
joinMe.cancelIgnoringExceptions();
break;
}
long now = System.nanoTime();
nanos -= now - last;
last = now;
}
//唤醒后的工作。
postBlock();
break;
}
}
}
逻辑很简单,不说明了。看下内部调用的ForkJoinTask的tryAwaitDone:
final void tryAwaitDone(long millis) {
int s;
try {
if (((s = status) > 0 ||
(s == 0 &&
UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
status > 0) {
synchronized (this) {
if (status > 0)
wait(millis);
}
}
} catch (InterruptedException ie) {
// caller must check termination
}
}
ForkJoinTask中还定义了invoke方法,在Pool的invoke(ForkJoinTask<T> task)中可能会被调用到,看下这个方法:
public final V invoke() {
if (doInvoke() != NORMAL)
return reportResult();
else
return getRawResult();
}
方法实现中调用了doInvoke来执行任务,然后获取结果。看下doInvoke方法:
private int doInvoke() {
int s; boolean completed;
if ((s = status) < 0)
return s;
try {
completed = exec();
} catch (Throwable rex) {
return setExceptionalCompletion(rex);
}
if (completed)
return setCompletion(NORMAL);
else
return doJoin();
}
doInvoke中的逻辑也很简单,先执行任务,然后判断任务是否结束,结束的话设置完成状态;否则join任务等待结果。
再看下ForkJoinTask中的几个invokeAll方法:
public static void invokeAll(ForkJoinTask<?> t1, ForkJoinTask<?> t2) {
t2.fork();
t1.invoke();
t2.join();
}
同时执行两个方法,都执行完毕后返回。注意如果其中有一个方法抛异常的话,另一个方法就可能会被取消。
public static void invokeAll(ForkJoinTask<?>... tasks) {
Throwable ex = null;
int last = tasks.length - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = tasks[i];
if (t == null) {
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = tasks[i];
if (t != null) {
if (ex != null)
t.cancel(false);
else if (t.doJoin() < NORMAL && ex == null)
ex = t.getException();
}
}
if (ex != null)
UNSAFE.throwException(ex);
}
和上面方法逻辑一致,只是处理多个任务。
public static <T extends ForkJoinTask<?>> Collection<T> invokeAll(Collection<T> tasks) {
if (!(tasks instanceof RandomAccess) || !(tasks instanceof List<?>)) {
invokeAll(tasks.toArray(new ForkJoinTask<?>[tasks.size()]));
return tasks;
}
@SuppressWarnings("unchecked")
List<? extends ForkJoinTask<?>> ts =
(List<? extends ForkJoinTask<?>>) tasks;
Throwable ex = null;
int last = ts.size() - 1;
for (int i = last; i >= 0; --i) {
ForkJoinTask<?> t = ts.get(i);
if (t == null) {
if (ex == null)
ex = new NullPointerException();
}
else if (i != 0)
t.fork();
else if (t.doInvoke() < NORMAL && ex == null)
ex = t.getException();
}
for (int i = 1; i <= last; ++i) {
ForkJoinTask<?> t = ts.get(i);
if (t != null) {
if (ex != null)
t.cancel(false);
else if (t.doJoin() < NORMAL && ex == null)
ex = t.getException();
}
}
if (ex != null)
UNSAFE.throwException(ex);
return tasks;
}
和上面方法逻辑一致,可以返回任务集合。
再看下几个判断任务完成状态的方法:
public final boolean isCompletedAbnormally() {
return status < NORMAL;
}
判断任务是否异常结束。
public final boolean isCompletedNormally() {
return status == NORMAL;
}
判断任务是否正常完成。
继续看下join和invoke另外的版本,它们不会返回结果或者抛出异常:
public final void quietlyJoin() {
doJoin();
}
public final void quietlyInvoke() {
doInvoke();
}
ForkJoinTask中还定义了一个重置方法:
public void reinitialize() {
if (status == EXCEPTIONAL)
//如果发生过异常,清空异常表。
clearExceptionalCompletion();
else
status = 0;
}
还有一个tryUnfork方法:
public boolean tryUnfork() {
return ((ForkJoinWorkerThread) Thread.currentThread())
.unpushTask(this);
}
方法内部会尝试将自身从当前工作线程的队列顶端移除,相当于在当前任务没有被调度执行之前取消了自己。
剩下的方法一起看一下:
public static ForkJoinPool getPool() {
Thread t = Thread.currentThread();
return (t instanceof ForkJoinWorkerThread) ?
((ForkJoinWorkerThread) t).pool : null;
}
public static boolean inForkJoinPool() {
return Thread.currentThread() instanceof ForkJoinWorkerThread;
}
方法都很简单,不用解释了。
- 最后看下ForkJoinPool相关的。
先看下ForkJoinPool中的managedBlock方法:
public static void managedBlock(ManagedBlocker blocker)
throws InterruptedException {
Thread t = Thread.currentThread();
if (t instanceof ForkJoinWorkerThread) {
ForkJoinWorkerThread w = (ForkJoinWorkerThread) t;
w.pool.awaitBlocker(blocker);
}
else {
do {} while (!blocker.isReleasable() && !blocker.block());
}
}
managedBlock逻辑如下: 如果当前线程是ForkJoin工作线程,会调用工作线程所在的Pool的awaitBlocker方法。 如果当前线程不是ForkJoin工作线程,就相当于在blocker上阻塞,被唤醒后方法就结束了。
看下managedBlock方法中调用的awaitBlocker方法:
private void awaitBlocker(ManagedBlocker blocker)
throws InterruptedException {
while (!blocker.isReleasable()) {
if (tryPreBlock()) {
try {
do {} while (!blocker.isReleasable() && !blocker.block());
} finally {
postBlock();
}
break;
}
}
}
可见,和managedBlock中非ForkJoin工作的逻辑差不多,只是加了阻塞前后的处理,可以回顾一下这两个方法都做了哪些工作。
细节:
managedBlock的目的是为了保证Pool的并行性。假设具体的ForkJoinTask在执行过程中,被外部阻塞了,相当于执行当前任务的ForkJoin工作线程在Pool以外的阻塞对象(blocker)上阻塞了,对于Pool来说,它还认为这个工作线程是活动的(不像内部阻塞,如join某个任务,在内部会有记录),如果这个阻塞过程持续的时间很长的话,一定会影响Pool的并行性能。如果能将这种外部阻塞也记录到Pool里面,那么工作线程被外部阻塞(阻塞时间很长)的话,Pool就可能会产生一个新的工作线程来保证并行性能。managedBlock方法的目的就是这个,具体使用方式就是通过实现ManagedBlocker接口将外部阻塞记录进来。
看下ManagedBlocker接口,定义在ForkJoinPool里面:
public static interface ManagedBlocker {
/**
* Possibly blocks the current thread, for example waiting for
* a lock or condition.
*
* @return {@code true} if no additional blocking is necessary
* (i.e., if isReleasable would return true)
* @throws InterruptedException if interrupted while waiting
* (the method is not required to do so, but is allowed to)
*/
boolean block() throws InterruptedException;
/**
* Returns {@code true} if blocking is unnecessary.
*/
boolean isReleasable();
}
ManagedBlocker的doc上还提供了示例,看一个:
class ManagedLocker implements ManagedBlocker {
final ReentrantLock lock;
boolean hasLock = false;
ManagedLocker(ReentrantLock lock) { this.lock = lock; }
public boolean block() {
if (!hasLock)
lock.lock();
return true;
}
public boolean isReleasable() {
return hasLock || (hasLock = lock.tryLock());
}
再看下ForkJoinPool中的invoke方法,和提交不同,这个是执行任务然后返回结果,不会异步的,看下实现:
public <T> T invoke(ForkJoinTask<T> task) {
Thread t = Thread.currentThread();
if (task == null)
throw new NullPointerException();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
((ForkJoinWorkerThread)t).pool == this)
return task.invoke(); // bypass submit if in same pool
else {
addSubmission(task);
return task.join();
}
}
逻辑也很清楚,如果当前线程是ForkJoin工作线程,就执行给定任务的invoke方法;否则将任务提交到Pool的任务队列里面,然后join等待任务。
继续看下两个execute方法,这个是异步执行的,无任何返回:
public void execute(ForkJoinTask<?> task) {
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
}
public void execute(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
forkOrSubmit(job);
}
再看个invokeAll方法:
public <T> List<Future<T>> invokeAll(Collection<? extends Callable<T>> tasks) {
ArrayList<ForkJoinTask<T>> forkJoinTasks =
new ArrayList<ForkJoinTask<T>>(tasks.size());
for (Callable<T> task : tasks)
forkJoinTasks.add(ForkJoinTask.adapt(task));
invoke(new InvokeAll<T>(forkJoinTasks));
@SuppressWarnings({"unchecked", "rawtypes"})
List<Future<T>> futures = (List<Future<T>>) (List) forkJoinTasks;
return futures;
}
static final class InvokeAll<T> extends RecursiveAction {
final ArrayList<ForkJoinTask<T>> tasks;
InvokeAll(ArrayList<ForkJoinTask<T>> tasks) { this.tasks = tasks; }
public void compute() {
try { invokeAll(tasks); }
catch (Exception ignore) {}
}
private static final long serialVersionUID = -7914297376763021607L;
}
其实内部调用的是ForkJoinTask的invokeAll。
其他的查询相关的方法可以一起看下,都很简单:
/**
* 获取工作线程工厂。
*/
public ForkJoinWorkerThreadFactory getFactory() {
return factory;
}
/**
* 获取内部工作线程未获取异常处理器。
*/
public Thread.UncaughtExceptionHandler getUncaughtExceptionHandler() {
return ueh;
}
/**
* 获取当前Pool的并行度。
*/
public int getParallelism() {
return parallelism;
}
/**
* 获取总的工作线程数量。
*/
public int getPoolSize() {
return parallelism + (short)(ctl >>> TC_SHIFT);
}
/**
* 获取工作模式(是否异步模式)。
*/
public boolean getAsyncMode() {
return locallyFifo;
}
/**
* 获取运行中的工作线程数量。(近似值)
*/
public int getRunningThreadCount() {
int r = parallelism + (int)(ctl >> AC_SHIFT);
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
/**
* 获取活动的工作数量(包括阻塞的,近似值)。
*/
public int getActiveThreadCount() {
int r = parallelism + (int)(ctl >> AC_SHIFT) + blockedCount;
return (r <= 0) ? 0 : r; // suppress momentarily negative values
}
/**
* 获取Pool内部所有工作线程窃取的任务总数。(近似值)
*/
public long getStealCount() {
return stealCount;
}
/**
* 获取所有工作线程任务队列中的任务总数。(近似值)
*/
public long getQueuedTaskCount() {
long count = 0;
ForkJoinWorkerThread[] ws;
if ((short)(ctl >>> TC_SHIFT) > -parallelism &&
(ws = workers) != null) {
for (ForkJoinWorkerThread w : ws)
if (w != null)
count -= w.queueBase - w.queueTop; // must read base first
}
return count;
}
/**
* 获取Pool的任务队列中的任务数量。(近似值)
*/
public int getQueuedSubmissionCount() {
return -queueBase + queueTop;
}
/**
* 判断Pool的任务队列中是否有任务。
*/
public boolean hasQueuedSubmissions() {
return queueBase != queueTop;
}
最后: Jdk1.7 ForkJoin框架的源码解析都到这里,有不对的地方请指正,欢迎一起讨论交流,感谢。