我有一个计算密集型程序,我试图并行化,但其中一个限制步骤是I / O操作,它由一个非常低效的API控制,我无法控制,但别无选择,只能使用.我的并行化必须不会增加I / O操作的数量,否则任何好处都可能很快消失.
布局是这样的:我有两个类,Foo和Bar,并且为了计算Foo,它涉及不小的计算量,我必须传递一个实例,或者我从一些导入的Bar的一些实例非常昂贵的I / O操作中的其他文件.我需要大量的Foo和Bar实例,并且许多这些Bar实例将用于计算多个Foo实例.因此,在计算每个Foo之后我不想丢弃我的Bar实例,并且我不想每次导入它们多次.可能值得注意的是,为了使问题更复杂,API是32位,而我的程序必须是64位以避免MemoryException,因此由本地托管的服务器处理,我使用WCF进行通信.
这是我提出的解决方案,但我对并行化非常陌生,特别是我不确定如何在ForEach循环内部处理await,从而释放处理器:
ConcurrentDictionary<string, Task<Bar>> barList = new ConcurrentDictionary<string, Task<Bar>>();
Parallel.ForEach(fooList, foo =>
{
if (!barList.ContainsKey(this.RequiredBarName))
{
Task<Bar> importBar = Task.Run(() => Import.BarByName(this.RequiredBarName));
barList.Add(this.RequiredBarName,importBar);
}
this.RequiredBarTask = barList.TryGetValue(this.RequiredBarName);
foo.CalculateStuff();
}
// where foo.CalculateStuff() looks something like this
async public void CalculateStuff()
{
// do some stuff...
Bar requiredBar = await this.RequiredBarTask;
// do some more stuff with requiredBar
}
当代码遇到等待时会发生什么? ThreadPool会选择一个不同的Task,还是处理器会闲置?如果我然后在Parallel.ForEach()之外安排某种WaitAll(),我能够有效地并行化所有这些吗?有没有人对我如何实现这个有更好的想法?
编辑以提供MCVE:
我无法满足此版本的可验证组件,因为我无法向您提供API,我当然无法为您提供API可能访问的任何数据,但是我会尝试为您提供一些直到调用服务器的内容. .
程序可以有效地在处理事物的方式上进行无限深入,更容易将其视为特定指令的解析器,允许客户端使用GUI构建一组“砖块”.通过这种方式,Dataflow看起来可以提供一个像样的解决方案.
在这个例子中,我没有处理循环引用或一个Channel计算已经由Parallel.ForEach()方法调用的另一个Channel;在我的代码中,这由一些逻辑和并发列表处理,以检查何时调用各种事物.
public abstract class Class
{
public string Name {get;set;}
public float[] Data {get;set;}
async public Task CalculateData(IsampleService proxy){}
}
public class Channel : Class
{
public Class[] ChildClasses {get;set;}
async public override Task CalculateData(IsampleService proxy)
{
foreach(Class childClass in ChildClasses)
{
// not the real processing but this step could be anything. There is a class to handle what happens here, but it is unnecessary for this post.
if(childClass.Data==null) await childClass.CalculateData(proxy);
this.Data = childClass.Data;
}
}
}
public class Input : Class
{
async public override Task CalculateData(IsampleService proxy)
{
this.Data = await proxy.ReturnData(this.Name);
}
}
async public static Task ProcessDataForExport(Channel[] channelArray)
{
ChannelFactory<IsampleService> factory = new ChannelFactory<IsampleService>(new NetNamedPipeBinding(), new EndpointAddress(baseAddress));
IsampleService proxy = factory.CreateChannel();
Parallel.ForEach(channelArray, channel =>
{
channel.CalculateData();
});
// Task.WhenAll() might be a better alternative to the Parallel.ForEach() here.
}
最佳答案
What will happen when the code runs into that await?
对于任何await语句都会发生同样的事情:在评估了任何表达式或语句检索要等待的任务之后,该方法将返回.对于所有意图和目的,这是方法的结束.
Will the ThreadPool pick up a different Task, or will the processor just idle?
这取决于还有什么.例如,你在等什么?如果它是一个排队到线程池的计算任务,并且它还没有被分配一个线程池线程,那么确定……线程池可能会选择它并开始处理它.
如果您正在等待I / O操作,那么这不一定会使处理器忙,但线程池队列中可能还有其他任务(例如来自Parallel.ForEach()调用的其他任务).这样就可以让处理器有所作为.
当然,使用await通常不会导致处理器空闲.事实上,使用它的主要原因是避免(*).由于await语句导致当前方法返回,所以让当前线程继续运行,这意味着如果没有足够的线程来保持处理器忙,那么现在它有事可做. 🙂
(*)(好吧,有点……真的,主要的原因是避免阻塞当前线程,但这有副作用,有更多的工作可供处理器处理:))
If I then arrange some sort of WaitAll() outside of the Parallel.ForEach() will I be able to parallelize through all of this efficiently? Does anyone have any better ideas of how I might implement this?
我没有在你的问题中看到足够有用的细节来回答这个问题.坦率地说,虽然我不能把手指放在它上面,但是从一个Parallel.ForEach()委托中使用await对我来说似乎有点可疑.一旦调用await,代理的方法就会返回.
因此,就Parallel.ForEach()而言,您已完成枚举中的该项,但当然您不是.它必须在其他地方完成.至少,这似乎会妨碍Parallel类能够充分了解它正在做的工作,以便最有效地安排它.
但也许没关系.或者它可能不是很好,但鉴于你所依赖的框架,它是你将要实现的最佳目标.很难说.
我鼓励您提供评论者Scott Chamberlain要求的MCVE.如果他是对的并且您的问题可通过数据流API解决,那么您最好给他一个机会来为您提供显示该问题的答案.