支撑异步的Generator“递归”

Python的generator最经常运用的体式格局就是作为迭代器运用,在Python中,可迭代对象是异常的有用。然则generator远比迭代器来得壮大,从某版本最先,generator就具有send要领了,这使得generator具有了在实行历程当中吸收外部输入的值后继续实行的才能。

当我们持有一个generator以后,我们可以做什么呢?

我们可以获得它的输出,而且给它传值。这里,隐含了一种迥殊的形式,generator输出使命,而我们传入使命的实行效果。这个历程看起来就像一次函数挪用一样:)

这里最症结的一点是,我们持有了使命的处置惩罚历程,而generator并不体贴使命是怎样被处置惩罚的。实际上,我们可以直接实行,或许丢给异步引擎,或许丢到线程池,或许痛快提交到长途机械上实行。我们把这个分配使命的部份叫做调理器吧:)

递归

假如generator天生的使命自身也是generator呢?我们应该怎样让generator完成递归呢?

怎样返回值?

实在我们直接用yield返回值即可。然则我们须要把使命和值辨别开来,由于python并没有供应什么有用的体式格局能把这两者星散。所以我们经由过程yield出的值的范例推断即可,我们可以定义一个接口,一切持有某个要领的对象都是使命。

调理器的逆境

这里有一个主要的题目是,也就是generator的下一次操纵,取决于使命是不是完成。而这个操纵是由使命决议的,调理器没法做到这一点。这致使一个题目,调理器不能直接掌握generator的实行,它须要把掌握的操纵下传给使命,让使命在完毕后自动完成这个操纵。只要如许,调理器才不须要自力的线程或许分外的体式格局举行掌握,由于它的触发是被动的。

实行使命

使命的实行须要接收一个callback和一个error_callback函数,当使命完成的时刻,它执callback,涌现毛病则实行error_callback

我们须要把关于generator的掌握封装到一个callback中,使得使命可以挪用这个函数,完成它的功用。我们可以把重试的函数作为error_callback传入使命,这使得使命在失利以后可以被重试。

完成

下面是一个完成。包括了以下的一些特征。

  • 调理器可以作为装潢器运用(假如疏忽毛病和返回值)
  • 关于一般的函数,它可以直接返回效果(传递给callback)
  • 关于一般的generator输出,它能把generator的输出作为参数传递给callback
  • 一切的使命对象都包括一个dojob要领,它接收callbackerr_callback,用于完成使命
  • 使命包括了重试机制,当使命失利次数到达限额以后,全部使命会直接失利(全部递归历程)

RecTask使命

  • 默许的RecTask范例,是通例的python函数挪用,包括函数和它的参数。
  • RecTask要领包括一个transform要领,用于对使命函数举行变更(完全是一个商定)。这个要领不会修正原始的使命函数,由于一个可变对象在反复挪用的历程当中会涌现难以估计的题目。
  • RecTaskrun要领,用于接收新的使命函数(假如是None则直接实行原始的函数)
  • 经由过程继续RecTask,可以组织别的的使命实行体式格局。比方经由过程异步引擎来实行使命。这也使得异步架构可以完成一些递归使命:)

import sys def rec_gen(func, callback=None, err_callback=None): ''' callback: run after func finish ''' def trans_func(*args, **kwargs): def error_do(e): print('@rec_func_error:', e, file=sys.stderr) if err_callback is not None: err_callback() try: g = func(*args, **kwargs) except Exception as e: error_do(e) return if not isinstance(g, types.GeneratorType): #return if g is not generator if callback is not None: callback(g) return ans = [] def go_through(it=None): try: em = g.send(it) if not hasattr(em, 'dojob'): ans.append(em) go_through(None) else: try_count = 0 def todo_next(try_limit=10): ''' child proc with retry ''' nonlocal try_count if try_count < try_limit: try_count += 1 if try_count > 1: print('@retry:# try_count: %s' % try_count, file=sys.stderr) em.dojob(callback=go_through, err_callback=todo_next) else: #raise Exception('can not recover error after %s retrys' % try_limit) print('@rec_recover_failed: after try %s time at one node' % try_limit, file=sys.stderr) print('@task is complete failed:(') if err_callback is not None: #let the task tree fail g.close() err_callback(0) todo_next(10) except StopIteration as st: if callback is not None: callback(*ans) return except Exception as e: g.close() error_do(e) return go_through() return trans_func from functools import partial class RecTask(object): def __init__(self, func, *args, **kwargs): self.func = func self.args = args self.kwargs = kwargs def dojob(self, callback=None, err_callback=None): self.run(self.transform(partial(rec_gen, callback=callback, err_callback=err_callback))) def transform(self, f): return f(self.func) def run(self, func=None): if func is None: func = self.func return func(*self.args, **self.kwargs) if __name__ == '__main__': sys.setrecursionlimit(10000000) def fib(n): if n <= 1: yield n else: yield (yield RecTask(fib, n-1)) + (yield RecTask(fib, n-2)) pfib = rec_gen(fib, lambda x: print(x)) for i in range(15): pfib(i)

送上一段fibonacci序列的代码。你可以纯真的把yield RecTask算作apply。这个顺序的一个题目是,它对栈空间斲丧迥殊大:)

sys.setrecursionlimit(10000000)

def fib(n):
    if n <= 1:
        yield n
    else:
        yield (yield RecTask(fib, n-1)) + (yield RecTask(fib, n-2))

pfib = rec_gen(fib, lambda x: print(x))
for i in range(15):
    pfib(i)

一个典范的异步HTTPTask

我们假定sender(request, callback)是一个异步接口。那末我们的异步Task使命以下:)

class HTTPTask(RecTask):
    def __init__(self, sender, req, callback):
        self.sender = sender
        self.req = req
        self.callback = callback

    def transform(self, f):
        return f(self.callback)

    def run(self, callback=None):
        if callback is None:
            callback = self.callback
        self.sender(self.req, callback)


    原文作者:Earthson_Lu
    原文地址: https://segmentfault.com/a/1190000000341143
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞