Jdk1.7 JUC源码增量解析(3)-ForkJoin-非ForkJoin任务的执行过程
作者:大飞
概述:
- 这篇通过分析一个非ForkJoin(Runnable或者Callable)任务的执行过程来分析ForkJoin的相关代码,注意这里说的非ForkJoin任务实际上也是ForkJoinTask,只是没有分裂(fork)/合并(join)过程。
源码分析: 我们看一个非ForkJoin任务的代码示例:
public static void main(String[] args) {
ForkJoinPool forkJoinPool = new ForkJoinPool();
ForkJoinTask<?> task = forkJoinPool.submit(new Runnable() {
@Override
public void run() {
System.out.println("呵呵哒~");
}
});
try {
task.get();
forkJoinPool.shutdown();
} catch (Exception e) {
e.printStackTrace();
}
}
代码中提交了一个Runnable的任务到ForkJoinPool,任务的执行就是打印一句话。 下面就从提交一个Runnable型的任务到ForkJoinPool,直到任务被执行的过程来分析源码。
- 首先需要创建一个ForkJoinPool,我们来看下ForkJoinPool的构造方法:
public ForkJoinPool() {
this(Runtime.getRuntime().availableProcessors(),
defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism) {
this(parallelism, defaultForkJoinWorkerThreadFactory, null, false);
}
public ForkJoinPool(int parallelism,
ForkJoinWorkerThreadFactory factory,
Thread.UncaughtExceptionHandler handler,
boolean asyncMode) {
checkPermission();
if (factory == null)
throw new NullPointerException();
if (parallelism <= 0 || parallelism > MAX_ID)
throw new IllegalArgumentException();
//设置并行度。
this.parallelism = parallelism;
//设置工作线程工厂。
this.factory = factory;
//设置线程未捕获异常处理器。
this.ueh = handler;
//设置是否为异步模式。
this.locallyFifo = asyncMode;
/*
* 参考我们之前介绍的ctl的内容,由于ctl中的AC表示当前活动
* 工作线程数量减去并行度,所以这里要将这个信息加到ctl上。
*/
long np = (long)(-parallelism);
//初始化ForkJoinPool控制信息。
this.ctl = ((np << AC_SHIFT) & AC_MASK) | ((np << TC_SHIFT) & TC_MASK);
//初始化提交任务队列。
this.submissionQueue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
/*
* 这里需要根据并行度来算出工作线程数组的大小。
* 由于数组大小必须的2的幂,这里的算法是算出比
* parallelism的2倍大的最小的2的幂,但不能超过
* MAX_ID + 1(1 << 16)的数作为工作线程数组大小
*/
int n = parallelism << 1;
if (n >= MAX_ID)
n = MAX_ID;
else { // See Hackers Delight, sec 3.2, where n < (1 << 16)
n |= n >>> 1; n |= n >>> 2; n |= n >>> 4; n |= n >>> 8;
}
//创建存放工作线程的数组。
workers = new ForkJoinWorkerThread[n + 1];
this.submissionLock = new ReentrantLock();
this.termination = submissionLock.newCondition();
//生成工作线程名称前缀。
StringBuilder sb = new StringBuilder("ForkJoinPool-");
sb.append(poolNumberGenerator.incrementAndGet());
sb.append("-worker-");
this.workerNamePrefix = sb.toString();
}
细节:
1.并行度如果未提供,默认就是当前处理器核数。
2.初始化控制信息的部分,注意在AC和TC的信息上减掉了并行度,比如如果并行度为4,那么初始的AC和TC就都是-4,那么如果后续发现AC等于0,就说明当前活动的线程数正好等于处理器核心数量。
3.确定工作线程数组大小的过程是这样的,首先取一个数n,默认是并行度的2倍。然后会使用来自HD的一个位操作技巧,就是将n的位模式的前导1后面所有的位都变成1,其实就是一个比n大的2的幂减1的数。当然n最大不能超过MAX_ID,最终数组的大小是n+1,是一个2的幂,能简化后续的取模操作。
ForkJoinPool中还提供了默认的工作线程工厂:
static {
...
defaultForkJoinWorkerThreadFactory =
new DefaultForkJoinWorkerThreadFactory();
...
}
public static interface ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool);
}
static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
- 创建了ForkJoinPool,下面看看将任务提交到Pool方法submit:
public <T> ForkJoinTask<T> submit(ForkJoinTask<T> task) {
if (task == null)
throw new NullPointerException();
forkOrSubmit(task);
return task;
}
public <T> ForkJoinTask<T> submit(Callable<T> task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task);
forkOrSubmit(job);
return job;
}
public <T> ForkJoinTask<T> submit(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<T> job = ForkJoinTask.adapt(task, result);
forkOrSubmit(job);
return job;
}
public ForkJoinTask<?> submit(Runnable task) {
if (task == null)
throw new NullPointerException();
ForkJoinTask<?> job;
if (task instanceof ForkJoinTask<?>) // avoid re-wrap
job = (ForkJoinTask<?>) task;
else
job = ForkJoinTask.adapt(task, null);
forkOrSubmit(job);
return job;
}
ForkJoinPool中定义了一些列重载的submit方法,这些submit方法内部会先将Callable、Runnable包装(适配)成ForkJoinTask,然后再进行提交。看下包装的方法:
public static <T> ForkJoinTask<T> adapt(Callable<? extends T> callable) {
return new AdaptedCallable<T>(callable);
}
public static ForkJoinTask<?> adapt(Runnable runnable) {
return new AdaptedRunnable<Void>(runnable, null);
}
public static <T> ForkJoinTask<T> adapt(Runnable runnable, T result) {
return new AdaptedRunnable<T>(runnable, result);
}
针对Callable会包装成一个AdaptedCallable:
static final class AdaptedCallable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Callable<? extends T> callable;
T result;
AdaptedCallable(Callable<? extends T> callable) {
if (callable == null) throw new NullPointerException();
this.callable = callable;
}
public T getRawResult() { return result; }
public void setRawResult(T v) { result = v; }
public boolean exec() {
try {
result = callable.call();
return true;
} catch (Error err) {
throw err;
} catch (RuntimeException rex) {
throw rex;
} catch (Exception ex) {
throw new RuntimeException(ex);
}
}
public void run() { invoke(); }
private static final long serialVersionUID = 2838392045355241008L;
}
针对Runnable会包装成一个AdaptedRunnable:
static final class AdaptedRunnable<T> extends ForkJoinTask<T>
implements RunnableFuture<T> {
final Runnable runnable;
final T resultOnCompletion;
T result;
AdaptedRunnable(Runnable runnable, T result) {
if (runnable == null) throw new NullPointerException();
this.runnable = runnable;
this.resultOnCompletion = result;
}
public T getRawResult() { return result; }
public void setRawResult(T v) { result = v; }
public boolean exec() {
runnable.run();
result = resultOnCompletion;
return true;
}
public void run() { invoke(); }
private static final long serialVersionUID = 5232453952276885070L;
}
所有submit方法内部最终都会调用一个forkOrSubmit方法,看下这个方法:
private <T> void forkOrSubmit(ForkJoinTask<T> task) {
ForkJoinWorkerThread w;
Thread t = Thread.currentThread();
if (shutdown)
throw new RejectedExecutionException();
if ((t instanceof ForkJoinWorkerThread) &&
(w = (ForkJoinWorkerThread)t).pool == this)
w.pushTask(task);
else
addSubmission(task);
}
细节:
如果当前线程是ForkJoin工作线程,说明是在ForkJoinTask内部提交的任务(比如分裂出子任务然后提交执行),这种情况下会将任务添加到工作线程的任务队列中;如果当前线程不是ForkJoin工作线程,说明是初始提交ForkJoin任务(外部将ForkJoinTask初次提交给ForkJoinPool),这种情况下会将任务添加到ForkJoinPool的提交任务队列中。 由于我们是初始提交(外部提交任务),在forkOrSubmit方法中一定会走addSubmission的分支,看下这个方法:
private void addSubmission(ForkJoinTask<?> t) {
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
ForkJoinTask<?>[] q; int s, m;
if ((q = submissionQueue) != null) { // ignore if queue removed
//这步计算内存偏移地址
long u = (((s = queueTop) & (m = q.length-1)) << ASHIFT)+ABASE;
//将t设置到q的对应位置。(LazySet)
UNSAFE.putOrderedObject(q, u, t);
queueTop = s + 1;
if (s - queueBase == m)
//如果队列满了,扩展队列。
growSubmissionQueue();
}
} finally {
lock.unlock();
}
//唤醒工作线程。
signalWork();
}
细节:
代码可以看到通过Unsafe设置Task到数组的方式,之后所有的设置任务到数组都会采取这种方式,之所以这样做是为了提高性能:这种方式其实和数组原子量(AtomicReferenceArray)中一致,但减少了2方面的性能损耗:1.不用像AtomicReferenceArray内部一样再做边界检测(由外部保证);2.由于队列最大容量的限制(工作线程中的任务队列也一样),不用像AtomicReferenceArray一样在计算偏移量过程中不会进行从int到long的提升。 addSubmission中逻辑很清晰:添加一个任务到提交任务队列(过程要加锁),如果队列满了,扩展一下。然后唤醒工作线程。先看下扩展队列方法:
private void growSubmissionQueue() {
ForkJoinTask<?>[] oldQ = submissionQueue;
int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
if (size > MAXIMUM_QUEUE_CAPACITY)
throw new RejectedExecutionException("Queue capacity exceeded");
if (size < INITIAL_QUEUE_CAPACITY)
size = INITIAL_QUEUE_CAPACITY;
ForkJoinTask<?>[] q = submissionQueue = new ForkJoinTask<?>[size];
int mask = size - 1;
int top = queueTop;
int oldMask;
if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
for (int b = queueBase; b != top; ++b) {
long u = ((b & oldMask) << ASHIFT) + ABASE;
Object x = UNSAFE.getObjectVolatile(oldQ, u);
if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
UNSAFE.putObjectVolatile
(q, ((b & mask) << ASHIFT) + ABASE, x);
}
}
}
很容易看懂,最小容量为INITIAL_QUEUE_CAPACITY=8,每次扩展为原来的2倍,最大不能超过MAXIMUM_QUEUE_CAPACITY = 1 << 24(16777216)。
- 现在任务已经提交到ForkJoinPool的submissionQueue中了,那么接下来任务如何被执行呢?继续往下看。
在addSubmission方法中添加一个Task到Pool的submissionQueue之后,还会执行一个唤醒工作线程的动作,这样就会有工作线程来执行我们提交的任务了,看下唤醒工作线程的方法signalWork:
final void signalWork() {
long c; int e, u;
while ((((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) &
(INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0) {
if (e > 0) {
//唤醒一个等待的工作线程。
int i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
if ((ws = workers) == null || //如果看到workers为null
(i = ~e & SMASK) >= ws.length || //等待工作线程的下标比workers的size大。
(w = ws[i]) == null) //获取不到等待线程。
break;
long nc = (((long)(w.nextWait & E_MASK)) | //将Treiber stack中下一个等待线程的信息(下标)放到控制信息上。
((long)(u + UAC_UNIT) << 32)); //控制信息上添加一个AC计数。
if (w.eventCount == e &&
UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
w.eventCount = (e + EC_UNIT) & E_MASK; //累加w的等待次数。
if (w.parked)
UNSAFE.unpark(w); //唤醒w。
break;
}
}
//添加一个工作线程。
else if (UNSAFE.compareAndSwapLong
(this, ctlOffset, c,
(long)(((u + UTC_UNIT) & UTC_MASK) | //累加控制信息上AC和TC。
((u + UAC_UNIT) & UAC_MASK)) << 32)) {
addWorker(); //添加工作线程。
break;
}
}
}
细节:
首先来看下while里面的条件:(((e = (int)(c = ctl)) | (u = (int)(c >>> 32))) & (INT_SIGN|SHORT_SIGN)) == (INT_SIGN|SHORT_SIGN) && e >= 0
先看下前半部分:当e的第32位bit为1或者u的第32位bit为1 并且 e的第16位bit为1或者u的第16位bit为1,结合上一篇我们了解到的ForkJoinPool中的控制信息可知:
e的第32位bit为1,说明ST为1,表示当前ForkJoinPool正在关闭。
u的第32位bit为1,说明AC为负数,表示没有足够多的活动的工作线程。
e的第16位bit为1,说明ID为负数,表示至少有一个等待线程。
u的第16位bit为1,说明TC为负数,表示没有足够多的(总的)工作线程。
再看下前半部分:e >= 0,排除了当前ForkJoinPool正在关闭的情况。
while循环内部由e来区分,当e==0,表示当前没有等待的工作线程,这种情况下要创建一个工作线程;当e>0,说明当前有等待线程,这种情况下唤醒一下等待的工作线程。 按照流程,由于是新创建的Pool,里面还没有工作线程呢,所以signalWork中一定会走添加工作线程的流程了,addWorker方法:
private void addWorker() {
Throwable ex = null;
ForkJoinWorkerThread t = null;
try {
t = factory.newThread(this);
} catch (Throwable e) {
ex = e;
}
if (t == null) {
//如果发生了异常导致工作线程创建失败,需要把之前累加的到控制信息的AC和TC计数减回去。
long c;
do {} while (!UNSAFE.compareAndSwapLong
(this, ctlOffset, c = ctl,
(((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))));
// 如果调用来之外部,需要将异常传递出去。
if (!tryTerminate(false) && ex != null &&
!(Thread.currentThread() instanceof ForkJoinWorkerThread))
UNSAFE.throwException(ex);
}
else
t.start(); //创建成功的话,启动工作线程。
}
这个方法中要注意下工作线程的创建过程,上面看到Pool会提供了默认的工作线程工厂来创建线程,看下具体过程:
static class DefaultForkJoinWorkerThreadFactory
implements ForkJoinWorkerThreadFactory {
public ForkJoinWorkerThread newThread(ForkJoinPool pool) {
return new ForkJoinWorkerThread(pool);
}
}
再看下ForkJoinWorkerThread的构造方法:
protected ForkJoinWorkerThread(ForkJoinPool pool) {
//设置线程名称,名称由Pool的nextWorkerName方法产生。
super(pool.nextWorkerName());
//由于ForkJoinWorkerThread中有一个Pool的引用,这里设置引用。
this.pool = pool;
//将自己注册到Pool里面。
int k = pool.registerWorker(this);
//保存在Pool里面的下标。
poolIndex = k;
//这里的eventCount初始化为工作线程在Pool中的下标取反值。
eventCount = ~k & SMASK;
//设置工作模式。
locallyFifo = pool.locallyFifo;
//设置线程未捕获异常处理器。
Thread.UncaughtExceptionHandler ueh = pool.ueh;
if (ueh != null)
setUncaughtExceptionHandler(ueh);
//设置为守护线程。
setDaemon(true);
}
工作线程名称的生成方法在Pool中,看下:
private volatile int nextWorkerNumber;
final String nextWorkerName() {
for (int n;;) {
if (UNSAFE.compareAndSwapInt(this, nextWorkerNumberOffset,
n = nextWorkerNumber, ++n))
return workerNamePrefix + n;
}
}
主要看下创建工作线程过程中,注册到Pool的过程:
final int registerWorker(ForkJoinWorkerThread w) {
for (int g;;) {
ForkJoinWorkerThread[] ws;
//如果当前scanGuard中没有SG_UNIT标记,尝试设置SG_UNIT标记。
//这是一个加锁的过程。
if (((g = scanGuard) & SG_UNIT) == 0 &&
UNSAFE.compareAndSwapInt(this, scanGuardOffset,
g, g | SG_UNIT)) {
int k = nextWorkerIndex;
try {
if ((ws = workers) != null) { // ignore on shutdown
int n = ws.length;
if (k < 0 || k >= n || ws[k] != null) {
for (k = 0; k < n && ws[k] != null; ++k)
;
if (k == n)
ws = workers = Arrays.copyOf(ws, n << 1);
}
ws[k] = w;
nextWorkerIndex = k + 1;
int m = g & SMASK;
g = (k > m) ? ((m << 1) + 1) & SMASK : g + (SG_UNIT<<1);
}
} finally {
scanGuard = g;
}
return k;
}
else if ((ws = workers) != null) { // help release others
for (ForkJoinWorkerThread u : ws) {
if (u != null && u.queueBase != u.queueTop) {
if (tryReleaseWaiter())
break;
}
}
}
}
}
细节:
registerWorker方法的主要逻辑就是要注册一个工作线程到ForkJoinPool,然后返回工作线程在Pool内部工作线程数组的下标。说明一下:
1.方法内部的主流程(无限循环)中,首先尝试获取一个顺序锁。如果获取失败,会遍历下所有的工作线程,如果发现有工作线程的任务队列里还有未处理的任务,就会尝试唤醒等待的工作线程,然后再尝试去获取顺序锁。
2.如果获取顺序锁成功,内部会将传入的工作线程设置到相应的位置(必要的时候工作线程数组需要扩容),然后返回下标。过程中会调整scanGuard的低16比特。
注:这里再次分析一下scanGuard这个神奇的域。
它的高16位用来做顺序锁,我们看到在主流程中首先会查看scanGuard中是否含有SG_UNIT对应的bit信息,如果有说明已经有其他线程持有这个锁了;如果没有,就可以通过一个CAS操作来获取这个锁,获取动作就是给scanGuard上添加上SG_UNIT对应的bit信息,在内部完成逻辑后会清除scanGuard上的SG_UNIT信息。
它的低16位就像它的命名一样,表示扫描的边界。上面的代码中可以看到,在注册工作线程时候会调整这个边界值,规律是这样,边界值的大小=比当前工作线程最大下标大的最小的2的幂减1(有点绕,不要晕),举几个栗子:maxIndex=5,guard=7、maxIndex=10,guard=15,看出规律了吧。这个值的作用是为了避免不必要的扫描,因为Pool内部的工作线程数组size可能比较大(初始化的时候就比并行度要大很多,回去看下Pool的构造方法。而且还有可能扩展),但实际的工作线程数量可能比较小(比如可能数组size是16,但里面只有4个工作线程,14个空位),如果扫描的时候扫描范围是全部数组,那一定会在空位上浪费很多时间,有了这个guard作为边界(而不是数组的length-1),会避免这些时间的浪费。 好了,继续我们的流程。前面创建、注册并启动了一个工作线程。接下来,这个工作线程应该会去执行我们提交的任务了吧,继续看ForkJoinWorkerThread的run方法了:
public void run() {
Throwable exception = null;
try {
onStart();
pool.work(this);
} catch (Throwable ex) {
exception = ex;
} finally {
onTermination(exception);
}
}
ForkJoinWorkerThread的逻辑很明确,启动前回调下onStart方法,然后是主流程(pool.work),如果方法结束,还会回调下onTermination方法。 先看下onStart和onTermination:
protected void onStart() {
//初始化工作线程的任务队列。
queue = new ForkJoinTask<?>[INITIAL_QUEUE_CAPACITY];
//生成工作线程的种子。
int r = pool.workerSeedGenerator.nextInt();
//确保种子不为0。
seed = (r == 0) ? 1 : r; // must be nonzero
}
protected void onTermination(Throwable exception) {
try {
//设置关闭标志。
terminate = true;
//取消任务。
cancelTasks();
//从ForkJoinPool上注销当前工作线程。
pool.deregisterWorker(this, exception);
} catch (Throwable ex) { // Shouldn't ever happen
if (exception == null) // but if so, at least rethrown
exception = ex;
} finally {
if (exception != null)
UNSAFE.throwException(exception);
}
}
下面重点看一下ForkJoinWorkerThread运行过程的主流程-pool.work方法,这个方法定义在ForkJoinPool中,看下实现:
final void work(ForkJoinWorkerThread w) {
boolean swept = false; // true on empty scans
long c;
while (!w.terminate && (int)(c = ctl) >= 0) {
int a; // active count
if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0)
swept = scan(w, a);
else if (tryAwaitWork(w, c))
swept = false;
}
}
work方法中,while条件就是当前工作线程未结束且pool未关闭。while循环中,会不断的扫描(scan)或等待(tryAwaitWork)。如果扫描标志(起始为false)表示上一次扫描不是空扫描(false),并且当前活动线程数量小于处理器核数(这里要注意下,返回去在看下控制信息中AC的定义),那么进行扫描动作;否则进行等待(也就是说,当前没有任务需要执行的任务 或者 当前活动的线程数量已经大于处理器核心数)。 而按照我们的流程,这里一定会先扫描到我们提交的任务,先看下scan方法:
private boolean scan(ForkJoinWorkerThread w, int a) {
int g = scanGuard;
/*
* 如果当前只有一个工作线程,将m设置为0,避免没用的扫描。
* 否则获取guard值。
*/
int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws == null || ws.length <= m) // 过期检测
return false;
for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
//随机选出一个牺牲者(工作线程)。
ForkJoinWorkerThread v = ws[k & m];
//一系列检查...
if (v != null && (b = v.queueBase) != v.queueTop &&
(q = v.queue) != null && (i = (q.length - 1) & b) >= 0) {
//如果这个牺牲者的任务队列中还有任务,尝试窃取这个任务。
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && v.queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
//窃取成功后,调整queueBase
int d = (v.queueBase = b + 1) - v.queueTop;
//将牺牲者的stealHint设置为当前工作线程在pool中的下标。
v.stealHint = w.poolIndex;
if (d != 0)
signalWork(); // 如果牺牲者的任务队列还有任务,继续唤醒(或创建)线程。
w.execTask(t); //执行窃取的任务。
}
//计算出下一个随机种子。
r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5);
return false; // 返回false,表示不是一个空扫描。
}
//前2*m次,随机扫描。
else if (j < 0) { // xorshift
r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5;
}
//后2*m次,顺序扫描。
else
++k;
}
if (scanGuard != g) // staleness check
return false;
else {
//如果扫描完毕后没找到可窃取的任务,那么从Pool的提交任务队列中取一个任务来执行。
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
if ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null && queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueBase = b + 1;
w.execTask(t);
}
return false;
}
return true; // 如果所有的队列(工作线程的任务队列和pool的任务队列)都是空的,返回true。
}
}
细节:
首先,需要确定扫描边界值m,如果当前只有一个工作线程,那么m就为0,避免多余的扫描;如果当前有多个工作线程,那么m就是scanGuard的低16位表示的值(去前面看看scanGuard)。
其次,确定了m以后,开始一个for循环进行扫描。扫描的目的就是要通过工作线程的seed(这个域之前没提,使用来选择一个窃取牺牲者的)算出一个牺牲者(victim)的下标,牺牲者也是一个工作线程,然后当前工作线程便会从牺牲者的任务队列里面窃取一个任务来执行。当然如果算出的下标对应的位置上没有牺牲者(工作线程),或者牺牲者的任务队列里没有任务,就会进行下一次尝试。整个for循环最多会尝试4*m次,前2*m次是随机算下标,每次会通过xorshift算法来生成新的k。后2*m次是顺序递增k来算下标。
最后,如果for循环结束了还没有扫描到任务,那么就会从Pool的submissionQueue中窃取一个任务来执行了。 由于是初始提交,scan方法中一定是从Pool的submissionQueue中获取我们提交的任务,然后执行。继续看执行任务调用的ForkJoinWorkerThread的execTask方法:
final void execTask(ForkJoinTask<?> t) {
currentSteal = t;
for (;;) {
if (t != null)
//执行任务
t.doExec();
if (queueTop == queueBase)
break; //如果当前工作线程的任务队列里没有任务了,退出循环。
//否则,根据模式来获取任务。
//如果Pool中指定为异步模式,这里从当前任务队列的尾部获取任务;否则,从任务队列头部获取任务。
t = locallyFifo ? locallyDeqTask() : popTask();
}
++stealCount; //最后累加窃取任务计数。
currentSteal = null;
}
看下execTask方法中调用的的ForJoinTask的doExec方法:
final void doExec() {
if (status >= 0) {
boolean completed;
try {
//调用exec方法执行任务
completed = exec();
} catch (Throwable rex) {
//设置异常完成结果
setExceptionalCompletion(rex);
return;
}
if (completed)
setCompletion(NORMAL); // 设置正常完成结果
}
}
ForJoinTask中的exec方法是一个抽象方法,具体执行逻辑交给子类去实现,我们前面看到的适配类AdaptedRunnable和AdaptedCallable里面,会在exec里面分别调用runnable.run()和callable.call()方法;而在ForJoinTask的两个子类RecursiveAction和RecursiveTask里面,exec里面调用的是compute方法:
public abstract class RecursiveAction extends ForkJoinTask<Void> {
private static final long serialVersionUID = 5232453952276485070L;
/**
* The main computation performed by this task.
*/
protected abstract void compute();
public final Void getRawResult() { return null; }
protected final void setRawResult(Void mustBeNull) { }
/**
* Implements execution conventions for RecursiveActions.
*/
protected final boolean exec() {
compute();
return true;
}
}
public abstract class RecursiveTask<V> extends ForkJoinTask<V> {
private static final long serialVersionUID = 5232453952276485270L;
V result;
/**
* The main computation performed by this task.
*/
protected abstract V compute();
public final V getRawResult() {
return result;
}
protected final void setRawResult(V value) {
result = value;
}
/**
* Implements execution conventions for RecursiveTask.
*/
protected final boolean exec() {
result = compute();
return true;
}
}
注意ForJoinTask的exec方法有返回值,表示任务是否执行完毕。doExec方法中会根据这个返回值来设置任务的完成状态,如果任务正常完成,会调用setCompletion(NORMAL):
/** The run status of this task */
volatile int status; // accessed directly by pool and workers
private static final int NORMAL = -1;
...
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
//尝试将任务状态设置为正常完成。
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
//同时唤醒合并当前任务的等待线程。
synchronized (this) { notifyAll(); }
return completion;
}
}
}
doExec方法中如果执行exec方法发生异常,会调用setExceptionalCompletion方法来设置异常完成状态:
private int setExceptionalCompletion(Throwable ex) {
//计算当前对象的原生hashCode。
int h = System.identityHashCode(this);
final ReentrantLock lock = exceptionTableLock;
lock.lock();
try {
//删除异常表中过期的异常。
expungeStaleExceptions();
//获取异常表
ExceptionNode[] t = exceptionTable;
//通过当前对象hashCode获取在异常表中的下标。
int i = h & (t.length - 1);
//设置异常节点。
for (ExceptionNode e = t[i]; ; e = e.next) {
if (e == null) {
t[i] = new ExceptionNode(this, ex, t[i]);
break;
}
if (e.get() == this) // already present
break;
}
} finally {
lock.unlock();
}
return setCompletion(EXCEPTIONAL);
}
细节:
注意到setExceptionalCompletion方法中最后还是调用了setCompletion,但之前会做一下将异常放入一个异常表的工作。
看到这里可能会有疑问,我们通过2个问题来了解下这个异常表:1.这里为什么会有一个异常表呢?2.异常表的结构是怎么样的呢?
首先回答第1个问题:首先,因为异常很少发生,所以没有将异常保存在任务对象内,而是放在一个弱引用异常表里(异常表里不会保存取消异常)。
再回答第2个问题:这里的异常表结构上是一个哈希表,每个桶里是单链,弱引用Key。 结构细节参见异常表相关的结构代码:
private static final ExceptionNode[] exceptionTable;
private static final ReentrantLock exceptionTableLock;
private static final ReferenceQueue<Object> exceptionTableRefQueue;
private static final int EXCEPTION_MAP_CAPACITY = 32;
static final class ExceptionNode extends WeakReference<ForkJoinTask<?>>{
final Throwable ex;
ExceptionNode next;
final long thrower; // use id not ref to avoid weak cycles
ExceptionNode(ForkJoinTask<?> task, Throwable ex, ExceptionNode next) {
super(task, exceptionTableRefQueue);
this.ex = ex;
this.next = next;
this.thrower = Thread.currentThread().getId();
}
}
...
// Unsafe mechanics
private static final sun.misc.Unsafe UNSAFE;
private static final long statusOffset;
static {
exceptionTableLock = new ReentrantLock();
exceptionTableRefQueue = new ReferenceQueue<Object>();
exceptionTable = new ExceptionNode[EXCEPTION_MAP_CAPACITY];
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
statusOffset = UNSAFE.objectFieldOffset
(ForkJoinTask.class.getDeclaredField("status"));
} catch (Exception e) {
throw new Error(e);
}
}
继续我们的流程,执行完doExec,方法返回到execTask,接下来由于当前工作线程自身的任务队列中并没有任务,所以queueTop == queueBase成立,execTask方法退出到scan方法,scan方法返回false到ForkJoinPool的work方法,进行下一次扫描(scan),由于工作线程本身的任务队列和Pool的任务队列都为空,所以下一次扫描一定是个空扫描,然后程序会走到work方法的tryAwaitWork分支,看下tryAwaitWork方法:
private boolean tryAwaitWork(ForkJoinWorkerThread w, long c) {
int v = w.eventCount;
//w的nextWait保存的是等待之前Pool的控制信息。
w.nextWait = (int)c;
//这里是将当前线程的ID信息(下标取反)记录到Pool控制信息上
//同时将控制信息上的活动工作线程计数减1。
long nc = (long)(v & E_MASK) | ((c - AC_UNIT) & (AC_MASK|TC_MASK));
if (ctl != c || !UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
long d = ctl;
// 如果和另外的一个窃取线程竞争并失败,这里返回true,work方法中会继续扫描。
return (int)d != (int)c && ((d - c) & AC_MASK) >= 0L;
}
for (int sc = w.stealCount; sc != 0;) {
//将工作线程上的stealCount原子累加到Pool的stealCount上面。
long s = stealCount;
if (UNSAFE.compareAndSwapLong(this, stealCountOffset, s, s + sc))
sc = w.stealCount = 0;
else if (w.eventCount != v)
return true; //如果eventCount发生变化,重试。
}
if ((!shutdown || !tryTerminate(false)) &&
(int)c != 0 && parallelism + (int)(nc >> AC_SHIFT) == 0 &&
blockedCount == 0 && quiescerCount == 0)
//如果满足上面的条件,说明当前Pool休眠了,需要调用下idleAwaitWork进行处理。
//上面的条件是:Pool未关闭 且 有工作线程 且 活动的工作线程数量等于cpu核心数量
//且没有工作线程在合并过程中阻塞 且 没有工作线程休眠。
idleAwaitWork(w, nc, c, v); // quiescent
for (boolean rescanned = false;;) {
if (w.eventCount != v)
return true; //如果eventCount发生变化,重试。
if (!rescanned) {
int g = scanGuard, m = g & SMASK;
ForkJoinWorkerThread[] ws = workers;
if (ws != null && m < ws.length) {
rescanned = true;
//这里再重新扫描一下,如果从其他工作线程任务队列里找到任务,尝试唤醒等待的工作线程。
for (int i = 0; i <= m; ++i) {
ForkJoinWorkerThread u = ws[i];
if (u != null) {
if (u.queueBase != u.queueTop &&
!tryReleaseWaiter())
rescanned = false; // 发生竞争,再次扫描。
if (w.eventCount != v)
return true;
}
}
}
if (scanGuard != g || // scanGuard发生变化
(queueBase != queueTop && !tryReleaseWaiter())) //或者从Pool任务队列中找到任务
rescanned = false; //再次扫描
if (!rescanned)
Thread.yield(); // 出让cpu,减少竞争。
else
Thread.interrupted(); // park前清除中断标记。
}
else {
w.parked = true; // 设置park标记
if (w.eventCount != v) { //再检测一下。
w.parked = false;
return true;
}
LockSupport.park(this);
rescanned = w.parked = false;
}
}
}
细节:
这个方法的目的就是阻塞工作线程w,但过程中有一些细节。首先,方法中要调整Pool的控制信息ctl,将w的ID信息设置到ctl上并将ctl上保存的活动工作线程数量减1;其次,在阻塞前,还会将w的窃取任务数量累计到Pool的总窃取任务数量(stealCount)上;再次,如果当前Pool正好处理休眠状态,那么要调用idleAwaitWork处理一下(可能会结束工作线程);最后,再真正阻塞前还要扫描一下工作线程任务队列和Pool任务队列,如果发现有任务,会尝试唤醒一个等待工作线程(可能是自己),这个过程要结束时会清除一下当前线程的中断标记,然后进行阻塞(以Pool为Blocker,相当于进入Pool的等待队列)。
- 如果按照我们的流程,到这儿就结束了,工作线程已经执行了我们提交的任务,然后阻塞等待了。但如果当前Pool正好处于休眠状态了,看看会怎么样,继续看下上面方法中调用的idleAwaitWork方法:
private void idleAwaitWork(ForkJoinWorkerThread w, long currentCtl,
long prevCtl, int v) {
if (w.eventCount == v) {
if (shutdown)
tryTerminate(false); //如果关闭方法已经被调用,那么调用tryTerminate
ForkJoinTask.helpExpungeStaleExceptions(); // 清理一下异常表中weak key
while (ctl == currentCtl) {
long startTime = System.nanoTime();
w.parked = true;
if (w.eventCount == v)
LockSupport.parkNanos(this, SHRINK_RATE); //阻塞给定时间(4秒)
w.parked = false;
if (w.eventCount != v)
break;
else if (System.nanoTime() - startTime <
SHRINK_RATE - (SHRINK_RATE / 10))
Thread.interrupted(); // 如果发生伪唤醒,清除中断标志。
//恢复之前的ctl,如果ctl一直没发生变化,会成功。
else if (UNSAFE.compareAndSwapLong(this, ctlOffset,
currentCtl, prevCtl)) {
//结束工作线程
w.terminate = true; // 设置结束标志。
w.eventCount = ((int)currentCtl + EC_UNIT) & E_MASK; //设置w的eventCount。
break;
}
}
}
}
可见,idleAwaitWork方法中开始会检测一下Pool是否正在关闭,是的话要调用tryTerminate;否则会先将工作线程w阻塞一段时间(4s),如果超过了这个时间,Pool的控制信息还没发生变化(说明Pool还是休眠状态),那么就需要将w结束掉,会设置w的结束标记为true,同时设置w的eventCount,然后退出到tryAwaitWork方法,tryAwaitWork方法中检测到w的eventCount发生变化,会退出到work方法,work方法中检测到w的结束标记为true,主循环回退出,工作线程w就要结束了,结束时会调用w的onTermination方法(上面提到过):
protected void onTermination(Throwable exception) {
try {
terminate = true;
cancelTasks();
pool.deregisterWorker(this, exception);
} catch (Throwable ex) { // Shouldn't ever happen
if (exception == null) // but if so, at least rethrown
exception = ex;
} finally {
if (exception != null)
UNSAFE.throwException(exception);
}
}
ForkJoinWorkerThread的onTermination方法中主要做了两件事:取消任务和从Pool中注销自己。
先看下取消任务:
final void cancelTasks() {
ForkJoinTask<?> cj = currentJoin;
if (cj != null && cj.status >= 0)
cj.cancelIgnoringExceptions(); //取消正在合并的任务
ForkJoinTask<?> cs = currentSteal;
if (cs != null && cs.status >= 0)
cs.cancelIgnoringExceptions(); //取消窃取的任务
while (queueBase != queueTop) {
ForkJoinTask<?> t = deqTask(); //取消工作线程任务队列中的所有任务。
if (t != null)
t.cancelIgnoringExceptions();
}
}
看下这个取消方法cancelIgnoringExceptions:
private static final int CANCELLED = -2;
...
final void cancelIgnoringExceptions() {
try {
cancel(false);
} catch (Throwable ignore) {
}
}
public boolean cancel(boolean mayInterruptIfRunning) {
return setCompletion(CANCELLED) == CANCELLED;
}
private int setCompletion(int completion) {
for (int s;;) {
if ((s = status) < 0)
return s;
if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
if (s != 0)
synchronized (this) { notifyAll(); }
return completion;
}
}
}
再看下ForkJoinWorkerThread从Pool中注销自己的代码:
final void deregisterWorker(ForkJoinWorkerThread w, Throwable ex) {
int idx = w.poolIndex;
int sc = w.stealCount;
int steps = 0;
// Remove from array, adjust worker counts and collect steal count.
// We can intermix failed removes or adjusts with steal updates
do {
long s, c;
int g;
if (steps == 0 && ((g = scanGuard) & SG_UNIT) == 0 &&
UNSAFE.compareAndSwapInt(this, scanGuardOffset,
g, g |= SG_UNIT)) {
ForkJoinWorkerThread[] ws = workers;
if (ws != null && idx >= 0 &&
idx < ws.length && ws[idx] == w)
ws[idx] = null; // verify
nextWorkerIndex = idx;
scanGuard = g + SG_UNIT;
steps = 1;
}
if (steps == 1 &&
UNSAFE.compareAndSwapLong(this, ctlOffset, c = ctl,
(((c - AC_UNIT) & AC_MASK) |
((c - TC_UNIT) & TC_MASK) |
(c & ~(AC_MASK|TC_MASK)))))
steps = 2;
if (sc != 0 &&
UNSAFE.compareAndSwapLong(this, stealCountOffset,
s = stealCount, s + sc))
sc = 0;
} while (steps != 2 || sc != 0);
if (!tryTerminate(false)) {
if (ex != null) // possibly replace if died abnormally
signalWork();
else
tryReleaseWaiter();
}
}
deregisterWorker方法中首先在一个while无限循环中完成工作线程注销的工作,包括3个阶段,全部完成后再执行下一步。之所以这样做是由于每个阶段都可能发生竞争,需要重试。第1阶段,会在获取顺序锁(scanGuard高16位)的情况下将Pool中工作线程对应的数组位置置空,并调整nextWorkerIndex;第2阶段,将控制信息ctl中的活动工作线程数量和总工作线程数量减1;第3阶段,将要注销工作线程的窃取任务数量累加到Pool的总窃取任务数量上。
deregisterWorker在完成注销线程工作后,还有可能会唤醒其他等待线程,首先会调用一下tryTerminate(false):
private boolean tryTerminate(boolean now) {
long c;
while (((c = ctl) & STOP_BIT) == 0) {
if (!now) {
if ((int)(c >> AC_SHIFT) != -parallelism)
return false;
if (!shutdown || blockedCount != 0 || quiescerCount != 0 ||
queueBase != queueTop) {
if (ctl == c) // staleness check
return false;
continue;
}
}
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, c | STOP_BIT))
startTerminating();
}
if ((short)(c >>> TC_SHIFT) == -parallelism) { // signal when 0 workers
final ReentrantLock lock = this.submissionLock;
lock.lock();
try {
termination.signalAll();
} finally {
lock.unlock();
}
}
return true;
}
tryTerminate中,如果参数为false(表示不会马上关闭Pool),那么实现中会查看是否有活动的工作线程,有的话返回false。然后检查Pool是否还在运行中(包括Pool有没有被关闭、有没有等待合并的工作线程、有没有空闲的工作线程、提交队列中是否有任务等),如果还在运行中,返回false。 否则会尝试设置关闭标志到控制信息,然后调用startTerminating方法,如果参数为true的话也会直接进行这些操作。然后再检测下如果当前总的工作线程为0,就会唤醒在termination条件上等待的线程了。
再看下这个startTerminating方法:
private void startTerminating() {
//取消Pool提交任务队列中的任务。
cancelSubmissions();
for (int pass = 0; pass < 3; ++pass) {
ForkJoinWorkerThread[] ws = workers;
if (ws != null) {
for (ForkJoinWorkerThread w : ws) {
if (w != null) {
//结束工作线程。
w.terminate = true;
if (pass > 0) {
//取消工作线程中任务队列的任务。
w.cancelTasks();
if (pass > 1 && !w.isInterrupted()) {
try {
//中断工作线程。
w.interrupt();
} catch (SecurityException ignore) {
}
}
}
}
}
//结束等待的工作线程。
terminateWaiters();
}
}
}
startTerminating中大体流程如下:
1.取消Pool中submissionQueue中的任务。
2.将所有的工作线程的结束状态设置为true。
3.取消所有工作线程的任务队列中未完成的任务。
4.中断所有工作线程。
5.结束还在等待的工作线程。
startTerminating中第1步会调用cancelSubmissions方法:
private void cancelSubmissions() {
while (queueBase != queueTop) {
ForkJoinTask<?> task = pollSubmission();
if (task != null) {
try {
task.cancel(false);
} catch (Throwable ignore) {
}
}
}
}
protected ForkJoinTask<?> pollSubmission() {
ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i;
while ((b = queueBase) != queueTop &&
(q = submissionQueue) != null &&
(i = (q.length - 1) & b) >= 0) {
long u = (i << ASHIFT) + ABASE;
if ((t = q[i]) != null &&
queueBase == b &&
UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueBase = b + 1;
return t;
}
}
return null;
}
startTerminating中第5步会调用terminateWaiters方法:
private void terminateWaiters() {
ForkJoinWorkerThread[] ws = workers;
if (ws != null) {
ForkJoinWorkerThread w; long c; int i, e;
int n = ws.length;
while ((i = ~(e = (int)(c = ctl)) & SMASK) < n &&
(w = ws[i]) != null && w.eventCount == (e & E_MASK)) {
if (UNSAFE.compareAndSwapLong(this, ctlOffset, c,
(long)(w.nextWait & E_MASK) |
((c + AC_UNIT) & AC_MASK) |
(c & (TC_MASK|STOP_BIT)))) {
w.terminate = true;
w.eventCount = e + EC_UNIT;
if (w.parked)
UNSAFE.unpark(w);
}
}
}
}
细节:
这个方法要注意一下,看到这里可能忽略一个细节,就是隐式的等待工作线程组成的链(也叫Treiber stack)。这里可能猛地一看,方法内部只唤醒了一个等待线程,这个等待线程的ID信息存储在Pool的控制信息中。但仔细看会发现,在设置(调整)ctl的时候,有这句w.nextWait & E_MASK,也就是说,唤醒了w之后,ctl里又会有另一个等待工作线程的ID信息(这个信息之前是存在w的nextWait上面的)。
好吧,回到deregisterWorker方法,假设当前Pool还在运行,那么tryTerminate(false)返回false,就会执行接下来的语句:
if (!tryTerminate(false)) {
if (ex != null) // possibly replace if died abnormally
signalWork();
else
tryReleaseWaiter();
}
如果有异常发生,会调用signalWork方法来唤醒或者创建一个工作线程;否则调用tryReleaseWaiter方法来尝试唤醒一个等待工作线程:
private boolean tryReleaseWaiter() {
long c; int e, i; ForkJoinWorkerThread w; ForkJoinWorkerThread[] ws;
if ((e = (int)(c = ctl)) > 0 && //如果有等待线程
(int)(c >> AC_SHIFT) < 0 && //且当前活动线程数小于CPU核数
(ws = workers) != null && //检测工作线程数组合法性
(i = ~e & SMASK) < ws.length && //检测控制信息中等待的工作线程的ID信息的合法性。
(w = ws[i]) != null) { //检测工作线程的合法性。
long nc = ((long)(w.nextWait & E_MASK) |
((c + AC_UNIT) & (AC_MASK|TC_MASK))); //尝试调整控制信息,增加活动工作线程计数,将Treiber stack下一个等待线程的ID信息设置到ctl
if (w.eventCount != e ||
!UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
return false; //如果发生冲突。
w.eventCount = (e + EC_UNIT) & E_MASK; //累加w的eventCount
if (w.parked)
UNSAFE.unpark(w); //唤醒w。
}
return true;
}
- OK,让我们在回到ForkJoinWorkerThread的execTask方法(不要晕):
final void execTask(ForkJoinTask<?> t) {
currentSteal = t;
for (;;) {
if (t != null)
t.doExec();
if (queueTop == queueBase)
break;
t = locallyFifo ? locallyDeqTask() : popTask();
}
++stealCount;
currentSteal = null;
}
假如执行完t,发现当前工作的任务队列中还有任务,那么接下来就会根据当前Pool的工作模式(是否是同步模式),来通过locallyDeqTask或者popTask获取一个任务出来继续执行。
细节:
工作线程中的任务队列(Pool中的任务队列也一样),形式上是一个数组,但概念上是一个双端队列。但和其他双端队列(JDK里另外的双端队列实现)不一样的是,这里的队列只定义了三种操作:从队列首部入队(push)、从队列首部出队(pop)、从队列尾部出队(deg)。这里既然说是队列,但又说什么push、pop可能会让人感觉晕晕的,其实可以这样理解,双端队列就可以看成是栈和队列的混血。结合使用Pool内部工作原理来说,如果不是异步模式(默认),那么就会把任务队列当成一个FIFO队列来使用;否则就相当于把任务队列当成一个栈来使用。
如果Pool不是异步模式(locallyFifo为false),那么会执行popTask方法:
private ForkJoinTask<?> popTask() {
int m;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
for (int s; (s = queueTop) != queueBase;) {
int i = m & --s;
long u = (i << ASHIFT) + ABASE; // raw offset
ForkJoinTask<?> t = q[i];
if (t == null) // lost to stealer
break;
if (UNSAFE.compareAndSwapObject(q, u, t, null)) {
queueTop = s; // or putOrderedInt
return t;
}
}
}
return null;
}
很明确,就是从当前任务队列pop一个任务出来。
如果Pool是异步模式(locallyFifo为true),那么会执行locallyDeqTask方法:
final ForkJoinTask<?> locallyDeqTask() {
ForkJoinTask<?> t; int m, b, i;
ForkJoinTask<?>[] q = queue;
if (q != null && (m = q.length - 1) >= 0) {
while (queueTop != (b = queueBase)) {
if ((t = q[i = m & b]) != null &&
queueBase == b &&
UNSAFE.compareAndSwapObject(q, (i << ASHIFT) + ABASE,
t, null)) {
queueBase = b + 1;
return t;
}
}
}
return null;
}
也很简单,就是从当前任务队列deg一个任务出来。
- 简单总结一下ForkJoinPool中的工作线程的工作流程:
1.被创建,注册到Pool中,启动。
2.扫描所有工作线程的队列和Pool中的任务队列,窃取一个任务来执行。
3.执行完毕后,在从自身工作队列中获取任务来执行。
4.没任务执行了,会阻塞等待,如果正好赶上了Pool休眠,那么会被结束掉,从Pool中注销。
最后:
本篇虽然是通过一个非ForkJoin任务的执行过程来分析代码,但已经包含ForkJoinPool运行过程的大部分流程。下篇会再次通过一个ForkJoin任务的流程来重点分析下Fork/Join过程中涉及到的源码。