导读
- 为什么要将kafka位置偏移量写到zookkerper?
- 方式一:使用Spark API
- 方式二:使用kafka API
- 方式三:使用zookeeper API
为什么要将kafka位置偏移量写到zookeeper
kafka作为一个消息队列,每次读取消息时,需要指定从哪里读取,否则就会从默认位置读取。
那么为什么不将位置偏移量储存在kafka中呢?原因是,如果在位置偏移量记录在kafka, 当kafka组件故障重启时,就无法获取位置偏移量。
zookeeper作为常用组件管理工具,成为记录kafka位置偏移量首选。
使用Spark api记录kafka offset
思路
- 初始化kafkaparam
- 使用kafkaparam创建一个kafkacluster对象
- 使用kafkacluster对象的setoffset方法
代码如下
使用情景
此方法适合在spark环境中运行的,本地运行的代码中不适合使用该方法。(但是该方法特别简单)
使用kafka api记录kafka offset
思路
- 初始化一个SimpleConsumer
- 新建一个OffsetCommitRequest对象
- 使用SimpleConsumer的commitOffsets方法
代码如下
使用情景
本地运行的代码中。比较麻烦
使用zookeeper api记录
思路
- 声明与新建一个zookeeper对象
- 使用zookeeper对象setdata方法将kafka中每个topic的每个particiton对应节点值设为offset.