可重现的错误
我尝试在在线REPL here中重现错误.但是,它与我的真实代码(我在position_stream()中的响应异步,而不是在count()中的位置实现(并因此行为)不完全相同在REPL中).
关于我实际实施的更多细节
我在某处定义了一个协程:
async def position(self):
request = telemetry_pb2.SubscribePositionRequest()
position_stream = self._stub.SubscribePosition(request)
try:
async for response in position_stream:
yield Position.translate_from_rpc(response)
finally:
position_stream.cancel()
其中position_stream是无限的(或者可能是非常持久的).我从这样的示例代码中使用它:
async def print_altitude():
async for position in drone.telemetry.position():
print(f"Altitude: {position.relative_altitude_m}")
和print_altitude()在循环上运行:
asyncio.ensure_future(print_altitude())
asyncio.get_event_loop().run_forever()
这很好用.现在,在某些时候,我想关闭来自调用者的流.我以为我可以运行asyncio.ensure_future(loop.shutdown_asyncgens())并等待我的最后关闭以上被调用,但它不会发生.
相反,我收到一个未经检测的异常的警告:
Task exception was never retrieved
future: <Task finished coro=<print_altitude() done, defined at [...]
为什么会这样,我怎样才能使所有异步生成器实际关闭(并运行它们的finally子句)?
最佳答案 首先,如果你停止循环,你的协同程序都没有机会正常关闭.呼叫关闭基本上意味着不可逆转地破坏循环.
如果您不关心那些正在运行的任务会发生什么,您可以简单地取消它们,这也将停止异步生成器:
import asyncio
from contextlib import suppress
async def position_stream():
while True:
await asyncio.sleep(1)
yield 0
async def print_position():
async for position in position_stream():
print(f'position: {position}')
async def cleanup_awaiter():
await asyncio.sleep(3)
print('cleanup!')
if __name__ == '__main__':
loop = asyncio.get_event_loop()
try:
asyncio.ensure_future(print_position())
asyncio.ensure_future(print_position())
loop.run_until_complete(cleanup_awaiter())
# get all running tasks:
tasks = asyncio.gather(*asyncio.Task.all_tasks())
# schedule throwing CancelledError into the them:
tasks.cancel()
# allow them to process the exception and be cancelled:
with suppress(asyncio.CancelledError):
loop.run_until_complete(tasks)
finally:
print('closing loop')
loop.close()