假设我有一项服务:
public interface ICustomersService
{
IObservable<ICustomer> Customers
{
get;
}
}
Customers属性的实现首先抓住所有现有客户并将其传递给观察者,之后它只传递稍后添加到系统中的客户.因此,它永远不会完成.
现在假设我想获取当前客户的快照(作为List< ICustomer>),忽略将来可能添加的任何快照.我怎么做?任何ToList()或其亲属的调用都将永远阻塞,因为序列永远不会完成.
我想我可以写自己的扩展,所以我尝试了这个:
public static class RxExtensions
{
public static List<T> ToSnapshot<T>(this IObservable<T> @this)
{
var list = new List<T>();
using (@this.Subscribe(x => list.Add(x)));
return list;
}
}
这似乎有效.例如:
var customers = new ReplaySubject<string>();
// snapshot has nothing in it
var snapshot1 = customers.ToSnapshot();
customers.OnNext("A");
customers.OnNext("B");
// snapshot has just the two customers in it
var snapshot2 = customers.ToSnapshot();
customers.OnNext("C");
// snapshot has three customers in it
var snapshot3 = customers.ToSnapshot();
我意识到当前的实现取决于调度程序是当前线程,否则ToSnapshot可能会在收到项目之前关闭其订阅.但是,我怀疑我还可以包含一个ToSnapshot覆盖,该覆盖采用IScheduler并确保在结束快照之前收到任何已安排的项目.
我找不到Rx内置的这种快照功能.我错过了什么吗?
最佳答案 您可以尝试在您的observable上使用超时
source.Customers().TakeUntil(DateTime.Now).ToEnumerable();