Jdk1.7 JUC源码增量解析(4)-ForkJoin-ForkJoin任务的fork/join过程

Jdk1.7 JUC源码增量解析(4)-ForkJoin-ForkJoin任务的fork/join过程

作者:大飞

 

概述:

  • 这篇通过分析一个ForkJoin任务的执行过程来分析ForkJoin的相关代码,主要侧重于分裂(fork)/合并(join)过程。

 

源码分析:
         还是先看一个代码示例,这个示例介绍篇出现过,这里只贴出任务代码:

public class SumTask extends RecursiveTask<Long>{
	private static final int THRESHOLD = 10;
	
	private long start;
	private long end;
	
	public SumTask(long n) {
		this(1,n);
	}
	
	private SumTask(long start, long end) {
		this.start = start;
		this.end = end;
	}
	@Override
	protected Long compute() {
		long sum = 0;
		if((end - start) <= THRESHOLD){
			for(long l = start; l <= end; l++){
				sum += l;
			}
		}else{
			long mid = (start + end) >>> 1;
		    SumTask left = new SumTask(start, mid);
		    SumTask right = new SumTask(mid + 1, end);
			left.fork();
			right.fork();
			sum = left.join() + right.join();
		}
		return sum;
	}
	private static final long serialVersionUID = 1L;
}

   

  • 这里略过任务提交、执行的一些过程,上篇都分析过了。任务执行过程中,新建了子任务,然后进行fork操作,看下fork的源码:
    public final ForkJoinTask<V> fork() {
        ((ForkJoinWorkerThread) Thread.currentThread())
            .pushTask(this);
        return this;
    }

       fork里面其实是将任务放到当前工作线程的任务队列里面了,看下pushTask方法细节:

    final void pushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q; int s, m;
        if ((q = queue) != null) {    // ignore if queue removed
            //这里首先根据当前的queueTop对队列(数组)长度取模来算出放置任务的下标
            //然后再通过下标算出偏移地址,提供给Unsafe使用。
            long u = (((s = queueTop) & (m = q.length - 1)) << ASHIFT) + ABASE;
            //设置任务。
            UNSAFE.putOrderedObject(q, u, t);
            //修改queueTop
            queueTop = s + 1;         // or use putOrderedInt
            if ((s -= queueBase) <= 2)
                pool.signalWork();
            else if (s == m)
                growQueue(); //如果队列满了,扩展一下队列容量。
        }
    }

        将任务push到任务队列,如果任务队列满了,扩展一下。

        看下扩展队列调用的growQueue方法:

    private void growQueue() {
        ForkJoinTask<?>[] oldQ = queue;
        int size = oldQ != null ? oldQ.length << 1 : INITIAL_QUEUE_CAPACITY;
        if (size > MAXIMUM_QUEUE_CAPACITY)
            throw new RejectedExecutionException("Queue capacity exceeded");
        if (size < INITIAL_QUEUE_CAPACITY)
            size = INITIAL_QUEUE_CAPACITY;
        ForkJoinTask<?>[] q = queue = new ForkJoinTask<?>[size];
        int mask = size - 1;
        int top = queueTop;
        int oldMask;
        if (oldQ != null && (oldMask = oldQ.length - 1) >= 0) {
            for (int b = queueBase; b != top; ++b) {
                long u = ((b & oldMask) << ASHIFT) + ABASE;
                Object x = UNSAFE.getObjectVolatile(oldQ, u);
                if (x != null && UNSAFE.compareAndSwapObject(oldQ, u, x, null))
                    UNSAFE.putObjectVolatile
                        (q, ((b & mask) << ASHIFT) + ABASE, x);
            }
        }
    }

        容量为原来的2倍,不超过MAXIMUM_QUEUE_CAPACITY(1 << 24),最小为INITIAL_QUEUE_CAPACITY(1 << 13)。  

  • 完事儿了,fork过程就这么简单。fork出子任务后,当前任务的计算可能会需要子任务的结果,需要join子任务:
    sum = left.join() + right.join();

 

       看下join的源码:

    public final V join() {
        if (doJoin() != NORMAL)
            return reportResult();
        else
            return getRawResult();
    }

        join中会先调用doJoin方法,如果doJoin方法返回NORMAL,那么通过getRawResult方法来获取结果;否则会调用reportResult来处理和获取结果。

        先看下doJoin方法:

    private int doJoin() {
        Thread t; ForkJoinWorkerThread w; int s; boolean completed;
        if ((t = Thread.currentThread()) instanceof ForkJoinWorkerThread) {
            if ((s = status) < 0)
                return s; //如果当前任务已经完成,直接返回状态。
            if ((w = (ForkJoinWorkerThread)t).unpushTask(this)) {
                //如果当前任务恰好是当前工作线程的队列顶端的第一个任务
                //那么将该任务出队,然后执行。
                try {
                    completed = exec();
                } catch (Throwable rex) {
                    return setExceptionalCompletion(rex);
                }
                if (completed)
                    return setCompletion(NORMAL);
            }
            //否则调用当前工作线程的joinTask方法。
            return w.joinTask(this);
        }
        else
            //如果当前线程不是ForkJoin工作线程,那么调用externalAwaitDone
            return externalAwaitDone();
    }

        doJoin方法中,如果要join的任务正好在当前任务队列的顶端,那么pop出这个任务,然后执行任务。

        注意这里调用的是ForkJoinWorkerThread的unpushTash方法,这是另一个版本的pop,看下实现:

    final boolean unpushTask(ForkJoinTask<?> t) {
        ForkJoinTask<?>[] q;
        int s;
        if ((q = queue) != null && (s = queueTop) != queueBase &&
            UNSAFE.compareAndSwapObject
            (q, (((q.length - 1) & --s) << ASHIFT) + ABASE, t, null)) {
            queueTop = s; // or putOrderedInt
            return true;
        }
        return false;
    }

        unpushTash方法中仅当给定的t是队列顶端的任务,才会返回并移除t。  

        继续doJoin方法,如果unpushTask方法失败,就会调用ForkJoinWorkerThread的joinTask方法,看下这个方法: 

    private static final int MAX_HELP = 16;
    ...
    final int joinTask(ForkJoinTask<?> joinMe) {
        //记录之前的合并任务。
        ForkJoinTask<?> prevJoin = currentJoin;
        //设置当前工作线程的合并任务。
        currentJoin = joinMe;
        for (int s, retries = MAX_HELP;;) {
            if ((s = joinMe.status) < 0) {
                //如果合并任务已经完成,恢复之前的合并任务。
                currentJoin = prevJoin;
                return s; //返回任务状态。
            }
            if (retries > 0) {
                if (queueTop != queueBase) {
                    /*
                     * 如果当前任务队列中有任务,尝试从当前队列顶端获取给定任务
                     * (如果给定任务恰好在当前任务队列顶端的话)或者其他一个已经
                     * 被取消的任务。
                     */
                    if (!localHelpJoinTask(joinMe))
                        retries = 0;           // cannot help
                }
                else if (retries == MAX_HELP >>> 1) {
                    --retries;                 // check uncommon case
                    /*
                     * 这里尝试一种特殊情况:如果给定的任务正好在其他工作线程的
                     * 队列的底部,那么尝试窃取这个任务并执行。
                     */
                    if (tryDeqAndExec(joinMe) >= 0)
                        Thread.yield();        // 如果没成功,这里出让一下CPU。
                }
                else
                    retries = helpJoinTask(joinMe) ? MAX_HELP : retries - 1;
            }
            else {
                retries = MAX_HELP;           // restart if not done
                pool.tryAwaitJoin(joinMe);
            }
        }
    }

        
joinTask方法中的逻辑也有些复杂,总结一下:
        1.joinTask方法中主体是一个无限循环,里面会先尝试帮助合并的一些操作,失败的话会继续重试,最多尝试MAX_HELP次。超过了MAX_HELP无法继续尝试的话,就会调用tryAwaitJoin方法等待合并任务。
        2.尝试过程中,如果当前任务队列中有任务,会调用localHelpJoinTask方法,如果方法调用失败会直接进入合并等待;否则会先进行helpJoinTask的尝试,尝试MAX_HELP/2次,成功的话会一直尝试,直到给定的任务完成;如果helpJoinTask尝试了MAX_HELP/2次都没有成功过,且本地队列一直没有任务,那么会进行一个特殊的尝试,会假设给定的任务在其他工作线程的任务队列的底部,然后去窃取这个任务,也会尝试MAX_HELP/2次。  

        看下localHelpJoinTask方法: 

    private boolean localHelpJoinTask(ForkJoinTask<?> joinMe) {
        int s, i; ForkJoinTask<?>[] q; ForkJoinTask<?> t;
        if ((s = queueTop) != queueBase && (q = queue) != null &&
            (i = (q.length - 1) & --s) >= 0 &&
            (t = q[i]) != null) {
            if (t != joinMe && t.status >= 0)
                //如果当前工作线程的任务队列顶端的任务不是给定任务,
                //且任务的状态是未取消(这里如果<0,一定是取消的任务),返回false。
                return false; 
            if (UNSAFE.compareAndSwapObject
                (q, (i << ASHIFT) + ABASE, t, null)) {
                //取出给定任务或者一个被取消的任务。
                queueTop = s;           // or putOrderedInt
                t.doExec();
            }
        }
        return true;
    }

        localHelpJoinTask方法中的逻辑是:如果当前任务队列顶端的任务是要合并的任务,或者是一个被取消的任务,那么尝试处理这个任务,返回成功;否则失败。  

        再看下joinTask方法中调用的tryDeqAndExec方法:

    private int tryDeqAndExec(ForkJoinTask<?> t) {
        int m = pool.scanGuard & SMASK;
        ForkJoinWorkerThread[] ws = pool.workers;
        //扫描所有工作线程
        if (ws != null && ws.length > m && t.status >= 0) {
            for (int j = 0; j <= m; ++j) {
                ForkJoinTask<?>[] q; int b, i;
                ForkJoinWorkerThread v = ws[j];
                if (v != null &&
                    (b = v.queueBase) != v.queueTop &&
                    (q = v.queue) != null &&
                    (i = (q.length - 1) & b) >= 0 &&
                    q[i] ==  t) {
                    //如果有工作线程的任务队列的底部正好是给定任务t。
                    //尝试窃取t后执行。
                    long u = (i << ASHIFT) + ABASE;
                    if (v.queueBase == b &&
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
                        v.queueBase = b + 1;
                        v.stealHint = poolIndex;
                        ForkJoinTask<?> ps = currentSteal;
                        currentSteal = t;
                        t.doExec();
                        currentSteal = ps;
                    }
                    break;
                }
            }
        }
        return t.status;
    }

        tryDeqAndExec方法中的逻辑就是扫描所有工作线程,如果有工作线程的队列底部的任务正好是要合并的任务,那么窃取该任务然后处理之。  

        最后看下joinTask方法中调用的helpJoinTask方法:

    private boolean helpJoinTask(ForkJoinTask<?> joinMe) {
        boolean helped = false;
        int m = pool.scanGuard & SMASK;
        ForkJoinWorkerThread[] ws = pool.workers;
        if (ws != null && ws.length > m && joinMe.status >= 0) {
            int levels = MAX_HELP;              // remaining chain length
            ForkJoinTask<?> task = joinMe;      // base of chain
            outer:for (ForkJoinWorkerThread thread = this;;) {
                // 找到线程thread的窃取者v
                ForkJoinWorkerThread v = ws[thread.stealHint & m];
                if (v == null || v.currentSteal != task) {
                    //如果thread没有窃取者或者v当前窃取的任务不是task,扫描工作线程数组。
                    for (int j = 0; ;) {        // search array
                        if ((v = ws[j]) != null && v.currentSteal == task) {
                            //如果找到了窃取线程,将其设置为thread的窃取线程。
                            thread.stealHint = j;
                            break;              // save hint for next time
                        }
                        if (++j > m)
                            break outer;        // 没找到的话,直接跳出outer循环。
                    }
                }
                // 找到了窃取者v。
                for (;;) {
                    ForkJoinTask<?>[] q; int b, i;
                    if (joinMe.status < 0)
                        break outer; //如果joinMe任务已经完成,跳出outer循环。
                    if ((b = v.queueBase) == v.queueTop ||
                        (q = v.queue) == null ||
                        (i = (q.length-1) & b) < 0)
                        break;                  //如果v的队列是空的,跳出当前循环。
                    long u = (i << ASHIFT) + ABASE;
                    ForkJoinTask<?> t = q[i]; 
                    if (task.status < 0)
                        break outer;            //如果task任务已经完成,跳出outer循环。
                    //尝试窃取v的任务队列底部的任务。
                    if (t != null && v.queueBase == b &&
                        UNSAFE.compareAndSwapObject(q, u, t, null)) {
                        //窃取成功后,执行任务。
                        v.queueBase = b + 1;
                        v.stealHint = poolIndex;
                        ForkJoinTask<?> ps = currentSteal;
                        currentSteal = t;
                        t.doExec();
                        currentSteal = ps;
                        helped = true;
                    }
                }
                // 再去找v的窃取者,注意这里是一个链。
                ForkJoinTask<?> next = v.currentJoin;
                if (--levels > 0 && task.status >= 0 &&
                    next != null && next != task) {
                    task = next;
                    thread = v;
                }
                else
                    break;  // 如果超过最大深度(MAX_HELP) 或者 task已经执行完成 或者 找到了头(next==null) 或者 出现循环  退出。
            }
        }
        return helped;
    }

       
细节:
        这个方法的前提是当前线程需要join给定的任务joinMe,但是这个任务被其他线程(窃取者)窃取了。所以方法中首先找到窃取joinMe任务的工作线程v,如果找到了窃取者v,就会从v的任务队列中窃取任务来完成(帮助v完成任务)。但也有可能v也在join其他的任务(比如当前线程执行任务过程中,分裂出一个子任务A,工作线程v窃取了A,然后执行,执行过程中A由分裂出子任务A1,A1又被另一个工作线程v1给窃取了…是一个链),所以方法中要顺着这个链一直找下去,目的就是能尽快的合并joinMe任务。为了避免一些情况,这里尝试的最大链深度限定为MAX_HELP。  

        接上面joinTask方法,如果尝试不成功,会调用Pool的tryAwaitJoin方法:

    final void tryAwaitJoin(ForkJoinTask<?> joinMe) {
        int s;
        Thread.interrupted(); // clear interrupts before checking termination
        //如果joinMe未完成
        if (joinMe.status >= 0) {
            //尝试阻塞等待之前的预操作
            if (tryPreBlock()) {
                //在joinMe任务上阻塞等待
                joinMe.tryAwaitDone(0L);
                //被唤醒后的操作
                postBlock();
            }
            else if ((ctl & STOP_BIT) != 0L)
                //如果Pool关闭了,取消任务。
                joinMe.cancelIgnoringExceptions();
        }
    }

        先看下tryAwaitJoin方法中调用的tryPreBlock方法:

    /**
     * Tries to increment blockedCount, decrement active count
     * (sometimes implicitly) and possibly release or create a
     * compensating worker in preparation for blocking. Fails
     * on contention or termination.
     *
     * @return true if the caller can block, else should recheck and retry
     */
    private boolean tryPreBlock() {
        int b = blockedCount;
        //累加等待join任务的计数。
        if (UNSAFE.compareAndSwapInt(this, blockedCountOffset, b, b + 1)) {
            int pc = parallelism;
            do {
                ForkJoinWorkerThread[] ws; ForkJoinWorkerThread w;
                int e, ac, tc, rc, i;
                long c = ctl;
                int u = (int)(c >>> 32);
                if ((e = (int)c) < 0) {
                                                 // 如果Pool关闭了,跳过。
                }
                else if ((ac = (u >> UAC_SHIFT)) <= 0 && e != 0 &&
                         (ws = workers) != null &&
                         (i = ~e & SMASK) < ws.length &&
                         (w = ws[i]) != null) {
                    //如果当前活动的工作线程不大于cpu核数,且有线程在等待任务(处于空闲状态)。
                    //那么唤醒这个工作线程。
                    long nc = ((long)(w.nextWait & E_MASK) |
                               (c & (AC_MASK|TC_MASK)));
                    if (w.eventCount == e &&
                        UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
                        w.eventCount = (e + EC_UNIT) & E_MASK;
                        if (w.parked)
                            UNSAFE.unpark(w);
                        return true;             
                    }
                }
                else if ((tc = (short)(u >>> UTC_SHIFT)) >= 0 && ac + pc > 1) {
                    //如果总的工作线程数量不少于cpu核心数量,且至少有一个活动的工作线程。
                    //尝试在总控信息上将AC递减。
                    long nc = ((c - AC_UNIT) & AC_MASK) | (c & ~AC_MASK);
                    if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc))
                        return true;             
                }
                else if (tc + pc < MAX_ID) {
                    //如果不满足上面条件,这里会增加一个工作线程。
                    long nc = ((c + TC_UNIT) & TC_MASK) | (c & ~TC_MASK);
                    if (UNSAFE.compareAndSwapLong(this, ctlOffset, c, nc)) {
                        addWorker();
                        return true;            
                    }
                }
                //如果失败,这里会把刚才对b增加的1给减回去。
            } while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset,
                                               b = blockedCount, b - 1));
        }
        return false;
    }

        继续看下tryAwaitJoin方法中调用的ForkJoinTask的tryAwaitDone方法:

    final void tryAwaitDone(long millis) {
        int s;
        try {
            if (((s = status) > 0 ||
                 (s == 0 &&
                  UNSAFE.compareAndSwapInt(this, statusOffset, 0, SIGNAL))) &&
                status > 0) {
                synchronized (this) {
                    if (status > 0)
                        wait(millis);
                }
            }
        } catch (InterruptedException ie) {
            // caller must check termination
        }
    }

       
细节:
               阻塞等待(join任务)时,首先会检查join任务的状态,如果join任务未完成的话,才可以在join任务上等待。也就是说,join的运行状态(再回顾一下task的运行状态定义)必须大于等于0。如果join的status大于0,说明join任务上已经有其他工作线程等待了,当前线程直接等待就可以了;如果join的status等于0,说明当前线程是第一个要在join任务上阻塞等待的线程,那么会尝试将join的status改为SIGNAL(1),然后进行阻塞等待工作。注意方法中不会处理中断异常,需要外部来处理。  

        在看下tryAwaitJoin方法中调用的postBlock方法:

    private void postBlock() {
        long c;
        do {} while (!UNSAFE.compareAndSwapLong(this, ctlOffset,  // 累加活动线程计数
                                                c = ctl, c + AC_UNIT));
        int b;
        do {} while (!UNSAFE.compareAndSwapInt(this, blockedCountOffset, // 递减等待join任务的计数。
                                               b = blockedCount, b - 1));
    }

          

        最后,tryAwaitJoin方法中如果发现Pool关闭,会取消joinMe任务,调用其cancelIgnoringExceptions方法:

    final void cancelIgnoringExceptions() {
        try {
            cancel(false);
        } catch (Throwable ignore) {
        }
    }
    private static final int CANCELLED   = -2;
    public boolean cancel(boolean mayInterruptIfRunning) {
        return setCompletion(CANCELLED) == CANCELLED;
    }
    private int setCompletion(int completion) {
        for (int s;;) {
            if ((s = status) < 0)
                return s;
            if (UNSAFE.compareAndSwapInt(this, statusOffset, s, completion)) {
                if (s != 0)
                    synchronized (this) { notifyAll(); }
                return completion;
            }
        }
    }

        可见,cancelIgnoringExceptions中的逻辑就是将任务运行状态设置为CANCELLED,然后唤醒在任务上等待的线程(如果有的话)。  

        最后回到join方法,如果正常完成会调用getRawResult方法:

    public abstract V getRawResult();

        ForkJoinTask中的getRawResult方法未实现,交由子类去实现,比如在RecursiveTask中:

    V result;
    ...
    public final V getRawResult() {
        return result;
    }
    ...

    protected final boolean exec() {
        result = compute();
        return true;
    }

      

 

        如果join方法中,任务非正常结束,会调用reportResult方法:

    private V reportResult() {
        int s; Throwable ex;
        if ((s = status) == CANCELLED)
            throw new CancellationException();
        if (s == EXCEPTIONAL && (ex = getThrowableException()) != null)
            UNSAFE.throwException(ex);
        return getRawResult();
    }

        方法中,如果任务状态为取消,抛出取消异常;如果任务状态是异常结束,会从异常表中获取异常,获取到的话,抛出异常。    

  • 简单总结一下ForkJoinPool中的ForkJoinTask的fork/join流程:

          1.fork时,会将任务添加到当前工作线程的任务队列的里面。
          2.join某个任务后,当前线程要做的首先是想办法完成这个任务,或者帮助加快这个任务的完成,如果这些尝试失败,当前线程就会在要join的任务(等待队列)上进行阻塞等待,等任务完成后被唤醒。      
最后:

       本篇通过一个ForkJoin任务的fork/join过程来分析代码,结合上一篇的话,已经涵盖了ForkJoinTask一个完整执行过程的相关代码了。下篇会做一个收尾工作,将本篇和上篇未涉及到的ForkJoin框架源码分析一下。 

 

 

    原文作者:JUC
    原文地址: https://blog.csdn.net/iteye_11160/article/details/82643814
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞