上篇文章一直追踪到了ForkJoinWorkerThread的pushTask方法,仍然没有办法解释Fork的原理,那么不妨来看看ForkJoinWorkerThread的run方法:
public void run() { Throwable exception = null; try { // 初始化任务队列 onStart(); // 线程运行 pool.work(this); } catch (Throwable ex) { exception = ex; } finally { // 结束后的工作 onTermination(exception); } }
因此我们需要再次回到ForkJoinPool,看看work方法:
final void work(ForkJoinWorkerThread w) { boolean swept = false; // 下面scan方法没有扫描到任务返回true long c; // ctl是一个64位长的数据,它的格式如下: // 48-63:AC,正在运行的worker线程数减去系统的并发数(减去系统的并发得出的实际是在某一瞬间等待并发资源的线程数量) // 32-47:TC,所有的worker线程数减去系统的并发数 // 31: ST,1表示线程池正在关闭 // 16-30:EC,第一个等待线程的等待数 // 0- 15:ID,Treiber栈(存储等待线程)顶的worker线程在线程池的线程队列中的索引 // (int)(c = ctl) >= 0表示ST位为0,即线程池不是正在关闭的状态 while (!w.terminate && (int)(c = ctl) >= 0) { int a; // 正在运行的worker线程数,ctl中的AC部分 // swept为false可能有三种: // 1. scan返回false // 2. 首次循环 // 3. tryAwaitWork成功 if (!swept && (a = (int)(c >> AC_SHIFT)) <= 0) swept = scan(w, a); else if (tryAwaitWork(w, c)) swept = false; } }
接下来分析scan方法,我承认我看得有点晕。
private boolean scan(ForkJoinWorkerThread w, int a) { int g = scanGuard; // mask 0 avoids useless scans if only one active int m = (parallelism == 1 - a && blockedCount == 0) ? 0 : g & SMASK; ForkJoinWorkerThread[] ws = workers; if (ws == null || ws.length <= m) // staleness check return false; // 代码看起来晕啊,看来当前的ForkJoinWorkerThread不一定是运行自己的 // Task,可以运行其他ForkJoinWorkerThread的Task。 // 似乎有点明白了,这样可以实现Fork出来的任务被多线程执行 // 看起来这是一个较为复杂的算法 for (int r = w.seed, k = r, j = -(m + m); j <= m + m; ++j) { ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; ForkJoinWorkerThread v = ws[k & m]; if (v != null && (b = v.queueBase) != v.queueTop && (q = v.queue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && v.queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { int d = (v.queueBase = b + 1) - v.queueTop; v.stealHint = w.poolIndex; if (d != 0) signalWork(); // propagate if nonempty w.execTask(t); } r ^= r << 13; r ^= r >>> 17; w.seed = r ^ (r << 5); return false; // store next seed } else if (j < 0) { // xorshift r ^= r << 13; r ^= r >>> 17; k = r ^= r << 5; } else ++k; } if (scanGuard != g) // staleness check return false; else { // try to take submission ForkJoinTask<?> t; ForkJoinTask<?>[] q; int b, i; if ((b = queueBase) != queueTop && (q = submissionQueue) != null && (i = (q.length - 1) & b) >= 0) { long u = (i << ASHIFT) + ABASE; if ((t = q[i]) != null && queueBase == b && UNSAFE.compareAndSwapObject(q, u, t, null)) { queueBase = b + 1; w.execTask(t); } return false; } return true; // all queues empty } }
但是起码能看出来,Fork出来的任务是如何被其他线程运行以实现多线程运行的了。面对这么个有点复杂的算法,我只能先去查查,发现原来叫做Work-Stealing,好吧,下一篇来研究这个Work-Stealing。