1. 前言
Spark Streaming在处理不断流入的数据时通过每间隔一段时间(batch interval)将这段时间内的流入的数据积累为一个batch,然后以这个batch内的数据作为job DAG的输入rdd提交新的job运行。当一个batch的的处理时间大于batch interval时,意味着数据处理速度跟不上数据接收速度,此时在数据接收端(即Receiver一般数据接收端都运行在executor上)就会积累数据,而数据是通过BlockManager管理的,如果数据存储采取MEMORY_ONLY模式就会导致OOM,采取MEMORY_AND_DISK多余的数据保存到磁盘上反而会增加数据读取时间。
说到这里,反压实际就是一种根据当前系统的处理能力来动态调节接收数据速率的功能。
2. 反压
前言中提到数据接收端Receiver,可以参考文章Spark stream receiver,简单说就是stream job运行期间会有一个或者多个Receiver运行在Executor上专门接收数据,并以batch interval为时间间隔将流式数据分割为一个个batch,然后以一个batch的数据启动job。但是stream job中Receiver并不是必然存在的,例外的情况是当数据源是kafka时,spark内置了一种叫DirectKafkaInputDStream
的输入源(可以通过KafkaUtils.createDirectStream(...)
创建),这种类型的InputDStream(输入源会实现这个类)没有Receiver。对于那些带Receiver的InputDStream实现类,当从InputDStream创建RDD时,源头RDD中的数据就是Receiver接收的数据,而从DirectKafkaInputDStream
创建RDD时,数据实际上还没有从kafka读取过来,这个时候的RDD只包含了kafka的topic以及offset信息,等到rdd对应的task运行时才从kafka中获取数据。
由于存在有Receiver和没有两种情况,实际上反压的实现也不一样。有Receiver时控制Receiver接收数据的速率就可以了,没有Receiver的DirectKafkaInputDStream
时的实现会在后文单独提一下。
附
关于spark stream的原理可以参考:
- Spark Streaming(1) – 基本原理
- Spark Streaming(2) – JobScheduler、JobGenerator
- Spark Streaming(3) – Receiver和ReceiverTacker
2.1 开启反压
指定配置spark.streaming.backpressure.enabled
为true即可开启反压。
2.2 有Receiver时反压原理
反压的原理是根据之前系统的处理能力来调节未来系统接收数据速率,它的过程是下面这样的:
stream (stream id标志)里所有job完成后
-> 反馈运行信息(包括,开始结束时间、处理本次处理记录数等信息) 给JobScheduler
-> JobScheduler将信息交给RateController(通过一个job成功事件,下文会说到) ,RateController根据反馈信息计算接下来应该控制住Receiver接收多少条数据
-> RateController委托JobScheduler的receiverTracker将的计算结果通知给所有在Executor上运行的Receiver
-> Receiever控制接收数据速率
上面过程中JobScheduler处在核心的位置,由它来负责协调,接下来分别讲述RateController,以及Recceiver是如何控制速率的。
2.2.1 RateController
RateController实现了StreamingListener
, 它作为JobScheduler的lister,监听这个stream job的提交,开始运行,以及完成。其实它只关心StreamingListenerBatchCompleted
事件的发生(该事件表示任务成功执行),这个事件包含了如下信息:
case class BatchInfo(
batchTime: Time,
streamIdToInputInfo: Map[Int, StreamInputInfo],
submissionTime: Long,
processingStartTime: Option[Long],
processingEndTime: Option[Long],
outputOperationInfos: Map[Int, OutputOperationInfo]
)
- RateController从何而来?
RateContoller 是InputDStream的成员,并在子类ReceiverInputDStream
中初始化了具体的实例,如下:
abstract class ReceiverInputDStream {
override protected[streaming] val rateController: Option[RateController] = {
// 开启反压的情况下创建了了ReceiverRateController
if (RateController.isBackPressureEnabled(ssc.conf)) {
Some(new ReceiverRateController(id, RateEstimator.create(ssc.conf, ssc.graph.batchDuration)))
} else {
None
}
}
...
}
本节中接下来关注ReceiverRateController
的实现。
- RateController工作原理
上面说到RateController实现了StreamingListener
,并且只关注StreamingListenerBatchCompleted
,该事件发生时,会调用RateController # onBatchCompleted方法,方法体如下:
override def onBatchCompleted(batchCompleted: StreamingListenerBatchCompleted) {
val elements = batchCompleted.batchInfo.streamIdToInputInfo
for {
processingEnd <- batchCompleted.batchInfo.processingEndTime
workDelay <- batchCompleted.batchInfo.processingDelay
waitDelay <- batchCompleted.batchInfo.schedulingDelay
elems <- elements.get(streamUID).map(_.numRecords)
} computeAndPublish(processingEnd, elems, workDelay, waitDelay)
}
}
上面方法拿到stream job结束时间processingEnd, job真实运行时间workDelay, job从提交到结束时间waitDelay, 本次job处理记录数elems,
然后调用computeAndPublish计算,computeAndPublish方法如下:
private def computeAndPublish(time: Long, elems: Long, workDelay: Long, waitDelay: Long): Unit =
Future[Unit] {
// 使用rateEstimator来计算接下来的接收速率,
val newRate = rateEstimator.compute(time, elems, workDelay, waitDelay)
newRate.foreach { s =>
rateLimit.set(s.toLong)
// publish是一个抽象方法,也就是将计算出来的速率通知出处
publish(getLatestRate())
}
}
下面是ReceiverRateController
的publish的实现:
override def publish(rate: Long): Unit =
// 通过JobScheduler的receiverTracker将计算出来的速率通知给所有的Receiver
ssc.scheduler.receiverTracker.sendRateUpdate(id, rate)
附:ReceiverTracker
这里顺带提一下ReceiverTracker中涉及的反压过程的成员或方法吧:
(这里涉及到spark rpc,可以参考spark rpc)
class ReceiverTracker{
// 这个rpc一端是运行在JobScheduler上的,负责接收Receiver传过来的消息,
// 它是类ReceiverTrackerEndpoint(ReceiverTracker的内部类)的实例
private var endpoint: RpcEndpointRef = null
// stream id到Receiver的信息,其中就包含了receiver的rpc通信信息
private val receiverTrackingInfos = new HashMap[Int, ReceiverTrackingInfo]
// 上面ReceiverRateController # push就是调用这个方法去通知receiver新的速率的
def sendRateUpdate(streamUID: Int, newRate: Long): Unit = synchronized {
if (isTrackerStarted) {
// 这个enpoint是运行在JobScheduler上的,也就是给自己发了一个UpdateReceiverRateLimit更新速率的事件
endpoint.send(UpdateReceiverRateLimit(streamUID, newRate))
}
}
}
下面是ReceiverTrackerEndpoint接收到UpdateReceiverRateLimit消息时的处理:
def receive{
...
case UpdateReceiverRateLimit(streamUID, newRate) =>
// 拿到receiver的rpc信息eP, 然后发送UpdateRateLimit(newRate)更新速率
for (info <- receiverTrackingInfos.get(streamUID); eP <- info.endpoint) {
eP.send(UpdateRateLimit(newRate))
}
...
}
RateController更新速率以及通知新接收速率过程就是这样,接下来是Receiver如何去根据新的速率newRate控制接收速率。
2.2.2 Receiver控制速率
Receiver上运行了很多组件:
- Receiver负责接收数据
- 接收的数据上报给ReceiverSupervisorImpl
- 如果接受的数据是一条条上报(调用方法ReceiverSupervisorImpl # putSingle), 则ReceiverSupervisorImpl 使用BlockGenerator用来将一条条的记录汇聚成block(如果Receiver一次接收并上报一批数据就不会使用BlockGenerator)
上面BlockGenerator将一条条数据汇聚成block, Receiver上控制接受速率就是通过BlockGenerator处理速度来实现的,BlockGenerator阻塞了也就相当于间接阻塞了Receiver接受速率。
但是上面说只有一条条接受的数据会走BlockGenerator,如果Receiver不使用
ReceiverSupervisorImpl # putSingle
而是使用其他方法一次上报一批数据,其实反压是不起作用的。
- ReceiverSupervisorImpl接收UpdateRateLimit消息
下面是ReceiverSupervisorImpl接受消息的rpc端处理代码:
private val endpoint = env.rpcEnv.setupEndpoint(
"Receiver-" + streamId + "-" + System.currentTimeMillis(), new ThreadSafeRpcEndpoint {
override val rpcEnv: RpcEnv = env.rpcEnv
override def receive: PartialFunction[Any, Unit] = {
case StopReceiver =>
logInfo("Received stop signal")
ReceiverSupervisorImpl.this.stop("Stopped by driver", None)
case CleanupOldBlocks(threshTime) =>
logDebug("Received delete old batch signal")
cleanupOldBlocks(threshTime)
case UpdateRateLimit(eps) =>
logInfo(s"Received a new rate limit: $eps.")
// 通知所有的BlockGenerator去更新速率
registeredBlockGenerators.asScala.foreach { bg =>
bg.updateRate(eps)
}
}
})
- BlockGenerator控制速率
BlockGenerator采用令牌桶算法实现速率控制,原理简介如下:
假设想把接收速率控制在m条记录每秒, 那么生产者只需要以恒定的速度每秒向桶中放m个令牌,数据接收者接收数据之前需要从桶中拿走一个令牌才能接收一条数据,显然数据接收速率不会超过m
来看看BlockGenerator是怎么实现的,BlockGenerator实现了RateLimiter抽象类,下面是RateLimiter的部分实现:
// 最大接收速率,接收数据速率由maxRateLimit和动态更新的反压速率共同控制
private val maxRateLimit = conf.getLong("spark.streaming.receiver.maxRate", Long.MaxValue)
// rateLimiter负责产生令牌
private lazy val rateLimiter = GuavaRateLimiter.create(getInitialRateLimit().toDouble)
// BlockGenerator在存储数据之前会调用这个方法,相当于取令牌。
def waitToPush() {
rateLimiter.acquire()
}
// 更新速率,也就是令牌产生的速率
private[receiver] def updateRate(newRate: Long): Unit =
if (newRate > 0) {
if (maxRateLimit > 0) {
rateLimiter.setRate(newRate.min(maxRateLimit))
} else {
rateLimiter.setRate(newRate)
}
}
BlockGenerator的父类RateLimiter实现了令牌桶速率控制算法,接下来就是BlockGenerator在接收Receiver传递过来的数据时调用waitToPush
去获取令牌了,没有令牌是,BlockGenerator阻塞,那么Receiver也会阻塞下去。
2.3 DirectKafkaInputDStream时的反压实现
DirectKafkaInputDStream是没有Receiver的。
有Receiver的stream job, 数据从由Receiver接收,然后组装成block然后汇报到JobScheduler,stream job提交运行前从JobScheduler拿一个block运行。
但是对于DirectKafkaInputDStream而言,不存在独立运行的Receiver接收数据,而是在从DirectKafkaInputDStream创建出
KafkaRDD
然后提交stream job运行时,在KafkaRDD
的compute方法中才开始从kafka读取数据。此时由于接收数据是在job开始运行后在task中进行的,因此反压实现也是通过控制本次task从kafka中读取多少数据来实现的。
1. 计算接收速率
不贴代码了,DirectKafkaInputDStream
使用IDRateEstimator去评估每秒接收的数据量R,同时由于DirectKafkaInputDStream
可以同时从一个topic的n个分区接收数据,这个R是整个全部分区的数据接收速度,在DirectKafkaInputDStream
中还有一个重要的参数spark.streaming.kafka.maxRatePerPartition
(每秒)控制每个分区的最大接收速度(假设是maxR)。
评估出R之后,就是计算n个分区每个分区的速度了,这个不是简单的R/n分的,
而是根据之前每一个分区消费到的offset(假设是prefOffset)和现在每个分区的最新offset(假设是curOffset)的差值/ 总差值的比例计算出来的,
假设全部n个分区从上一batch消费之后到现在的整个未消费记录数是totalLag,
分区n1未消费数是lagN1, 那么本次batch从分区n1应该消费的记录数就是:
(min( (lagN1 / totalLag) * R, maxR)) * batchDuration(换算成秒)
2. 控制速率
「1」中计算出每个分区最大消费记录数m,接下来从DirectKafkaInputDStream
生成KafkaRDD
(KafkaRDD的分区个数也就是topic的分区个数), 然后提交job运行后KafkaRDD
的compute方法开始从kafka消费m条记录。