mysql实时数据采集

0、集群环境介绍

10.20.201.51  namenode   resourcemanager       hmaster               spark

10.20.201.52  namenode   resourcemanager       hmaster               spark

10.20.201.53  datanode     nodemanager             hregionserver      spark   zookeeper   kafka     kafkaconnect    

10.20.201.54  datanode     nodemanager             hregionserver      spark   zookeeper   kafka     kafkaconnect

10.20.201.55  datanode     nodemanager             hregionserver      spark   zookeeper   kafka     kafkaconnect

10.20.201.56   mysql   hive  

192.168.112.201   flume        kafkaconnect

1.zookeeper集群

启动   zkServer.sh start

停止   zkServer.sh stop

察看状态  zkServer.sh status

2.kafka集群

启动    53 54  55三台执行同样的命令

/bdapps/kafka/bin/kafka-server-start.sh /bdapps/kafka/config/server.properties &

停止    53 54  55三台执行同样的命令

/bdapps/kafka/bin/kafka-server-stop.sh

topic操作

/bdapps/kafka/bin/kafka-topics.sh –list –zookeeper hdfsaha:2181,hdfsbha:2181,hdfscha:2181

/bdapps/kafka/bin/kafka-topics.sh –describe –zookeeper hdfsaha:2181,hdfsbha:2181,hdfscha:2181  –topic mysqltopic

/bdapps/kafka/bin/kafka-topics.sh –delete –zookeeper hdfsaha:2181,hdfsbha:2181,hdfscha:2181 –topic mysqltopic

/bdapps/kafka/bin/kafka-topics.sh –create –zookeeper hdfsaha:2181,hdfsbha:2181,hdfscha:2181  –replication-factor 3 –partitions 5 –topic mysqltopic

/bdapps/kafka/bin/kafka-console-productor.sh  –bootstrap-server hdfsaha:9092,hdfsaha:9092,hdfsaha:9092 –topic mysqltopic   

/bdapps/kafka/bin/kafka-console-consumer.sh  –bootstrap-server hdfsaha:9092,hdfsaha:9092,hdfsaha:9092 –topic mysqltopic    –from-beginning

3.数据采集(kafka生产端   192.168.112.201)

3.1 flume jdbc 插件采集mysql数据

flume启动命令

/bdapps/flume/bin/flume-ng agent –conf /bdapps/flume/conf/ –conf-file /bdapps/flume/conf/flume-kafka-query.conf –name producer3 -Dflume.root.logger=INFO,console

3.2 kafka connect  jdbc采集mysql数据

cd /bdapps/confluent-4.0.0

nohup ./bin/connect-standalone ./etc/kafka/connect-standalone.properties ./etc/kafka-connect-jdbc/connect-mysql-source.properties > connect.out 2>&1

4.数据消费

4.1 kafka connect集群

启动   53  54  55三台执行同样的命令

/bdapps/confluent/bin/connect-distributed ./etc/kafka/connect-distributed.properties

connect操作示例

curl localhost:8083/connectors | jq

curl localhost:8083/connector-plugins | jq

curl localhost:8083/connectors/test-mysql-sink | jq

curl localhost:8083/connectors/test-mysql-sink/config | jq

curl localhost:8083/connectors/test-mysql-sink/status | jq

curl localhost:8083/connectors/local-file-sink/tasks | jq

curl localhost:8083/connectors/test-mysql-sink/tasks/0/status | jq

curl -X POST localhost:8083/connectors/test-mysql-sink/restart

curl -X POST localhost:8083/connectors/test-mysql-sink/tasks/0/restart

curl -X PUT localhost:8083/connectors/local-file-sink/pause

curl -X PUT localhost:8083/connectors/local-file-sink/resume

curl -X DELETE localhost:8083/connectors/local-file-sink

curl -X POST -H “Content-Type: application/json” –data ‘{“name”: “test-mysql-sink-historysignaltemperature”, “config”: {“connector.class”:”io.confluent.connect.jdbc.JdbcSinkConnector”, “tasks.max”:”1″, “topics”:”sg_bx_historysignaltemperature”,”connection.url”:”jdbc:mysql://10.20.201.56:3306/test”,”connection.user”=”test”,”connection.password”=”Abc123456″,”auto.create”=”false”,”insert.mode”=”upsert”,”pk.mode=”record_value”,”pk.fields”=”RECORDTIME,EQUIPMENTID,SIGNALID” }}’ http://localhost:8083/connectors | jq

curl -X POST -H “Content-Type: application/json” -d  @/bdapps/confluent-4.0.0/etc/kafka-connect-jdbc/connect-mysql-source-historysignal.json  http://localhost:8083/connectors  | jq

curl -X POST -H “Content-Type: application/json” -d  @/bdapps/confluent-4.0.0/etc/kafka-connect-jdbc/connect-mysql-source-historysignalpower.json  http://localhost:8083/connectors  | jq

4.2 spark streaming

    原文作者:在路上很久了
    原文地址: https://www.jianshu.com/p/b9810740c5b5
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞