如何与python并行处理输入,但没有进程?

我有一个输入数据列表,并希望并行处理它,但处理每个输入数据需要时间,因为涉及到网络io. CPU使用率不是问题.

我不希望有额外进程的开销,因为我一次要处理很多事情并且不想设置进程间通信.

# the parallel execution equivalent of this?
import time
input_data = [1,2,3,4,5,6,7]
input_processor = time.sleep
results = map(input_processor, input_data)

我正在使用的代码使用twisted.internet.defer,因此涉及到的解决方案也很好.

最佳答案 您可以轻松定义并行工作的工作线程,直到队列为空.

from threading import Thread
from collections import deque
import time


# Create a new class that inherits from Thread
class Worker(Thread):

    def __init__(self, inqueue, outqueue, func):
        '''
        A worker that calls func on objects in inqueue and
        pushes the result into outqueue

        runs until inqueue is empty
        '''

        self.inqueue = inqueue
        self.outqueue = outqueue
        self.func = func
        super().__init__()

    # override the run method, this is starte when
    # you call worker.start()
    def run(self):
        while self.inqueue:
            data = self.inqueue.popleft()
            print('start')
            result = self.func(data)
            self.outqueue.append(result)
            print('finished')


def test(x):
    time.sleep(x)
    return 2 * x


if __name__ == '__main__':
    data = 12 * [1, ]
    queue = deque(data)
    result = deque()

    # create 3 workers working on the same input
    workers = [Worker(queue, result, test) for _ in range(3)]

    # start the workers
    for worker in workers:
        worker.start()

    # wait till all workers are finished
    for worker in workers:
        worker.join()

    print(result)

正如预期的那样, 4秒

还可以编写一个简单的Pool类来消除main函数中的噪声:

from threading import Thread
from collections import deque
import time


class Pool():

    def __init__(self, n_threads):
        self.n_threads = n_threads

    def map(self, func, data):
        inqueue = deque(data)
        result = deque()

        workers = [Worker(inqueue, result, func) for i in range(self.n_threads)]

        for worker in workers:
            worker.start()

        for worker in workers:
            worker.join()

        return list(result)


class Worker(Thread):

    def __init__(self, inqueue, outqueue, func):
        '''
        A worker that calls func on objects in inqueue and
        pushes the result into outqueue

        runs until inqueue is empty
        '''

        self.inqueue = inqueue
        self.outqueue = outqueue
        self.func = func
        super().__init__()

    # override the run method, this is starte when
    # you call worker.start()
    def run(self):
        while self.inqueue:
            data = self.inqueue.popleft()
            print('start')
            result = self.func(data)
            self.outqueue.append(result)
            print('finished')


def test(x):
    time.sleep(x)
    return 2 * x


if __name__ == '__main__':
    data = 12 * [1, ]

    pool = Pool(6)
    result = pool.map(test, data)

    print(result)
点赞