java – ConnectableObservable vs flatMap()自引用?

我很好奇ConnectableObservable的用例,并认为从冷可观察(如数据库查询)中转换昂贵的排放可能会有所帮助,并将它们作为热点发出.这样可以避免昂贵的重放,并且将一组排放推送给所有操作符和订户.

然而,经过一些思考实验,我有一些担心,flatMaps中的自引用可能会导致问题.

例如,假设我通过ConnectableObservable发出值1到10.但是我将flatMap()的每个值都计算为所有值的总和,然后减去当前值.

    ConnectableObservable<Integer> source = Observable.range(1,10)
            .doOnNext(System.out::println)
            .publish();

    source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
            .subscribe(sum -> System.out.println("SUM - i: " + sum));

    source.connect();

我希望我会得到这个输出.

1
2
3
4
5
6
7
8
9
10
SUM - i: 54
SUM - i: 53
SUM - i: 52
SUM - i: 51
SUM - i: 50
SUM - i: 49
SUM - i: 48
SUM - i: 47
SUM - i: 46
SUM - i: 45

但相反,我得到了这个.

1
2
3
4
5
6
7
8
9
10
SUM - i: 53
SUM - i: 50
SUM - i: 46
SUM - i: 41
SUM - i: 35
SUM - i: 28
SUM - i: 20
SUM - i: 11
SUM - i: 1
SUM - i: -10

正如我所担心的那样,flatMap()看起来需要重放值,因为它无法处理源的热顺序性质.因此,如果我使用cache()运算符,那么一切正常,因为缓存的值将为每个flatMap()运算符重放.

    Observable<Integer> source = Observable.range(1,10)
            .doOnNext(System.out::println)
            .cache();

    source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
            .subscribe(sum -> System.out.println("SUM - i: " + sum));

这些是我的问题:

>这个ConnectableObservable流程究竟发生了什么?它看起来是确定性的,那么它是如何产生这些价值的呢?
>在任何使用它的运算符中,ConnectableObervable可能对自引用有危险吗?在这些情况下,cache()应该是go-to hot操作符吗?

最佳答案

What exactly happened with this ConnectableObservable process? It looks to be deterministic so how did it come up with those values?

这种设置是非直观的,但是发生的事情是内部总和在它们各自的起始值被创建之前不存在,并且它们中的每一个仅在它们创建之后看到原始序列的一个元素.例如,对于1,内部总和将仅从2到10获得事件.

Is it safe to say that ConnectableObervable can be dangerous to self-reference in any operators that use it? And cache() should be the go-to hot operator in these circumstances?

问题不在于ConnectableObservable,而是对时间敏感且订阅者敏感的发布:谁在那里接收事件,谁不会在那里追溯得到任何东西.

点赞