进程
进程(process)是正在运行的程序的实例,但一个程序可能会产生多个进程。比如,打开 Chrome 浏览器程序,它可能会产生多个进程,主程序需要一个进程,一个网页标签需要一个进程,一个插件也需要一个进程,等等。
每个进程都有自己的地址空间,内存,数据栈以及其他记录其运行状态的辅助数据,不同的进程只能使用消息队列、共享内存等进程间通讯(IPC)方法进行通信,而不能直接共享信息。
fork()
在介绍 Python 的进程编程之前,让我们先看看 Unix/Linux 中的 fork
函数。在 Unix/Linux 系统中,fork
函数被用于创建进程。这个函数很特殊,对于普通的函数,调用它一次,返回一次,但是调用 fork
一次,它返回两次。事实上,fork
函数创建了新的进程,我们把它称为子进程,子进程几乎是当前进程(即父进程)的一个拷贝:它会复制父进程的代码段,堆栈段和数据段。
对于父进程,fork
函数返回了子进程的进程号 pid,对于子进程,fork
函数则返回 0
,这也是 fork
函数返回两次的原因,根据返回值,我们可以判断进程是父进程还是子进程。
下面我们看一段 C 代码,它展示了 fork 的基本使用:
#include <unistd.h>
#include <stdio.h>
int main(int argc, char const *argv[])
{
int pid;
pid = fork(); // 使用 fork 函数
if (pid < 0) {
printf("Fail to create process\n");
}
else if (pid == 0) {
printf("I am child process (%d) and my parent is (%d)\n", getpid(), getppid());
}
else {
printf("I (%d) just created a child process (%d)\n", getpid(), pid);
}
return 0;
}
其中,getpid
用于获取当前进程号,getppid
用于获取父进程号。
事实上,Python 的 os 模块包含了普遍的操作系统功能,该模块也提供了 fork
函数,把上面的代码改成用 Python 来实现,如下:
import os
pid = os.fork()
if pid < 0:
print 'Fail to create process'
elif pid == 0:
print 'I am child process (%s) and my parent is (%s).' % (os.getpid(), os.getppid())
else:
print 'I (%s) just created a child process (%s).' % (os.getpid(), pid)
运行上面的代码,产生如下输出:
I (86645) just created a child process (86646).
I am child process (86646) and my parent is (86645).
需要注意的是,虽然子进程复制了父进程的代码段和数据段等,但是一旦子进程开始运行,子进程和父进程就是相互独立的,它们之间不再共享任何数据。
多进程
Python 提供了一个 multiprocessing 模块,利用它,我们可以来编写跨平台的多进程程序,但需要注意的是 multiprocessing 在 Windows 和 Linux 平台的不一致性:一样的代码在 Windows 和 Linux 下运行的结果可能不同。因为 Windows 的进程模型和 Linux 不一样,Windows 下没有 fork。
我们先来看一个简单的例子,该例子演示了在主进程中启动一个子进程,并等待其结束,代码如下:
import os
from multiprocessing import Process
# 子进程要执行的代码
def child_proc(name):
print 'Run child process %s (%s)...' % (name, os.getpid())
if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Process(target=child_proc, args=('test',))
print 'Process will start.'
p.start()
p.join()
print 'Process end.'
在上面的代码中,我们从 multiprocessing 模块引入了 Process,Process 是一个用于创建进程对象的类,其中,target 指定了进程要执行的函数,args 指定了参数。在创建了进程实例 p 之后,我们调用 start 方法开始执行该子进程,接着,我们又调用了 join 方法,该方法用于阻塞子进程以外的所有进程(这里指父进程),当子进程执行完毕后,父进程才会继续执行,它通常用于进程间的同步。
可以看到,用上面这种方式来创建进程比直接使用 fork
更简单易懂。现在,让我们看下输出结果:
Parent process 7170.
Process will start.
Run child process test (10075)...
Process end.
multiprocessing 与平台有关
import random
import os
from multiprocessing import Process
num = random.randint(0, 100)
def show_num():
print("pid:{}, num is {}".format(os.getpid(), num))
if __name__ == "__main__":
print("pid:{}, num is {}".format(os.getpid(), num))
p = Process(target=show_num)
p.start()
p.join()
在 Windows 下运行以上代码,输出的结果如下(你得到不一样的结果也是对的):
pid:6504, num is 25
pid:6880, num is 6
我们发现,num 的值是不一样的!
在 Linux 下运行以上代码,可以看到 num 的值是一样的:
pid:11747, num is 13
pid:11748, num is 13
使用进程池创建多个进程
在上面,我们只是创建了一个进程,如果要创建多个进程呢?Python 提供了进程池的方式,让我们批量创建子进程,让我们看一个简单的示例:
import os, time
from multiprocessing import Pool
def foo(x):
print 'Run task %s (pid:%s)...' % (x, os.getpid())
time.sleep(2)
print 'Task %s result is: %s' % (x, x * x)
if __name__ == '__main__':
print 'Parent process %s.' % os.getpid()
p = Pool(4) # 设置进程数
for i in range(5):
p.apply_async(foo, args=(i,)) # 设置每个进程要执行的函数和参数
print 'Waiting for all subprocesses done...'
p.close()
p.join()
print 'All subprocesses done.'
在上面的代码中,Pool 用于生成进程池,对 Pool 对象调用 apply_async 方法可以使每个进程异步执行任务,也就说不用等上一个任务执行完才执行下一个任务,close 方法用于关闭进程池,确保没有新的进程加入,join 方法会等待所有子进程执行完毕。
看看执行结果:
Parent process 7170.
Run task 1 (pid:10320)...
Run task 0 (pid:10319)...
Run task 3 (pid:10322)...
Run task 2 (pid:10321)...
Waiting for all subprocesses done...
Task 1 result is: 1
Task 0 result is: 0
Run task 4 (pid:10320)...
Task 3 result is: 9
Task 2 result is: 4
Task 4 result is: 16
All subprocesses done.
进程间通信
进程间的通信可以通过管道(Pipe),队列(Queue)等多种方式来实现。Python 的 multiprocessing 模块封装了底层的实现机制,让我们可以很容易地实现进程间的通信。
下面以队列(Queue)为例,在父进程中创建两个子进程,一个往队列写数据,一个从对列读数据,代码如下:
# -*- coding: utf-8 -*-
from multiprocessing import Process, Queue
# 向队列中写入数据
def write_task(q):
try:
n = 1
while n < 5:
print "write, %d" % n
q.put(n)
time.sleep(1)
n += 1
except BaseException:
print "write_task error"
finally:
print "write_task end"
# 从队列读取数据
def read_task(q):
try:
n = 1
while n < 5:
print "read, %d" % q.get()
time.sleep(1)
n += 1
except BaseException:
print "read_task error"
finally:
print "read_task end"
if __name__ == "__main__":
q = Queue() # 父进程创建Queue,并传给各个子进程
pw = Process(target=write_task, args=(q,))
pr = Process(target=read_task, args=(q,))
pw.start() # 启动子进程 pw,写入
pr.start() # 启动子进程 pr,读取
pw.join() # 等待 pw 结束
pr.join() # 等待 pr 结束
print "DONE"
执行结果如下:
write, 1
read, 1
write, 2
read, 2
write, 3
read, 3
write, 4
read, 4
write_task end
read_task end
DONE
小结
- 进程是正在运行的程序的实例。
- 由于每个进程都有各自的内存空间,数据栈等,所以只能使用进程间通讯(Inter-Process Communication, IPC),而不能直接共享信息。
- Python 的 multiprocessing 模块封装了底层的实现机制,让我们可以更简单地编写多进程程序。