【Python】multiprocessing多进程实例

以下代码亲测可运行,环境py3.5

案例1:使用多进程的pool+map

# coding:utf-8
import multiprocessing

def f(x):
    return x * x

if __name__ == "__main__":
    cores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=cores)
    xs = range(5)
    
    # method 1: map
    print(pool.map(f, xs))  # prints [0, 1, 4, 9, 16]
    
    # method 2: imap
    for y in pool.imap(f, xs):
        print(y)            # 0, 1, 4, 9, 16, respectively
    
    # method 3: imap_unordered
    for y in pool.imap_unordered(f, xs):
        print(y)           # may be in any order
        
    cnt = 0
    for _ in pool.imap_unordered(f, xs):
        sys.stdout.write('done %d/%d\r' % (cnt, len(xs)))
        cnt += 1

或者

import multiprocessing
import time

def func(msg):
    for i in range(3):
       print(msg)
       time.sleep(1)
    return "done " + msg

if __name__ == "__main__":
    pool = multiprocessing.Pool(processes=2)
    result = []
    for i in range(5):
        msg = "hello %d" %(i)
        result.append(pool.apply_async(func, (msg, )))
    pool.close()
    pool.join()
    for res in result:
        print(res.get())
    print("Sub-process(es) done.")


案例2:使用多进程(multiprocessing)

# Similarity and difference of multi thread vs. multi process
# Written by Vamei

import os
import threading
import multiprocessing

# worker function
def worker(sign, lock):
    lock.acquire()
    print(sign, os.getpid())
    lock.release()

if __name__ == "__main__":
    # Main
    print('Main:',os.getpid())
    
    # Multi-thread
    record = []
    lock  = threading.Lock()
    for i in range(5):
        thread = threading.Thread(target=worker,args=('thread',lock))
        thread.start()
        record.append(thread)
    
    for thread in record:
        thread.join()
    
    # Multi-process
    record = []
    lock = multiprocessing.Lock()
    for i in range(5):
        process = multiprocessing.Process(target=worker,args=('process',lock))
        process.start()
        record.append(process)
    
    for process in record:
        process.join()

注意:

但在使用这些共享API的时候,我们要注意以下几点:

在UNIX平台上,当某个进程终结之后,该进程需要被其父进程调用wait,否则进程成为僵尸进程(Zombie)。所以,有必要对每个Process对象调用join()方法 (实际上等同于wait)。对于多线程来说,由于只有一个进程,所以不存在此必要性。

multiprocessing提供了threading包中没有的IPC(比如Pipe和Queue),效率上更高。应优先考虑Pipe和Queue,避免使用Lock/Event/Semaphore/Condition等同步方式 (因为它们占据的不是用户进程的资源)。

多进程应该避免共享资源。在多线程中,我们可以比较容易地共享资源,比如使用全局变量或者传递参数。在多进程情况下,由于每个进程有自己独立的内存空间,以上方法并不合适。此时我们可以通过共享内存和Manager的方法来共享资源。但这样做提高了程序的复杂度,并因为同步的需要而降低了程序的效率。

Process.PID中保存有PID,如果进程还没有start(),则PID为None。

案例3:使用多进程的quene

# Written by Vamei
import os
import multiprocessing
import time
#==================
# input worker
def inputQ(queue):
    info = str(os.getpid()) + '(put):' + str(time.time())
    queue.put(info)

# output worker
def outputQ(queue,lock):
    info = queue.get()
    lock.acquire()
    print (str(os.getpid()) + '(get):' + info)
    lock.release()
#===================
# Main
record1 = []   # store input processes
record2 = []   # store output processes
lock  = multiprocessing.Lock()    # To prevent messy print
queue = multiprocessing.Queue(3)

# input processes
for i in range(10):
    process = multiprocessing.Process(target=inputQ,args=(queue,))
    process.start()
    record1.append(process)

# output processes
for i in range(10):
    process = multiprocessing.Process(target=outputQ,args=(queue,lock))
    process.start()
    record2.append(process)

for p in record1:
    p.join()

queue.close()  # No more object will come, close the queue

for p in record2:
    p.join()

一些进程使用put()在Queue中放入字符串,这个字符串中包含PID和时间。另一些进程从Queue中取出,并打印自己的PID以及get()的字符串。

参考:http://www.cnblogs.com/vamei/archive/2012/10/12/2721484.html

    原文作者:CS青雀
    原文地址: https://blog.csdn.net/ztf312/article/details/77851607
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞