前言
spark应用程序的调度体现在两个地方,第一个是Yarn对spark应用间的调度,第二个是spark应用内(同一个SparkContext)的多个TaskSetManager的调度,这里暂时只对应用内部调度进行分析。
spark的调度模式分为两种:FIFO(先进先出)和FAIR(公平调度)。默认是FIFO,即谁先提交谁先执行,而FAIR支持在调度池中再进行分组,可以有不同的权重,根据权重、资源等来决定谁先执行。spark的调度模式可以通过spark.scheduler.mode进行设置。
调度池初始化
在DAGScheluer对job划分好stage并以TaskSet的形式提交给TaskScheduler后,TaskScheduler的实现类会为每个TaskSet创建一个TaskSetMagager对象,并将该对象添加到调度池中:
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
schedulableBuilder是SparkContext 中newTaskSchedulerImpl(sc)在创建TaskSchedulerImpl的时候通过scheduler.initialize(backend)的initialize方法对schedulableBuilder进行了实例化。
def initialize(backend: SchedulerBackend) {
this.backend = backend
// temporarily set rootPool name to empty
rootPool = new Pool("", schedulingMode, 0, 0)
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported spark.scheduler.mode: $schedulingMode")
}
}
schedulableBuilder.buildPools()
}
可以看到程序会根据配置来创建不同的调度池,schedulableBuilder有两种实现,分别是FIFOSchedulableBuilder和FairSchedulableBuilder,接着后面调用了schedulableBuilder.buildPools(),我们来看两者都是怎么实现的。
override def buildPools() {
// nothing
}
FIFOSchedulableBuilder啥也没干。
override def buildPools() {
var is: Option[InputStream] = None
try {
is = Option {
schedulerAllocFile.map { f =>
new FileInputStream(f)
}.getOrElse {
Utils.getSparkClassLoader.getResourceAsStream(DEFAULT_SCHEDULER_FILE)
}
}
//根据配置文件创建buildFairSchedulerPool
is.foreach { i => buildFairSchedulerPool(i) }
} finally {
is.foreach(_.close())
}
// finally create "default" pool
buildDefaultPool()
}
可以看到FairSchedulableBuilder的buildPools方法中会先去读取FAIR模式的配置文件默认位于SPARK_HOME/conf/fairscheduler.xml,也可以通过参数spark.scheduler.allocation.file设置用户自定义配置文件。 模板如下:
<allocations>
<pool name="production">
<schedulingMode>FAIR</schedulingMode>
<weight>1</weight>
<minShare>2</minShare>
</pool>
<pool name="test">
<schedulingMode>FIFO</schedulingMode>
<weight>2</weight>
<minShare>3</minShare>
</pool>
</allocations>
其中:
- name:调度池名字,可在程序中根据spark.scheduler.pool来指定使用某个调度池,未指定则使用名字为default的调度池。
- schedulingMode:调度模式
- weigt:权重(weight为2的分配到的资源为weight为1的两倍),如果设置为1000,该调度池一有任务就会马上运行,默认为1
- minShare:调度池所需最小资源数(cores),默认为0
FAIR可以配置多个调度池,即rootPool里面还是一组Pool,Pool中包含了TaskSetMagager。
FairSchedulableBuilder会根据配置文件创建buildFairSchedulerPool。
private def buildFairSchedulerPool(is: InputStream) {
val xml = XML.load(is)
for (poolNode <- (xml \\ POOLS_PROPERTY)) {
val poolName = (poolNode \ POOL_NAME_PROPERTY).text
var schedulingMode = DEFAULT_SCHEDULING_MODE
var minShare = DEFAULT_MINIMUM_SHARE
var weight = DEFAULT_WEIGHT
val xmlSchedulingMode = (poolNode \ SCHEDULING_MODE_PROPERTY).text
if (xmlSchedulingMode != "") {
try {
schedulingMode = SchedulingMode.withName(xmlSchedulingMode)
} catch {
case e: NoSuchElementException =>
logWarning(s"Unsupported schedulingMode: $xmlSchedulingMode, " +
s"using the default schedulingMode: $schedulingMode")
}
}
val xmlMinShare = (poolNode \ MINIMUM_SHARES_PROPERTY).text
if (xmlMinShare != "") {
minShare = xmlMinShare.toInt
}
val xmlWeight = (poolNode \ WEIGHT_PROPERTY).text
if (xmlWeight != "") {
weight = xmlWeight.toInt
}
val pool = new Pool(poolName, schedulingMode, minShare, weight)
rootPool.addSchedulable(pool)
logInfo("Created pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
poolName, schedulingMode, minShare, weight))
}
}
根据每个字段值(未设置则为默认值)来实例化一个Pool对象,并添加到rootPool中。
一个spark应用程序包含一个TaskScheduler,一个TaskScheduler包含一个唯一的RootPool,FIFO只有一层Pool,包含TaskSetMagager,而FARI包含两层Pool,RootPool包含子Pool,子Pool包含TaskSetMagager,RootPool都是在实例化SchedulableBuilder的时候创建的。
private def buildDefaultPool() {
if (rootPool.getSchedulableByName(DEFAULT_POOL_NAME) == null) {
val pool = new Pool(DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE,
DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT)
rootPool.addSchedulable(pool)
logInfo("Created default pool %s, schedulingMode: %s, minShare: %d, weight: %d".format(
DEFAULT_POOL_NAME, DEFAULT_SCHEDULING_MODE, DEFAULT_MINIMUM_SHARE, DEFAULT_WEIGHT))
}
}
若根据配置文件创建的调度池中没有一个名字为default的调度池,则会创建一个所有参数都是默认值的名字为default的调度池。
调度池添加TaskSetMagager
两种调度模式的最终实现都是一样,不过FAIR会在添加之前会获取需要使用的调度池,默认为名字为default的调度池。
override def addSchedulable(schedulable: Schedulable) {
require(schedulable != null)
schedulableQueue.add(schedulable)
schedulableNameToSchedulable.put(schedulable.name, schedulable)
schedulable.parent = this
}
添加一个TaskSetMagager的时候会添加到队列的尾部,获取是从头部获取。对于FIFO而言,parentPool都是RootPool,而FAIR,TaskSetMagager的parentPool都是RootPool的子Pool。
调度池对TaskSetMagager排序算法
TaskScheduler通过SchedulerBackend拿到的executor资源后,会对所有TaskSetMagager进行调度。通过rootPool.getSortedTaskSetQueue来获取排序后的TaskSetMagager。
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
val sortedSchedulableQueue =
schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
for (schedulable <- sortedSchedulableQueue) {
sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
}
sortedTaskSetQueue
}
可见排序核心的算法在taskSetSchedulingAlgorithm.comparator里,而两种模式的taskSetSchedulingAlgorithm对应的实现也不一样:
var taskSetSchedulingAlgorithm: SchedulingAlgorithm = {
schedulingMode match {
case SchedulingMode.FAIR =>
new FairSchedulingAlgorithm()
case SchedulingMode.FIFO =>
new FIFOSchedulingAlgorithm()
case _ =>
val msg = "Unsupported scheduling mode: $schedulingMode. Use FAIR or FIFO instead."
throw new IllegalArgumentException(msg)
}
}
FIFO模式的算法类是FIFOSchedulingAlgorithm,FAIR模式的算法实现类是FairSchedulingAlgorithm。下面看两种模式下的比较函数的实现,FIFO:
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val priority1 = s1.priority
val priority2 = s2.priority
var res = math.signum(priority1 - priority2)
if (res == 0) {
val stageId1 = s1.stageId
val stageId2 = s2.stageId
res = math.signum(stageId1 - stageId2)
}
res < 0
}
- 先比较priority,在FIFO中该优先级实际上是Job ID,越早提交的job的jobId越小,priority越小,优先级越高。
- 若priority相同,则说明是同一个job里的TaskSetMagager,则比较StageId,StageId越小优先级越高。
下面看FAIR的排序算法:
override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
val minShare1 = s1.minShare
val minShare2 = s2.minShare
val runningTasks1 = s1.runningTasks
val runningTasks2 = s2.runningTasks
val s1Needy = runningTasks1 < minShare1
val s2Needy = runningTasks2 < minShare2
val minShareRatio1 = runningTasks1.toDouble / math.max(minShare1, 1.0)
val minShareRatio2 = runningTasks2.toDouble / math.max(minShare2, 1.0)
val taskToWeightRatio1 = runningTasks1.toDouble / s1.weight.toDouble
val taskToWeightRatio2 = runningTasks2.toDouble / s2.weight.toDouble
var compare = 0
if (s1Needy && !s2Needy) {
return true
} else if (!s1Needy && s2Needy) {
return false
} else if (s1Needy && s2Needy) {
compare = minShareRatio1.compareTo(minShareRatio2)
} else {
compare = taskToWeightRatio1.compareTo(taskToWeightRatio2)
}
if (compare < 0) {
true
} else if (compare > 0) {
false
} else {
s1.name < s2.name
}
}
- 调度池运行的task数小于minShare的优先级比不小于的优先级要高。
- 若两者运行的task个数都比minShare小,则比较minShare使用率,使用率约低优先级越高。
- 若两者的minShare使用率相同,则比较权重使用率,使用率约低优先级越高。
- 若权重也相同,则比较名字。
在FAIR模式中,需要先对子Pool进行排序,再对子Pool里面的TaskSetMagager进行排序,因为Pool和TaskSetMagager都继承了Schedulable特质,都是用的是FairSchedulingAlgorithm.FairSchedulingAlgorithm算法。