多线程 – Python 3.5 asyncio在不同线程中的同步代码的事件循环上执行协程

我希望有人可以帮助我.

我有一个对象,能够具有返回协程对象的属性.这很好用,但是我需要在单独的线程中从同步代码获取coroutine对象的结果,而事件循环当前正在运行.我想出的代码是:

def get_sync(self, key: str, default: typing.Any=None) -> typing.Any:
    """
    Get an attribute synchronously and safely.

    Note:
        This does nothing special if an attribute is synchronous. It only
        really has a use for asynchronous attributes. It processes
        asynchronous attributes synchronously, blocking everything until
        the attribute is processed. This helps when running SQL code that
        cannot run asynchronously in coroutines.

    Args:
        key (str): The Config object's attribute name, as a string.
        default (Any): The value to use if the Config object does not have
            the given attribute. Defaults to None.

    Returns:
        Any: The vale of the Config object's attribute, or the default
        value if the Config object does not have the given attribute.
    """
    ret = self.get(key, default)

    if asyncio.iscoroutine(ret):
        if loop.is_running():
            loop2 = asyncio.new_event_loop()
            try:
                ret = loop2.run_until_complete(ret)

            finally:
                loop2.close()
        else:
            ret = loop.run_until_complete(ret)

    return ret

我正在寻找的是一种在多线程环境中同步获取协程对象结果的安全方法.对于我设置为提供它们的属性,self.get()可以返回一个协程对象.我发现的问题是:如果事件循环正在运行.在堆栈溢出和其他一些站点上搜索了几个小时之后,我的(破碎的)解决方案就在上面.如果循环正在运行,我创建一个新的事件循环并在新的事件循环中运行我的协同程序.这是有效的,除了代码永远挂在ret = loop2.run_until_complete(ret)行上.

现在,我有以下带有结果的场景:

> self.get()的结果不是协程

>返回结果. [好]

> self.get()的结果是一个协程&事件循环未运行(基本上与事件循环在同一个线程中)

>返回结果. [好]

> self.get()的结果是一个协程&事件循环正在运行(基本上在与事件循环不同的线程中)

>永远等待结果. [坏]

有谁知道如何修复坏结果,这样我才能得到我需要的价值?谢谢.

我希望我在这里有所了解.

我确实有一个好的,有效的理由来使用线程;特别是我使用的SQLAlchemy不是异步的,我将SQLAlchemy代码发送到ThreadPoolExecutor来安全地处理它.但是,我需要能够从这些线程中查询这些异步属性,以便SQLAlchemy代码安全地获取某些配置值.不,我不会为了完成我需要的东西而从SQLAlchemy转移到另一个系统,所以请不要提供它的替代品.该项目太过遥远,无法切换一些如此重要的东西.

我尝试使用asyncio.run_coroutine_threadsafe()和loop.call_soon_threadsafe(),但都失败了.到目前为止,它已经最大限度地使其工作,我觉得我只是遗漏了一些明显的东西.

当我有机会时,我会编写一些代码来提供问题的示例.

好的,我实现了一个示例案例,它按照我期望的方式工作.所以我的问题很可能是代码中的其他地方.保持开放状态,如果需要,将改变问题以适应我的真实问题.

有没有人有任何想法为什么来自asyncio.run_coroutine_threadsafe()的concurrent.futures.Future会永远挂起而不是返回结果?

不幸的是,我的示例代码不会重复我的错误,如下所示:

import asyncio
import typing

loop = asyncio.get_event_loop()

class ConfigSimpleAttr:
    __slots__ = ('value', '_is_async')

    def __init__(
        self,
        value: typing.Any,
        is_async: bool=False
    ):
        self.value = value
        self._is_async = is_async

    async def _get_async(self):
        return self.value

    def __get__(self, inst, cls):
        if self._is_async and loop.is_running():
            return self._get_async()
        else:
            return self.value

class BaseConfig:
    __slots__ = ()

    attr1 = ConfigSimpleAttr(10, True)
    attr2 = ConfigSimpleAttr(20, True)    

    def get(self, key: str, default: typing.Any=None) -> typing.Any:
        return getattr(self, key, default)

    def get_sync(self, key: str, default: typing.Any=None) -> typing.Any:
        ret = self.get(key, default)

        if asyncio.iscoroutine(ret):
            if loop.is_running():
                fut = asyncio.run_coroutine_threadsafe(ret, loop)
                print(fut, fut.running())
                ret = fut.result()
            else:
                ret = loop.run_until_complete(ret)

        return ret

config = BaseConfig()

def example_func():
    return config.get_sync('attr1')

async def main():
    a1 = await loop.run_in_executor(None, example_func)
    a2 = await config.attr2
    val = a1 + a2
    print('{a1} + {a2} = {val}'.format(a1=a1, a2=a2, val=val))
    return val

loop.run_until_complete(main())

这是我的代码正在做的精简版,并且该示例有效,即使我的实际应用程序没有.我被困在哪里寻找答案.欢迎建议在哪里尝试追踪我的“永远卡住”问题,即使我上面的代码实际上没有复制问题.

最佳答案 您不太可能同时运行多个事件循环,因此这部分看起来非常错误:

    if loop.is_running():
        loop2 = asyncio.new_event_loop()
        try:
            ret = loop2.run_until_complete(ret)

        finally:
            loop2.close()
    else:
        ret = loop.run_until_complete(ret)

即使测试循环是否正在运行似乎也不是正确的方法.最好将(仅)运行循环显式给get_sync并使用run_coroutine_threadsafe调度协程:

def get_sync(self, key, loop):
    ret = self.get(key, default)
    if not asyncio.iscoroutine(ret):
        return ret
    future = asyncio.run_coroutine_threadsafe(ret, loop)
    return future.result()

编辑:挂起问题可能与在错误循环中调度的任务有关(例如,在调用协程时忘记了可选的循环参数).使用PR 303(现在合并)应该更容易调试这种问题:当循环和未来不匹配时,会引发RuntimeError.因此,您可能希望使用最新版本的asyncio运行测试.

点赞