如何使用Future与dask.distrubuted(Python库)中的Executor的map方法?

我正在运行
dask.distributed集群.

我的任务包括链式计算,其中最后一步是使用Executor.map方法对先前步骤创建的列表进行并行处理.列表的长度不是预先知道的,因为它是在计算期间从中间结果生成的.

代码如下所示:

from distributed import Executor, progress


def process():
    e = Executor('{address}:{port}'.format(address=config('SERVER_ADDR'),
                                           port=config('SERVER_PORT')))
    futures = []
    gen_list1 = get_list_1()
    gen_f1 = e.map(generate_1, gen_list1)
    futures.append(gen_f1)

    gen_list2 = get_list_2()
    gen_f2 = e.map(generate_2, gen_list2)
    futures.append(gen_f2)

    m_list = e.submit(create_m_list)  # m_list is created from gen_list1 and gen_list2
                                      # some results of processing are stored in the database
                                      # and create_m_list doesn't need additional arguments
    futures.append(m_list)

    m_result = e.map(process_m_list, m_list)
    futures.append(m_result)

    return futures

if __name__ == '__main__':
    r = process()
    progress(r)

但是,我收到错误TypeError:zip参数#1必须支持迭代:

File "F:/wl/under_development/database/jobs.py", line 366, in start_job
  match_result = e.map(process_m_list, m_list)
File "C:\Anaconda\lib\site-packages\distributed\executor.py", line 672, in map
  iterables = list(zip(*zip(*iterables)))
TypeError: zip argument #1 must support iteration

gen_list1和gen_list2是独立计算的,但m_list是从gen_list1和gen_list2创建的,因此取决于它们.

我也尝试调用m_list的.result()方法,但是,它已经阻止了函数进程,直到gen_list1和gen_list2的计算完成.

我也尝试过调用m_list的异步方法._result,但它产生了同样的错误“zip参数#1必须支持迭代”.使用dask.delayed(m_result = e.map(process_m_list,delayed(m_list)))获得了相同的错误.

dask.distributed的文档在这方面含糊不清,示例仅提及已存在的实际列表对象.但是,SO中的其他帖子以及Google都认为它应该是可行的.

这是我的Python发行版的版本字符串

Python 2.7.11 | Anaconda custom(64位)| (默认情况下,2016年2月16日,09:58:36)[winv上的MSC v.1500 64位(AMD64)]

最佳答案 问题的症结似乎在这里:

m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list)

你是不对的,你不能将功能映射到个人的未来.你需要传递一个序列.在不了解您的数据的情况下,Dask不知道要提交多少函数.在将来调用.result()将是一个很好的解决方案:

m_list = e.submit(create_m_list)
m_result = e.map(process_m_list, m_list.result())

I’ve also tried calling .result() method of m_list, however, it has blocked the function process until computations of gen_list1 and gen_list2 have finished.

那是对的.没有任何附加信息,调度程序将更喜欢先前提交的计算.您可以先提交create_m_list函数,然后提交额外的comptuations,然后等待create_m_list结果来解决此问题.

m_list = e.submit(create_m_list)                   # give this highest priority
f1 = e.map(generate_1, get_list_1())
f2 = e.map(generate_2, gen_list_2())

L = m_list.result()                                # block on m_list until done
m_result = e.map(process_m_list, L)                # submit more tasks

return [f1, f2, m_result]
点赞