我想创建一个Thread或一个在While True循环中永远运行的进程.
我需要以队列的形式向工作人员发送和接收数据,可以是multiprocessing.Queue()或collections.deque().我更喜欢使用collections.deque(),因为它明显更快.
我还需要能够最终杀死这个工作者(因为它运行了一段时间的True循环.这里有一些测试代码,我把它们放在一起试图理解线程,进程,队列和deque之间的区别.
import time
from multiprocessing import Process, Queue
from threading import Thread
from collections import deque
class ThreadingTest(Thread):
def __init__(self, q):
super(ThreadingTest, self).__init__()
self.q = q
self.toRun = False
def run(self):
print("Started Thread")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Thread deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Thread Queue: " + str(i))
def stop(self):
print("Trying to stop Thread")
self.toRun = False
while self.isAlive():
time.sleep(0.1)
print("Stopped Thread")
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.toRun = False
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.toRun = False
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
q = Queue()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
if type(q) == type(deque()):
q.append(i)
elif type(q) == type(Queue()):
q.put_nowait(i)
time.sleep(1)
t1.stop()
t1.join()
if type(q) == type(deque()):
print(q)
elif type(q) == type(Queue()):
while q.qsize() > 0:
print(str(q.get_nowait()))
如您所见,t1可以是ThreadingTest,也可以是ProcessTest.此外,传递给它的队列可以是multiprocessing.Queue或collections.deque.
ThreadingTest适用于Queue或deque().当调用stop()方法时,它还会正确杀死run().
Started Thread
Thread deque: 0
Thread deque: 1
Thread deque: 2
Thread deque: 3
Thread deque: 4
Thread deque: 5
Thread deque: 6
Thread deque: 7
Thread deque: 8
Thread deque: 9
Trying to stop Thread
Stopped Thread
deque([])
如果ProcessTest类型为multiprocessing.Queue,则它只能从队列中读取.它不适用于collections.deque.此外,我无法使用stop()终止进程.
Process Queue: 0
Process Queue: 1
Process Queue: 2
Process Queue: 3
Process Queue: 4
Process Queue: 5
Process Queue: 6
Process Queue: 7
Process Queue: 8
Process Queue: 9
Trying to stop Process
我想弄明白为什么?另外,在进程中使用双端队列的最佳方法是什么?而且,我将如何使用某种stop()方法杀死进程.
最佳答案 您不能使用collections.deque在两个multiprocessing.Process实例之间传递数据,因为collections.deque不支持进程. multiprocessing.Queue在内部将其内容写入multiprocessing.Pipe,这意味着其中的数据可以在一次进程中排队并在另一个进程中检索. collections.deque没有那种管道,所以它不起作用.当您在一个进程中写入双端队列时,另一个进程中的双端队列实例根本不会受到影响;他们是完全独立的实例.
您的stop()方法也会遇到类似的问题.您正在主进程中更改toRun的值,但这根本不会影响子进程.它们是完全独立的实例.结束孩子的最好方法是将一些哨兵送到队列.当你获得孩子的哨兵时,突破无限循环:
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if type(self.q) == type(deque()):
if self.q:
i = self.q.popleft()
print("Process deque: " + str(i))
elif type(self.q) == type(Queue()):
if not self.q.empty():
i = self.q.get_nowait()
if i is None:
break # Got sentinel, so break
print("Process Queue: " + str(i))
def stop(self):
print("Trying to stop Process")
self.q.put(None) # Send sentinel
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
编辑:
如果您确实需要在两个进程之间使用deque语义,则可以使用custom multiprocessing.Manager()
在Manager进程中创建共享双端队列,并且每个Process实例都将获得一个Proxy:
import time
from multiprocessing import Process
from multiprocessing.managers import SyncManager
from collections import deque
SyncManager.register('deque', deque)
def Manager():
m = SyncManager()
m.start()
return m
class ProcessTest(Process):
def __init__(self, q):
super(ProcessTest, self).__init__()
self.q = q
self.ctr = 0
def run(self):
print("Started Process")
self.toRun = True
while self.toRun:
if self.q._getvalue():
i = self.q.popleft()
if i is None:
break
print("Process deque: " + str(i))
def stop(self):
print("Trying to stop Process")
self.q.append(None)
while self.is_alive():
time.sleep(0.1)
print("Stopped Process")
if __name__ == '__main__':
m = Manager()
q = m.deque()
t1 = ProcessTest(q)
t1.start()
for i in range(10):
q.append(i)
time.sleep(1)
t1.stop()
t1.join()
print(q)
请注意,这可能不会比multiprocessing.Queue更快,因为每次访问双端队列时都会有IPC成本.它也是一种不那么自然的数据结构,可以按照您的方式传递消息.