[SPARK-19680] OffsetOutOfRangeException 解决方案

当kafka中的数据丢失时,Spark程序消费kafka中数据的时候就可能会出现以下异常:

Lost task 12.0 in stage 398.0 (TID 2311, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {abc_2018-0=151260}
    at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
    at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
    at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
    at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
    at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
    at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
    at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

原因分析

Spark在创建Kafka RDD时会将kafkaParams 中的 auto.offset.reset 强制修改为none因此,当在zookeeper中获取到的offset超出kafka中topic有效offset范围时,就会报这个异常。这个异常通常出现在kafka中的数据丢失或过期所导致。
问题源码参考:

DirectKafkaInputDStream.scala:218
DirectKafkaInputDStream.scala:63
KafkaUtils.scala:205

解决方案

在创建KafkaRDD时,设置验证过的offset,代码如下:

/**
  * Kafka辅助处理工具
  */
object MyKafkaUtils {
  private val logger: Logger = LoggerFactory.getLogger(this.getClass)

  /**
    * 获取最小offset
    *
    * @param consumer   消费者
    * @param partitions topic分区
    * @return
    */
  def getEarliestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    consumer.seekToBeginning(partitions)
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }

  /**
    * 获取最小offset
    * Returns the earliest (lowest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客户端配置
    * @param topics      获取获取offset的topic
    */
  def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val parts = consumer.assignment()
    consumer.seekToBeginning(parts)
    consumer.pause(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
  }

  /**
    * 获取最大offset
    *
    * @param consumer   消费者
    * @param partitions topic分区
    * @return
    */
  def getLatestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    consumer.seekToEnd(partitions)
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }

  /**
    * 获取最大offset
    * Returns the latest (highest) available offsets, taking new partitions into account.
    *
    * @param kafkaParams kafka客户端配置
    * @param topics      需要获取offset的topic
    **/
  def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val parts = consumer.assignment()
    consumer.seekToEnd(parts)
    consumer.pause(parts)
    val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
    consumer.unsubscribe()
    consumer.close()
    offsets
  }

  /**
    * 获取消费者当前offset
    *
    * @param consumer   消费者
    * @param partitions topic分区
    * @return
    */
  def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
    partitions.map(tp => tp -> consumer.position(tp)).toMap
  }

  /**
    * 获取offsets
    *
    * @param kafkaParams kafka参数
    * @param topics      topic
    * @return
    */
  def getCurrentOffset(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
    val offsetResetConfig = kafkaParams.getOrElse(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").toString.toLowerCase
    val newKafkaParams = mutable.Map[String, Object]()
    newKafkaParams ++= kafkaParams
    newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
    val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
    consumer.subscribe(topics)
    val notOffsetTopicPartition = mutable.Set[TopicPartition]()
    try {
      consumer.poll(0)
    } catch {
      case ex: NoOffsetForPartitionException =>
        logger.warn(s"consumer topic partition offset not found:${ex.partition()}")
        notOffsetTopicPartition.add(ex.partition())
    }
    val parts = consumer.assignment().toSet
    consumer.pause(parts)
    val topicPartition = parts.diff(notOffsetTopicPartition)
    //获取当前offset
    val currentOffset = mutable.Map[TopicPartition, Long]()
    topicPartition.foreach(x => {
      try {
        currentOffset.put(x, consumer.position(x))
      } catch {
        case ex: NoOffsetForPartitionException =>
          logger.warn(s"consumer topic partition offset not found:${ex.partition()}")
          notOffsetTopicPartition.add(ex.partition())
      }
    })
    //获取earliestOffset
    val earliestOffset = getEarliestOffsets(consumer, parts)
    earliestOffset.foreach(x => {
      val value = currentOffset.get(x._1)
      if (value.isEmpty) {
        currentOffset(x._1) = x._2
      } else if (value.get < x._2) {
        logger.warn(s"kafka data is lost from partition:${x._1} offset ${value.get} to ${x._2}")
        currentOffset(x._1) = x._2
      }
    })
    //获取lastOffset
    val latestOffset = if (offsetResetConfig.equalsIgnoreCase("earliest")) {
      getLatestOffsets(consumer, topicPartition)
    } else {
      getLatestOffsets(consumer, parts)
    }
    latestOffset.foreach(x => {
      val value = currentOffset.get(x._1)
      if (value.isEmpty || value.get > x._2) {
        currentOffset(x._1) = x._2
      }
    })
    consumer.unsubscribe()
    consumer.close()
    currentOffset.toMap
  }
}

Spark Kafka RDD 创建:

val offset = MyKafkaUtils.getCurrentOffset(kafkaParams.toMap,topics)
    val kafkaStreams: DStream[ConsumerRecord[String, ObjectNode]] = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(topics, kafkaParams,offset))

到些问题基本解决,但是如果是从checkpoint里面恢复时,依然会出现问题,这个就得使用commit了

//自动提交新的offset
    kafkaStreams.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      kafkaStreams.asInstanceOf[CanCommitOffsets].commitAsync(offset)
    }
    原文作者:竹子平江
    原文地址: https://www.jianshu.com/p/157fd4a73b3a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞