并行处理 – 采取的功能!并把!和朱莉娅的频道

我正在尝试运行看起来像这样的东西:

y = @parallel (min) for i in collection
    f(i)
end

其中f(i)是一个函数,它本质上是一个while循环,它计算完成条件所需的迭代次数.首先,终止条件之一是预定的迭代次数n.但是,如果f(i)返回小于n,那么理想情况下我想用f(i)的值替换n(例如,因为我正在寻找最小f(i),如果f(j)是m我希望所有其他循环停止检查它们是否达到m次迭代).

我是并行计算的新手,所以我可能误解了documentation,但我认为我应该可以这样做:

x = Channel{Int64}(1)
put!(x,n)

y = @parallel (min) for i in collection
    f(i,x)
end

close(x)

我修改了f以获取Channel参数,现在它看起来像这样:

@everywhere function f(item,chan)
    going = true
    count = 0
    while (going)
        going = false
        # perform some operations
        if (count < fetch(chan) && !conditions_met())
            # conditions_met checks the other termination conditions
            going = true
            count += 1
        end
    end

    count += 1

    if (count < fetch(chan))
        take!(chan)
        put!(chan,count)
    end

    return count
end

如果我替换第一个计数< fetch(chan)with count< n并删除其他if块/通道代码,脚本运行正常.但是,由于n将比最小f(i)大几个数量级,如果我可以像我所描述的那样做,它会显着加快计算速度.这是我应该做的事情,如果是这样,我是否正确接近这个? 现在我遇到以下错误(运行4个进程):

ERROR (unhandled task failure): On worker 3:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in remotecall_fetch at multi.jl:737
 in remotecall_fetch at multi.jl:740
 in anonymous at multi.jl:1519
ERROR: LoadError: On worker 2:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in preduce at multi.jl:1523
 [inlined code] from multi.jl:1532
 in anonymous at expr.jl:113
 [inlined code] from /home/michael/Documents/julia/script.jl:125
 in anonymous at no file:0
while loading /home/michael/Documents/julia/script.jl, in expression starting on line 121
ERROR (unhandled task failure): On worker 4:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in remotecall_fetch at multi.jl:737
 in remotecall_fetch at multi.jl:740
 in anonymous at multi.jl:1519
ERROR (unhandled task failure): On worker 5:
cannot resize array with shared data
 in shift! at array.jl:501
 in take! at channels.jl:54
 in f at /home/michael/Documents/julia/script.jl:98
 [inlined code] from /home/michael/Documents/julia/script.jl:126
 in anonymous at no file:0
 in anonymous at multi.jl:913
 in run_work_thunk at multi.jl:651
 [inlined code] from multi.jl:913
 in anonymous at task.jl:63
 in remotecall_fetch at multi.jl:737
 in remotecall_fetch at multi.jl:740
 in anonymous at multi.jl:1519

其中第98行是函数定义中的take!(chan)语句,第126行是并行for循环内的f(i,x).

最佳答案 通道为异步通信实现类似CSP的语义,但它们没有跨并行进程共享的自动机制.您需要将RemoteRef用于此目的:
http://docs.julialang.org/en/release-0.4/manual/parallel-computing/#remoterefs-and-abstractchannels

点赞