上一篇文章我们谈到,DAGScheduler将Job划分成由Stage组成的DAG后,就根据Stage的具体类型来生成ShuffleMapTask和ResultTask,然后使用TaskSet对其进行封装,最后调用TaskScheduler的submitTasks方法提交具体的TaskSet,而实际上是调用的TaskSchedulerImpl的submitTasks方法,下面我们就来分析具体Tasks提交的过程。
直接进入TaskSchedulerImpl的submitTasks方法:
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
// 使用synchronized来确保Task的同步提交
this.synchronized {
// 首先创建一个TaskSetManager,主要负责调度TaskSet中的Tasks
// 默认的最大失败重试次数是4次,可以通过spark.task.maxFailures进行配置
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
}
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
}
// 将TaskSetManager及TaskSet的属性信息添加到schedulableBuilder中
// SchedulableBuilder会确定TaskSetManager的调度顺序是FIFO还是FAIR,默认是FIFO
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
}
hasReceivedTask = true
}
backend.reviveOffers()
}
进入backend的reviveOffers()方法:
override def reviveOffers() {
driverEndpoint.send(ReviveOffers)
}
可以看到是给DriverEndpoint发送了一条ReviveOffers消息(我们在前面分析DriverEndpoint注册的时候就给自己发送了一条ReviveOffers消息),DriverEndpoint接收到该消息后的处理如下:
case ReviveOffers =>
makeOffers()
接下来看makeOffers():
private def makeOffers() {
// Filter out executors under killing
// 过滤出Alive的Executors
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
// 构建由WorkerOffer组成的集合,WorkerOffer就代表Executor上可用的计算资源
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
// 将Tasks提交到Executors上
launchTasks(scheduler.resourceOffers(workOffers))
}
在makeOffers()方法中首先准备好可以用于计算的workOffers(代表所有可用的ExecutorBackend中可以使用的cores的信息)。
在launchTasks之前,我们重点看一下scheduler.resourceOffers(workOffers)都做了什么工作:
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
// 标记slave为alive并记录hostname的信息
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
// 跨机架的情况,我们这里不做考虑
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// 将offers打乱,为的就是负载均衡
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
// 这里我们清楚的看见每个Executor上的Tasks的个数取决于该Executor上可用的cores的个数
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
// 每个executor上可用的cores的个数组成的数组
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
可以看到TaskSchedulerImpl的resourceOffers方法的主要作用就是为每个Task具体分配计算资源,输入的就是可用的资源,输出的是由TaskDescription组成的二维数组(Seq[Seq[TaskDescription]]),TaskDescription中保存了executorId,即每个Task具体运行在哪个ExecutorBackend上,下面我们就具体分析该方法的执行过程:
对可用的资源进行标记并记录hostname,同时判断是否有新的executor加入
即resourceOffers方法的如下部分:
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
这部分不是我们考虑的重点,所以大家只要知道他的作用就可以了。
将所有可用的计算资源随机打散
对应的源码:
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
通过Random.shuffle的方法将所有的计算资源重新进行“洗牌”,以追求最大化的负载均衡
根据每个ExecutorBackend的cores的个数声明类型为TaskDescription的ArrayBuffer数组
对应的源码:
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
此处只是声明,并没有加入TaskDescription
获得根据具体的调度策略排序后的TaskSets
对应的源码:
val sortedTaskSets = rootPool.getSortedTaskSetQueue
调度策略即SchedulableBuilder,具体的实现分为FIFOSchedulableBuilder和FairSchedulableBuilder,而默认使用的就是FIFO的调度策略。
如果有新的Executor加入,此时会调用TaskSet的executorAdded方法来获取最新的完整的可用计算资源
对应的源码:
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
确定最高优先级本地性
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
这里的LocalityLevel从高到低依次为:PROCESS_LOCAL、NODE_LOCAL、NO_PREF、RACK_LOCAL、ANY,其中NO_PREF是指机器的本地性,因为一台机器上有可能有很多node。
下面我们来看这个resourceOfferSingleTaskSet方法:
private def resourceOfferSingleTaskSet(
taskSet: TaskSetManager,
maxLocality: TaskLocality,
shuffledOffers: Seq[WorkerOffer],
availableCpus: Array[Int],
tasks: Seq[ArrayBuffer[TaskDescription]]) : Boolean = {
var launchedTask = false
for (i <- 0 until shuffledOffers.size) {
val execId = shuffledOffers(i).executorId
val host = shuffledOffers(i).host
if (availableCpus(i) >= CPUS_PER_TASK) {
try {
for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
tasks(i) += task
val tid = task.taskId
taskIdToTaskSetManager(tid) = taskSet
taskIdToExecutorId(tid) = execId
executorIdToTaskCount(execId) += 1
executorsByHost(host) += execId
availableCpus(i) -= CPUS_PER_TASK
assert(availableCpus(i) >= 0)
launchedTask = true
}
} catch {
case e: TaskNotSerializableException =>
logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
// Do not offer resources for this task, but don't throw an error to allow other
// task sets to be submitted.
return launchedTask
}
}
}
return launchedTask
}
首先循环遍历判断每个Executor上可用的cores的个数是否满足每个Task所需要的CPU的个数,默认CPUS_PER_TASK的个数为1;然后通过调用TaskSetManager的resourceOffer方法最终确定每个Task具体运行在哪个ExecutorBackend的具体的Locality Level:
@throws[TaskNotSerializableException]
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
{
if (!isZombie) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
}
}
dequeueTask(execId, host, allowedLocality) match {
case Some((index, taskLocality, speculative)) => {
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
}
// Serialize and return the task
// 将task序列化
val startTime = clock.getTimeMillis()
val serializedTask: ByteBuffer = try {
Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
}
// 判断序列化后的大小是否超过了TaskSetManager的限制,默认是100k
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
}
addRunningTask(taskId)
// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, partition ${task.partitionId}," +
s"$taskLocality, ${serializedTask.limit} bytes)")
// 向DAGScheduler汇报Task开始
sched.dagScheduler.taskStarted(task, info)
// 返回TaskDescription
return Some(new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,
taskName, index, serializedTask))
}
case _ =>
}
}
None
}
该方法传入的maxLocality就是上面TaskSchedulerImpl的resourceOffers方法中那个for循环中传入的maxLocality,该循环是按照上面提到的Locality Level的级别由高到低进行的,所以最优的计算本地性就是PROCESS_LOCAL,最后返回Some(TaskDescription)给resourceOfferSingleTaskSet方法中,然后的一系列操作是更新数据结构以及从可用的计算资源中减掉刚才Task使用的cores的个数(默认每个Task使用一个core),最后将分配好的Task(Seq[ArrayBuffer[TaskDescription]])返回给TaskSchedulerImpl的resourceOffers方法,而该方法又将最后的结果(Seq[Seq[TaskDescription]])返回给CoarseGrainedSchedulerBackend的makeOffers方法,最后执行launchTask:
private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))
}
这里需要补充一点:我们之前已经分析过DAGScheduler是从数据的层面,也就是RDD的层面考虑的preferredLocation,即DAGScheduler部分已经确定了Task要被发到哪个Executor上运行;而TaskScheduler是从具体计算Task的角度考虑计算的本地性,也就是说具体的计算是发生在内存中还是发生在本地磁盘等等(PROCESS_LOCAL、NODE_LOCAL…),由此也印证了DAGScheduler负责高层的调度任务,而TaskScheduler负责底层的调度任务。
接下来我们就进入launchTasks方法:
private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
// 注意此处是对TaskDescription进行序列化操作
val serializedTask = ser.serialize(task)
// 判断序列化的大小是否超过限制
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
// 获得该TaskDescription要发送到的那个Executor的信息
val executorData = executorDataMap(task.executorId)
// 减掉使用的计算资源,即cores
executorData.freeCores -= scheduler.CPUS_PER_TASK
// 将Task发送到具体的Executor上
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
这里akkaFrameSize的默认大小是128M(Spark 1.6.3版本,可以通过spark.akka.frameSize进行配置),而AkkaUtils.reservedSizeBytes的大小是200k,也就是说序列化后的大小不能超过128MB-200k,最后如果小于该限制就会将task发送到具体的ExecutorBackend上。
ExecutorBackend(Standalone模式下就是CoarseGrainedExecutorBackend)接收到该消息后就会调用具体的executor的launchTask方法去执行task:
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
val taskDesc = ser.deserialize[TaskDescription](data.value)
logInfo("Got assigned task " + taskDesc.taskId)
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
由此可见真正发送tasks的不是TaskSetManager,而是CoarseGrainedSchedulerBackend,TaskSetManager只是负责监控task的运行。
至此Tasks的提交过程执行完成,下一篇文章我们将继续分析Tasks的运行过程。
本文参照的是Spark 1.6.3版本的源码,同时给出Spark 2.1.0版本的连接:
本文为原创,欢迎转载,转载请注明出处、作者,谢谢!