C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务

基本信息

Barrier 是 .Net 提供的一直并发的机制,它允许多个任务同步他们不同阶段的并发工作。

这里的关键点是【多个任务】和【不同阶段】。
假设有4个相同的任务(Task),每个任务都有4个阶段(Phase),当他们并发工作时,只有当所有任务的相同步骤都完成时,所有任务才可以开始下一个步骤。

如图所示:

《C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务》

这里的 Barrier 就是 .NetFramework 提供的一种机制。它像一个篱笆(屏障),把所有任务的阶段隔离开来,当前阶段不完成,不会开始下一个阶段。

代码示例:

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Sample5_1_barrier
{
    class Program
    {
        private static void Phase0Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ===== Phase 0", TaskID);
        }

        private static void Phase1Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ***** Phase 1", TaskID);
        }

        private static void Phase2Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ^^^^^ Phase 2", TaskID);
        }

        private static void Phase3Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} $$$$$ Phase 3", TaskID);
        }

        private static int _TaskNum = 4;
        private static Task[] _Tasks;
        private static Barrier _Barrier;


        static void Main(string[] args)
        {
            _Tasks = new Task[_TaskNum];
            _Barrier = new Barrier(_TaskNum, (barrier) =>
            {
                Console.WriteLine("-------------------------- Current Phase:{0} --------------------------", 
                                  _Barrier.CurrentPhaseNumber);
            });

            for (int i = 0; i < _TaskNum; i++)
            {
                _Tasks[i] = Task.Factory.StartNew((num) =>
                {
                    var taskid = (int)num;

                    Phase0Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase1Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase2Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase3Doing(taskid);
                    _Barrier.SignalAndWait();

                }, i);
            }

            var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) =>
                {
                    Task.WaitAll(_Tasks);
                    Console.WriteLine("========================================");
                    Console.WriteLine("All Phase is completed");

                    _Barrier.Dispose();
                });

            finalTask.Wait();

            Console.ReadLine();
        }
    }
}

测试结果:
《C# 并行编程 之 Barrier的使用 - 通过屏障同步并发任务》

使用屏障时的异常处理

如果进入屏障后,工作的代码出现了异常,这个异常会被包装在BarrierPostPhaseException中,而且所有任务都能够捕捉到这个异常。原始的异常可以通过NarrierPostPhaseException 对象的InnerException进行访问。

代码示例: 注意其中抛出异常的位置,和捕获异常的位置。

using System;
using System.Threading;
using System.Threading.Tasks;

namespace Sample5_1_barrier
{
    class Program
    {
        private static void Phase0Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ===== Phase 0", TaskID);
        }

        private static void Phase1Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ***** Phase 1", TaskID);

        }

        private static void Phase2Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ^^^^^ Phase 2", TaskID);
        }

        private static void Phase3Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} $$$$$ Phase 3", TaskID);
        }

        private static int _TaskNum = 4;
        private static Task[] _Tasks;
        private static Barrier _Barrier;


        static void Main(string[] args)
        {
            _Tasks = new Task[_TaskNum];
            _Barrier = new Barrier(_TaskNum, (barrier) =>
            {
                Console.WriteLine("-------------------------- Current Phase:{0} --------------------------", 
                                  _Barrier.CurrentPhaseNumber);
                if (_Barrier.CurrentPhaseNumber == 1)
                    throw new InvalidOperationException("Phase 2 need to be TERMINTED!!!!!");
            });

            for (int i = 0; i < _TaskNum; i++)
            {
                _Tasks[i] = Task.Factory.StartNew((num) =>
                {
                    var taskid = (int)num;

                    Phase0Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase1Doing(taskid);
                    try
                    {
                        _Barrier.SignalAndWait();
                    }
                    catch (BarrierPostPhaseException bpp_ex)
                    {
                        Console.WriteLine("Got an Exception in Phase1: " + bpp_ex.InnerException);
                    }

                    Phase2Doing(taskid);
                    _Barrier.SignalAndWait();

                    Phase3Doing(taskid);
                    _Barrier.SignalAndWait();

                }, i);
            }

            var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) =>
                {
                    Task.WaitAll(_Tasks);
                    Console.WriteLine("========================================");
                    Console.WriteLine("All Phase is completed");

                    _Barrier.Dispose();
                });

            finalTask.Wait();

            Console.ReadLine();
        }
    }
}

Barrier 关于超时的处理

这里会使用Barrier.SignalAndWait(TIMEOUT)),来对超时进行判断。示例代码中Phase 2的Task 3会等待10秒,超过了超时时间 2秒,在Barrier中会检查到Task的Phase2超时了并返回错误。

示例代码:

using System;
using System.Threading;
using System.Threading.Tasks;


namespace Sample5_1_barrier
{
    class Program
    {

        private static void Phase0Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ===== Phase 0", TaskID);
        }

        private static void Phase1Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} ***** Phase 1", TaskID);
        }

        private static void Phase2Doing(int TaskID)
        {
            int i = 0;
            Console.WriteLine("Task : #{0} ^^^^^ Phase 2", TaskID);
            if (TaskID == 3)
                while (i < 10)
                {
                    System.Threading.Thread.Sleep(1000);
                    i++;
                }
        }

        private static void Phase3Doing(int TaskID)
        {
            Console.WriteLine("Task : #{0} $$$$$ Phase 3", TaskID);
        }

        private static int _TaskNum = 4;
        private static Task[] _Tasks;
        private static Barrier _Barrier;
        private static int TIMEOUT = 2000;


        static void Main(string[] args)
        {
            var cts = new System.Threading.CancellationTokenSource();
            var ct = cts.Token;

            _Tasks = new Task[_TaskNum];
            _Barrier = new Barrier(_TaskNum, (barrier) => { Console.WriteLine("-------------------------- Current Phase:{0} --------------------------", _Barrier.CurrentPhaseNumber); //if (_Barrier.CurrentPhaseNumber == 1) // throw new InvalidOperationException("Phase 2 need to be TERMINTED!!!!!"); }); for (int i = 0; i < _TaskNum; i++) { _Tasks[i] = Task.Factory.StartNew((num) => { var taskid = (int)num; Phase0Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 0 canceled: ", ct); } Phase1Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 1 canceled: ", ct); } Phase2Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 2 canceled: ", ct); } Phase3Doing(taskid); if (!_Barrier.SignalAndWait(TIMEOUT)) { Console.WriteLine("`````````````` This Phase {0} TIMEOUT ``````````````", _Barrier.CurrentPhaseNumber); throw new OperationCanceledException("Phase 3 canceled: ", ct); } }, i, ct); } var finalTask = Task.Factory.ContinueWhenAll(_Tasks, (tasks) => { Task.WaitAll(_Tasks); Console.WriteLine("========================================"); Console.WriteLine("All Phase is completed"); }, ct); try { finalTask.Wait(); } catch (AggregateException aex) { Console.WriteLine("Task failed And Canceled" + aex.ToString()); } finally { _Barrier.Dispose(); } Console.ReadLine(); } } } 
    原文作者:zy__
    原文地址: https://blog.csdn.net/wangzhiyu1980/article/details/45688075
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞