concurrent.futures模块
该模块主要特色在于ThreadPoolExecutor 和 ProcessPoolExecutor 类,这两个类都继承自concurrent.futures._base.Executor类,它们实现的接口能分别在不同的线程或进程中执行可调用的对象,它们都在内部维护着一个工作线程或者进程池。
ThreadPoolExecutor 和 ProcessPoolExecutor 类是高级类,大部分情况下只要学会使用即可,无需关注其实现细节。
####ProcessPoolExecutor 类
>class ThreadPoolExecutor(concurrent.futures._base.Executor)
>| This is an abstract base class for concrete asynchronous executors.
>| Method resolution order:
>| ThreadPoolExecutor
| concurrent.futures._base.Executor
| builtins.object
|
| Methods defined here:
|
| init(self, max_workers=None, thread_name_prefix='')
| Initializes a new ThreadPoolExecutor instance.
|
| Args:
| max_workers: The maximum number of threads that can be used to
| execute the given calls.
| thread_name_prefix: An optional name prefix to give our threads.
|
| shutdown(self, wait=True)
| Clean-up the resources associated with the Executor.
|
| It is safe to call this method several times. Otherwise, no other
| methods can be called after this one.
|
| Args:
| wait: If True then shutdown will not return until all running
| futures have finished executing and the resources used by the
| executor have been reclaimed.
|
| submit(self, fn, *args, **kwargs)
| Submits a callable to be executed with the given arguments.
|
| Schedules the callable to be executed as fn(*args, **kwargs) and returns
| a Future instance representing the execution of the callable.
|
| Returns:
| A Future representing the given call.
|
| ----------------------------------------------------------------------
| Methods inherited from concurrent.futures._base.Executor:
|
| enter(self)
|
| exit(self, exc_type, exc_val, exc_tb)
|
| map(self, fn, *iterables, timeout=None, chunksize=1)
| Returns an iterator equivalent to map(fn, iter).
|
| Args:
| fn: A callable that will take as many arguments as there are
| passed iterables.
| timeout: The maximum number of seconds to wait. If None, then there
| is no limit on the wait time.
| chunksize: The size of the chunks the iterable will be broken into
| before being passed to a child process. This argument is only
| used by ProcessPoolExecutor; it is ignored by
| ThreadPoolExecutor.
|
| Returns:
| An iterator equivalent to: map(func, *iterables) but the calls may
| be evaluated out-of-order.
|
| Raises:
| TimeoutError: If the entire result iterator could not be generated
| before the given timeout.
| Exception: If fn(*args) raises for any values.
初始化可以指定一个最大进程数作为其参数 max_workers 的值,该值一般无需指定,默认为当前运行机器的核心数,可以由os.cpu_count()获取;类中含有方法:
- map()方法,与python内置方法map() 功能类似,也就是映射,参数为:
- 一个可调用函数 fn
- 一个迭代器 iterables
- 超时时长 timeout
- 块数chuncksize 如果大于1, 迭代器会被分块处理
—->> 该函数有一个特性:其返回结果与调用开始的顺序是一致的;在调用过程中不会产生阻塞,也就是说可能前者被调用执行结束之前,后者被调用已经执行结束了。
如果一定要获取到所有结果后再处理,可以选择采用submit()方法和futures.as_completed函数结合使用。
- shutdown()方法,清理所有与当前执行器(executor)相关的资源
- submit() 方法,提交一个可调用对象给fn使用
- 从concurrent.futures._base.Executor继承了__enter__() 和 __exit__()方法,这意味着ProcessPoolExecutor 对象可以用于with 语句。
from concurrent import futures
with futures.ProcessPoolExecutor(max_works=3) as executor:
executor.map()
ThreadPoolExecutor类
class ThreadPoolExecutor(concurrent.futures._base.Executor)
| This is an abstract base class for concrete asynchronous executors.
|
| Method resolution order:
| ThreadPoolExecutor
| concurrent.futures._base.Executor
| builtins.object
|
| Methods defined here:
|
| init(self, max_workers=None, thread_name_prefix='')
| Initializes a new ThreadPoolExecutor instance.
|
| Args:
| max_workers: The maximum number of threads that can be used to
| execute the given calls.
| thread_name_prefix: An optional name prefix to give our threads.
|
| shutdown(self, wait=True)
| Clean-up the resources associated with the Executor.
|
| It is safe to call this method several times. Otherwise, no other
| methods can be called after this one.
|
| Args:
| wait: If True then shutdown will not return until all running
| futures have finished executing and the resources used by the
| executor have been reclaimed.
|
| submit(self, fn, *args, **kwargs)
| Submits a callable to be executed with the given arguments.
|
| Schedules the callable to be executed as fn(*args, **kwargs) and returns
| a Future instance representing the execution of the callable.
|
| Returns:
| A Future representing the given call.
|
| ----------------------------------------------------------------------
| Methods inherited from concurrent.futures._base.Executor:
|
| enter(self)
|
| exit(self, exc_type, exc_val, exc_tb)
|
| map(self, fn, *iterables, timeout=None, chunksize=1)
| Returns an iterator equivalent to map(fn, iter).
|
| Args:
| fn: A callable that will take as many arguments as there are
| passed iterables.
| timeout: The maximum number of seconds to wait. If None, then there
| is no limit on the wait time.
| chunksize: The size of the chunks the iterable will be broken into
| before being passed to a child process. This argument is only
| used by ProcessPoolExecutor; it is ignored by
| ThreadPoolExecutor.
|
| Returns:
| An iterator equivalent to: map(func, *iterables) but the calls may
| be evaluated out-of-order.
|
| Raises:
| TimeoutError: If the entire result iterator could not be generated
| before the given timeout.
| Exception: If fn(*args) raises for any values.
与ProcessPoolExecutor 类十分相似,只不过一个是处理进程,一个是处理线程,可根据实际需要选择。
示例
from time import sleep, strftime
from concurrent import futures
def display(*args):
print(strftime('[%H:%M:%S]'), end="")
print(*args)
def loiter(n):
msg = '{}loiter({}): doing nothing for {}s'
display(msg.format('\t'*n, n, n))
sleep(n)
msg = '{}loiter({}): done.'
display(msg.format('\t'*n, n))
return n*10
def main():
display('Script starting')
executor = futures.ThreadPoolExecutor(max_workers=3)
results = executor.map(loiter, range(5))
display('results:', results)
display('Waiting for individual results:')
for i, result in enumerate(results):
display('result {} : {}'.format(i, result))
if __name__ == '__main__':
main()
运行结果:
[20:32:12]Script starting
[20:32:12]loiter(0): doing nothing for 0s
[20:32:12]loiter(0): done.
[20:32:12] loiter(1): doing nothing for 1s
[20:32:12] loiter(2): doing nothing for 2s
[20:32:12]results: <generator object Executor.map.<locals>.result_iterator at 0x00000246DB21BC50>
[20:32:12]Waiting for individual results:
[20:32:12] loiter(3): doing nothing for 3s
[20:32:12]result 0 : 0
[20:32:13] loiter(1): done.
[20:32:13] loiter(4): doing nothing for 4s
[20:32:13]result 1 : 10
[20:32:14] loiter(2): done.
[20:32:14]result 2 : 20
[20:32:15] loiter(3): done.
[20:32:15]result 3 : 30
[20:32:17] loiter(4): done.
[20:32:17]result 4 : 40
不同机器运行结果可能不同。
示例中设置max_workers=3,所以代码一开始运行,则有三个对象(0,1,2)被执行loiter() 操作; 三秒后,对象0运行结束,得到结果result 0之后,对象3才开始被执行,同理,对象4的执行时间在对象1执行结果result 1打印结束之后。