java8学习第三篇:forkjoin

forkjoin最初出现在java7中,在java8的demo里依然可以看到forkjoin的身影。

如果用简单的话来描述forkjoin,就是拆解子任务,并发运行,并最后统一运行结果。

fork:叉

join:连接

先分叉再连接,大概就是这么个意思。

forkjoin适用于如下场景:

如果有一个很大的任务需要被执行多次才能完成,而这个大任务可以以递归的方式划分为若干个独立的小任务,

而这些小任务的运行结果最终能通过某种方式汇合计算出大任务的结果。

如果有以上场景的话,就适用于forkjoin框架。

举个最简单的例子,求1-100的和。这个任务非常适合拆解,我可以把它拆成若个数字区间段,比如1-10,11-20…91-100。

然后每一个区间段分开运行,最终汇总每个区间段的和,加起来就是总和。

那么其实这个问题,我们就算不利用java中提供的forkjoin也可以实现,大概的代码就是利用Futrue和Callable来完成。

这里我就不写了,因为主要想讲的是forkjoin框架。

这里简单介绍java中提供的三个类

1. abstract class java.util.concurrent.ForkJoinTask<V>

2. abstract class java.util.concurrent.RecursiveAction extends ForkJoinTask<Void>

     需要实现的抽象方法:protected abstract void compute();

3. abstract class java.util.concurrent.RecursiveTask<V> extends ForkJoinTask<V>

    需要实现的抽象方法: protected abstract V compute();

ForkJoinTask是实现forkjoin的核心功能的抽象类。包括管理线程池,实例化线程运行子任务这些工作都在这个类里完成。

RecursiveAction ,我们一般直接继承的类,父类是ForkJoinTask,提供抽象方法void compute()供我们实现。主要处理无需返回值的子任务拆解场景。

RecursiveTask,我们一般直接继承的类,父类是ForkJoinTask,提供抽象方法V compute()供我们实现,可以返回泛型指定的数据对象,处理有返回的任务拆解场景。

那么一般我们如何实现compute呢?

首先要定义递归算法的临界值,也就是当条件不满足或低于临界值时,就不再拆解子任务,而是直接运算结果。

然后,在满足可以拆解的情况下,我们就拆解任务。

那么一次可以拆解几个任务呢?理论上拆解几个都行,但是要考虑到每个任务都是一个线程,所以拆解任务的粒度不宜过小,当然为了提高并发效率,也不宜过大,适中就好。

继续回到我们的问题上来,之前我们的举例是讲计算1-100的总和这个任务拆解成若干个子任务。

那么我们可以做一个简单的拆解,比如每次求和时,如果求和的起始和终止数字的差值大于20,就拆解

拆解的方案是从中间拆解。比如 1-100求和,差值>20,因此拆解为1-50求和和51-100求和两个子任务。

子任务运行时,如果发现差值>20,则继续拆解,直到满足差值<=20的情况,则调用求和算法求和。

然后将子任务的统计结果循环相加,最终得到1-100的和。

现在开始编码

public class ForkJoinTaskTest {
    private static AtomicInteger seq = new AtomicInteger(0);
    private static Object lock = new Object();

    private static final class Sum extends RecursiveTask<Integer> {
        private static final int THRESHOLD = 20;
        int start;
        int end;
        String name;
        int level;
        int id;
        int parentId;
        String parentName;

        public Sum(int id, Sum parent, String name, int start, int end, int level) {
            this.id = id;
            this.start = start;
            this.end = end;
            this.level = level;
            this.parentId = parent == null ? 0 : parent.id;
            this.parentName = parent == null ? "no Parent" : parent.name;
            this.name = parentId+"_"+name;
        }

        @Override
        protected Integer compute() {
            System.out.println("begin compute:" + this);
            int allsum = 0;
            if ((end - start) < THRESHOLD) {
                int sum = getSum(start, end);
                System.out.println("return sum:" + this + ",sum" + sum);
                return sum;
            } else {
                int middle = (start + end) / 2;
                Sum left = new Sum(seq.addAndGet(1), this, "left", start, middle, level + 1);
                Sum right = new Sum(seq.addAndGet(1), this, "right", middle + 1, end, level + 1);
                System.out.println("split:start=" + start + ",middle=" + middle + ",end=" + end+",left:" + left+",right"+right);
                left.fork();
                right.fork();
                allsum = left.join() + right.join();
                System.out.println("split end, allsum=" + allsum);
            }
            System.out.println("end compute:" + this);
            return allsum;
        }

        @Override
        public String toString() {
            return "{" +
                    "id=" + id +
                    ", name='" + name + '\'' +
                    ", level=" + level +
                    ", parentId=" + parentId +
                    ", parentName=" + parentName +
                    ", start=" + start +
                    ", end=" + end +
                    ", diff=" + (end-start) +
                    "}";
        }

        private int getSum(int start, int end) {
            int sum = 0;
            for (int i = start; i <= end; i++) {
                sum += i;
            }
            return sum;
        }
    }

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        Sum sum= new Sum(seq.addAndGet(1), null, "base", 0, 100, 1);
        Future<Integer> result = forkJoinPool.submit(sum);
        System.out.println(result.get());
    }
}

运行的结果是

begin compute:{id=1, name='0_base', level=1, parentId=0, parentName=no Parent, start=0, end=100, diff=100}
split:start=0,middle=50,end=100,left:{id=2, name='1_left', level=2, parentId=1, parentName=0_base, start=0, end=50, diff=50},right{id=3, name='1_right', level=2, parentId=1, parentName=0_base, start=51, end=100, diff=49}
begin compute:{id=2, name='1_left', level=2, parentId=1, parentName=0_base, start=0, end=50, diff=50}
begin compute:{id=3, name='1_right', level=2, parentId=1, parentName=0_base, start=51, end=100, diff=49}
split:start=0,middle=25,end=50,left:{id=4, name='2_left', level=3, parentId=2, parentName=1_left, start=0, end=25, diff=25},right{id=5, name='2_right', level=3, parentId=2, parentName=1_left, start=26, end=50, diff=24}
split:start=51,middle=75,end=100,left:{id=6, name='3_left', level=3, parentId=3, parentName=1_right, start=51, end=75, diff=24},right{id=7, name='3_right', level=3, parentId=3, parentName=1_right, start=76, end=100, diff=24}
begin compute:{id=6, name='3_left', level=3, parentId=3, parentName=1_right, start=51, end=75, diff=24}
split:start=51,middle=63,end=75,left:{id=8, name='6_left', level=4, parentId=6, parentName=3_left, start=51, end=63, diff=12},right{id=9, name='6_right', level=4, parentId=6, parentName=3_left, start=64, end=75, diff=11}
begin compute:{id=8, name='6_left', level=4, parentId=6, parentName=3_left, start=51, end=63, diff=12}
return sum:{id=8, name='6_left', level=4, parentId=6, parentName=3_left, start=51, end=63, diff=12},sum741
begin compute:{id=9, name='6_right', level=4, parentId=6, parentName=3_left, start=64, end=75, diff=11}
return sum:{id=9, name='6_right', level=4, parentId=6, parentName=3_left, start=64, end=75, diff=11},sum834
split end, allsum=1575
begin compute:{id=4, name='2_left', level=3, parentId=2, parentName=1_left, start=0, end=25, diff=25}
end compute:{id=6, name='3_left', level=3, parentId=3, parentName=1_right, start=51, end=75, diff=24}
begin compute:{id=7, name='3_right', level=3, parentId=3, parentName=1_right, start=76, end=100, diff=24}
split:start=0,middle=12,end=25,left:{id=10, name='4_left', level=4, parentId=4, parentName=2_left, start=0, end=12, diff=12},right{id=11, name='4_right', level=4, parentId=4, parentName=2_left, start=13, end=25, diff=12}
split:start=76,middle=88,end=100,left:{id=12, name='7_left', level=4, parentId=7, parentName=3_right, start=76, end=88, diff=12},right{id=13, name='7_right', level=4, parentId=7, parentName=3_right, start=89, end=100, diff=11}
begin compute:{id=10, name='4_left', level=4, parentId=4, parentName=2_left, start=0, end=12, diff=12}
begin compute:{id=12, name='7_left', level=4, parentId=7, parentName=3_right, start=76, end=88, diff=12}
return sum:{id=10, name='4_left', level=4, parentId=4, parentName=2_left, start=0, end=12, diff=12},sum78
return sum:{id=12, name='7_left', level=4, parentId=7, parentName=3_right, start=76, end=88, diff=12},sum1066
begin compute:{id=11, name='4_right', level=4, parentId=4, parentName=2_left, start=13, end=25, diff=12}
begin compute:{id=13, name='7_right', level=4, parentId=7, parentName=3_right, start=89, end=100, diff=11}
return sum:{id=11, name='4_right', level=4, parentId=4, parentName=2_left, start=13, end=25, diff=12},sum247
return sum:{id=13, name='7_right', level=4, parentId=7, parentName=3_right, start=89, end=100, diff=11},sum1134
split end, allsum=325
split end, allsum=2200
end compute:{id=4, name='2_left', level=3, parentId=2, parentName=1_left, start=0, end=25, diff=25}
end compute:{id=7, name='3_right', level=3, parentId=3, parentName=1_right, start=76, end=100, diff=24}
begin compute:{id=5, name='2_right', level=3, parentId=2, parentName=1_left, start=26, end=50, diff=24}
split end, allsum=3775
split:start=26,middle=38,end=50,left:{id=14, name='5_left', level=4, parentId=5, parentName=2_right, start=26, end=38, diff=12},right{id=15, name='5_right', level=4, parentId=5, parentName=2_right, start=39, end=50, diff=11}
end compute:{id=3, name='1_right', level=2, parentId=1, parentName=0_base, start=51, end=100, diff=49}
begin compute:{id=14, name='5_left', level=4, parentId=5, parentName=2_right, start=26, end=38, diff=12}
begin compute:{id=15, name='5_right', level=4, parentId=5, parentName=2_right, start=39, end=50, diff=11}
return sum:{id=15, name='5_right', level=4, parentId=5, parentName=2_right, start=39, end=50, diff=11},sum534
return sum:{id=14, name='5_left', level=4, parentId=5, parentName=2_right, start=26, end=38, diff=12},sum416
split end, allsum=950
end compute:{id=5, name='2_right', level=3, parentId=2, parentName=1_left, start=26, end=50, diff=24}
split end, allsum=1275
end compute:{id=2, name='1_left', level=2, parentId=1, parentName=0_base, start=0, end=50, diff=50}
split end, allsum=5050
end compute:{id=1, name='0_base', level=1, parentId=0, parentName=no Parent, start=0, end=100, diff=100}
5050

结果有点乱,但是这么多信息是为了帮助我们看清整个程序运行的情况。

我们可以根据日志大概想到程序运行的情况如下《java8学习第三篇:forkjoin》

根据上图我们可以分析出几个问题

1. 每个子任务因为是并发运行的,所有并没有绝对的先创建的任务(left)一定比后创建的任务(right)先运行。从ID号可以看出来,4,5早于6先创建

但是6分解(fork)的两个子任务8,9却比4,5fork出来的子任务10,11,14,15先运行。

2. 每一个父任务都会等待两个子任务都返回(join)后,再返回它自身给上层父任务。

最终最外层的任务拿到汇总后的结果,跟单独运行的计算结果是一样的。

那么我们再来看一下效率呢

数值forkjoin (THRESHOLD= size/5)normal
10000000501
500000005610
2000000008476
1000000000217381

这里可以看出,当运算量小时,直接运算的效率是要高于forkjoin。因为forkjoin会消耗着创建线程,线程切换的资源和时间。

而当运算量大时,forkjoin由于其多线程特性,更好的利用了CPU资源,而线程资源创建切换的损耗比起大量的计算时间来说也可以被忽略了。

因此forkjoin的表现更为出色。

    原文作者:java吧
    原文地址: https://blog.csdn.net/java8cn/article/details/22819735
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞