我很好奇ConnectableObservable的用例,并认为从冷可观察(如数据库查询)中转换昂贵的排放可能会有所帮助,并将它们作为热点发出.这样可以避免昂贵的重放,并且将一组排放推送给所有操作符和订户.
然而,经过一些思考实验,我有一些担心,flatMaps中的自引用可能会导致问题.
例如,假设我通过ConnectableObservable发出值1到10.但是我将flatMap()的每个值都计算为所有值的总和,然后减去当前值.
ConnectableObservable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.publish();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
source.connect();
我希望我会得到这个输出.
1
2
3
4
5
6
7
8
9
10
SUM - i: 54
SUM - i: 53
SUM - i: 52
SUM - i: 51
SUM - i: 50
SUM - i: 49
SUM - i: 48
SUM - i: 47
SUM - i: 46
SUM - i: 45
但相反,我得到了这个.
1
2
3
4
5
6
7
8
9
10
SUM - i: 53
SUM - i: 50
SUM - i: 46
SUM - i: 41
SUM - i: 35
SUM - i: 28
SUM - i: 20
SUM - i: 11
SUM - i: 1
SUM - i: -10
正如我所担心的那样,flatMap()看起来需要重放值,因为它无法处理源的热顺序性质.因此,如果我使用cache()运算符,那么一切正常,因为缓存的值将为每个flatMap()运算符重放.
Observable<Integer> source = Observable.range(1,10)
.doOnNext(System.out::println)
.cache();
source.flatMap(i -> source.reduce(0,(x,y) -> x + y).map(sum -> sum - i))
.subscribe(sum -> System.out.println("SUM - i: " + sum));
这些是我的问题:
>这个ConnectableObservable流程究竟发生了什么?它看起来是确定性的,那么它是如何产生这些价值的呢?
>在任何使用它的运算符中,ConnectableObervable可能对自引用有危险吗?在这些情况下,cache()应该是go-to hot操作符吗?
最佳答案
What exactly happened with this ConnectableObservable process? It looks to be deterministic so how did it come up with those values?
这种设置是非直观的,但是发生的事情是内部总和在它们各自的起始值被创建之前不存在,并且它们中的每一个仅在它们创建之后看到原始序列的一个元素.例如,对于1,内部总和将仅从2到10获得事件.
Is it safe to say that ConnectableObervable can be dangerous to self-reference in any operators that use it? And cache() should be the go-to hot operator in these circumstances?
问题不在于ConnectableObservable,而是对时间敏感且订阅者敏感的发布:谁在那里接收事件,谁不会在那里追溯得到任何东西.