kafka位置偏移量写到zookeeper的三种方式

导读

  1. 为什么要将kafka位置偏移量写到zookkerper?
  2. 方式一:使用Spark API
  3. 方式二:使用kafka API
  4. 方式三:使用zookeeper API

为什么要将kafka位置偏移量写到zookeeper

kafka作为一个消息队列,每次读取消息时,需要指定从哪里读取,否则就会从默认位置读取。

那么为什么不将位置偏移量储存在kafka中呢?原因是,如果在位置偏移量记录在kafka, 当kafka组件故障重启时,就无法获取位置偏移量。

zookeeper作为常用组件管理工具,成为记录kafka位置偏移量首选。

使用Spark api记录kafka offset

思路

  1. 初始化kafkaparam
  2. 使用kafkaparam创建一个kafkacluster对象
  3. 使用kafkacluster对象的setoffset方法

代码如下

使用情景

此方法适合在spark环境中运行的,本地运行的代码中不适合使用该方法。(但是该方法特别简单)

使用kafka api记录kafka offset

思路

  1. 初始化一个SimpleConsumer
  2. 新建一个OffsetCommitRequest对象
  3. 使用SimpleConsumer的commitOffsets方法

代码如下

使用情景

本地运行的代码中。比较麻烦

使用zookeeper api记录

思路

  1. 声明与新建一个zookeeper对象
  2. 使用zookeeper对象setdata方法将kafka中每个topic的每个particiton对应节点值设为offset.
    原文作者:君子月满楼
    原文地址: https://www.jianshu.com/p/f35ff9353b51
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞