0、集群环境介绍
10.20.201.51 namenode resourcemanager hmaster spark
10.20.201.52 namenode resourcemanager hmaster spark
10.20.201.53 datanode nodemanager hregionserver spark zookeeper kafka kafkaconnect
10.20.201.54 datanode nodemanager hregionserver spark zookeeper kafka kafkaconnect
10.20.201.55 datanode nodemanager hregionserver spark zookeeper kafka kafkaconnect
10.20.201.56 mysql hive
192.168.112.201 flume kafkaconnect
1.zookeeper集群
启动 zkServer.sh start
停止 zkServer.sh stop
察看状态 zkServer.sh status
2.kafka集群
启动 53 54 55三台执行同样的命令
/bdapps/kafka/bin/kafka-server-start.sh /bdapps/kafka/config/server.properties &
停止 53 54 55三台执行同样的命令
/bdapps/kafka/bin/kafka-server-stop.sh
topic操作
/bdapps/kafka/bin/kafka-topics.sh –list –zookeeper hdfsaha:2181,hdfsbha:2181,hdfscha:2181
/bdapps/kafka/bin/kafka-topics.sh –describe –zookeeper hdfsaha:2181,hdfsbha:2181,hdfscha:2181 –topic mysqltopic
/bdapps/kafka/bin/kafka-topics.sh –delete –zookeeper hdfsaha:2181,hdfsbha:2181,hdfscha:2181 –topic mysqltopic
/bdapps/kafka/bin/kafka-topics.sh –create –zookeeper hdfsaha:2181,hdfsbha:2181,hdfscha:2181 –replication-factor 3 –partitions 5 –topic mysqltopic
/bdapps/kafka/bin/kafka-console-productor.sh –bootstrap-server hdfsaha:9092,hdfsaha:9092,hdfsaha:9092 –topic mysqltopic
/bdapps/kafka/bin/kafka-console-consumer.sh –bootstrap-server hdfsaha:9092,hdfsaha:9092,hdfsaha:9092 –topic mysqltopic –from-beginning
3.数据采集(kafka生产端 192.168.112.201)
3.1 flume jdbc 插件采集mysql数据
flume启动命令
/bdapps/flume/bin/flume-ng agent –conf /bdapps/flume/conf/ –conf-file /bdapps/flume/conf/flume-kafka-query.conf –name producer3 -Dflume.root.logger=INFO,console
3.2 kafka connect jdbc采集mysql数据
cd /bdapps/confluent-4.0.0
nohup ./bin/connect-standalone ./etc/kafka/connect-standalone.properties ./etc/kafka-connect-jdbc/connect-mysql-source.properties > connect.out 2>&1
4.数据消费
4.1 kafka connect集群
启动 53 54 55三台执行同样的命令
/bdapps/confluent/bin/connect-distributed ./etc/kafka/connect-distributed.properties
connect操作示例
curl localhost:8083/connectors | jq
curl localhost:8083/connector-plugins | jq
curl localhost:8083/connectors/test-mysql-sink | jq
curl localhost:8083/connectors/test-mysql-sink/config | jq
curl localhost:8083/connectors/test-mysql-sink/status | jq
curl localhost:8083/connectors/local-file-sink/tasks | jq
curl localhost:8083/connectors/test-mysql-sink/tasks/0/status | jq
curl -X POST localhost:8083/connectors/test-mysql-sink/restart
curl -X POST localhost:8083/connectors/test-mysql-sink/tasks/0/restart
curl -X PUT localhost:8083/connectors/local-file-sink/pause
curl -X PUT localhost:8083/connectors/local-file-sink/resume
curl -X DELETE localhost:8083/connectors/local-file-sink
curl -X POST -H “Content-Type: application/json” –data ‘{“name”: “test-mysql-sink-historysignaltemperature”, “config”: {“connector.class”:”io.confluent.connect.jdbc.JdbcSinkConnector”, “tasks.max”:”1″, “topics”:”sg_bx_historysignaltemperature”,”connection.url”:”jdbc:mysql://10.20.201.56:3306/test”,”connection.user”=”test”,”connection.password”=”Abc123456″,”auto.create”=”false”,”insert.mode”=”upsert”,”pk.mode=”record_value”,”pk.fields”=”RECORDTIME,EQUIPMENTID,SIGNALID” }}’ http://localhost:8083/connectors | jq
curl -X POST -H “Content-Type: application/json” -d @/bdapps/confluent-4.0.0/etc/kafka-connect-jdbc/connect-mysql-source-historysignal.json http://localhost:8083/connectors | jq
curl -X POST -H “Content-Type: application/json” -d @/bdapps/confluent-4.0.0/etc/kafka-connect-jdbc/connect-mysql-source-historysignalpower.json http://localhost:8083/connectors | jq