Jdk1.7 JUC源码增量解析(2)-ForkJoin-框架概览

Jdk1.7 JUC源码增量解析(2)-ForkJoin-框架概览

作者:大飞

 

  • ForkJoin框架是什么?

       ForkJoin框架是jdk1.7提供的一个并行计算框架。

  • ForkJoin框架能干什么?

       首先ForkJoin框架是针对一些符合
ForkJoin模型的任务而设计的,那什么是ForkJoin模型呢?看个图先:

《Jdk1.7 JUC源码增量解析(2)-ForkJoin-框架概览》

 
       
注:图片来之
https://en.wikipedia.org/wiki/Fork%E2%80%93join_model          ForkJoin模型是指一些任务在执行过程中会分裂出一些子任务(或子过程),而且这些子任务是可以并行执行的(相互没有制约),子任务们执行完毕后合并到主任务中,主任务继续执行。这种模型本身也体现了分而治之的思想。          能体现ForkJoin模型的典型栗子就是归并排序,归并排序这个老黄历就不解释了,还是看个图吧: 

《Jdk1.7 JUC源码增量解析(2)-ForkJoin-框架概览》

 
 
       
注:图片来之
http://www.cnblogs.com/horizonice/p/4102553.html          可见在归并过程中,会不断分裂出许多子任务,而同一层级子合并任务之间不会相互制约,可以并行执行。        ForkJoin框架就是针对这种模型,提供的一套并行计算框架,可以很好的利用多核的能力来挖掘任务的并行性。目的是缩短任务的总体执行时间,同时也能充分的利用计算机资源。        
注:ForkJoin是一个单机框架,类似的分布式的框架有Hadoop这类的,它们的计算模型是MapReduce,体现了和ForkJoin一样的思想-分而治之。  

  • 能不能给个ForkJoin框架的使用示例?

       为了更关注于框架本身,这里给一个最简单的示例,计算整数1到n的累加结果。(假设我们不知道高斯求和公式…)        首先需要定义一个任务:

public class SumTask extends RecursiveTask<Long>{
	private static final int THRESHOLD = 10;
	
	private long start;
	private long end;
	
	public SumTask(long n) {
		this(1,n);
	}
	
	private SumTask(long start, long end) {
		this.start = start;
		this.end = end;
	}
	@Override
	protected Long compute() {
		long sum = 0;
		if((end - start) <= THRESHOLD){
			for(long l = start; l <= end; l++){
				sum += l;
			}
		}else{
			long mid = (start + end) >>> 1;
		    SumTask left = new SumTask(start, mid);
		    SumTask right = new SumTask(mid + 1, end);
			left.fork();
			right.fork();
			sum = left.join() + right.join();
		}
		return sum;
	}
	private static final long serialVersionUID = 1L;
}

       可见我们定义的任务首先要继承RecursiveTask类。
简单说明一下RecursiveTask,它表示一种有返回值的ForkJoin任务,对应的无返回值(比如给数组排序这种任务)的版本是RecursiveAction。        代码中定义了一个阀值10,也就是说我们会把1-n拆分成1-10、10-20…这样的长度小于10的分段(代码中fork出子任务),然后分别计算这些分段和,然后再不断汇总(通过join来获取子任务的结果),得到最终的结果。          任务定义好了,下面我们来执行这个任务: 

	public static void main(String[] args) {
		ForkJoinPool forkJoinPool = new ForkJoinPool();
		//计算1-100的和。
		SumTask sumTask = new SumTask(100);
		ForkJoinTask<Long> forkJoinTask = forkJoinPool.submit(sumTask);
		try {
			long result = forkJoinTask.get();
			System.out.println("result="+result);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}
	}

       输出结果: 

result=5050

       通过执行代码可见,我们会将任务提交到ForkJoinPool中,等待任务被执行完毕后,获取执行结果。是不是发现这种使用方式和JDK线程池的使用方式有些类似,没错,ForkJoinPool也是ExecutorService的一种实现并且ForkJoinTask也是Future的一种实现。        
注:JDK1.8中的Arrays提供了并行排序(parallelSort),就是利用ForkJoin框架来做的。  

  • ForkJoin框架内部组件及工作原理

       ForkJoin框架包括下面这些类:              
ForkJoinPool
              ForkJoinWorkerThread
              ForkJoinTask
              RecursiveTask
              RecursiveAction        
 ForkJoinPool充当ForkJoin工作线程池的角色,管理和维护ForkJoin的工作线程,也管理一部分任务。
       ForkJoinWorkerThread就是执行ForkJoin任务的工作线程。
       ForkJoinTask是ForkJoin任务,它定义ForkJoin任务的一些基础行为。
       RecursiveTask和RecursiveAction都继承自ForkJoinTask,分别表示有返回值和无返回值的任务。         ForkJoin框架的主要逻辑功能都实现在ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask中,RecursiveTask和RecursiveAction中只是简单的做了返回值的逻辑,然后开放了compute方法交由子类去实现具体的计算逻辑。          ForkJoin框架内部采用了work-stealing(工作窃取)模式:每个工作线程都有自己的任务队列(一般是双端队列),工作线程首先会处理自己任务队列里面的任务,如果自己任务队列中没有任务了,会从其他工作线程的任务队列中窃取任务来执行。        
注:关于工作窃取的理论可以看下《多处理器编程的艺术》中的第16章相关内容,另外本书包含了JUC包实现的很多理论基础,虽然比较枯燥,但很值得一读。  

  • ForkJoin框架内部重要类概览

       下面对ForkJoinPool、ForkJoinWorkerThread和ForkJoinTask这三个类做下概览,后续的源码分析也主要针对这三个类进行。          
首先看下ForkJoinPool,这个类是整个ForkJoin框架的主体,简单看下内部一些重要的域: 

    /**
     * ForkJoinPool的总控制信息,包含在一个long里面:
     * AC: 表示当前活动的工作线程的数量减去并行度得到的数值。(16 bits)
     * TC: 表示全部工作线程的数量减去并行度得到的数值。(16bits)
     * ST: 表示当前ForkJoinPool是否正在关闭。(1 bit)
     * EC: 表示Treiber stack顶端的等待工作线程的等待次数。(15 bits)
     * ID: Treiber stack顶端的等待工作线程的下标取反(16 bits)
     *
     * 1111111111111111 1111111111111111  1  111111111111111 1111111111111111
     * AC               TC                ST EC              ID 
     *
     * 如果AC为负数,说明没有足够的活动工作线程。
     * 如果TC为负数,说明工作线程数量没达到最大工作线程数量。
     * 如果ID为负数,说明至少有一个等待的工作线程。
     * 如果(int)ctl为负数,说明ForkJoinPool正在关闭。
     */
    volatile long ctl;
    // bit positions/shifts for fields
    private static final int  AC_SHIFT   = 48;
    private static final int  TC_SHIFT   = 32;
    private static final int  ST_SHIFT   = 31;
    private static final int  EC_SHIFT   = 16;
    // bounds
    private static final int  MAX_ID     = 0x7fff;  // max poolIndex
    private static final int  SMASK      = 0xffff;  // mask short bits
    private static final int  SHORT_SIGN = 1 << 15;
    private static final int  INT_SIGN   = 1 << 31;
    // masks
    private static final long STOP_BIT   = 0x0001L << ST_SHIFT;
    private static final long AC_MASK    = ((long)SMASK) << AC_SHIFT;
    private static final long TC_MASK    = ((long)SMASK) << TC_SHIFT;
    // units for incrementing and decrementing
    private static final long TC_UNIT    = 1L << TC_SHIFT;
    private static final long AC_UNIT    = 1L << AC_SHIFT;
    // masks and units for dealing with u = (int)(ctl >>> 32)
    private static final int  UAC_SHIFT  = AC_SHIFT - 32;
    private static final int  UTC_SHIFT  = TC_SHIFT - 32;
    private static final int  UAC_MASK   = SMASK << UAC_SHIFT;
    private static final int  UTC_MASK   = SMASK << UTC_SHIFT;
    private static final int  UAC_UNIT   = 1 << UAC_SHIFT;
    private static final int  UTC_UNIT   = 1 << UTC_SHIFT;
    // masks and units for dealing with e = (int)ctl
    private static final int  E_MASK     = 0x7fffffff; // no STOP_BIT
    private static final int  EC_UNIT    = 1 << EC_SHIFT;

       ctl是ForkJoinPool中最重要的,也是设计最精密的域,它是整个ForkJoinPool的总控信息。所有信息包含在一个long(64bit)中,这些信息包括:当前活动的工作线程数量、当前总的工作线程数量、ForkJoinPool的关闭标志、在Treiber stack(由全部等待工作线程组成的一个链)顶端等待的工作线程的等待次数、Treiber stack(由全部等待工作线程组成的一个链)顶端等待的工作线程的ID信息(工作线程的下标取反)。        ctl还有一个相对不重要的作用就是,某些非volatile域会依赖ctl来保证可见性。   

    ForkJoinWorkerThread[] workers;

       workers是ForkJoinPool中保存工作线程的数组,它的更新会由一个锁(scanGuard)来保护。   

    volatile int scanGuard;

       scanGuard是另外一个比较重要的域,它有两个作用:1.作为更新工作线程数组是使用的(顺序)锁;2.作为扫描工作线程数组时使用的边界值来避免一些没用的扫描(当数组过大时)。   

    private static final int INITIAL_QUEUE_CAPACITY = 8;

    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

    private ForkJoinTask<?>[] submissionQueue;

    private final ReentrantLock submissionLock;

       ForkJoinPool中也有一个队列-submissionQueue,这个队列里存放的是有外部(非ForkJoin工作线程)提交到ForkJoinPool中的任务。

    volatile int queueBase;

    int queueTop;

       这两个域分别表示submissionQueue的顶部和底部。    

    final boolean locallyFifo;

       locallyFifo域也比较重要,它有ForkJoinPool的构造方法的参数asyncMode来指定。如果locallyFifo为true,表示内部将才用FIFO的方式来调度任务队列中的任务,而且这些任务可以分裂(fork),但最好不要合并(join),这种模式很适合来处理事件形式(event-style)的异步任务。默认locallyFifo为false。          ForkJoinPool中一些比较重要的域就简单介绍到这里,后续的源码解析中还会详细分析。          
再看下ForkJoinWorkerThread,这个类是ForkJoin框架中负责执行具体任务的工作线程,简单看下内部一些重要的域: 

    private static final int INITIAL_QUEUE_CAPACITY = 1 << 13;

    private static final int MAXIMUM_QUEUE_CAPACITY = 1 << 24; // 16M

    ForkJoinTask<?>[] queue;

    int queueTop;

    volatile int queueBase;

       queue就是ForkJoinWorkerThread中的任务队列,当从其他工作线程中窃取任务时,就是从这个队列中进行窃取。   

    final int poolIndex;

       工作线程在ForkJoinPool中工作线程数组中的下标。  

    int stealHint;

       stealHint保存了最近的窃取者(来窃取任务的工作线程)的下标(poolIndex)。注意这个值不准确,因为可能同时有很多窃取者来窃取任务,这个值只能记录其中之一。    

    int nextWait;

       nextWait算是比较难理解的一个域。首先所有的等待工作线程组成了一个隐式的单链(代码中也叫Treiber stack,由于行为类似于栈),链顶端的等待工作线程的信息保存在Pool的ctl中,新来的等待工作线程会将ctl中之前的等待工作线程信息保存到nextWait上,然后将自己的信息设置到ctl上。   

    final boolean locallyFifo;

       这个和Pool中的locallyFifo一致。   

    ForkJoinTask<?> currentSteal;

       当前工作线程最新窃取的任务。注意可以从其他工作线程的任务队列或者从Pool中的提交任务队列(submissionQueue)中窃取任务。   

    ForkJoinTask<?> currentJoin;

       当前工作线程正在合并的任务。        
 最后看下ForkJoinTask,这个类表示ForkJoin框架中执行的任务,简单看下内部域: 

    /** The run status of this task */
    volatile int status; // accessed directly by pool and workers
    private static final int NORMAL      = -1;
    private static final int CANCELLED   = -2;
    private static final int EXCEPTIONAL = -3;
    private static final int SIGNAL      =  1;

       ForkJoinTask中只有一个表示运行状态的域。初始为0;1表示在等待被唤醒;负数都表示执行完毕,-1表示正常完成、-2表示被取消、-3表示异常结束。  

  • 最后

       本篇的内容到此结束,由于本系列侧重于源码分析,所以这里只是简单的介绍了一下ForkJoin框架。        后续的文章会进行源码细节的分析,ForkJoin框架为了性能的提升,源码中充斥着耦合、内联、位操作、细节优化等情况,再加上框架本身比较精密的设计,所以源码读起来会有一点点蛋疼,所以请务必了解下本篇介绍的ForkJoinPool大体的工作模式和组件类中一些重要域的作用。        后续分析文章不会按照类为单位来分析(由于太耦合,这样分析会屎),而是按照一些运行过程来分析,请先了解下大体的执行过程。     

    原文作者:JUC
    原文地址: https://blog.csdn.net/iteye_11160/article/details/82643771
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞