C#中的生产者/混合消费者使用4.0框架类和Blocking Collection

我有一种生产者/消费者情景的情况.生产者永远不会停止,这意味着即使有时间BC中没有物品,也可以在以后添加更多物品.

从.NET Framework 3.5迁移到4.0,我决定使用BlockingCollection作为使用者和生产者之间的并发队列.我甚至添加了一些并行扩展,因此我可以使用带有Parallel.ForEach的BC.

问题是,在消费者线程中,我需要一种混合模型:

>我总是检查BC处理任何到达的项目
Parallel.ForEach(bc.GetConsumingEnumerable(),item =>等
>在这个foreach中,我执行彼此之间不相互依赖的所有任务.
>问题来了.在对前面的任务进行并行化之后,我需要按照它们在BC中的相同FIFO顺序来管理它们的结果.这些结果的处理应该在同步线程中进行.

伪代码中的一个小例子如下:

制片人:

//This event is triggered each time a page is scanned. Any batch of new pages can be added at any time at the scanner
private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
     //The object to add has a property with the sequence number
    _concurrentCollection.TryAdd(scannedPage);
}

消费者:

private void Init()
{
    _cancelTasks = false;
    _checkTask = Task.Factory.StartNew(() =>
            {
                while (!_cancelTasks)
                {
                    //BlockingCollections with Paralell ForEach
                    var bc = _concurrentCollection;
                    Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
                    {
                        ScannedPage currentPage = item;
                        // process a batch of images from the bc and check if an image has a valid barcode. T
                    });
                    //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.

                }
            });
}

显然,这不能正常工作,因为.GetConsumingEnumerable()会阻塞,直到BC中有另一个项目.我认为我可以完成任务,只需在同一批次中激活4或5个任务,但是:

>我怎么能用任务执行此操作,并且在阻止任务开始之前仍然有一个等待点,直到BC中有一个项目被消耗(如果没有任何内容,我不想开始处理.一旦有BC中的东西我只是开始批量的4个任务,并在每个任务中使用一个TryTake,所以如果没有什么可以采取它们不会阻止,因为我不知道我是否总能达到来自的项目数量BC作为一批任务,例如BC中只留下一个项目和一批4个任务)?
>我怎么能这样做并利用Parallel.For提供的效率?
>我如何以从BC中提取项目的相同FIFO顺序保存任务结果?
>是否还有其他并发类更适合消费者对这类项目的混合处理?
>此外,这是我在StackOverflow中做的第一个问题,所以如果您需要更多数据,或者您认为我的问题不正确,请告诉我.

最佳答案 我想我遵循你的要求,为什么不创建一个ConcurrentBag并在处理时添加到它:

while (!_cancelTasks)
{
   //BlockingCollections with Paralell ForEach
   var bc = _concurrentCollection;
   var q = new ConcurrentBag<ScannedPage>();
   Parallel.ForEach(bc.GetConsumingEnumerable(), item =>
   {
      ScannedPage currentPage = item;
      q.Add(item);
      // process a batch of images from the bc and check if an image has a valid barcode. T
   });
 //Here should go the code that takes the results from each tasks, process them in the same FIFO order in which they entered the BC and save each image to a file, all of this in this same thread.


  //process items in your list here by sorting using some sequence key
  var items = q.OrderBy( o=> o.SeqNbr).ToList();
  foreach( var item in items){
     ...
  }
}

这显然不会按照它们添加到BC的确切顺序排列它们,但是您可以像Alex建议的那样向ScannedPage对象添加一些序列nbr,然后对结果进行排序.

这是我如何处理序列:

将其添加到ScannedPage类:

public static int _counter;  //public because this is just an example but it would work.

获取序列nbr并在此处指定:

private void Current_OnPageScanned(object sender, ScannedPage scannedPage)
{          
    lock( this){   //to single thread this process.. not necessary if it's already single threaded of course.
    System.Threading.Interlocked.Increment( ref ScannedPage._counter);
    scannedPage.SeqNbr = ScannedPage._counter;
    ...
    }
}
点赞