system.reactive – Rx-几个生产者/一个消费者

一直试图谷歌这个但是有点卡住了.

假设我们有一个触发事件的类,并且该事件可能同时由多个线程触发.

使用Observable.FromEventPattern,我们创建一个Observable,并订阅该事件. Rx究竟如何管理多个同时被触发的事件?假设我们在不同的线程上快速连续发射了3个事件.它是否在内部排队,然后为每个队列同步调用Subscribe委托?假设我们订阅了一个线程池,我们是否仍然可以保证订阅将在时间上单独处理?

接下来,让我们说对于每个事件,我们想要执行一个动作,但它是一个可能不是线程安全的方法,所以我们一次只希望一个线程在这个方法中.现在我看到我们可以使用EventLoop Scheduler,可能我们不需要对代码实现任何锁定?

另外,观察当前线程是一种选择吗?当前线程是事件被触发的线程,还是订阅设置的事件?即当前线程是否保证始终相同,或者可能有2个线程同时运行在方法中?

谢谢

PS:我把一个例子放在一起,但我似乎总是在我的订阅方法中使用相同的线程,即使我在ObserveOn线程池中,这是令人困惑的:S

PSS:通过做一些实验,似乎如果没有指定Scheduler,那么RX将只在事件被触发的任何线程上执行,这意味着它同时处理几个.一旦我引入了一个调度程序,无论调度程序的类型是什么,它总是连续运行.奇怪:S

最佳答案 根据
Rx Design Guidelines,观察者不应同时将观察者称为OnNext.在进行下一次呼叫之前,它将始终等待当前呼叫完成.所有Rx方法都遵循这一惯例.更重要的是,他们认为你也尊重这个惯例.当您违反此条件时,您可能会遇到Observable行为的微妙错误.

对于有源数据不符合此约定的那些时间(即它可以同时生成数据),它们提供Synchronize.

Observable.FromEventPattern假设您不会触发并发事件,因此不会阻止并发下游通知.如果您计划从多个线程触发事件,有时同时触发,则使用Synchronize()作为您在FromEventPattern之后执行的第一个操作:

// this will get you in trouble if your event source might fire events concurrently.
var events = Observable.FromEventPattern(...).Select(...).GroupBy(...);

// this version will protect you in that case.
var events = Observable.FromEventPattern(...).Synchronize().Select(...).GroupBy(...);

现在,正如Rx设计指南所承诺的那样,所有下游操作符(以及最终的观察者)都受到保护,不会受到并发通知的影响.使用简单的互斥锁(也称为锁定语句)进行同步.没有奇特的排队或任何东西.如果一个线程尝试引发事件而另一个线程已经引发它,则第二个线程将阻塞,直到第一个线程完成.

点赞