Python 期物之 concurrent.futures.Future

Python 期物用在异步编程,所谓期物指的是排定的执行事件。Python 3.4起

  • 总结
    • 1、期物处理并发只涉及到三个对象,一个是期物(concurrent.futures.Future),一个是执行器(concurrent.futures.Executor),还有一个是 _WorkItem
      • 1)期物对象:本身不涉及多线程,多进程或者yield等语法,其只是一个具有运行状态运行结果以及可添加回调函数的类
      • 2)_WorkItem 对象:真正被添加到任务队列的对象。将一个需执行的函数期物实例化成一个 _WorkItem 对象。通过 run 方法执行,run 方法负责标记期物的状态,执行函数,将执行结果赋值给期物。
      • 3)执行器:有两个方法 map, submit
        • submit 接收一个函数期物,生成 _WorkItem 对象,并将该对象添加到任务队列中。每调用 submit 方法都会调整处理队列的线程个数,如果当前运行线程数小于执行器设置的最大执行线程数,则新建一个线程去处理任务队列。返回值为期物对象
        • map 方法,使用 submit 迭代执行器要执行函数的参数列表,返回一个期物列表。遍历期物列表,使用 yield 去接收每个期物对象的result属性。
    • 2、任务队列的运行:
      • 1)每个线程均执行 _worker()方法
      • 2)任务队列 work_queue 使用 queue.Queue() 存储

循环执行 work_queue.get 得到的 _WorkItem 对象,直到获取的对象为 None

  • Future 源码
class Future(object):
    # 表征了异步计算的结果

    def __init__(self):
        # 初始化 future 实例,不应该通过用户端调用
        self._condition = threading.Condition()  # condition是条件锁
        self._state = PENDING
        self._result = None
        self._exception = None
        self._waiters = []
        self._done_callbacks = []

    # 回调
    def _invoke_callbacks(self):
        for callback in self._done_callbacks:
            try:
                callback(self)
            except Exception:
                LOGGER.exception('exception calling callback for %r', self)

    # 格式化输出对象
    def __repr__(self):
        with self._condition:
            if self._state == FINISHED:
                if self._exception:
                    return '<%s at %#x state=%s raised %s>' % (
                        self.__class__.__name__,
                        id(self),
                        _STATE_TO_DESCRIPTION_MAP[self._state],
                        self._exception.__class__.__name__)
                else:
                    return '<%s at %#x state=%s returned %s>' % (
                        self.__class__.__name__,
                        id(self),
                        _STATE_TO_DESCRIPTION_MAP[self._state],
                        self._result.__class__.__name__)
            return '<%s at %#x state=%s>' % (
                    self.__class__.__name__,
                    id(self),
                   _STATE_TO_DESCRIPTION_MAP[self._state])

    def cancel(self):
        # 取消期物的调用,取消成功返回 Ture,其余返回 False。
        # 如果期物已经运行或者已经结束,则该期物不可以被取消,返回 True。
        
        with self._condition:
            if self._state in [RUNNING, FINISHED]:
                return False

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                return True

            self._state = CANCELLED
            # 唤醒所有使用 _condition 条件阻塞的线程
            self._condition.notify_all()
        
        # 执行任务结束或cancel的回调
        self._invoke_callbacks()
        return True

    def cancelled(self):
        # 如果 future 已被 cancel,返回 True
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]

    def running(self):
        # 如果 future 正在运行,返回 True
        with self._condition:
            return self._state == RUNNING

    def done(self):
        # 如果 future 已被 cancel 或者 执行结束,返回 True
        with self._condition:
            return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]

    # 返回期物运行结果
    def __get_result(self):
        if self._exception:
            raise self._exception
        else:
            return self._result

    def add_done_callback(self, fn):
        # 期物运行结束调用的对象
        # fn: 期物运行结束或 cancel 后被调用,总会在所添加的进程内调用。如果期物已经结束或 cancel 则会立即调用;根据添加顺序进行调用
        with self._condition:
            if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
                self._done_callbacks.append(fn)
                return
        fn(self)

    def result(self, timeout=None):
        """
        Returns:
            期物的运行结果
        Raises:
            CanceledError: 期物被 cancell
            TimeoutError: 期物在给定的时间没有执行完毕
            Exception: 其他 Error
        """
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()

            # 此处会阻塞,等待 notify 
            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self.__get_result()
            else:
                raise TimeoutError()

    def exception(self, timeout=None):
        """
        Returns:
            期物运行的异常
        Raises:
            CancelledError: 如果期物被 cancel
            TimeoutError: 如果期物在给定的时间没有执行完毕
        """
        with self._condition:
            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self._exception

            self._condition.wait(timeout)

            if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
                raise CancelledError()
            elif self._state == FINISHED:
                return self._exception
            else:
                raise TimeoutError()

    # The following methods should only be used by Executors and in tests.
    def set_running_or_notify_cancel(self):
        """
        标记期物为 RUNNING 或者 CANCELLED_AND_NOTIFIED,
            1、如果期物已经 cancelled 则期物任何等待执行的线程都会被 notify 并且 return False。
            2、如果期物没有被 cancelled,则状态变更为 RUNNING,返回 True
            3、此方法应该在期物所关联的work执行前被调用,如果此方法返回 False,那么 work 不应该被执行。
        Returns:
            如果期物已经被 cancelled,返回 False;其他情况返回 True
        Raises:
            RuntimeError:如果此方法已经被调用或者 set_result() 或者 set_exception()被调用。
        """
        with self._condition:
            if self._state == CANCELLED:
                self._state = CANCELLED_AND_NOTIFIED
                for waiter in self._waiters:
                    waiter.add_cancelled(self)
                # self._condition.notify_all() is not necessary because
                # self.cancel() triggers a notification.
                return False
            elif self._state == PENDING:
                self._state = RUNNING
                return True
            else:
                LOGGER.critical('Future %s in unexpected state: %s',
                                id(self),
                                self._state)
                raise RuntimeError('Future in unexpected state')

    def set_result(self, result):
        """Sets the return value of work associated with the future.

        Should only be used by Executor implementations and unit tests.
        """
        """
        将期物关联 work 的返回值赋值给期物对象,并发送通知 notify
        """
        with self._condition:
            self._result = result
            self._state = FINISHED
            for waiter in self._waiters:
                waiter.add_result(self)
            self._condition.notify_all()
        self._invoke_callbacks()

    def set_exception(self, exception):
        """
        使用给定的异常设置期物的 _exception 值
        """
        with self._condition:
            self._exception = exception
            self._state = FINISHED
            for waiter in self._waiters:
                waiter.add_exception(self)
            self._condition.notify_all()
        self._invoke_callbacks()
  • 单从 Future 类并无法获知期物何时运行,下面引入 ThreadPoolExecutor
class ThreadPoolExecutor(_base.Executor):
    def __init__(self, max_workers=None):
        """
        初始化一个 ThreadPoolExecutor 实例
        Args: max_workers 使用最大线程数
        """
        if max_workers is None:
            # Use this number because ThreadPoolExecutor is often
            # used to overlap I/O instead of CPU work.
            max_workers = (os.cpu_count() or 1) * 5
        if max_workers <= 0:
            raise ValueError("max_workers must be greater than 0")

        self._max_workers = max_workers
        self._work_queue = queue.Queue()  # _WorkItem 实例队列
        self._threads = set()  # 实例的线程数
        self._shutdown = False  # 设置为 True 不再接受事件提交
        self._shutdown_lock = threading.Lock()  # 锁

    # 事件提交
    def submit(self, fn, *args, **kwargs):
        with self._shutdown_lock:
            if self._shutdown:
                raise RuntimeError('cannot schedule new futures after shutdown')

            f = _base.Future()
            w = _WorkItem(f, fn, args, kwargs)  # 用以在线程中调用其 run 方法

            self._work_queue.put(w)
            self._adjust_thread_count()  # 用以开启最多 _max_workers 数量的线程,并且在每个线程中 while 循环执行 _work_queue 队列中的实例
            
            return f  # 返回期物
    submit.__doc__ = _base.Executor.submit.__doc__

    def _adjust_thread_count(self):
        # 用以唤醒 worker 线程
        def weakref_cb(_, q=self._work_queue):
            q.put(None)
        # TODO(bquinlan): Should avoid creating new threads if there are more
        # idle threads than items in the work queue.
        if len(self._threads) < self._max_workers:
            t = threading.Thread(target=_worker,
                                 args=(weakref.ref(self, weakref_cb),
                                       self._work_queue))
            t.daemon = True
            t.start()
            self._threads.add(t)
            _threads_queues[t] = self._work_queue

    def shutdown(self, wait=True):
        with self._shutdown_lock:
            self._shutdown = True
            self._work_queue.put(None)
        if wait:
            for t in self._threads:
                t.join()
    shutdown.__doc__ = _base.Executor.shutdown.__doc__
  • Executor
class Executor(object):
    # 异步调用的抽象基类

    def submit(self, fn, *args, **kwargs):
        raise NotImplementedError()

    def map(self, fn, *iterables, timeout=None, chunksize=1):
        """
        Returns:
            迭代器,等同于 map(fn, *iteravles) 但是不是按照顺序执行
        Args:
            fn: 可调用对象,参数在 iterable 对象中
            timeout: 最大等待时间
        Raises:
            TimeoutError: 所有的迭代器不能在给定的时间生成
            Exception: 任何其他异常错误
        """
        if timeout is not None:
            end_time = timeout + time.time()

        # submit 的作用是将 函数+期物 绑定生成_WorkItem 实例对象,并且创建线程去循环执行 _WorkItem 对象实例
        fs = [self.submit(fn, *args) for args in zip(*iterables)]

        def result_iterator():
            try:
                for future in fs:
                    if timeout is None:
                        yield future.result()
                    else:
                        yield future.result(end_time - time.time())
            finally:
                for future in fs:
                    future.cancel()
        return result_iterator()

    def shutdown(self, wait=True):
        # 清理所有关联 executor 对象的资源
        pass

    def __enter__(self):
        # return 的 self 是给 as 使用的
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        self.shutdown(wait=True)
        return False
  • _WorkItem
# 简单的工作类 
class _WorkItem(object):

    # 初始化,参数为 期物+函数+参数
    def __init__(self, future, fn, args, kwargs):
        self.future = future
        self.fn = fn
        self.args = args
        self.kwargs = kwargs

    # 标记期物为notify,非 True 直接返回。调用期物关联的fn方法。
    def run(self):
        if not self.future.set_running_or_notify_cancel():
            return

        try:
            result = self.fn(*self.args, **self.kwargs)
        except BaseException as e:
            self.future.set_exception(e)
        else:
            self.future.set_result(result)
  • _worker()
# _worker方法
def _worker(executor_reference, work_queue):
    """
    此方法在被调用的线程内 while True 执行
    """
    try:
        while True:
            work_item = work_queue.get(block=True)
            if work_item is not None:
                work_item.run()
                # Delete references to object. See issue16284
                del work_item
                continue
            executor = executor_reference()
            # Exit if:
            #   - The interpreter is shutting down OR
            #   - The executor that owns the worker has been collected OR
            #   - The executor that owns the worker has been shutdown.
            """
            1、编译器是否关闭
            2、executor 是否被回收
            3、executor._shutdown 被设置
            """
            if _shutdown or executor is None or executor._shutdown:
                # 通知其他线程的 worker
                work_queue.put(None)
                return
            del executor
    except BaseException:
        _base.LOGGER.critical('Exception in worker', exc_info=True)
备注

期物的使用标准流程

with futures.ThreadPoolExecutor(10) as executor:
    res = executor.map(func, para_data_list)  # func 是要调用的函数,para_data_list 是参数 list
  • 分析
  • ThreadPoolExecutorProcessPoolExecutor 均继承自 concurrent.futures.Executor,因其实现了 __enter__, __exit__方法,故可以使用 with语法
  • 使用 .map() 方法会调用 .submit() 方法;
  • .submit() 方法:
    • 将函数func + future绑定生成 _WorkItem 实例对象 w
    • w添加到队列 _work_queue
    • 并且创建线程执行 _worker 方法(此处的创建线程是指会最多创建如上例10个线程去执行);
  • _worker() 方法:
    • _work_queue队列中获取 _WorkItem对象 w,执行其 w.run()方法
  • 返回值 res 是生成器,使用迭代获取函数返回的值;
  • future.result() 会阻塞调用。
    原文作者:宝宝家的隔壁老王
    原文地址: https://www.jianshu.com/p/015eea593d28
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞