Spark源码剖析(五):Task提交流程

# Task提交流程
在划分Stage之后,在对Task进行封装成为TaskSet然后提交给TaskScheduler。

Spark带注释源码

对于整个Spark源码分析系列,我将带有注释的Spark源码和分析的文件放在我的GitHub上Spark源码剖析

提交流程源码解析

提交TaskSet

查看TaskSchedulerImpl的160行,可以看到submitTasks()方法,主要代码如下:

//TODO 该方法提交TaskSet
  override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    this.synchronized {
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      activeTaskSets(taskSet.id) = manager
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  if (!isLocal && !hasReceivedTask) {
    starvationTimer.scheduleAtFixedRate(new TimerTask() {
      override def run() {
        if (!hasLaunchedTask) {
          logWarning("Initial job has not accepted any resources; " +
            "check your cluster UI to ensure that workers are registered " +
            "and have sufficient resources")
        } else {
          this.cancel()
        }
      }
    }, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
  }
  hasReceivedTask = true
}
//TODO  CoarseGrainedSchedulerBackend类的reviveOffers()
backend.reviveOffers()

这里主要的的方法是CoarseGrainedSchedulerBackend类的reviveOffers()。

CoarseGrainedSchedulerBackend的reviveOffers()

这里主要是向向DriverActor发送消息
//TODO 向DriverActor发送消息
  override def reviveOffers() {
    driverActor ! ReviveOffers
  }

CoarseGrainedSchedulerBackend中DriverActor的receiveWithLogging()

DriverActor类中的receiveWithLogging()进行模式匹配

//TODO 进行模式匹配,调用makeOffers()向Executor提交Task
case ReviveOffers =>
     makeOffers()

makeOffers()方法向Executor提交Task

Executor运行Task

makeOffers()方法的主要代码如下:

//TODO 调用launchTasks向Executor提交Task
    def makeOffers() {
      launchTasks(scheduler.resourceOffers(executorDataMap.map { case (id, executorData) =>
        new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
      }.toSeq))
    }

这里调用launchTasks(),代码主要的流程是:

 //TODO
    def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      //TODO Task是一个一个发送给Executor的
      for (task <- tasks.flatten) {
        //TODO 首先拿到序列化器
        val ser = SparkEnv.get.closureSerializer.newInstance()
        //TODO 将Task序列化用来进行网络传输
        val serializedTask = ser.serialize(task)
        //TODO  进行大小判断
        if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
          val taskSetId = scheduler.taskIdToTaskSetId(task.taskId)
          scheduler.activeTaskSets.get(taskSetId).foreach { taskSet =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
                "spark.akka.frameSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
                AkkaUtils.reservedSizeBytes)
              taskSet.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          //TODO 这是一个HashMap
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK
          //TODO 向Executor发送序列化好的Task
          executorData.executorActor ! LaunchTask(new SerializableBuffer(serializedTask))
        }
      }
    }

这里做的工作主要是迭代TaskSet然后一个一个的取出Task进行序列化之后向Executor发送序列化好的Task

Executor执行Task

CoarseGrainedExecutorBackend的模式匹配,主要是DriverActor发送数据给Executor的信息

 //TODO Driver 发送数据给Executor的信息
    case LaunchTask(data) =>
      if (executor == null) {
        logError("Received LaunchTask command but executor was null")
        System.exit(1)
      } else {
        //TODO 拿到序列化器
        val ser = env.closureSerializer.newInstance()
        //TODO 将Task反序列化
        val taskDesc = ser.deserializeTaskDescription        logInfo("Got assigned task " + taskDesc.taskId)
        //TODO 将反序列化的Task放入线程池执行
        executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
          taskDesc.name, taskDesc.serializedTask)
      }

这里做的工作是拿到序列化器,将Task反序列化,将反序列化的Task放入线程池执行

下面是Executor的launchTask()方法,主要的逻辑是将创建一个TaskRunner对象将Task的信息封装信息然后使用线程池执行

//TODO Executor执行Task
  def launchTask(
      context: ExecutorBackend,
      taskId: Long,
      attemptNumber: Int,
      taskName: String,
      serializedTask: ByteBuffer) {
    //TODO 创建一个TaskRunner对象将Task的信息封装信息
    val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
      serializedTask)
    //TODO runningTasks是一个ConcurrentHashMap保证线程安全
    runningTasks.put(taskId, tr)
    //TODO 使用线程池执行
    threadPool.execute(tr)
  }

总结

1.提交Task主要是迭代TaskSet一个一个的取出Task进行序列化之后向Executor发送序列化好的Task

2.Executor执行Task,创建一个TaskRunner对象将Task的信息封装信息然后使用线程池执行

    原文作者:Oeljeklaus
    原文地址: https://zhuanlan.zhihu.com/p/39167207
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞