Kettle 增量导出MongoDB到Mysql表中

一、需求:

        将MongoDB表中的数据按照时间戳增量抽取到Mysql表中。

二、实现方式:

      1. kettle 

      2. python 脚本

三、遇到的问题:

         kettle 如何将增量时间作为变量传入到MongoDB Input 中?

         翻遍了百度,没有一篇文章能写明白的,本人将实现方式详尽记载下来,供大家参考!

四、具体实现:

        1. kettle实现

           实现思路:

                    step1: 先全量将MongoDB 导入到 Mysql 表中。(这个不难,百度中有实现。)

                    step2:创建第一个Transformation,获取Mysql 中最大时间字段 例如:Max(time),将其存入到Mysql的一张表中。

                        step2-1: 配置表输入(目的:为了取得全量表中时间字段最大值)

                        step2-2:设置变量

                        step2-3:将变量值输出到表T1中

                    step3:创建第二个Transformation,配置MongoDB Input -> Mysql

                            step3-1 表输入

                            step3-2 设置变量

                            step3-3 配置MongoDB Input

                            step3-4 配置字段选择

                            step3-5 表输出 

                    step4:创建作业执行step2 和 step3 两个转换。

            具体步骤如下:

            step1:先全量将MongoDB 导入到 Mysql 表中。(略)

mongo_test 表(全量表)

《Kettle 增量导出MongoDB到Mysql表中》 step1-1

            step2:创建第一个Transformation,获取Mysql 中最大时间字段 例如:Max(time),将其存入到Mysql的一张表中。

《Kettle 增量导出MongoDB到Mysql表中》 step2
《Kettle 增量导出MongoDB到Mysql表中》 step2-1 表输入
《Kettle 增量导出MongoDB到Mysql表中》 step2-2 设置变量
《Kettle 增量导出MongoDB到Mysql表中》 step2-3 将变量值保存到表t1中

《Kettle 增量导出MongoDB到Mysql表中》 step2 t1 表

step3 :创建第二个Transformation,配置MongoDB Input -> Mysql

《Kettle 增量导出MongoDB到Mysql表中》 step3

 step3-1 表输入

《Kettle 增量导出MongoDB到Mysql表中》

step3-2 设置变量

《Kettle 增量导出MongoDB到Mysql表中》

step3-3 配置MongoDB Input

《Kettle 增量导出MongoDB到Mysql表中》 注意写法

这里需要注意:STRAT_TIME是step3-2中的变量名,这样就可以获取到变量值

step3-4 配置字段选择

《Kettle 增量导出MongoDB到Mysql表中》

step3-5 表输出

《Kettle 增量导出MongoDB到Mysql表中》

step4:创建作业执行step2 和 step3 两个转换。

《Kettle 增量导出MongoDB到Mysql表中》

测试:

1. 当前t1 中的时间为:

        2018-05-19 00:00:00    

2. 向MongoDB中添加一条数据

> db.xk.insert({“name”:”发发发发发财”,”age”:”30″,”time”:”2018-05-20 00:00:00″})

3. 执行job

《Kettle 增量导出MongoDB到Mysql表中》

4. 查看结果:

《Kettle 增量导出MongoDB到Mysql表中》

以上结束:

python 脚本方式也很简单:

==============================================================

#coding=utf-8

# Python 实现 从MongoDB 想MySql 增量导数据

from pymongo import MongoClient

import pymysql

# mongodb

client = MongoClient(‘192.168.107.128’, 27017)

TempleSpider = client[‘xk’]

temple_comment_collect = TempleSpider[‘xk’]

# mysql

mysql = pymysql.connect(‘localhost’, ‘root’, ‘123456’, ‘test’, charset=”utf8″)

# insert sql

sql = ‘insert into mongo_test values(%s,%s,%s)’

# 批量提交

def batch_commit(connect, target_sql, result):

    cursor = connect.cursor()

    list = []

    i = 0

    try:

        for row in result:

            data = (row[‘name’].encode(‘utf-8’), row[‘age’], row[‘time’])

            print(row[‘name’])

            list.append(data)

            if i >= 1000:

                cursor.executemany(target_sql, list)

                connect.commit()

                print(‘1000 条插入成功…’)

                i = 0

                list.clear()

            i = i+1

        if i > 0:

            cursor.executemany(target_sql, list)

            connect.commit()

            print(‘%d 条插入成功……’ % i)

    except Exception as e:

        print(e)

        print(“增量数据导入失败!!!”)

        # 回滚

        connect.rollback()

records = temple_comment_collect.find({“time”: {“$gt”: “2018-05-15 00:00:00”}})

batch_commit(mysql, sql, records)

client.close()

mysql.close()

==============================================================

    原文作者:xukai5265
    原文地址: https://www.jianshu.com/p/6fa502b05e51
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞