当kafka中的数据丢失时,Spark程序消费kafka中数据的时候就可能会出现以下异常:
Lost task 12.0 in stage 398.0 (TID 2311, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {abc_2018-0=151260}
at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.scheduler.Task.run(Task.scala:109)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
原因分析
Spark在创建Kafka RDD时会将kafkaParams 中的 auto.offset.reset 强制修改为none因此,当在zookeeper中获取到的offset超出kafka中topic有效offset范围时,就会报这个异常。这个异常通常出现在kafka中的数据丢失或过期所导致。
问题源码参考:
DirectKafkaInputDStream.scala:218
DirectKafkaInputDStream.scala:63
KafkaUtils.scala:205
解决方案
在创建KafkaRDD时,设置验证过的offset,代码如下:
/**
* Kafka辅助处理工具
*/
object MyKafkaUtils {
private val logger: Logger = LoggerFactory.getLogger(this.getClass)
/**
* 获取最小offset
*
* @param consumer 消费者
* @param partitions topic分区
* @return
*/
def getEarliestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
consumer.seekToBeginning(partitions)
partitions.map(tp => tp -> consumer.position(tp)).toMap
}
/**
* 获取最小offset
* Returns the earliest (lowest) available offsets, taking new partitions into account.
*
* @param kafkaParams kafka客户端配置
* @param topics 获取获取offset的topic
*/
def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
val newKafkaParams = mutable.Map[String, Object]()
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
consumer.subscribe(topics)
val parts = consumer.assignment()
consumer.seekToBeginning(parts)
consumer.pause(parts)
val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
consumer.unsubscribe()
consumer.close()
offsets
}
/**
* 获取最大offset
*
* @param consumer 消费者
* @param partitions topic分区
* @return
*/
def getLatestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
consumer.seekToEnd(partitions)
partitions.map(tp => tp -> consumer.position(tp)).toMap
}
/**
* 获取最大offset
* Returns the latest (highest) available offsets, taking new partitions into account.
*
* @param kafkaParams kafka客户端配置
* @param topics 需要获取offset的topic
**/
def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
val newKafkaParams = mutable.Map[String, Object]()
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
consumer.subscribe(topics)
val parts = consumer.assignment()
consumer.seekToEnd(parts)
consumer.pause(parts)
val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
consumer.unsubscribe()
consumer.close()
offsets
}
/**
* 获取消费者当前offset
*
* @param consumer 消费者
* @param partitions topic分区
* @return
*/
def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
partitions.map(tp => tp -> consumer.position(tp)).toMap
}
/**
* 获取offsets
*
* @param kafkaParams kafka参数
* @param topics topic
* @return
*/
def getCurrentOffset(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
val offsetResetConfig = kafkaParams.getOrElse(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").toString.toLowerCase
val newKafkaParams = mutable.Map[String, Object]()
newKafkaParams ++= kafkaParams
newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
consumer.subscribe(topics)
val notOffsetTopicPartition = mutable.Set[TopicPartition]()
try {
consumer.poll(0)
} catch {
case ex: NoOffsetForPartitionException =>
logger.warn(s"consumer topic partition offset not found:${ex.partition()}")
notOffsetTopicPartition.add(ex.partition())
}
val parts = consumer.assignment().toSet
consumer.pause(parts)
val topicPartition = parts.diff(notOffsetTopicPartition)
//获取当前offset
val currentOffset = mutable.Map[TopicPartition, Long]()
topicPartition.foreach(x => {
try {
currentOffset.put(x, consumer.position(x))
} catch {
case ex: NoOffsetForPartitionException =>
logger.warn(s"consumer topic partition offset not found:${ex.partition()}")
notOffsetTopicPartition.add(ex.partition())
}
})
//获取earliestOffset
val earliestOffset = getEarliestOffsets(consumer, parts)
earliestOffset.foreach(x => {
val value = currentOffset.get(x._1)
if (value.isEmpty) {
currentOffset(x._1) = x._2
} else if (value.get < x._2) {
logger.warn(s"kafka data is lost from partition:${x._1} offset ${value.get} to ${x._2}")
currentOffset(x._1) = x._2
}
})
//获取lastOffset
val latestOffset = if (offsetResetConfig.equalsIgnoreCase("earliest")) {
getLatestOffsets(consumer, topicPartition)
} else {
getLatestOffsets(consumer, parts)
}
latestOffset.foreach(x => {
val value = currentOffset.get(x._1)
if (value.isEmpty || value.get > x._2) {
currentOffset(x._1) = x._2
}
})
consumer.unsubscribe()
consumer.close()
currentOffset.toMap
}
}
Spark Kafka RDD 创建:
val offset = MyKafkaUtils.getCurrentOffset(kafkaParams.toMap,topics)
val kafkaStreams: DStream[ConsumerRecord[String, ObjectNode]] = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe(topics, kafkaParams,offset))
到些问题基本解决,但是如果是从checkpoint里面恢复时,依然会出现问题,这个就得使用commit了
//自动提交新的offset
kafkaStreams.foreachRDD { rdd =>
val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
kafkaStreams.asInstanceOf[CanCommitOffsets].commitAsync(offset)
}