这对
scala受过教育的人来说可能是一个简单的问题,但我还是初学者;)
我有一个基础角色,它将一个任务分派给多个工作者,并通过!回复阻止外部呼叫的结果!
a = new a
a.start
println(a !? "12345")
class a extends Actor {
def act = {
loop {
react {
case msg =>
val result = worker_actor_1 !? msg
result += worker_actor_2 !? msg
result += worker_actor_3 !? msg
// So I just have multiple workers who should do stuff in parallel and the aggregated result should be returned to the calling function
reply(result)
}
现在我不知道如何在阻塞调用中真正并行化worker actor,因为最后我必须回复().呼叫权利不是演员,只是普通的阶级.
最佳答案 您可以创建多个期货,然后生成一个单独的actor来等待其结果.因此,您的调度将为新请求做好准备.代码片段如下:
case msg =>
val invoker = sender
val flist =
worker_actor_1 !! task1 ::
worker_actor_2 !! task2 ::
worker_actor_3 !! task3 :: Nil
Scheduler.execute { invoker ! Futures.awaitAll(100, flist).map{ ..sum the results.. } }
请注意awaitAll返回List [Option [Any]],这样您就可以了解出现问题并且您的工作人员没有及时完成任务