通过前面的博客的介绍,我们知道Stream有一个源,0个或者多个中间操作,以及一个终止操作。Stream只有遇到终止操作,它的源才开始执行遍历操作,而且只会进行一次遍历,而不是每个操作都执行一次遍历。今天,我们就从源码的层面来分析一下JDK这一块是怎么实现的。
首先看下面一段代码,下面将以这一段代码来进行分析:
List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
list.stream().filter(i -> i % 2 == 0)
.limit(3)
.map(String::valueOf)
.forEach(System.out::println);
上面这段代码,中间操作,短路操作,终止操作全部都有了。
首先list.stream()
会返回一个Stream对象。我们可以跟进去,看看返回的到底是个什么对象。
default Stream<E> stream() {
return StreamSupport.stream(spliterator(), false);
}
继续跟到StreamSupport.stream
:
public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
Objects.requireNonNull(spliterator);
return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);
}
可以看到返回的是一个ReferencePipeline.Head
的实例。再跟进去,可以发现Head是ReferencePipeline类的一个静态内部类,并且Head还继承了ReferencePipeline,JDK注释说它表示的是ReferencePipeline的一个源阶段。
再来看看ReferencePipeline这个类。它是一个抽象类,继承了AbstractPipeline这个类并且实现了Stream接口。我们将上面的操作跟进去看实现,会发现filter,limit,map,forEach这些方法都是在这个类中实现的。也就是说ReferencePipeline这个类提供了大量的对流的操作的实现。
继续分析ReferencePipeline这个类,它的父类AbstractPipeline中定义了三个个AbstractPipeline类型的变量:sourceStage(源阶段),previousStage(上游pipeline,前一阶段),nextStage(下一阶段)。根据JDK文档的对着三个属性的说明可以知道:ReferencePipeline实际上是一个双向链表的数据结构。而ReferencePipeline对Stream的操作做了实现,每一个中间操作都会返回一个Stream对象,实际上就是ReferencePipeline对象,因此可以得到结论:Stream底层是通过双向链表来实现的。
经过上面的分析,我们就知道了流的源阶段返回的Stream和中间阶段返回的Stream到底是什么了。源阶段返回的是ReferencePipeline.Head对象,而中间操作阶段返回的是ReferencePipeline对象。在流的源阶段和中间阶段仅仅只是返回了ReferencePipeline对象,并没有做其他的方法调用操作,这也是为什么流在执行中间操作并不会有任何的输出或者结果产生。
那么这些中间操作在执行终止操作的时候是怎么被调用的呢?还有流的短路是怎么实现的呢?
接着上面,跟进filter方法的实现:
public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
Objects.requireNonNull(predicate);
return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
@Override
Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
@Override
public void begin(long size) {
downstream.begin(-1);
}
@Override
public void accept(P_OUT u) {
if (predicate.test(u))
downstream.accept(u);
}
};
}
};
}
可以看到仅仅只是new一个StatelessOp对象进行返回。跟进去StatelessOp发现它也是ReferencePipeline的一个静态内部类,并且也继承了ReferencePipeline,也印证了上面说的中间操作返回的都是ReferencePipeline对象。上面的代码还可以看到,StatelessOp对象对opWrapSink这个方法做了实现:返回一个Sink.ChainedReference对象。Sink.ChainedReference又是一个静态内部类,并且继承了Sink。并且Sink.ChainedReference中有一个Sink类型的属性downstream,可以知道:Sink.ChainedReference是一个单链表的数据结构。
跟到父类里面查看JDK对于opWrapSink这个方法的说明:接受一个用于接受操作结果的Sink对象并且返回一个接受当前操作输入类型元素的Sink。有点迷惑,查看一下这个Sink到底是什么。跟进去看JDK说明:Sink是Cosumer的一个子类,用来引导值来通过流管道的各个阶段,通过提供一些额外的方法实现来管理大小信息、控制流动等等。可以看到里面提供了一个accept方法,参数接受的是流中的值,用来定义一种行为,这个行为也就是流的操作。因此可以总结出的是:对于流的各种操作,底层其实是被封装成一个个的Sink(对于引用类型其实是Sink.ChainedReference)对象来进行操作的。而Sink.ChainedReference又是一种单链表的数据结构,所以,流中的操作会以单链表的形式被链接起来。而且查看各种中间操作的实现,可以发现opWrapSink这个方法返回的Sink.ChainedReference对象的accept方法实现中,都会调用下游Sink的accept方法。这无疑就是行为的一种串联。这也就解释了,流在遇到终止操作过后,是怎么实现的遍历的时候是每个元素经历一连串的操作,然后再遍历下一个元素。
最后在分析一下流的终止操作是什么样的以及流的短路是怎么实现的。
最后的forEach方法跟进去,这里肯定进入的是ReferencePipeline的forEach方法,因为Head是源的Pipeline。
public void forEach(Consumer<? super P_OUT> action) {
evaluate(ForEachOps.makeRef(action, false));
}
看下ForEachOps.makeRef的实现:
public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, boolean ordered) {
Objects.requireNonNull(action);
return new ForEachOp.OfRef<>(action, ordered);
}
看到最终返回一个ForEachOp.OfRef对象,跟进去发现ForEachOp.OfRef是ForEachOp的一个静态内部类并且继承了ForEachOp。并且可以查看到ForEachOp这个类有一个祖先是Sink。那说明ForEachOps.makeRef这个方法的作用就是把forEach操作封装成Sink对象。
在查看evaluate方法实现:
final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
assert getOutputShape() == terminalOp.inputShape();
if (linkedOrConsumed)
throw new IllegalStateException(MSG_STREAM_LINKED);
linkedOrConsumed = true;
return isParallel()
? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
: terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}
在跟到串行流的实现:
public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
return helper.wrapAndCopyInto(this, spliterator).get();
}
这里的第一个参数PipelineHelper类型其实是AbstractPipeline的父类,而AbstractPipeline又是ReferencePipeline的父类。再跟进helper.wrapAndCopyInto方法:
final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
return sink;
}
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
Objects.requireNonNull(sink);
for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
}
return (Sink<P_IN>) sink;
}
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
Objects.requireNonNull(wrappedSink);
if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
wrappedSink.begin(spliterator.getExactSizeIfKnown());
spliterator.forEachRemaining(wrappedSink);
wrappedSink.end();
}
else {
copyIntoWithCancel(wrappedSink, spliterator);
}
}
首先看wrapSink方法,可以看到通过ReferencePipeline的双向链表,从最后一个操作(也就是终止操作)往前遍历,将所有的操作都串联起来,最终返回一个指向第一个操作的Sink引用。这里的p.opWrapSink调用的其实就是每个操作返回的ReferencePipeline对象的opWrapSink方法。前面对这个方法做过了分析。
再来分析copyInto方法。可以看到逻辑分为短路操作和非短路操作,如果有短路操作就会执行下面的copyIntoWithCancel方法,否则指向上面的逻辑,最终会对封装好的sink执行accept,而每一个sink的accept方法里优惠调用下游sink的accept方法,来实现操作串联。
最后分析下短路:
final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
@SuppressWarnings({"rawtypes","unchecked"})
AbstractPipeline p = AbstractPipeline.this;
while (p.depth > 0) {
p = p.previousStage;
}
wrappedSink.begin(spliterator.getExactSizeIfKnown());
p.forEachWithCancel(spliterator, wrappedSink);
wrappedSink.end();
}
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
}
可以看到短路流在执行遍历的时候会调用Sink封装的cancellationRequested方法,如果返回出就不会进行后面的操作。也就是说短路流在操作的时候都要重写这个方法,例如limit操作,肯定是对传入的参数m进行自减– 操作,等到m<=0,cancellationRequested这个方法肯定会返回true。
可以跟进去看下limit操作的实现:
public final Stream<P_OUT> limit(long maxSize) {
if (maxSize < 0)
throw new IllegalArgumentException(Long.toString(maxSize));
return SliceOps.makeRef(this, 0, maxSize);
}
下面是SliceOps.makeRef返回的ReferencePipeline.StatefulOp对象对于opWrapSink方法的实现:
Sink<T> opWrapSink(int flags, Sink<T> sink) {
return new Sink.ChainedReference<T, T>(sink) {
long n = skip;
long m = limit >= 0 ? limit : Long.MAX_VALUE;
@Override
public void begin(long size) {
downstream.begin(calcSize(size, skip, m));
}
@Override
public void accept(T t) {
if (n == 0) {
if (m > 0) {
m--;
downstream.accept(t);
}
}
else {
n--;
}
}
@Override
public boolean cancellationRequested() {
return m == 0 || downstream.cancellationRequested();
}
};
}