# Task提交流程
在划分Stage之后,在对Task进行封装成为TaskSet然后提交给TaskScheduler。
Spark带注释源码
对于整个Spark源码分析系列,我将带有注释的Spark源码和分析的文件放在我的GitHub上Spark源码剖析
提交流程源码解析
提交TaskSet
查看TaskSchedulerImpl的160行,可以看到submitTasks()方法,主要代码如下:
//TODO 该方法提交TaskSet
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
val manager = createTaskSetManager(taskSet, maxTaskFailures)
activeTaskSets(taskSet.id) = manager
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask = true
}
//TODO CoarseGrainedSchedulerBackend类的reviveOffers()
backend.reviveOffers()
这里主要的的方法是CoarseGrainedSchedulerBackend类的reviveOffers()。
CoarseGrainedSchedulerBackend的reviveOffers()
这里主要是向向DriverActor发送消息
//TODO 向DriverActor发送消息
override def reviveOffers() {
driverActor ! ReviveOffers
}
CoarseGrainedSchedulerBackend中DriverActor的receiveWithLogging()
DriverActor类中的receiveWithLogging()进行模式匹配
//TODO 进行模式匹配,调用makeOffers()向Executor提交Task
case ReviveOffers =>
makeOffers()
makeOffers()方法向Executor提交Task
Executor运行Task
makeOffers()方法的主要代码如下:
//TODO 调用launchTasks向Executor提交Task
def makeOffers() {
launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq))
}
这里调用launchTasks(),代码主要的流程是:
//TODO
def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
//TODO Task是一个一个发送给Executor的
for (task <- tasks.flatten) {
//TODO 首先拿到序列化器
val ser = SparkEnv.get.closureSerializer.newInstance()
//TODO 将Task序列化用来进行网络传输
val serializedTask = ser.serialize(task)
//TODO 进行大小判断
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSet.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
//TODO 这是一个HashMap
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
//TODO 向Executor发送序列化好的Task
executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
}
}
}
这里做的工作主要是迭代TaskSet然后一个一个的取出Task进行序列化之后向Executor发送序列化好的Task。
Executor执行Task
CoarseGrainedExecutorBackend的模式匹配,主要是DriverActor发送数据给Executor的信息
//TODO Driver 发送数据给Executor的信息
case LaunchTask(data) =>
if (executor == null) {
logError("Received LaunchTask command but executor was null")
System.exit(1)
} else {
//TODO 拿到序列化器
val ser = env.closureSerializer.newInstance()
//TODO 将Task反序列化
val taskDesc = ser.deserializeTaskDescription logInfo("Got assigned task " + taskDesc.taskId)
//TODO 将反序列化的Task放入线程池执行
executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
taskDesc.name, taskDesc.serializedTask)
}
这里做的工作是拿到序列化器,将Task反序列化,将反序列化的Task放入线程池执行。
下面是Executor的launchTask()方法,主要的逻辑是将创建一个TaskRunner对象将Task的信息封装信息然后使用线程池执行。
//TODO Executor执行Task
def launchTask(
context: ExecutorBackend,
taskId: Long,
attemptNumber: Int,
taskName: String,
serializedTask: ByteBuffer) {
//TODO 创建一个TaskRunner对象将Task的信息封装信息
val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
serializedTask)
//TODO runningTasks是一个ConcurrentHashMap保证线程安全
runningTasks.put(taskId, tr)
//TODO 使用线程池执行
threadPool.execute(tr)
}
总结
1.提交Task主要是迭代TaskSet一个一个的取出Task进行序列化之后向Executor发送序列化好的Task
2.Executor执行Task,创建一个TaskRunner对象将Task的信息封装信息然后使用线程池执行