【Spark】SparkStreaming 新增Metrics 支持Kafka数据消费堆积监控

在SparkStreaming任务运行的过程中,由于数据流量或者网络的抖动,任务的batch很可能出现delay,所以就出现了一个需求:实时监控任务对kafka消息的消费,及时了解堆积情况。

这个需求应该有很多种解决方案,我这边提供的思路是基于Spark Metrics System的。SparkStreaming任务在运行过程中,会产生很多Metrics信息,不断地推送到Sink上面,我们这里使用到的是MetricsServlet。

打开Spark UI,我们能够很方便地通过RestAPI方式请求任务Metrics信息,接口如下:

http://ClusterHostName:8088/proxy/AppID/metrics/json

返回的Metrics信息如下:

  • 《【Spark】SparkStreaming 新增Metrics 支持Kafka数据消费堆积监控》 Metrics_json.png

这里应用的方案就是在这些Metrics里面添加一个新Metrics,这个Metrics应该能够向监控应用程序提供任务batch对records的消费情况。

我们知道,SparkStreaming应用消费Kafka数据有两种API:Reciever模式和Direct模式。所以针对使用的不同的API,需要提供不同的Metrics信息,其格式可以如下设置:

  • Reciever-Metrics
    kafka.consumer.$zkQuorum.$topic.$groupId
  • Direcct-Metrics: kafka.direct.$kafkaBrokerList.$topic.lastCompletedBatch_sumOffsets

注意其中带“$”号 的为变量,需要根据实际情况赋值的,其它为常量字符串。
上面两个Metrics我们使用registerGauge方法分别向MetricsSystem注册就可以了。

根据上面Metrics的信息可以解读到,对于Reciever-Metrics,只向监控应用提供Kafka集群的连接信息,包括ZK,topics和groupId,注意对于多个topic的情况,要注册多个Metrics,然后需要监控应用自己调kafka的API去获取该consumer的offset和logsize,从而计算出堆积量;而对于Direcct-Metrics,需要Spark计算出每个batch消费的最新offset之和(实际上是计算消费的每个topic下所有partition的最新offset之和)。

针对具体使用来说,首先根据应用创建DStream时传递给API的参数获取到
1)对Reciever模式:zookeeper.connectgroup.idtopics
2)对Direcct模式:metadata.broker.list或者bootstrap.serverstopics
等信息,并将信息配置在StreamingContext新建的结构里面(以便于StreamingSource获取)。

这样对于Reciever-Metrics来说,使用获取的信息构造对应的Metrics并注册,就可以了,对于value设置为0;对于Direcct-Metrics来说,需要在DirectKafkaInputDStream里面每一次compute计算时,将offsetRanges里面的元数据计算后推送到StreamingJobProgressListener里面(其中配置一个topic->sumOffsets的HashMap结构即可,每次compute向里面更新最新的计算结果)。最后在StreamingSource中registerGauge时根据topic就可以获取到sumOffset。

实现下来需要修改的Spark源码文件可能包括:
1)StreamingJobProgressListener.scala
2)DirectKafkaInputDStream.scala
3)KafkaInputDStream.scala
4)StreamingSource.scala
5)StreamingContext.scala

    原文作者:PowerMe
    原文地址: https://www.jianshu.com/p/f757ac43770e
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞