c# – TPL数据流循环完成

我在确定如何在循环TPL数据流中检测完成时遇到问题.

我在数据流的一部分中有一个反馈循环,它向远程服务器发出GET请求并处理数据响应(用更多数据流转换这些数据流然后提交结果).

数据源将其结果拆分为1000条记录的页面,并不会告诉我它有多少页面可供我使用.我必须继续阅读,直到我得到不到一整页的数据.

通常页数是1,经常是10,每次我们都有1000.

我有很多要求在开始时提取.
我希望能够使用一个线程池来处理这个,所有这些都很好,我可以排队多个数据请求并同时请求它们.如果我偶然遇到需要获取大量页面的实例,我想要使用我的所有线程.我不想留下一个线索,而其他人已经完成了.

我遇到的问题是当我将此逻辑放入数据流时,例如:

//generate initial requests for activity
var request = new TransformManyBlock<int, DataRequest>(cmp => QueueRequests(cmp));

//fetch the initial requests and feedback more requests to our input buffer if we need to
TransformBlock<DataRequest, DataResponse> fetch = null;
fetch = new TransformBlock<DataRequest, DataResponse>(async req =>
{
    var resp = await Fetch(req);

    if (resp.Results.Count == 1000)
        await fetch.SendAsync(QueueAnotherRequest(req));

    return resp;
}
, new ExecutionDataflowBlockOptions {  MaxDegreeOfParallelism = 10 });

//commit each type of request
var commit = new ActionBlock<DataResponse>(async resp => await Commit(resp));

request.LinkTo(fetch);
fetch.LinkTo(commit);

//when are we complete?

QueueRequests产生IEnumerable< DataRequest>.我立刻将下一个N页请求排队,接受这意味着我发送的呼叫数量比我需要的多一些. DataRequest实例共享一个LastPage计数器,以避免无意中发出我们知道在最后一页之后的请求.这一切都很好.

问题:
如果我通过将更多请求反馈到fetch的输入缓冲区来循环,就像我在本例中所示,那么我对如何发信号(甚至检测)完成有疑问.我无法在请求提取时设置完成,因为一旦设置完成,我就无法再反馈了.

我可以在fetch上监视输入和输出缓冲区是否为空,但我认为当我设置完成时,我仍然冒着获取仍然忙于请求的风险,因此阻止了对其他页面的排队请求.

我可以通过某种方式知道提取繁忙(输入或忙于处理输入).

我错过了解决这个问题的明显/直接的方法吗?

>我可以在fetch中循环,而不是排队更多的请求.问题是我希望能够使用设置的最大线程数来限制我正在对远程服务器做的事情.块内的并行循环是否可以与块本身共享调度程序,并通过调度程序控制生成的线程数?
>我可以为fetch创建一个自定义转换块来处理完成信号.对于这样一个简单的场景,似乎有很多工作要做.

非常感谢您提供的任何帮助!

最佳答案 在TPL Dataflow中,您可以使用
DataflowLinkOptions
link the blocks指定
propagation of completion of the block

request.LinkTo(fetch, new DataflowLinkOptions { PropagateCompletion = true });
fetch.LinkTo(commit, new DataflowLinkOptions { PropagateCompletion = true });

之后,您只需为请求块调用Complete()方法,就完成了!

// the completion will be propagated to all the blocks
request.Complete();

你应该使用的最后一件事是最后一个块的Completion任务属性:

commit.Completion.ContinueWith(t =>
    {
        /* check the status of the task and correctness of the requests handling */
    });
点赞