python – 使用dask作为任务调度来并行运行机器学习模型

基本上我想要的是并行运行ML Pipelines.

我一直在使用scikit-learn,我决定使用DaskGridSearchCV.

我有一个gridSearchCV = DaskGridSearchCV(管道,网格,评分=评估者)对象的列表,我按顺序运行每个对象:

for gridSearchCV in list:
    gridSearchCV.fit(train_data, train_target)
    predicted = gridSearchCV.predict(test_data)

如果我有N个不同的GridSearch对象,我希望尽可能多地利用所有可用资源.如果有资源同时并行运行2,3,4,…或N,我想这样做.

所以我开始尝试基于dask文档的一些东西.首先我尝试了dask.threaded和dask.multiprocessing,但它最终变慢了,我不断得到:

/Library/Python/2.7/site-packages/sklearn/externals/joblib/parallel.py:540:UserWarning:多处理支持的并行循环不能嵌套在线程下面,设置n_jobs = 1

这是代码片段:

def run_pipeline(self, gs, data):

    train_data, test_data, train_target, expected = train_test_split(data, target, test_size=0.25, random_state=33)

    model = gs.fit(train_data, train_target)
    predicted = gs.predict(test_data)


values = [delayed(run_pipeline)(gs, df) for gs in gs_list]
compute(*values, get=dask.threaded.get)

也许我接近这个错误的方式,你对我有什么建议吗?

最佳答案

Yes, but I have a list of GridSearch objects, for example one using DecisionTree and another with RandomForest. And I wanna run them in parallel as long as there are resources for it.

如果这是你的目标,我会将它们全部合并到同一个网格中. Scikit-Learn Pipelines支持跨步骤的网格搜索,这将允许您仅在单个GridSearchCV对象中进行搜索(有关此示例,请参阅scikit-learn文档,请参阅here).如果您只有一个估算器(而不是管道),则可以使用具有单个步骤的管道作为代理.例如:

from sklearn.tree import DecisionTreeClassifier
from sklearn.ensemble import RandomForestClassifier
from sklearn.pipeline import Pipeline
import dask_searchcv as dcv

pipeline = Pipeline([('est', DecisionTreeClassifier())])

grid = [
    {'est': [DecisionTreeClassifier()],
     'max_features': ['sqrt', 'log2'],
     # more parameters for DecisionTreeClassifier
    },
    {'est': [RandomForestClassifier()],
     'max_features': ['sqrt', 'log2'],
     # more parameters for RandomForesetClassifier
    },
    # more estimator/parameter subsets
]

gs = dcv.GridSearchCV(pipeline, grid)
gs.fit(train_data, train_target)
gs.predict(test_data)

请注意,对于此特定情况(所有估算器共享相同的参数,您可以合并网格:

grid = {'est': [DecisionTreeClassifier(), RandomForestClassifier()],
        'max_features': ['sqrt', 'log2'],
        # more parameters for all estimators}

至于为什么你的延迟示例不起作用 – dask.delayed用于包装不调用dask代码的函数.由于您在延迟函数(也使用dask进行计算)中调用了dask_searchcv.GridSearchCV对象(使用dask进行计算),因此您将调用dask调度程序,这可能会导致性能最差,最糟糕的是奇怪的错误.

点赞