学习主题:spark-streaming2
学习目标:
- 掌握Kafka的相关概念
- Kafka的分布式集群部署
- | | broker1 | broker2 | broker3 | | ——- | ——- | ——- | ——- | | deng001 | * | | | | deng002 | | * | | | deng003 | | | * | | deng004 | | | |
- 解压:tar -zvxf kafka_2.10-0.8.2.2.tgz
- 进入config目录下
- vim server.properties
- broker.id=0 (0,1,2,3 依次 不能重复, 唯一的)
- port=9092 (输入 端口)
- zookeeper.connect=deng002:2181,deng003:2181,deng004:2181(配置zookeeper)
- log.dirs=/kafka-logs(文件储存目录)
- scp -r kafka_2.10-0.8.2.2 deng003:`pwd`(拷贝解压文件到deng002,deng003)
- 修改broker.id=1,broker.id=2
- 启动zookeeper (zkServer.sh start)
- 启动kafka (./kafka-server-start.sh ../config/server.properties)
- 关掉kafka
- vim startKafka.sh(写脚本)
- chmod a+x startKafka.sh(修改权限)
- 启动脚本 (./startKafka.sh)(2185 Kafka)
- 把脚本发送到deng002,deng003( scp -r startKafka.sh deng003:
pwd
) - 分别启动
- kafka命令
- ./kafka-topics.sh –zookeeper deng002:2181,deng003:2181,deng004:2181 –list
创建topic:./kafka-topics.sh –zookeeper node3:2181,node4:2181,node5:2181 –create –topic topic2017 –partitions 3 –replication-factor 3
用一台节点控制台来当kafka的生产者:./kafka-console-producer.sh –topic topic2017
–broker-list node1:9092,node2:9092,node3:9092
用另一台节点控制台来当kafka的消费者:./kafka-console-consumer.sh –zookeeper node3:2181,node4:2181,node5:2181 –topic topic2017
查看kafka中topic列表:./kafka-topics.sh –list –zookeeper node3:2181,node4:2181,node5:2181
查看kafka中topic的描述:./kafka-topics.sh –describe –zookeeper node3:2181,node4:2181,node5:2181 –topic topic2017(:ISR是检查数据的完整性有哪些个节点。)
查看zookeeper中topic相关信息:
启动zookeeper客户端:
./zkCli.sh
查看topic相关信息:
ls /brokers/topics/
查看消费者相关信息:
ls /consumers
1) 删除kafka中的数据。
在kafka集群中删除topic,当前topic被标记成删除。
在kafka集群中删除topic,当前topic被标记成删除。
在每台broker节点上删除当前这个topic对应的真实数据。
① :进入zookeeper客户端,删除topic信息
rmr /brokers/topics/t1205
① :删除zookeeper中被标记为删除的topic信息
rmr /admin/delete_topics/t1205
- 掌握Kafka和Spark-Streaming的Direct模式
- Ø Direct模式理解
- SparkStreaming+kafka 的Driect模式就是将kafka看成存数据的一方,不是被动接收数据,而是主动去取数据。消费者偏移量也不是用zookeeper来管理,而是SparkStreaming内部对消费者偏移量自动来维护,默认消费偏移量是在内存中,当然如果设置了checkpoint目录,那么消费偏移量也会保存在checkpoint中。当然也可以实现用zookeeper来管理。
- 掌握Spark-Streaming使用Zookeeper来管理offset
- Spark-Streaming的相关参数
Ø Direct模式并行度设置Direct模式的并行度是由读取的kafka中topic的partition数决定的。预写日志:spark.streaming.receiver.writeAheadLog.enable 默认false没有开启blockInterval:spark.streaming.blockInterval 默认200ms反压机制spark.streaming.backpressure.enabled 默认false接收数据速率:spark.streaming.receiver.maxRate 默认没有设置
对应作业:
1、