我正在使用Rx来确保我们的后端服从某些第三方API的请求限制.
下面的实现使用简单的主题< T>.作为输入队列,然后使用James World’s custom Pace operator进行驯服.
只有在主线程上没有观察到由ObserveOn(TaskPoolScheduler.Default)强制执行的throttledRequests时,这才有效.
一旦我注释掉这一行(第61行),该程序就像完全没有使用Pace运算符一样,并且请求再次按照排队的速度再次处理.谁能解释这种行为?
using System;
using System.Linq;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using System.Reactive.Threading.Tasks;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
public static class ObservableExtensions
{
/// <summary>
/// James World's Pace operater (see https://stackoverflow.com/a/21589238/88513)
/// </summary>
public static IObservable<T> Pace<T>(this IObservable<T> source, TimeSpan interval)
{
return source.Select(i => Observable.Empty<T>()
.Delay(interval)
.StartWith(i))
.Concat();
}
}
class Program
{
ISubject<int> requests;
IObservable<int> throttledRequests;
private Task<T> QueueRequest<T>(int work, Func<int, Task<T>> doWork)
{
var task = throttledRequests
.Where(x => x == work)
.Take(1)
.SelectMany(doWork)
.ToTask();
// queue it
requests.OnNext(work);
return task;
}
private Task<int> DoRequest(int x)
{
Console.WriteLine("{0:T}: DoRequest({1}) on TID {2}", DateTime.UtcNow, x, Thread.CurrentThread.ManagedThreadId);
return Task.FromResult(x);
}
private void Run()
{
// initialize request queue
requests = new Subject<int>();
// create a derived rate-limited queue
throttledRequests = requests
.Pace(TimeSpan.FromMilliseconds(1000))
.Publish()
.RefCount()
.ObserveOn(TaskPoolScheduler.Default);
Console.WriteLine("Main TID: {0}", Thread.CurrentThread.ManagedThreadId);
int i = 0;
while (true)
{
// Queue a number of requests
var tasks = Enumerable.Range(i * 10, 10)
.Select(x => QueueRequest(x, DoRequest))
.ToArray();
Task.WaitAll(tasks);
Console.ReadLine();
i++;
}
}
static void Main(string[] args)
{
new Program().Run();
}
}
}
最佳答案 我无法完全回答问题(不确定为什么它在ThreadPoolScheduler上运行)但我会给你我的想法,并展示如何修复它以使用或不使用ThreadPoolScheduler按预期运行.
首先你可能会注意到,即使在ThreadPoolScheduler上它也无法正常工作 – 通常前1-3个项目会得到处理而不会有任何延迟.之后为什么他们开始延迟处理仍然不清楚.现在到了原因.请考虑以下示例代码:
var result = Observable.Range(0, 10).Delay(TimeSpan.FromSeconds(10)).StartWith(1).Take(1).ToTask().Result;
在这里,不会有任何延迟,任务将立即完成.为什么?因为StartWith立即在序列的开头注入“1”,然后Take(1)获取该值并完成 – 没有理由继续序列,因此永远不会执行延迟.例如,如果您使用Take(2) – 它将在完成之前延迟10秒.
出于同样的原因,您的代码永远不会进入延迟(您可以通过在延迟后选择并记录到控制台来验证调试器).要修复,只需删除Take(1)(或将其更改为Take(2)) – 无论如何,每个键始终只有一个项目.执行此操作时,无论是否使用ThreadPoolScheduler,代码都将正确运行.