文章目录
Java 8 Stream入门概述
今天,我们将对Java 8
的新特性Stream
进行深入概述,当我第一次阅读Stream API
的时候,被其名所困惑,容易让人将其与Java I/O
的InputStream
和 OutputStream
相关联在一起,Java8
的 Stream
流是完全不同的东西。具有链操作的含义, 或将该类型的函数嵌套在一起。
本文将介绍如何使用Java 8 流以及如何利用不同类型操作。可以了解流的处理顺序,以及不用顺序对性能的影响。更强大的流操作符 reduce,collect 和 faltMap ,文末将介绍并行流(parallel streams)。
Streams流如何工作
一个流由一组支持不同操作的元素序列组成,并在这些元素上执行计算。
List<String> myList = Arrays.asList("a1", "a2", "b1", "c2", "c1");
myList.stream().filter(s -> s.startsWith("c"))
.map(String::toUpperCase)
.sorted()
.forEach(System.out::println);
流操作要么是中间的, 要么是终端。中间操作返回一个流, 这样我们就可以在不使用分号的情况下链接多个中间操作。终端操作要么无效, 要么返回非流结果。上面的例子中,filter
,map
,sorted
是中间操作符,forEach
是终端操作符。上例中的一串流操作也称为操作管线operation pipeline
。
大多数流操作都接受某种 lambda 表达式参数, 该函数接口指定操作的确切行为。这些操作中的大多数必须既不干涉(non-Interfering)又无状态(stateless)。那是什么意思?
即:
在不修改流的基础数据源时, 函数不受干扰, 例如, 在上面的示例中, 没有 lambda 表达式通过添加或移除集合中的元素来修改 myList。
当操作的执行的结果是确定性的时候, 就说函数是无状态的, 例如, 在上面的示例中, 没有 lambda 表达式依赖于在执行过程中可能更改的外部范围内的任何可变变量或状态。
不同的Streams流
Streams
可以通过不同的数据源创建出来,尤其是通过collections
集合。Lists
和Sets
支持新的方法stream()
和parallelStream()
来创建一个顺序或者并行的流。并行流可以在多个线程上运行, 我们将在后面在做讨论。
Arrays.asList("a1", "a2", "a3")
.stream()
.findFirst()
.ifPresent(System.out::println); // a1
在对象列表中调用方法stream () 将返回一个常规对象流Stream。但是, 不必创建集合来处理流, 正如下一个代码示例:
Stream.of("a1", "a2", "a3")
.findFirst()
.ifPresent(System.out::println); // a1
使用Stream.of()
从一串对象的引用中创建一个stram流。除了常规的对象流 , Java 8 中特定stream流类型的还对基础的数据类型进行支持,包括int
,long
和double
类型。一一对应的,可以猜到会有 IntStream
, LongStream
and DoubleStream
。
IntStreams
可以利用IntStream.range()
来替代常规的for循环。
IntStream.range(1, 4).forEach(System.out::println);
//输出:
1
2
3
所有这些简单的流基本和常规的对象流是一样的,不过有以下区别:
IntFunction
替代Function
,使用IntPredicate
替代Predicate
。并且,这些简单流还支持附加的终端聚合操作sum()
和average()
。
Arrays.stream(new int[]{1, 2, 3, 4}).map(n -> 2 * n + 1)
.average()
.ifPresent(System.out::println); // 6.0
有时需要将常规对象流转换为简单流或者相反,例如特定对象流支持mapToInt
,mapToLong()
和mapToDouble
:
Stream.of("a1","a2","a3","a4").map(s->s.substring(1))
.mapToInt(Integer::parseInt)
.max()
.ifPresent(System.out::println); //4
同样,简单流可以通过mapToObj()
转换成常规流对象:
IntStream.range(1,4)
.mapToObj(i-> "a" + i)
.forEach(System.out::println);
//输出:
a1
a2
a3
下面的例子结合上述两类函数:
Stream.of(1.0, 2.0, 3.0)
.mapToInt(Double::intValue)
.mapToObj(n -> "a" + n)
.forEach(System.out::println);
//输出:
a1
a2
a3
操作处理顺序
现在,已经学会了如何创建和使用不同类型的流;接下来, 继续深入了解如何处理流操作。
- 中间(intermediate)操作是懒惰的(lazyness)
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
});
执行上面这个代码片段,控制台上什么都不会打印。这是因为只有在终端操作出现时, 中间操作才会执行。
让我们使用forEach()
来扩展上面的代码片段:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return true;
})
.forEach(s -> System.out.println("forEach: " + s));
//输出:
filter: d2
forEach: d2
filter: a2
forEach: a2
filter: b1
forEach: b1
filter: b3
forEach: b3
filter: c
forEach: c
一个简单的方法就是流的所有元素在水平方向上执行操作,纵向上,字符串“d2”只有通过了filter
操作,才会到达forEach
操作,接下来才是”a2″。
下面的示例,可以减少对每个元素执行的实际操作数:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.anyMatch(s -> {
System.out.println("anyMatch: " + s);
return s.startsWith("A");
});
//输出:
map: d2
anyMatch: D2
map: a2
anyMatch: A2
anyMatch
操作返回true
当输入满足以”A”开头的条件的时候,当第二个元素输入为”A2″的时候,anyMatch返回true,由于流链垂直执行, 在这种情况下, 映射只执行两次。因此,避免映射流的所有元素,尽可能少地调用映射。
为什么操作顺序重要?
下面的例子包含两个中间操作map
和filter
,还有一个终端操作forEach
,接下来看看这些操作是如何被执行:
Stream.of("d2", "a2", "b1", "b3", "c")
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.filter(s -> {
System.out.println("filter: " + s );
return s.startsWith("A");
}).forEach(s->{System.out.println("forEach: " + s);});
//输出:
map: d2
filter: D2
map: a2
filter: A2
forEach: A2
map: b1
filter: B1
map: b3
filter: B3
map: c
filter: C
从控制台输出可以看到,map
和filter
各被调用5次,forEach
只被调用了一次,请重点关注第二个元素’a2’,只有在filter
中返回为true
时候,才会调用forEach
。
现在,我们可以通过尝试调整操作的顺序来减少实际执行的次数;将filter
调整至操作链的起始处:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s );
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s->{System.out.println("forEach: " + s);});
//输出:
filter: d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
这里map
映射只被执行了一次,所以当有大量的输入的时候,第二种的处理速度将会更快。
接下来,对上面的代码片段进行扩充,使用排序sorted
操作:
Stream.of("d2", "a2", "b1", "b3", "c")
.sorted((s1,s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.filter(s -> {
System.out.println("filter: " + s );
return s.startsWith("a");
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s->{System.out.println("forEach: " + s);});
//输出:
sort: a2; d2
sort: b1; a2
sort: b1; d2
sort: b1; a2
sort: b3; b1
sort: b3; d2
sort: c; b3
sort: c; d2
filter: a2
map: a2
forEach: A2
filter: b1
filter: b3
filter: c
filter: d2
控制台输出可以看出,sorted
在整个元素集合上都执行了一遍;换句话说, 排序是水平执行的。因此, 在这种情况下, 对输入集合中的每个元素进行多个组合的排序,共进行8次对比。
这里我们可以再次通过调整操作的顺序来优化性能:
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
//输出:
filter: d2
filter: a2
filter: b1
filter: b3
filter: c
map: a2
forEach: A2
在本例中,sorted
并没有被调用,是因为 filter
已经将元素集合过滤到只剩一个元素。
如果扩充一个”a5″元素在”a2″前面:
Stream.of("d2", "a5" ,"a2", "b1", "b3", "c")
.filter(s -> {
System.out.println("filter: " + s);
return s.startsWith("a");
})
.sorted((s1, s2) -> {
System.out.printf("sort: %s; %s\n", s1, s2);
return s1.compareTo(s2);
})
.map(s -> {
System.out.println("map: " + s);
return s.toUpperCase();
})
.forEach(s -> System.out.println("forEach: " + s));
}
//输出:
filter: d2
filter: a5
filter: a2
filter: b1
filter: b3
filter: c
sort: a2; a5
map: a2
forEach: A2
map: a5
forEach: A5
可以观察到filter
过滤出”a2″和”a5″两个元素,然后sorted
操作进行排序比较,接着分别执行中间操作map
映射和forEach
终端操作。
复用流(Reusing Streams)
无法重用 Java 8 流。一旦调用任何终端操作, 流就会关闭:
Stream<String> stream =
Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s -> s.startsWith("a"));
stream.anyMatch(s -> true); // ok
stream.noneMatch(s -> true); // exception
在同一个流上调用anyMatch
之后,再尝试调用noneMatch
报出如下异常:
java.lang.IllegalStateException: stream has already been operated upon or closed
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:229)
at java.util.stream.ReferencePipeline.noneMatch(ReferencePipeline.java:459)
To overcome this limitation we have to to create a new stream chain for every terminal operation we want to execute, e.g. we could create a stream supplier to construct a new stream with all intermediate operations already set up:
可以为每一个中断操作都创建一个新的流链,例如:可以创建一个流提供者(stream supplier),里面包含所有的公共中间操作:
Supplier<Stream<String>> streamSupplier =
() -> Stream.of("d2", "a2", "b1", "b3", "c")
.filter(s->s.startsWith("c"));
streamSupplier.get().anyMatch(s -> true); // ok
streamSupplier.get().noneMatch(s -> true); // ok
每次调用 get()
都会创建一个新的流,并执行后面的终端操作。
高级操作
流支持大量不同的操作。我们已经了解了最重要的操作, 如过滤器filter
或映射map
,接下来,我们将着重介绍collect
,flatMap
和 reduce
。
本节中的大多数代码示例使用以下人员列表进行演示:
class Person {
String name;
int age;
Person(String name, int age) {
this.name = name;
this.age = age;
}
@Override
public String toString() {
return name;
}
}
List<Person> persons =
Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
Collect
Collect是一个很有用的 终端操作 ,可以将流中的元素转换成不同的结果,比如:List,Set,或者 Map.
/** * Collect * */
List<Person> filtered = persons
.stream()
.filter(p -> p.name.startsWith("P"))
.collect(Collectors.toList());
System.out.println(filtered); // [Peter, Pamela]
可以看出从流元素生成一个List
列表是很容易的,如果要生成Set
集合的话,可以使用 Collectors.toSet()
- 分组聚合
按照年龄进行分组聚合:
Map<Integer, List<Person>> personsByAge = persons
.stream()
.collect(Collectors.groupingBy(p -> p.age));
personsByAge.forEach((age, p) -> System.out.format("age %s: %s\n", age, p));
//输出:
age 18: [Max]
age 23: [Peter, Pamela]
age 12: [David]
Collectors是非常灵活的,可以对流中的每个元素进行聚合,例如:所有人的平均年龄。
Double averageAge = persons
.stream()
.collect(Collectors.averagingInt(p -> p.age));
System.out.println(averageAge); // 19.0
Collectors汇总收集器将返回一个特殊的内置摘要统计对象,可以简单计算最小年龄、最大年龄、算术平均年龄、总和和数量。
IntSummaryStatistics ageSummary = persons
.stream()
.collect(Collectors.summarizingInt(p -> p.age));
System.out.println(ageSummary);
//输出:
IntSummaryStatistics{count=4, sum=76, min=12, average=19.000000, max=23}
以上完成与2018年10月22日
下面的例子将所有人连接为一个字符串:
String phrase = persons
.stream()
.filter(p -> p.age >= 18)
.map(p -> p.name)
.collect(Collectors.joining(" and ", "In Germany ", " are of legal age."));
System.out.println(phrase);
// In Germany Max and Peter and Pamela are of legal age.
连接收集器接受分隔符,以及可选的前缀和后缀。
为了将数据流中的元素转换为映射,我们需要指定键和值如何被映射。要记住键必须是唯一的,否则会抛出IllegalStateException
异常。你可以选择传递一个合并函数作为额外的参数来避免这个异常。
既然我们知道了一些最强大的内置收集器,让我们来尝试构建自己的特殊收集器吧。我们希望将流中的所有人转换为一个字符串,包含所有大写的名称,并以|
分割。为了完成它,我们通过Collector.of()
创建了一个新的收集器。我们需要传递一个收集器的四个组成部分:供应器、累加器、组合器和终止器。
Collector<Person, StringJoiner, String> personNameCollector =
Collector.of(
() -> new StringJoiner(" | "), // supplier
(j, p) -> j.add(p.name.toUpperCase()), // accumulator
(j1, j2) -> j1.merge(j2), // combiner
StringJoiner::toString); // finisher
String names = persons
.stream()
.collect(personNameCollector);
System.out.println(names); // MAX | PETER | PAMELA | DAVID
由于Java中的字符串是不可变的,我们需要一个助手类StringJointer
。让收集器构造我们的字符串。供应器最开始使用相应的分隔符构造了这样一个StringJointer
。累加器用于将每个人的大写名称加到StringJointer
中。组合器知道如何把两个StringJointer
合并为一个。最后一步,终结器从StringJointer
构造出预期的字符串。
flatMap
我们已经了解了如何通过使用map
操作,将流中的对象转换为另一种类型。map
有时十分受限,因为每个对象只能映射为一个其它对象。但如何我希望将一个对象转换为多个或零个其他对象呢?flatMap
这时就会派上用场。
flatMap
将流中的每个元素,转换为其它对象的流。所以每个对象会被转换为零个、一个或多个其它对象,以流的形式返回。这些流的内容之后会放进flatMap
所返回的流中。
在我们了解flatMap
如何使用之前,我们需要相应的类型体系:
class Foo {
String name;
List<Bar> bars = new ArrayList<>();
Foo(String name) {
this.name = name;
}
}
class Bar {
String name;
Bar(String name) {
this.name = name;
}
}
下面,我们使用我们自己的关于流的知识来实例化一些对象:
List<Foo> foos = new ArrayList<>();
// create foos
IntStream
.range(1, 4)
.forEach(i -> foos.add(new Foo("Foo" + i)));
// create bars
foos.forEach(f ->
IntStream
.range(1, 4)
.forEach(i -> f.bars.add(new Bar("Bar" + i + " <- " + f.name))));
现在我们拥有了含有三个foo
的列表,每个都含有三个bar
。
flatMap
接受返回对象流的函数。所以为了处理每个foo
上的bar
对象,我们需要传递相应的函数:
foos.stream()
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
// Bar1 <- Foo1
// Bar2 <- Foo1
// Bar3 <- Foo1
// Bar1 <- Foo2
// Bar2 <- Foo2
// Bar3 <- Foo2
// Bar1 <- Foo3
// Bar2 <- Foo3
// Bar3 <- Foo3
像你看到的那样,我们成功地将含有三个foo
对象中的流转换为含有九个bar
对象的流。
最后,上面的代码示例可以简化为流式操作的单一流水线:
IntStream.range(1, 4)
.mapToObj(i -> new Foo("Foo" + i))
.peek(f -> IntStream.range(1, 4)
.mapToObj(i -> new Bar("Bar" + i + " <- " + f.name))
.forEach(f.bars::add))
.flatMap(f -> f.bars.stream())
.forEach(b -> System.out.println(b.name));
flatMap
也可用于Java8引入的Optional
类。Optional
的flatMap
操作返回一个Optional
或其他类型的对象。所以它可以用于避免烦人的null
检查。
考虑像这样更复杂的层次结构:
class Outer {
Nested nested;
}
class Nested {
Inner inner;
}
class Inner {
String foo;
}
为了处理外层示例上的内层字符串foo
,你需要添加多个null
检查来避免潜在的NullPointerException
:
Outer outer = new Outer();
if (outer != null && outer.nested != null && outer.nested.inner != null) {
System.out.println(outer.nested.inner.foo);
}
可以使用Optional
的flatMap
操作来完成相同的行为:
Optional.of(new Outer())
.flatMap(o -> Optional.ofNullable(o.nested))
.flatMap(n -> Optional.ofNullable(n.inner))
.flatMap(i -> Optional.ofNullable(i.foo))
.ifPresent(System.out::println);
如果存在的话,每个flatMap
的调用都会返回预期对象的Optional
包装,否则为null
的Optional
包装。
reduce
归约操作将所有流中的元素组合为单一结果。Java8支持三种不同类型的reduce
方法。第一种将流中的元素归约为流中的一个元素。让我们看看我们如何使用这个方法来计算出最老的人:
persons
.stream()
.reduce((p1, p2) -> p1.age > p2.age ? p1 : p2)
.ifPresent(System.out::println); // Pamela
reduce
方法接受BinaryOperator
积累函数。它实际上是两个操作数类型相同的BiFunction
。BiFunction
就像是Function
,但是接受两个参数。示例中的函数比较两个人的年龄,来返回年龄较大的人。
第二个reduce
方法接受一个初始值,和一个BinaryOperator
累加器。这个方法可以用于从流中的其它Person
对象中构造带有聚合后名称和年龄的新Person
对象。
Person result =
persons
.stream()
.reduce(new Person("", 0), (p1, p2) -> {
p1.age += p2.age;
p1.name += p2.name;
return p1;
});
System.out.format("name=%s; age=%s", result.name, result.age);
// name=MaxPeterPamelaDavid; age=76
第三个reduce
对象接受三个参数:初始值,BiFunction
累加器和BinaryOperator
类型的组合器函数。由于初始值的类型不一定为Person
,我们可以使用这个归约函数来计算所有人的年龄总和。:
Integer ageSum = persons
.stream()
.reduce(0, (sum, p) -> sum += p.age, (sum1, sum2) -> sum1 + sum2);
System.out.println(ageSum); // 76
你可以看到结果是76。但是背后发生了什么?让我们通过添加一些调试输出来扩展上面的代码:
Integer ageSum = persons
.stream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Max
// accumulator: sum=18; person=Peter
// accumulator: sum=41; person=Pamela
// accumulator: sum=64; person=David
你可以看到,累加器函数做了所有工作。它首先使用初始值0
和第一个人Max来调用累加器。接下来的三步中sum
会持续增加,直到76。
等一下。好像组合器从来没有调用过?以并行方式执行相同的流会揭开这个秘密:
Integer ageSum = persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s\n", sum, p);
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s\n", sum1, sum2);
return sum1 + sum2;
});
// accumulator: sum=0; person=Pamela
// accumulator: sum=0; person=David
// accumulator: sum=0; person=Max
// accumulator: sum=0; person=Peter
// combiner: sum1=18; sum2=23
// combiner: sum1=23; sum2=12
// combiner: sum1=41; sum2=35
这个流的并行执行行为会完全不同。现在实际上调用了组合器。由于累加器被并行调用,组合器需要用于计算部分累加值的总和。
下一节我们会深入了解并行流。
并行流
流可以并行执行,在大量输入元素上可以提升运行时的性能。并行流使用公共的ForkJoinPool
,由ForkJoinPool.commonPool()
方法提供。底层线程池的大小最大为五个线程 – 取决于CPU的物理核数。
ForkJoinPool commonPool = ForkJoinPool.commonPool();
System.out.println(commonPool.getParallelism()); // 3
在我的机器上,公共池默认初始化为3。这个值可以通过设置下列JVM参数来增减:
-Djava.util.concurrent.ForkJoinPool.common.parallelism=5
集合支持parallelStream()
方法来创建元素的并行流。或者你可以在已存在的数据流上调用衔接方法parallel()
,将串行流转换为并行流。
为了描述并行流的执行行为,下面的例子向sout
打印了当前线程的信息。
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
通过分析调试输出,我们可以对哪个线程用于执行流式操作拥有更深入的理解:
filter: b1 [main]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: c2 [ForkJoinPool.commonPool-worker-3]
map: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: A2 [ForkJoinPool.commonPool-worker-1]
map: b1 [main]
forEach: B1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-3]
map: a1 [ForkJoinPool.commonPool-worker-3]
forEach: A1 [ForkJoinPool.commonPool-worker-3]
forEach: C1 [ForkJoinPool.commonPool-worker-2]
就像你看到的那样,并行流使用了所有公共的ForkJoinPool
中的可用线程来执行流式操作。在连续的运行中输出可能有所不同,因为所使用的特定线程是非特定的。
让我们通过添加额外的流式操作sort
来扩展这个示例:
Arrays.asList("a1", "a2", "b1", "c2", "c1")
.parallelStream()
.filter(s -> {
System.out.format("filter: %s [%s]\n",
s, Thread.currentThread().getName());
return true;
})
.map(s -> {
System.out.format("map: %s [%s]\n",
s, Thread.currentThread().getName());
return s.toUpperCase();
})
.sorted((s1, s2) -> {
System.out.format("sort: %s <> %s [%s]\n",
s1, s2, Thread.currentThread().getName());
return s1.compareTo(s2);
})
.forEach(s -> System.out.format("forEach: %s [%s]\n",
s, Thread.currentThread().getName()));
结果起初可能比较奇怪:
filter: c2 [ForkJoinPool.commonPool-worker-3]
filter: c1 [ForkJoinPool.commonPool-worker-2]
map: c1 [ForkJoinPool.commonPool-worker-2]
filter: a2 [ForkJoinPool.commonPool-worker-1]
map: a2 [ForkJoinPool.commonPool-worker-1]
filter: b1 [main]
map: b1 [main]
filter: a1 [ForkJoinPool.commonPool-worker-2]
map: a1 [ForkJoinPool.commonPool-worker-2]
map: c2 [ForkJoinPool.commonPool-worker-3]
sort: A2 <> A1 [main]
sort: B1 <> A2 [main]
sort: C2 <> B1 [main]
sort: C1 <> C2 [main]
sort: C1 <> B1 [main]
sort: C1 <> C2 [main]
forEach: A1 [ForkJoinPool.commonPool-worker-1]
forEach: C2 [ForkJoinPool.commonPool-worker-3]
forEach: B1 [main]
forEach: A2 [ForkJoinPool.commonPool-worker-2]
forEach: C1 [ForkJoinPool.commonPool-worker-1]
sort
看起来只在主线程上串行执行。实际上,并行流上的sort
在背后使用了Java8中新的方法Arrays.parallelSort()
。如javadoc所说,这个方法会参照数据长度来决定以串行或并行来执行。
如果指定数据的长度小于最小粒度,它使用相应的
Arrays.sort
方法来排序。
返回上一节中reduce
的例子。我们已经发现了组合器函数只在并行流中调用,而不在串行流中调用。让我们来观察实际上涉及到哪个线程:
List<Person> persons = Arrays.asList(
new Person("Max", 18),
new Person("Peter", 23),
new Person("Pamela", 23),
new Person("David", 12));
persons
.parallelStream()
.reduce(0,
(sum, p) -> {
System.out.format("accumulator: sum=%s; person=%s [%s]\n",
sum, p, Thread.currentThread().getName());
return sum += p.age;
},
(sum1, sum2) -> {
System.out.format("combiner: sum1=%s; sum2=%s [%s]\n",
sum1, sum2, Thread.currentThread().getName());
return sum1 + sum2;
});
控制台的输出表明,累加器和组合器都在所有可用的线程上并行执行:
accumulator: sum=0; person=Pamela; [main]
accumulator: sum=0; person=Max; [ForkJoinPool.commonPool-worker-3]
accumulator: sum=0; person=David; [ForkJoinPool.commonPool-worker-2]
accumulator: sum=0; person=Peter; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=18; sum2=23; [ForkJoinPool.commonPool-worker-1]
combiner: sum1=23; sum2=12; [ForkJoinPool.commonPool-worker-2]
combiner: sum1=41; sum2=35; [ForkJoinPool.commonPool-worker-2]
总之,并行流对拥有大量输入元素的数据流具有极大的性能提升。但是要记住一些并行流的操作,例如reduce
和collect
需要额外的计算(组合操作),这在串行执行时并不需要。
此外我们已经了解,所有并行流操作都共享相同的JVM相关的公共ForkJoinPool
。所以你可能需要避免实现又慢又卡的流式操作,因为它可能会拖慢你应用中严重依赖并行流的其它部分。
参考资源
https://winterbe.com/posts/2014/07/31/java8-stream-tutorial-examples/