java8——并行处理与性能

绪论

之前的几章中,我们已经看到了新的Stream接口可以让你以声明性方式处理数据集。我们还解释了将外部迭代换为内部迭代能够让原生Java库控制流元素的处理。这种方法让Java程序员无需显示实现优化来为数据集的处理加速。到目前为止,最重要的好处是可以对这些集合执行操作流水线,能够自动利用计算机上的多个内核。

在本篇中,你将了解Stream接口如何让你不用太费力气就能对数据执行并行操作。它允许你声明性的将顺序流变为并行流。此外,你讲看到Java是如何变戏法的,或者更实际地来说,流是如何在幕后应用java7引入的分支/合并框架的。你还会发现,了解并行流内部是如何工作的很重要,因为如果你忽视这一方面,就可能因误用而得到意外(很可能是错的)的结果。

为此,会特别显示,在并行处理数据块之前,并行流被划分为数据块的方式在某些情况下恰恰是这些错误且无法解释的结果的根源。因此,你将会学习到如何通过实现和使用你自己的Spliterator来控制这个划分过程。

并行流

可以通过对收集源调用parallelStream方法来把集合转换为并行流。并行流就是一个把内容分成多个数据块,并用不同的线程分别处理每个数据块的流。这样一来,你就可以自动把给定操作的工作负荷分配给多核处理器的所有内核,让他们都忙起来。

假设要写一个方法,接受数字n作为参数,并返回从1到给定参数的的所有数字的和。一个直接的方法是生成一个无穷大的数字流,把它限制到给定的数目,然后用对两个数字求和的BinaryOperater来归约这个流

public static long sequentialSum(long n){
    return Stream.iterate(1L,i ->i + 1)
    .limit(n).reduce(0L,long :: sum);
}

这似乎是利用并行处理的好机会,特别是n很大的时候。那怎么入手呢?你要对结果变量进行同步吗?用多少个线程呢?谁负责生成数呢?谁来做加法呢?

根本不用担心。用并行流的话,这问题就简单多了。

将顺序流转化为并行流

可以把流转化成并行流,从而让前面的函数归约过程(也就是求和)并行运行——对顺序流调用parallel方法:

public static long parallelSum(long n){
    return Stream.iterate(1L,i -> i +1)
    .limit(n)
    .parallel()
    .reduce(0L,Long::sum);
}

上面的代码的不同之处在于Stream在内部分成了几块。因此可以对不同的快独立并行进行归纳操作。

《java8——并行处理与性能》 image

请注意,在现实中,对顺序流调用parallel方法并不意味着流本身有任何实际的变化。它在内部实际上就是设了一个boolean标志,表示你想让调用parallel之后进行的所有操作都并行执行。类似的,你只需要对并行流调用sequential方法就可以刻把它变成顺序流。请注意,你可能以为吧这两个方法都结合起来,就可以更细化地控制在遍历流时那些操作要并行执行,哪些要顺序执行。例如:

stream.parallel().filter(...).sequential().map(...).parallel().reduce();

但是最后一次parallel或者sequential调用会影响整个流水线。在本例中,流水线会并行执行,因为最后调用的是它。

那么,调用parallel方法,你可能会想,并行流用的线程是从哪儿 来的呢?有多少个?怎么定义这个过程?
并行流内部使用了默认的ForkJoinPool,它默认的是线程数量就是处理器的数量,这个值是Runtime.getRuntime().availableProcessors()得到的。

但是你可一个通过系统属性来改变。System.setProperty();这是一个全局设置,因此它将影响代码中所有的的并行流。反过来说,目前还无法专为某个并行流指定这个值。一般而言,让ForkJoinPool的大小等于处理器数量是个不错的默认值,强烈不建议修改它。

测量流性能

我们声称并行求和方法应该比顺序求和迭代方法性能好。然而在软件工程上,靠猜绝对不是什么好办法!特别是在优化性能时,要测量,测量,再测量。
例子:测量对前n个自然数求和的函数的性能

public long measureSumPerf(Function<Long ,Long> adder,long n){
    long fastest = Long.MAX_VALUE;
    for(int i= 0;i<10;i++){
        long start = System.nanoTime();
        long sum = adder.apply(n);
        long duration = (System.nanoTime()-start)/1000000;
        System.out.println("Result"+sum);
        if(duration<fastest) fastest = duration;
    }
    return fastest;
}

现在就可以把先前开发的所有方法都放进了一个名为ParallelStreams的类,你就可以用这个框架来测试书序加法器函数对前已前往个自然数求和要多久:

System.out.println(meadureSumPerf(ParallelStreams::sequentialSum,10000000))

//结果:97

用传统for循环的迭代版本执行起来应该会快很多,因为它更为底层,更重要的是不需要对原始类型做任何装箱和拆箱的工作。

System.out.println(measureSumPerf(ParallelStreams::iterativeSum,10000000));
//结果: 2

现在我们对函数的并行版本做测试:

System.out.println(measureSumPerf(ParallelStreams::parallelSum,10000000));
//结果:164

令人相当的失望,求和方法的并行版本比顺序执行版本要慢的多。这里有两个实际问题:

  1. iterate生成的是装箱的对象,必须拆箱成数字才能求和;
  2. 我们很难吧iterate分成多个独立块来并行执行。

iterate很难分割成能够独立执行的小块,因为每次应用这个函数都要依赖前一次用用的结果:

《java8——并行处理与性能》 image

这意味着,在这个特定的情况下,归纳进程不是像将顺序流转化成并行流那样进行的;整张数字列表在归纳过程开始时没有准备好,因而无法有效地把流划分为小块来并行处理。把流标记成并行,你其实是个顺序处理增加了开销,他还要八二每次求和操作分到一个不同的线程上。

这就说明了并行编程可能很复杂,有时候甚至有点违反直觉。如果用的不对(比如才用了一个不易并行化的操作,如iterate),它甚至可能让程序的整体性能更差,所以在调用那个看似神奇的aprallel操作时,了解背后到底发生了什么是很有必要的。

使用更有针对性的方法

那到底要怎么利用多核处理器,用流来高效地并行求和呢?我们在第5章中讨论了一个叫LongStream.rangeClosed的方法。这个方法与iterate相比有两个优点。

  • LongStream.rangeClosed直接产生原始类型的long数字,没有装箱拆箱的操作
  • LongStream.rangeClosed会生成数字范围,很容易拆分为独立的小块。列如,范围120可分为15,610,1115,16~20。
    先看一下它用于顺序流时的性能如何,看看拆箱的开销:
public static long rangedSum(long n){
    return LongStream.rangeClosed(1,n).reduce(0L,Long::sum);
}
//输出 17

这个数值流比前面那个用iterate工厂方法生成数字的顺序执行版本要快得多,因为数值流避免了非针对想流那些没必要的自动装箱和拆箱的操作。由此可见,选择适当的数据结构往往比并行化算法更为有效。使用并行的效果呢?

public static long rangedSum(long n){
    return LongStream.rangeClosed(1,n).parallel().reduce(0L,Long::sum);
}
//输出 1

终于有了一个比顺序执行更快的并行归纳。这也表明,使用正确的数据结构然后使其并行工作能够保证最佳的性能。

尽管如此,请记住,并行化并不是没有代价的。并行化过程本身需要对流做递归划分,把每个子流的归纳操作分配到不同的线程,然后把这些操作的结果合并成一个值。但在多个内核之间移动数据的代价也可能比你想要的要大,所以很重要的一点是要办证在内核中并行执行工作的时间比在内核之间传输数据的时间长。

高效使用并行流

  • 如果有疑问,测量。把顺序转成并行流轻而易举,但却比一定是好事。因为并行流并不总是比顺序快。
  • 留意装箱。自动装箱和拆箱操作会大大降低性能。java8中有原始类型流(IntStream,LongStream,DoubleStream)来避免这种操作,但凡有可能都要用这些流。
  • 有些操作本身在并行流上的性能就比顺序流查。特别是limit和findFirst等依赖于元素顺序的操作,他们在并行流上执行的代价非常大。findAny会比findFirst性能好,因为它不一定要按顺序执行。
  • 对于较小的数据量,选择并行流几乎从来都不是一个好的决定。并行处理少数几个元素的好处还抵不上并行化造成额外的开销。
  • 看流的内容是否可拆分

    《java8——并行处理与性能》 image

    流背后使用的基础架构是java7中引入的分支/合并框架。并行汇总的实例证明了要想正确使用并行流,了解它的内部原理至关重要。

分支/合并框架

分支/合并框架的目的是以递归方式将可以并行的任务拆分成更小的任务,然后将每个子任务的结果合并起来生成整体结果。它是ExecutorService接口的一个实现,它把子任务分配给线程池(称为ForkJoinPool)中的工作线程。

    原文作者:浔它芉咟渡
    原文地址: https://www.jianshu.com/p/23a013bb0d81
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞