c# – 拍摄IObservable的快照

假设我有一项服务:

public interface ICustomersService
{
    IObservable<ICustomer> Customers
    {
        get;
    }
}

Customers属性的实现首先抓住所有现有客户并将其传递给观察者,之后它只传递稍后添加到系统中的客户.因此,它永远不会完成.

现在假设我想获取当前客户的快照(作为List< ICustomer>),忽略将来可能添加的任何快照.我怎么做?任何ToList()或其亲属的调用都将永远阻塞,因为序列永远不会完成.

我想我可以写自己的扩展,所以我尝试了这个:

public static class RxExtensions
{
    public static List<T> ToSnapshot<T>(this IObservable<T> @this)
    {
        var list = new List<T>();

        using (@this.Subscribe(x => list.Add(x)));

        return list;
    }
}

这似乎有效.例如:

var customers = new ReplaySubject<string>();

// snapshot has nothing in it
var snapshot1 = customers.ToSnapshot();

customers.OnNext("A");
customers.OnNext("B");

// snapshot has just the two customers in it
var snapshot2 = customers.ToSnapshot();

customers.OnNext("C");

// snapshot has three customers in it
var snapshot3 = customers.ToSnapshot();

我意识到当前的实现取决于调度程序是当前线程,否则ToSnapshot可能会在收到项目之前关闭其订阅.但是,我怀疑我还可以包含一个ToSnapshot覆盖,该覆盖采用IScheduler并确保在结束快照之前收到任何已安排的项目.

我找不到Rx内置的这种快照功能.我错过了什么吗?

最佳答案 您可以尝试在您的observable上使用超时

source.Customers().TakeUntil(DateTime.Now).ToEnumerable();
点赞