system.reactive – RX故障的解决方法?

我正在各种平台上试验Reactive Extensions,有一件让我烦恼的事情就是毛刺.

即使对于UI代码这些毛刺可能是not be that problematic,并且通常可以找到一个可以解决它们的运算符,我仍然发现在出现毛刺时调试代码更难:中间结果对于调试并不重要,但是我的头脑不知道当结果是中间或“最终”时.

在Haskell和同步数据流系统中使用了纯功能FRP,它也“感觉”错了,但这当然是主观的.

但是当将RX连接到非UI执行器(如电机或开关)时,我认为故障更成问题.如何确保只将正确的值发送到外部执行器?

也许这可以通过一些’调度员’解决,该调度员知道某些“外部传感器”何时触发了启动事件,以便在将最终结果转发给执行器之前处理所有内部事件.像flapjax paper中描述的东西.

我希望得到答案的问题是:

> RX中有什么东西可以解决同步通知的修复故障吗?
>如果没有,是否存在修复同步毛刺的RX(最好是生产质量)库或方法?特别是对于单线程Javascript,这可能有意义吗?
>如果不存在通用解决方案,RX如何用于控制外部传感器/执行器而不会在执行器上出现毛刺?

让我举个例子

假设我想打印一份合同所在的元组序列(a,b)

a=n    b=10 * floor(n/10)

n是自然数流= 0,1,2 ….

所以我期待以下顺序

(a=0, b=0)
(a=1, b=0)
(a=2, b=0)
...
(a=9, b=0)
(a=10, b=10)
(a=11, b=10)
...

在RX中,为了使事情更有趣,我将使用过滤器来计算b流

var n = Observable
        .Interval(TimeSpan.FromSeconds(1))
        .Publish()
        .RefCount();

var a = n.Select(t => "a=" + t);

var b = n.Where(t => t % 10 == 0)
        .Select(t => "b=" + t);

var ab = a.CombineLatest(b, Tuple.Create);

ab.Subscribe(Console.WriteLine);

这给了我认为是一个小故障(临时违反不变/合同):

(a=0, b=0)
(a=1, b=0)
(a=2, b=0)
...
(a=10, b=0) <-- glitch?
(a=10, b=10)
(a=11, b=10)

我意识到这是CombineLatest的正确行为,但我也认为这被称为故障,因为在真正的纯FRP系统中,你不会得到这些中间不变的违规结果.

请注意,在此示例中,我将无法使用Zip,并且WithLatestFrom也会提供不正确的结果.

当然我可以简单地将这个例子简化为一个monadic计算,永远不会对n个流的出现进行多播(这意味着无法过滤而只是映射),但这不是重点:在RX中的IMO总是会出现’故障’ ‘每当你拆分并重新加入一个可观察的流时:

    s
   / \
  a   b
   \ /
    t

例如,在FlapJAX中,您不会遇到这些问题.

这有什么意义吗?

非常感谢,
彼得

最佳答案 更新:让我尝试在RX上下文中回答我自己的问题.

首先,似乎我对“故障”的理解是错误的.从纯粹的FRP角度来看,RX中看起来像毛刺的故障似乎在RX中实际上是正确的行为.

所以我想在RX中我们需要明确我们期望从传感器中激活值的“时间”.

在我自己的例子中,执行器是控制台,传感器是间隔n.

所以,如果我改变我的代码

ab.Subscribe(Console.WriteLine);

ab.Sample(n).Subscribe(Console.WriteLine);

然后只打印“正确”值.

这意味着当我们得到一个组合来自传感器的值的可观察序列时,我们必须知道所有原始传感器,将它们全部合并,并在向执行器发送任何值之前使用该合并信号对值进行采样…

因此,另一种方法是将IObservable“提升”为“Sensed”结构,记住并合并原始传感器,例如:

public struct Sensed<T>
{
    public IObservable<T> Values;
    public IObservable<Unit> Sensors;

    public Sensed(IObservable<T> values, IObservable<Unit> sensors)
    {
        Values = values;
        Sensors = sensors;
    }

    public IObservable<Unit> MergeSensors(IObservable<Unit> sensors)
    {
        return sensors == Sensors ? Sensors : Sensors.Merge(sensors);
    }

    public IObservable<T> MergeValues(IObservable<T> values)
    {
        return values == Values ? Values : Values.Merge(values);
    }
}

然后我们必须将所有RX方法转移到这个“Sensed”结构:

public static class Sensed
{
    public static Sensed<T> Sensor<T>(this IObservable<T> source)
    {
        var hotSource = source.Publish().RefCount();
        return new Sensed<T>(hotSource, hotSource.Select(_ => Unit.Default));
    }

    public static Sensed<long> Interval(TimeSpan period)
    {
        return Observable.Interval(period).Sensor();
    }

    public static Sensed<TOut> Lift<TIn, TOut>(this Sensed<TIn> source, Func<IObservable<TIn>, IObservable<TOut>> lifter)
    {
        return new Sensed<TOut>(lifter(source.Values), source.Sensors);
    }

    public static Sensed<TOut> Select<TIn, TOut>(this Sensed<TIn> source, Func<TIn, TOut> func)
    {
        return source.Lift(values => values.Select(func));
    }

    public static Sensed<T> Where<T>(this Sensed<T> source, Func<T, bool> func)
    {
        return source.Lift(values => values.Where(func));
    }

    public static Sensed<T> Merge<T>(this Sensed<T> source1, Sensed<T> source2)
    {
        return new Sensed<T>(source1.MergeValues(source2.Values), source1.MergeSensors(source2.Sensors));
    }

    public static Sensed<TOut> CombineLatest<TIn1, TIn2, TOut>(this Sensed<TIn1> source1, Sensed<TIn2> source2, Func<TIn1, TIn2, TOut> func)
    {
        return new Sensed<TOut>(source1.Values.CombineLatest(source2.Values, func), source1.MergeSensors(source2.Sensors));
    }

    public static IDisposable Actuate<T>(this Sensed<T> source, Action<T> next) 
    {
        return source.Values.Sample(source.Sensors).Subscribe(next);
    }
}

我的例子变成了:

var n = Sensed.Interval(TimeSpan.FromMilliseconds(100));
var a = n.Select(t => "a=" + t);
var b = n.Where(t => t % 10 == 0).Select(t => "b=" + t);
var ab = a.CombineLatest(b, Tuple.Create);
ab.Actuate(Console.WriteLine);

并且仅将“期望的”值传递给致动器,但是通过这种设计,在感测结构中记住始发传感器.

我不确定这是否有任何“感觉”(双关语),也许我应该放弃对纯粹玻璃钢的渴望,并与之共存.毕竟,时间是相对的;-)

彼得

点赞