c# – 在Parallel.ForEach()循环中等待的行为是什么?

我有一个计算密集型程序,我试图并行化,但其中一个限制步骤是I / O操作,它由一个非常低效的API控制,我无法控制,但别无选择,只能使用.我的并行化必须不会增加I / O操作的数量,否则任何好处都可能很快消失.

布局是这样的:我有两个类,Foo和Bar,并且为了计算Foo,它涉及不小的计算量,我必须传递一个实例,或者我从一些导入的Bar的一些实例非常昂贵的I / O操作中的其他文件.我需要大量的Foo和Bar实例,并且许多这些Bar实例将用于计算多个Foo实例.因此,在计算每个Foo之后我不想丢弃我的Bar实例,并且我不想每次导入它们多次.可能值得注意的是,为了使问题更复杂,API是32位,而我的程序必须是64位以避免MemoryException,因此由本地托管的服务器处理,我使用WCF进行通信.

这是我提出的解决方案,但我对并行化非常陌生,特别是我不确定如何在ForEach循环内部处理await,从而释放处理器:

ConcurrentDictionary<string, Task<Bar>> barList = new ConcurrentDictionary<string, Task<Bar>>();

Parallel.ForEach(fooList, foo =>
{
    if (!barList.ContainsKey(this.RequiredBarName))
    {
        Task<Bar> importBar = Task.Run(() => Import.BarByName(this.RequiredBarName));
        barList.Add(this.RequiredBarName,importBar);
    }
    this.RequiredBarTask = barList.TryGetValue(this.RequiredBarName);
    foo.CalculateStuff();
}

// where foo.CalculateStuff() looks something like this
async public void CalculateStuff()
{
    // do some stuff...
    Bar requiredBar = await this.RequiredBarTask;
    // do some more stuff with requiredBar
}

当代码遇到等待时会发生什么? ThreadPool会选择一个不同的Task,还是处理器会闲置?如果我然后在Parallel.ForEach()之外安排某种WaitAll(),我能够有效地并行化所有这些吗?有没有人对我如何实现这个有更好的想法?

编辑以提供MCVE:

我无法满足此版本的可验证组件,因为我无法向您提供API,我当然无法为您提供API可能访问的任何数据,但是我会尝试为您提供一些直到调用服务器的内容. .

程序可以有效地在处理事物的方式上进行无限深入,更容易将其视为特定指令的解析器,允许客户端使用GUI构建一组“砖块”.通过这种方式,Dataflow看起来可以提供一个像样的解决方案.

在这个例子中,我没有处理循环引用或一个Channel计算已经由Parallel.ForEach()方法调用的另一个Channel;在我的代码中,这由一些逻辑和并发列表处理,以检查何时调用各种事物.

public abstract class Class
{
    public string Name {get;set;}
    public float[] Data {get;set;}

    async public Task CalculateData(IsampleService proxy){}
}

public class Channel : Class
{
    public Class[] ChildClasses {get;set;}

    async public override Task CalculateData(IsampleService proxy)
    {
        foreach(Class childClass in ChildClasses)
        {
            // not the real processing but this step could be anything. There is a class to handle what happens here, but it is unnecessary for this post.
            if(childClass.Data==null) await childClass.CalculateData(proxy);
            this.Data = childClass.Data;
        }
    }
}

public class Input : Class
{
    async public override Task CalculateData(IsampleService proxy)
    {
            this.Data = await proxy.ReturnData(this.Name);
    }
}

async public static Task ProcessDataForExport(Channel[] channelArray)
{
ChannelFactory<IsampleService> factory = new ChannelFactory<IsampleService>(new NetNamedPipeBinding(), new EndpointAddress(baseAddress));

IsampleService proxy = factory.CreateChannel();

Parallel.ForEach(channelArray, channel =>
    {
        channel.CalculateData();
    });
// Task.WhenAll() might be a better alternative to the Parallel.ForEach() here.
}

最佳答案

What will happen when the code runs into that await?

对于任何await语句都会发生同样的事情:在评估了任何表达式或语句检索要等待的任务之后,该方法将返回.对于所有意图和目的,这是方法的结束.

Will the ThreadPool pick up a different Task, or will the processor just idle?

这取决于还有什么.例如,你在等什么?如果它是一个排队到线程池的计算任务,并且它还没有被分配一个线程池线程,那么确定……线程池可能会选择它并开始处理它.

如果您正在等待I / O操作,那么这不一定会使处理器忙,但线程池队列中可能还有其他任务(例如来自Parallel.ForEach()调用的其他任务).这样就可以让处理器有所作为.

当然,使用await通常不会导致处理器空闲.事实上,使用它的主要原因是避免(*).由于await语句导致当前方法返回,所以让当前线程继续运行,这意味着如果没有足够的线程来保持处理器忙,那么现在它有事可做. 🙂

(*)(好吧,有点……真的,主要的原因是避免阻塞当前线程,但这有副作用,有更多的工作可供处理器处理:))

If I then arrange some sort of WaitAll() outside of the Parallel.ForEach() will I be able to parallelize through all of this efficiently? Does anyone have any better ideas of how I might implement this?

我没有在你的问题中看到足够有用的细节来回答这个问题.坦率地说,虽然我不能把手指放在它上面,但是从一个Parallel.ForEach()委托中使用await对我来说似乎有点可疑.一旦调用await,代理的方法就会返回.

因此,就Parallel.ForEach()而言,您已完成枚举中的该项,但当然您不是.它必须在其他地方完成.至少,这似乎会妨碍Parallel类能够充分了解它正在做的工作,以便最有效地安排它.

但也许没关系.或者它可能不是很好,但鉴于你所依赖的框架,它是你将要实现的最佳目标.很难说.

我鼓励您提供评论者Scott Chamberlain要求的MCVE.如果他是对的并且您的问题可通过数据流API解决,那么您最好给他一个机会来为您提供显示该问题的答案.

点赞