我有一个简单的控制台应用程序测试一些代码.
我的代码有一个包含1000个数字的列表,并将每个数字/ int放到Azure队列中.
现在,我这样做是异步的,而且效果很好.这是我库中的代码:
var tasks = stagedFileIds.Select(stagedFileId => QueueFileToProcessAsync(stagedFileId));
await Task.WhenAll(tasks)
.ConfigureAwait(false);
效果很好.
但是……这是件坏事吗?我应该把它分成50或25或者什么?但最重要的是……批量吗?
执行上述代码的“成本”是多少?
请记住,这是一个控制台应用程序.我将在某个时候将其移动到Azure功能.
最佳答案 您应该以异步方式限制它们,以确保您没有并行地进行太多的QueueFileToProcessAsync操作,除非您确定它是无害的.我推荐你 Stephen Cleary introduction to TPL Dataflow,其中 part 3和他的另一个帖子 Async Producer/Consumer Queue using Dataflow地址限制.
如果您正在调用端点,那么可能会被@Gerino指出的ServicePointManager.DefaultConnectionLimit限制.
只是为了这个问题,如果你必须在没有TPL Dataflow的情况下自己实现,你可以使用.NET Concurrent Collections:
// prototype code
static class TaskThrottlingExtension
{
public static async Task ThrottleProcessingAsync<T>(this IEnumerable<T> inputs, int parallel, Func<T, Task> process)
{
var queues = new BlockingCollection<T>[parallel];
var tasks = new Task[parallel];
for (int i = 0; i < parallel; i++)
{
var queue = queues[i] = new BlockingCollection<T>(1);
tasks[i] = Task.Run( async () =>
{
foreach (var input in queue.GetConsumingEnumerable())
{
await process(input).ConfigureAwait(false);
}
});
}
try
{
foreach (var input in inputs)
{
BlockingCollection<T>.AddToAny(queues, input);
}
foreach (var queue in queues)
{
queue.CompleteAdding();
}
await Task.WhenAll(tasks).ConfigureAwait(false);
}
finally
{
foreach (var queue in queues)
{
queue.Dispose();
}
}
}
}