使用Luigi來搭建Data Pipeline

为什么需要Luigi?

常见的资料处理流程可租略分成以下几步:

  1. 预处理: 整合不同来源的数据, 筛选相关的数据, 清洗数据, 标准化(normalization) 等处理
  2. 模型训练: 透过一些机器学习算法来建立模型
  3. 呈现或预测: 将训练得到的模型用于分析模式或是预测

刚开始你的资料处理流程可能是像这样依序地执行脚本

$ python get_data.py
$ python clean_data.py
$ python normalize.py
$ python train.py
$ python predict.py

稍微懒一点的人于是会把所有脚本打包进一个脚本run_all.py

if __name__ == '__main__':
    get_data()
    clean_data()
    normalize()
    train()
    predict()
$ python run_all.py

但是如果想要做好错误处理(error handling), 脚本可能就会变成像这样唠叨的代码

try:
    get_data()
    try:
        clean_data()
        try:
            # ...
        except MoreErrors:
            # ...
    except CleanDataError as e:
        # handle CleanDataError
except GetDataError as e:
    # handle GetDataError

一旦步骤切分得更细或是步骤越来越多时, 错误处理变成一项繁琐的工作, 这样的重複的结构也使得代码不易阅读. 一些框架如Luigi和Airflow能将程序员从这项劳动中解放出来, 提升开发的效率,

什么是Luigi?

Luigi是一套管理上述工作流程的python框架,让开发人员专注在数据处理的逻辑,而不必重複的撰写常用的代码, 例如前面举例的错误处理. 另外, 如果某个数据处理的步骤发生了错误或是更动,使用Luigi可以避免重新计算不被影响的结果.

安装Luigi

  1. 稳定版本 $ pip install luigi
  2. 最新版本 $ pip install git+https://github.com/spotify/luigi.git

运行Luigi

  1. 执行 Luigi daemon $ luigid
  2. 开启浏览器前往 http://localhost:8082
    原文作者:南郭先生
    原文地址: https://segmentfault.com/a/1190000015154667
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞