我正在开发一个脚本,我随机创建对象,但我不想重复.它们被存储起来,每次我创建一个新的,我都会检查现有的.正如我想为大量对象做的那样,我现在正在尝试并行化它,但到目前为止还没有成功.我尝试了在网上找到的一些解决方案(主要在这里),但仍然无法正常工作.
我的想法是启动一个池并将我的功能映射到它.当进程找到匹配时,它将值设置为1.此值可由所有进程读取,它们可以使用锁写入它,我需要在最后返回.因此,我创建了一个Lock和一个Value,以便所有进程都可以读取该值(因此lock = False)并检查是否在另一个进程中找到了匹配项.然后我尝试了一些与事件不同的东西,并检查它是否已设置但是仍然无法工作……然后我尝试提出一个特殊的异常,但仍未成功使代码成功.
拜托,我更喜欢编写OOP,所以我会避免直到我的最后一个资源来定义一个全局变量,因为我认为它们是不确定的(个人意见).
这是一个MWE,我用int替换我的复杂对象,用范围(10000)替换我存储的对象以帮助你理解.
#!/usr/bin/env python3
import multiprocessing as muproc
def ParallelCheck(me):
print(" Testing {}...".format(me))
#manager = muproc.Manager()
#lock = manager.Lock()
lock = muproc.Lock()
back = muproc.Value("i", 0, lock=False)
ParChild = ParallelChild(me, lock, back)
with muproc.Pool() as pool:
try:
pool.map(ParChild.run, range(10000))
except AbortPool:
pool.terminate()
print("pool")
return back.value
def Computation(me, neighbour):
return me == neighbour
class ParallelChild(object):
def __init__(self, me, lock, back):
self.abort = muproc.Event()
self.lock = lock
self.me = me
self.back = back
def run(self, neighbour):
print("run")
if self.abort.is_set():
print("Aborting")
pass
else:
if Computation(self.me, neighbour):
self.lock.acquire()
self.abort.set()
self.back.value = 1
print("GOTCHA")
self.lock.release()
raise AbortPool
else:
print("...")
class AbortPool(Exception):
#pass
def __init__(self):
## Just to check
print("AbortPool raised!")
if __name__ == "__main__":
values = [12000, 13, 7]
for v in values:
print("value={} match={}".format(v, ParallelCheck(v)))
现在它产生一个RunTimeError:
me@stigepc4$python3 mwe.py
Testing 12000...
Traceback (most recent call last):
File "mwe.py", line 63, in <module>
print("value={} match={}".format(v, ParallelCheck(v)))
File "mwe.py", line 16, in ParallelCheck
pool.map(ParChild.run, range(10000))
File "/usr/lib/python3.4/multiprocessing/pool.py", line 260, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.4/multiprocessing/pool.py", line 599, in get
raise self._value
File "/usr/lib/python3.4/multiprocessing/pool.py", line 383, in _handle_tasks
put(task)
File "/usr/lib/python3.4/multiprocessing/connection.py", line 206, in send
self._send_bytes(ForkingPickler.dumps(obj))
File "/usr/lib/python3.4/multiprocessing/reduction.py", line 50, in dumps
cls(buf, protocol).dump(obj)
File "/usr/lib/python3.4/multiprocessing/sharedctypes.py", line 128, in reduce_ctype
assert_spawning(obj)
File "/usr/lib/python3.4/multiprocessing/context.py", line 347, in assert_spawning
' through inheritance' % type(obj).__name__
RuntimeError: c_int objects should only be shared between processes through inheritance
我想它与Lock有关(虽然评论经理但是这不起作用)或者与Value有关但现在想法如何摆脱它…
编辑
当我继续尝试改变我的代码以我想要的方式工作时,我意识到我没有提到我的主要问题是什么.我真正的困难是如果找到匹配项,则停止池中的所有进程.这就是我所需要的,因此并行运行优于串行运行.现在我可以让一个事件告诉孩子是否已经找到匹配,但它会不断循环数据,即使我引发异常……
编辑2
简单地说,我有以下……
for o in objects:
if too_close(o, existing_objects):
return 1
return 0
…我希望在CPU之间分配……
for o in objects:
if too_close(o, some_existing_objects):
return 1 and abort other processes
return 0
最佳答案 通过寻找答案,我的脚本太复杂了.
我试着从接近原始文档的东西开始
多处理模块.
然后没有成功我寻找一种方法来修复它并添加了一些东西.
我不是python多处理的专家,但经过一段时间的尝试,
我发现在第一场比赛中中止pool.map的唯一方法是使用一个事件
所以所有进程都知道它已经发生,然后它们都会抛出一个特殊的异常
流产自己
我可以摆脱价值和锁定,它们对我的情况毫无用处.
但是我做这些事情的方式可能效率不高.
产生这些过程将耗费大量的计算时间,
并且每个进程都会将需要运行的数据复制到自己的内存中.
我试图产生较少的进程,但每个都有较少的数据和
他们将自己迭代的数据集(不让池处理这部分).
所以我可以选择哪个数据进入哪个进程.
在我的例子中,我将范围(10000)分成例如4个过程
每个都有2500的范围.
我只想知道是否有匹配,因此我可以进一步简化.
我可以设置当找到匹配时,设置事件并返回函数以使其停止.
另一个进程测试事件的状态,一旦设置它们就会返回到自己停止状态.
现在回到主要过程,最后我只看一下这个事件
(当然,不要忘记在开头清除它).
如果已设置,则找到匹配并且就这么简单.
缺点是我必须声明multiprocessing.Event全局…
否则,当生成进程时,每个子进程都会复制它
他们无法在他们之间和主要过程之间进行沟通.
但正如bj0已经提到的那样,并行化这个问题可能不是更好……
在实现这两种方法之后,我将它们与串行问题进行了比较,这是我的结果
对于同一台机器的给定案例:
>序列号:7s
>游泳池:910s
> 3个进程拥有自己的数据集:97s
所以没有什么比这更好了…我会坚持我的串行实现,并寻找其他方法来加速事情,像其他方法,而不是完全随机…
这是我的MWE的最后一个工作版本:
#!/usr/bin/env python3
import multiprocessing as muproc
def ParallelCheck(me):
print(" Testing {}...".format(me))
global abort
abort.clear()
ParChild = ParallelChild(me)
jobs = []
N = 4
for i in range(N):
jobs.append(muproc.Process(target = ParChild.run, args=(range(i * 2500, (i+1) * 2500),)))
for p in jobs:
p.start()
for p in jobs:
p.join()
if abort.is_set():
print("MATCH FOUND")
return 1
else:
print(" no match...")
return 0
def Computation(me, neighbour):
return me == neighbour
class ParallelChild(object):
def __init__(self, me):
self.me = me
def run(self, neighbours):
global abort
for neighbour in neighbours:
print("{} vs {} by {}".format(self.me, neighbour, self.CurProc()))
if abort.is_set():
print("Aborting {}".format(self.CurProc()))
return 0
else:
if Computation(self.me, neighbour):
abort.set()
print("GOTCHA {}".format(self.CurProc()))
return 1
def CurProc(self):
return muproc.current_process()._identity[0]
if __name__ == "__main__":
abort = muproc.Event()
values = [12000, 130, 7]
for v in values:
print("value={} match={}".format(v, ParallelCheck(v)))