使用NiFi将数据从Mysql导入至HBase

1. 启动并登录NiFi

1.1 在本机启动nifi

./bin/nifi.sh start

1.2 登录nifi

打开浏览器,访问xxx.xxx.xxx.xxx:8080(默认端口是8080)

2. 构建Processor

2.1 ExecuteSQL

拖一个Processor到面板中,选择ExecuteSQL类型

在processor上点击右键–>选择configure–>选择properties

《使用NiFi将数据从Mysql导入至HBase》 image.png

选择Database Connection Pooling Service属性值为:DBCPConnectionPool,然后再点击右侧的箭头,进入项目的Configuration界面,配置数据库连接的具体信息。

《使用NiFi将数据从Mysql导入至HBase》 image.png

在项目的Configuration页面中,选择新生成的DBCPConnectionPool条目,点击右侧的齿轮按钮,进行相关配置。

《使用NiFi将数据从Mysql导入至HBase》 image.png

依次填写:

  • Database Connection URL:

    数据库连接字符串,如:
    jdbc:mysql://localhost:3306/test?useUnicode=true&characterEncoding=utf8&useSSL=true

  • Database Driver Class Name:

    数据库连接驱动类,这里是:com.mysql.jdbc.Driver

  • Database Driver Location(s):

    数据库连接驱动jar包的位置,如:/home/hadoop/nifi/mysql-connector-java-5.1.39-bin.jar

  • Database User:

    数据库用户名,如:root

  • Password:

    数据库密码,如:root

回到ExecuteSQL的properties,设置SQL select query为想要查询的SQL语句,设置Max Wait Time为10 seconds。

《使用NiFi将数据从Mysql导入至HBase》 image.png

2.2 ConvertAvroToJson

从ExecuteSQL里出来的是avro格式的数据,要先将其转化成json格式,再导入HBase。拖一个ConvertAvroToJson Processor到界面。然后,从ExecuteSQL连一条线到ConvertAvroToJson,关系为success。

《使用NiFi将数据从Mysql导入至HBase》 image.png

2.3 SplitJson

从上一步输出的数据是由多条记录构成的整体,需要将其分割成独立的单条数据,再依次导入HBase。

拖入一个SplitJson processor到界面中,然后从ConvertAvroToJson连一条线到SplitJson,关系为success。

配置SplitJson,在properties页,将JsonPath Expression设置为*

《使用NiFi将数据从Mysql导入至HBase》 image.png

2.4 PutHbaseJson

这一步将分割好的json格式的Mysql记录添加到HBase中。

拖入一个PutHbaseJson processor到界面中,右键点击,选择configure,进行配置。

在properties页,配置HBase Client Service为 HBase_1_1_2_ClientService

《使用NiFi将数据从Mysql导入至HBase》 image.png

点击右侧的小箭头,增加HbaseClientService,然后点击齿轮按钮,在properites中依次设置:

  • ZooKeeper Quorum :

    Zookeeper的地址,如:192.168.2.xxx

  • Zookeeper client port:

    Zookeeper的端口,一般是:2181

  • Zookeeper znode parent:

    Zookeeper中的znode,一般是:/hbase

《使用NiFi将数据从Mysql导入至HBase》 image.png
《使用NiFi将数据从Mysql导入至HBase》 image.png

配置完成后,点击小闪电,让配置生效

《使用NiFi将数据从Mysql导入至HBase》 image.png

回到PutHbaseJson的properties,配置RowIdentifier。依次设置:

  • Table Name:

    数据导入到HBase的表名称,需要是已存在的表

  • Row Identifier Field Name:

    作为HBase表行键的字段名称,一般设置为mysql中的主键

  • Row Identified Encoding Strategy:

    行键编码方式,根据行键类型选择String或Binary

  • Column Famlily:

    数据导入到HBase表的列族名称

《使用NiFi将数据从Mysql导入至HBase》 image.png

从SplitJosn连一条线到PutHbaseJson,关系为split

2.5 LogAttribute

拖入一个LogAttribute到界面中,从其他所有processor连一条线到LogAttribute,关系全部选则为failure,其中splitJson的original关系也连接到LogAttribute。

《使用NiFi将数据从Mysql导入至HBase》 image.png

接下来定义数据流的终止方式:

(1)点击LogAttribute的confiure,在setting标签中,勾选右侧Automatically Terminate Relationships的success选项。

《使用NiFi将数据从Mysql导入至HBase》 image.png

(2)点击PutHbaseJson的confiure,在setting标签中,勾选右侧Automatically Terminate Relationships的success选项。

《使用NiFi将数据从Mysql导入至HBase》 image.png

3. 检查并开始导入

完成以上步骤后,整体结构图类似于下图所示:

《使用NiFi将数据从Mysql导入至HBase》 image.png

点击空白处,再点击左侧Operate面板的齿轮按钮,检查数据库连接

《使用NiFi将数据从Mysql导入至HBase》 image.png

确保mysql与hbase的连接都处于enable状态,如果没有,则可以查看左侧的错误提示,根据提示修改错误。

《使用NiFi将数据从Mysql导入至HBase》 image.png

最后,根据需要设置ExecuteSQL processor的scheduling选项,默认的执行间隔是0秒,即不间断的执行SQL语句,会导致从Mysql中读出大量重复数据。如果仅仅需要将一次SQL查询的结果导入HBase,建议将该值设置大一些,等待执行完毕后手动结束即可;如果需要定期执行,则应设置合适的执行间隔时间。

《使用NiFi将数据从Mysql导入至HBase》 image.png

以上检查完成后,在Operate面板中点击run按钮,开始进行从mysql到hbase的转换,转换完成后,点击stop按钮停止转换。也可单独控制每一个Processor的启动与停止状态,以便调试。

《使用NiFi将数据从Mysql导入至HBase》 image.png

    原文作者:涛O_O
    原文地址: https://www.jianshu.com/p/0ff537f93450
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞