我正在尝试运行看起来像这样的东西:
y = @parallel (min) for i in collection
f(i)
end
其中f(i)是一个函数,它本质上是一个while循环,它计算完成条件所需的迭代次数.首先,终止条件之一是预定的迭代次数n.但是,如果f(i)返回小于n,那么理想情况下我想用f(i)的值替换n(例如,因为我正在寻找最小f(i),如果f(j)是m我希望所有其他循环停止检查它们是否达到m次迭代).
我是并行计算的新手,所以我可能误解了documentation,但我认为我应该可以这样做:
x = Channel{Int64}(1)
put!(x,n)
y = @parallel (min) for i in collection
f(i,x)
end
close(x)
我修改了f以获取Channel参数,现在它看起来像这样:
@everywhere function f(item,chan)
going = true
count = 0
while (going)
going = false
# perform some operations
if (count < fetch(chan) && !conditions_met())
# conditions_met checks the other termination conditions
going = true
count += 1
end
end
count += 1
if (count < fetch(chan))
take!(chan)
put!(chan,count)
end
return count
end
如果我替换第一个计数< fetch(chan)with count< n并删除其他if块/通道代码,脚本运行正常.但是,由于n将比最小f(i)大几个数量级,如果我可以像我所描述的那样做,它会显着加快计算速度.这是我应该做的事情,如果是这样,我是否正确接近这个? 现在我遇到以下错误(运行4个进程):
ERROR (unhandled task failure): On worker 3:
cannot resize array with shared data
in shift! at array.jl:501
in take! at channels.jl:54
in f at /home/michael/Documents/julia/script.jl:98
[inlined code] from /home/michael/Documents/julia/script.jl:126
in anonymous at no file:0
in anonymous at multi.jl:913
in run_work_thunk at multi.jl:651
[inlined code] from multi.jl:913
in anonymous at task.jl:63
in remotecall_fetch at multi.jl:737
in remotecall_fetch at multi.jl:740
in anonymous at multi.jl:1519
ERROR: LoadError: On worker 2:
cannot resize array with shared data
in shift! at array.jl:501
in take! at channels.jl:54
in f at /home/michael/Documents/julia/script.jl:98
[inlined code] from /home/michael/Documents/julia/script.jl:126
in anonymous at no file:0
in anonymous at multi.jl:913
in run_work_thunk at multi.jl:651
[inlined code] from multi.jl:913
in anonymous at task.jl:63
in preduce at multi.jl:1523
[inlined code] from multi.jl:1532
in anonymous at expr.jl:113
[inlined code] from /home/michael/Documents/julia/script.jl:125
in anonymous at no file:0
while loading /home/michael/Documents/julia/script.jl, in expression starting on line 121
ERROR (unhandled task failure): On worker 4:
cannot resize array with shared data
in shift! at array.jl:501
in take! at channels.jl:54
in f at /home/michael/Documents/julia/script.jl:98
[inlined code] from /home/michael/Documents/julia/script.jl:126
in anonymous at no file:0
in anonymous at multi.jl:913
in run_work_thunk at multi.jl:651
[inlined code] from multi.jl:913
in anonymous at task.jl:63
in remotecall_fetch at multi.jl:737
in remotecall_fetch at multi.jl:740
in anonymous at multi.jl:1519
ERROR (unhandled task failure): On worker 5:
cannot resize array with shared data
in shift! at array.jl:501
in take! at channels.jl:54
in f at /home/michael/Documents/julia/script.jl:98
[inlined code] from /home/michael/Documents/julia/script.jl:126
in anonymous at no file:0
in anonymous at multi.jl:913
in run_work_thunk at multi.jl:651
[inlined code] from multi.jl:913
in anonymous at task.jl:63
in remotecall_fetch at multi.jl:737
in remotecall_fetch at multi.jl:740
in anonymous at multi.jl:1519
其中第98行是函数定义中的take!(chan)语句,第126行是并行for循环内的f(i,x).
最佳答案 通道为异步通信实现类似CSP的语义,但它们没有跨并行进程共享的自动机制.您需要将RemoteRef用于此目的:
http://docs.julialang.org/en/release-0.4/manual/parallel-computing/#remoterefs-and-abstractchannels