from tornado.concurrent import Future
def async_call_method(fun, *args, **kwargs):
future = Future()
// 定义一个闭包 finish
def finish():
try:
result = fun(*args, **kwargs)
if future._callbacks:
IOLoop.current().add_callback(future.set_result, result)
else:
future.set_result(result)
except:
if future._callbacks:
IOLoop.current().add_callback(future.set_exc_info, sys.exc_info())
else:
future.set_exc_info(sys.exc_info())
child_gr = greenlet.greenlet(finish)
child_gr.switch()
return future
tornado 相关官方文档
Future 是一种用于并发编程的模式,首次引入是在 python 3.2 的 concurrent.futures 模块。
Future 对象是一个对于异步返回结果的占位符。
一个 Future 对象包含了一次异步操作的结果。在同步编程中,Futures 被用于等待从一个线程池或进程池里返回的结果;在 tornado 中,future 通常被用在 IOLoop.add_future 或者在一个 gen.coroutine 函数中 yielding 它们。
tornado.concurrent.Future 和 concurrent.futures.Future 相似,但是其不是线程安全的(因此,在单线程事件循环应用在速度更快)
async_call_method() 的来源
经过一番搜索,查询到 async_call_method()
这个函数来自于 github.com/snower/TorMySQL.
经过对该项目代码的仔细阅读,我发现了它是如何实现了 mysql 的异步操作。
tormysql.client.connect()
...
def connect(self):
# 入口函数
# 设置 future 占位符
future = Future()
# 定义回调函数
def on_connected(connection_future):
if connection_future._exc_info is None:
future.set_result(self)
else:
future.set_exc_info(connection_future.exc_info())
self._connection = Connection(defer_connect = True, *self._args, **self._kwargs)
self._connection.set_close_callback(self.connection_close_callback)
# 用 greenlet 包装 self._connection.connect 并返回 future
# 要使 async_call_method 包装后的函数有非阻塞的特性,必须达成以下要求
# 1. 函数可以访问 父greenlet
# 2. 函数中所有 IO 操作均支持非阻塞(比如: 非阻塞由 socket 的 non-blocking 特性支持)
# 3. 函数中执行 IO 操作后立即将运行权交还给主函数(父greenlet, 如:ioloop 时间循环)(greenlet.switch)
# 4. 函数中所有 IO 操作均返回 Future
# 5. Future.callback 运行后立即将运行权(greenlet.switch)返回给当前函数(greenlet.current),完成当前函数的剩余部分
connection_future = async_call_method(self._connection.connect)
# 当 connection_future 状态为 finished, 调用 on_connected()
# finished => 调用 connection_future.set_result()
IOLoop.current().add_future(connection_future, on_connected)
return future
...
self._connection.connect()
...
# IOStream 基于 tornado.iostream.IOStream
sock = IOStream(sock)
sock.set_close_callback(self.stream_close_callback)
# getcurrent() 返回包装了当前函数的 greenlet
child_gr = greenlet.getcurrent()
# main 是指 父greenlet(主函数, 时间循环?)
main = child_gr.parent
assert main is not None, "Execut must be running in child greenlet"
...
def connected(future):
if self._loop_connect_timeout:
self._loop.remove_timeout(self._loop_connect_timeout)
self._loop_connect_timeout = None
if future._exc_info is not None:
child_gr.throw(future.exception())
else:
self._sock = sock
# 将运行权交还给当前 greenlet
child_gr.switch()
# IOStream.connect 是 no-blocking 的 socket 操作
future = sock.connect(address)
# 给 sock.connect 操作添加回调函数
self._loop.add_future(future, connected)
# 然后把运行权交还给 父greenlet
# 直到连接成功,connected() 中会将运行权交还给 当前greenlet
main.switch()
...
结论
要使 async_call_method 包装后的函数有非阻塞的特性,必须达成以下要求
函数可以访问 父greenlet
函数中所有 IO 操作均支持非阻塞(比如: 非阻塞由 socket 的 non-blocking 特性支持)
函数中执行 IO 操作后立即将运行权交还给主函数(父greenlet, 如:ioloop 时间循环)(greenlet.switch)
函数中所有 IO 操作均返回 Future
Future.callback 运行后立即将运行权(greenlet.switch)返回给当前函数(greenlet.current),完成当前函数的剩余部分
async_call_method 包装后的函数要实现非阻塞,最终还是依赖于 socket 的非阻塞
=> socket.setblocking(False)
。
github.com/snower/TorMySQL
中于 mysql
的交互全部通过 IOStream()
的以下方法实现:
* def _handle_events(self, fd, events): # ioloop 在事件发生时调用 _handle_events
* def _handle_connect(self):
* def _handle_read(self): # 当事件为读取事件时,读取数据到 buffer, 然后 future.set_result(data)
* def _handle_write(self): # 当事件为写事件时,读取数据到 buffer, 然后 future.set_result(data)
* def read(self, num_bytes):
* def write(self, data):
通过对上述方法进行 设置 future 占位符
,并基于 non-blocking socket 实现上述方法的非阻塞。