消息总线概述
消息总线是Spark内部进行消息传递,触发事件的框架,消息总线的核心是由三个抽象的对象组成:
- Event: 定义了一个事件;
- Listener:定义了一个监听器,是用于对Event作出响应的实体;
- Bus: 将Event路由到某个Listener的管道,负责接收Event、注册Listener,可以说Bus是Listener与Event的连接器。
ListenerBus
在Spark中,消息总线最顶层的抽象是:ListenerBus,其源码实现如下:
package org.apache.spark.util
import java.util.concurrent.CopyOnWriteArrayList
import scala.collection.JavaConverters._
import scala.reflect.ClassTag
import scala.util.control.NonFatal
import com.codahale.metrics.Timer
import org.apache.spark.internal.Logging
/**
* An event bus which posts events to its listeners.
*/
private[spark] trait ListenerBus[L <: AnyRef, E] extends Logging {
private[this] val listenersPlusTimers = new CopyOnWriteArrayList[(L, Option[Timer])]
// Marked `private[spark]` for access in tests.
private[spark] def listeners = listenersPlusTimers.asScala.map(_._1).asJava
/**
* Returns a CodaHale metrics Timer for measuring the listener's event processing time.
* This method is intended to be overridden by subclasses.
*/
protected def getTimer(listener: L): Option[Timer] = None
/**
* Add a listener to listen events. This method is thread-safe and can be called in any thread.
*/
final def addListener(listener: L): Unit = {
listenersPlusTimers.add((listener, getTimer(listener)))
}
/**
* Remove a listener and it won't receive any events. This method is thread-safe and can be called
* in any thread.
*/
final def removeListener(listener: L): Unit = {
listenersPlusTimers.asScala.find(_._1 eq listener).foreach { listenerAndTimer =>
listenersPlusTimers.remove(listenerAndTimer)
}
}
/**
* Post the event to all registered listeners. The `postToAll` caller should guarantee calling
* `postToAll` in the same thread for all events.
*/
def postToAll(event: E): Unit = {
// JavaConverters can create a JIterableWrapper if we use asScala.
// However, this method will be called frequently. To avoid the wrapper cost, here we use
// Java Iterator directly.
val iter = listenersPlusTimers.iterator
while (iter.hasNext) {
val listenerAndMaybeTimer = iter.next()
val listener = listenerAndMaybeTimer._1
val maybeTimer = listenerAndMaybeTimer._2
val maybeTimerContext = if (maybeTimer.isDefined) {
maybeTimer.get.time()
} else {
null
}
try {
doPostEvent(listener, event)
} catch {
case NonFatal(e) =>
logError(s"Listener ${Utils.getFormattedClassName(listener)} threw an exception", e)
} finally {
if (maybeTimerContext != null) {
maybeTimerContext.stop()
}
}
}
}
/**
* Post an event to the specified listener. `onPostEvent` is guaranteed to be called in the same
* thread for all listeners.
*/
protected def doPostEvent(listener: L, event: E): Unit
private[spark] def findListenersByClass[T <: L : ClassTag](): Seq[T] = {
val c = implicitly[ClassTag[T]].runtimeClass
listeners.asScala.filter(_.getClass == c).map(_.asInstanceOf[T]).toSeq
}
}
其中listenersPlusTimers是一个线程安全的List对象,用于存储注册在该总线上的Listener,此外还定义了注册Listener的方法-addListener和注销Listener的方法-removeListener。
在这个抽象类中最重要的方法是postToAll,该方法负责将传入的Event广播给listenersPlusTimers中注册的每一个Listener,通过调用doPostEvent方法将Event和Listener进行关联。doPostEvent在这个特质中没有给出具体的实现,可以通过继承ListenerBus来实现自定义的消息总线。
在Spark中ListenerBus有很多中实现,直接实现包括SparkListenerBus,ExternalCatalog等。每一种实现又根据特定的场景有细分,SparkListenerBus是Spark中最重要的消息总线实现,下面重点描述。
SparkListenerBus
SparkListenerBus的实现源码如下:
package org.apache.spark.scheduler
import org.apache.spark.util.ListenerBus
/**
* A [[SparkListenerEvent]] bus that relays [[SparkListenerEvent]]s to its listeners
*/
private[spark] trait SparkListenerBus
extends ListenerBus[SparkListenerInterface, SparkListenerEvent] {
protected override def doPostEvent(
listener: SparkListenerInterface,
event: SparkListenerEvent): Unit = {
event match {
case stageSubmitted: SparkListenerStageSubmitted =>
listener.onStageSubmitted(stageSubmitted)
case stageCompleted: SparkListenerStageCompleted =>
listener.onStageCompleted(stageCompleted)
case jobStart: SparkListenerJobStart =>
listener.onJobStart(jobStart)
case jobEnd: SparkListenerJobEnd =>
listener.onJobEnd(jobEnd)
case taskStart: SparkListenerTaskStart =>
listener.onTaskStart(taskStart)
case taskGettingResult: SparkListenerTaskGettingResult =>
listener.onTaskGettingResult(taskGettingResult)
case taskEnd: SparkListenerTaskEnd =>
listener.onTaskEnd(taskEnd)
case environmentUpdate: SparkListenerEnvironmentUpdate =>
listener.onEnvironmentUpdate(environmentUpdate)
case blockManagerAdded: SparkListenerBlockManagerAdded =>
listener.onBlockManagerAdded(blockManagerAdded)
case blockManagerRemoved: SparkListenerBlockManagerRemoved =>
listener.onBlockManagerRemoved(blockManagerRemoved)
case unpersistRDD: SparkListenerUnpersistRDD =>
listener.onUnpersistRDD(unpersistRDD)
case applicationStart: SparkListenerApplicationStart =>
listener.onApplicationStart(applicationStart)
case applicationEnd: SparkListenerApplicationEnd =>
listener.onApplicationEnd(applicationEnd)
case metricsUpdate: SparkListenerExecutorMetricsUpdate =>
listener.onExecutorMetricsUpdate(metricsUpdate)
case executorAdded: SparkListenerExecutorAdded =>
listener.onExecutorAdded(executorAdded)
case executorRemoved: SparkListenerExecutorRemoved =>
listener.onExecutorRemoved(executorRemoved)
case executorBlacklisted: SparkListenerExecutorBlacklisted =>
listener.onExecutorBlacklisted(executorBlacklisted)
case executorUnblacklisted: SparkListenerExecutorUnblacklisted =>
listener.onExecutorUnblacklisted(executorUnblacklisted)
case nodeBlacklisted: SparkListenerNodeBlacklisted =>
listener.onNodeBlacklisted(nodeBlacklisted)
case nodeUnblacklisted: SparkListenerNodeUnblacklisted =>
listener.onNodeUnblacklisted(nodeUnblacklisted)
case blockUpdated: SparkListenerBlockUpdated =>
listener.onBlockUpdated(blockUpdated)
case speculativeTaskSubmitted: SparkListenerSpeculativeTaskSubmitted =>
listener.onSpeculativeTaskSubmitted(speculativeTaskSubmitted)
case _ => listener.onOtherEvent(event)
}
}
}
由代码可以看出,SparkListenerBus重写了ListenerBus的doPostEvent方法,定义了自己的Listener类型:SparkListenerInterface和Event类型:SparkListenerEvent,在doPostEvent方法中通过类型匹配,只处理特定的事件,匹配到特定的事件后,将事件交给SparkListenerInterface的对应方法去处理。
SparkListenerInterface是一个特质,定义了所有SparkListenerEvent的处理方法,最常见的实现SparkListenerInterface的类是驻守在Spark Driver进程中的HeartbeatReceiver类,这个类用于维护Driver与Executor之间的心跳,后续会对这个类展开详细解读。
异步通信的消息总线AsyncEventQueue
SparkListenerBus的doPostEvent方法将消息路由到了Listener的具体方法之中,并未定义Listener是以什么形似对Event作出响应。
AsyncEventQueue通过继承SparkListenerBus,定义了一种异步响应Event的消息框架,其基本原理如下:
1 . 定义一个FIFO且线程安全的Queue-eventQueue,eventQueue用于存放等待触发执行的Event。
// Cap the capacity of the queue so we get an explicit error (rather than an OOM exception) if
// it's perpetually being added to more quickly than it's being drained.
private val eventQueue = new LinkedBlockingQueue[SparkListenerEvent](
conf.get(LISTENER_BUS_EVENT_QUEUE_CAPACITY))
- 定义一个线程dispatchThread,用于循环从eventQueue中取出Event,并将Event dispatch到Listener。
private val dispatchThread = new Thread(s"spark-listener-group-$name") {
setDaemon(true)
override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
dispatch()
}
}
其中tryOrStopSparkContext方法保证了在dispatch遇到无法控制的异常时将SparkContext自动退出,避免内存泄漏。
dispatch()函数的实现逻辑如下:
private def dispatch(): Unit = LiveListenerBus.withinListenerThread.withValue(true) {
try {
var next: SparkListenerEvent = eventQueue.take()
while (next != POISON_PILL) {
val ctx = processingTime.time()
try {
super.postToAll(next)
} finally {
ctx.stop()
}
eventCount.decrementAndGet()
next = eventQueue.take()
}
eventCount.decrementAndGet()
} catch {
case ie: InterruptedException =>
logInfo(s"Stopping listener queue $name.", ie)
}
}
该方法定义了一个循环体不停的从eventQueue中取出event,并派发到总线中注册的Listener中。该总线定义了一个“哨兵”Event-POISON_PILL,用于标记stop, 当调用stop时,这个Event-POISON_PILL会被添加到eventQueue的尾部,当取出Event-POISON_PILL时,退出循环,从而退出线程dispatch线程。当线程退出时,eventCount变为0.
- 定义post方法,将Event添加到eventQueue中
def post(event: SparkListenerEvent): Unit = {
if (stopped.get()) {
return
}
eventCount.incrementAndGet()
if (eventQueue.offer(event)) {
return
}
eventCount.decrementAndGet()
droppedEvents.inc()
droppedEventsCounter.incrementAndGet()
if (logDroppedEvent.compareAndSet(false, true)) {
// Only log the following message once to avoid duplicated annoying logs.
logError(s"Dropping event from queue $name. " +
"This likely means one of the listeners is too slow and cannot keep up with " +
"the rate at which tasks are being started by the scheduler.")
}
logTrace(s"Dropping event $event")
val droppedCount = droppedEventsCounter.get
if (droppedCount > 0) {
// Don't log too frequently
if (System.currentTimeMillis() - lastReportTimestamp >= 60 * 1000) {
// There may be multiple threads trying to decrease droppedEventsCounter.
// Use "compareAndSet" to make sure only one thread can win.
// And if another thread is increasing droppedEventsCounter, "compareAndSet" will fail and
// then that thread will update it.
if (droppedEventsCounter.compareAndSet(droppedCount, 0)) {
val prevLastReportTimestamp = lastReportTimestamp
lastReportTimestamp = System.currentTimeMillis()
val previous = new java.util.Date(prevLastReportTimestamp)
logWarning(s"Dropped $droppedEvents events from $name since $previous.")
}
}
}
}
整个实现是一个典型的“生产者-消费者”模式,post方法是Event的生产者,负责向eventQueue中添加Event,dispatchThread线程是消费者,负责从eventQueue取出Event并派发到Listener。该方法中eventCount 是一个AtomicLong对象,用于单独记录Event的个数,单独记录Event,而不使用eventQueue的原因是能够保证当eventCount为0时,所有的Event都被处理了,而不仅仅是从eventQueue中取出了,这个可以在类的dispatch方法中看出。
当AsyncEventQueue的stop被调用后,post不再接收新的Event。
LiveListenerBus 这个AsyncEventQueue的容器
LiveListenerBus从面向对象的角度来讲,它不是一个bus,它定义了一个CopyOnWriteArrayList对象queues来存储AsyncEventQueue,LiveListenerBus负责添加、移除queues中的AsyncEventQueue,Spark在queues中预定义了三个AsyncEventQueue:
- SHARED_QUEUE
- APP_STATUS_QUEUE
- EXECUTOR_MANAGEMENT_QUEUE
- EVENT_LOG_QUEUE
分别处理特定类型的Spark Event。
LiveListenerBus是SparkContext的重要组成部分。