摘要
在使用Python开发某些高并发WEB应用后台时,为了提高处理能力,通常会采用多进程或者多线程的方式,但这同时也给异常处理带了一些麻烦。
背景
线程是一个轻型实体,只有由很少的支持其独立运行的资源。 对于Python,线程拥有自己独立的栈, 当线程运行出错,线程会直接结束运行,当需要进行错误处理时,一般都会在线程中进行处理,但是如果只能由主进程来处理异常,那么线程要怎么才能将异常通知给主进程呢?
对于进程,子进程的产生的异常如何让父进程去处理?
Multiprocessing Package
Multiprocessing
是Python的一个多进程库,其实现的API类似threading
. 其子模块dummy
实现了与Multiprocessing.process
相同的API,唯一的区别在于,dummy
其实只是对threading
做了一层封装而已. 这给不知道是CPU密集型或者是IO密集型的项目提供了便利,你可以自由地在多进程与多线程之间进行切换而只需更改一行代码。
调用error_callback函数
在python3中, Multiprocessing
对与apply_async()
进行了改进,增加了一个默认参数error_callback=None
,在这里你可以指定相应的错误处理函数,它将在子进程或者子线程运行出现异常是被调用. 与之对应的是callback=None
参数(Python2也支持),它将在子进程或者子线程顺利运行之后被调用.
其原理,在于子线程或者子进程的一个_success
属性, 其指出了运行状态.
def get(self, timeout=None):
self.wait(timeout)
if not self.ready():
raise TimeoutError
if self._success:
return self._value
else:
raise self._value
def _set(self, i, obj):
self._success, self._value = obj
if self._callback and self._success:
self._callback(self._value)
if self._error_callback and not self._success:
self._error_callback(self._value)
self._event.set()
del self._cache[self._job]
需要注意的是,raise self._value
语句抛出的是一个Exception.AttributeError
, 可以看出只会抛出两种错误,一种是超时错误,一种是AttributeError
. 而我们能利用的就是AttributeError
, 在线程或者进程执行过程种出现错误, 我们只需要将合适的错误信息使用格式化字符串
通过AttributeError传递出去,在error_callback
去获取这个错误信息,在进行进一步的处理即可, 如果你只需要告知主进程出错了,那么只需要raise AttributeError
即可.
进程间数据共享
不推荐pipe
和queue
,而它们都有可能堵塞进程,所以最好采用其他方式,这里介绍两种,共享内存变量,以及服务进程Manager. 你可以通过使用这两种方式在主进程中检测子线程或者子进程的状态. 至于访问速度,前者与和直接访问内存速度相差无几, 后者速度要低两个数量级
共享内存变量 Shared Ctypes Objects
multi提供了两种类型,Multiprocessing.Value
以及MultiProcessing.Array
, 这两种结果内部实现了Lock,默认是开启的, 因此这时,它们是process-safe
的. 共享内存变量在父进程中被创建,然后在创建子进程的时候传进去即可.
官方文档示例代码:
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, x, s, A):
n.value **= 2
x.value **= 2
s.value = s.value.upper()
for a in A:
a.x **= 2
a.y **= 2
if __name__ == '__main__':
lock = Lock()
n = Value('i', 7)
x = Value(c_double, 1.0/3.0, lock=False)
s = Array('c', b'hello world', lock=lock)
A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)
p = Process(target=modify, args=(n, x, s, A))
p.start()
p.join()
print("Ounput:")
print(n.value)
print(x.value)
print(s.value)
print([(a.x, a.y) for a in A])
Output:
49
0.1111111111111111
HELLO WORLD
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]
服务进程 Manager
如果你需要支持更多类型数据的共享呢? Sever Process Manager 可以完成, 它支持很多数据类型,包括list
, dict
, Namespace
, Lock
, RLock
, Semaphore
, BoundedSemaphore
, Condition
, Event
, Queue
, Value
和Array
. 作为一个独立的服务进程存在,意味着可以远程共享数据,内部实现类似与RPC服务器,每个需要进行读写共享数据的进程,都需要通过proxies来访问服务进程进行操作. 一个Manager对象就控制一个用于管理共享数据的服务进程。由于访问是在网络之上进行的, 因此速度方面没有共享内存快.
Manager的具体使用方法
检查线程退出状态
由于线程出现异常退出和正常退出时的exitcode
时不一样的,而线程结束运行后,内存又不会立即被回收,基于此,我们可以通过检查exitcode
的方式来捕获线程中出现的错误,这种方法来自PengMeng’s Blog, 一般地,Python中进程的属性, 或许可以达到和exitcode
相同的效果. 这里不再深入讨论.