Spark源码分析:TaskSetManager

任务集管理模块TaskSetManager详解

前面提到,dagscheduler负责将一组任务提交给taskscheduler以后,这组任务的调度任务对于他来说就算完成了。接下来这组任务内部的调度逻辑则是由tastsetmanager来完成的。

/**
 * Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
 * each task, retries tasks if they fail (up to a limited number of times), and
 * handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
 * to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
 * and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
 *
 * THREADING: This class is designed to only be called from code with a lock on the
 * TaskScheduler (e.g. its event handlers). It should not be called from other threads.
 *
 * @param sched           the TaskSchedulerImpl associated with the TaskSetManager
 * @param taskSet         the TaskSet to manage scheduling for
 * @param maxTaskFailures if any particular task fails this number of times, the entire
 *                        task set will be aborted
 */
private[spark] class TaskSetManager(
    sched: TaskSchedulerImpl,
    val taskSet: TaskSet,
    val maxTaskFailures: Int,
    blacklistTracker: Option[BlacklistTracker] = None,
    clock: Clock = new SystemClock()) extends Schedulable with Logging

该模块主要是任务集管理模块。

属性:

private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]

// Set of pending tasks for each host. Similar to pendingTasksForExecutor,
// but at host level.
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]

// Set of pending tasks for each rack -- similar to the above.
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]

// Set containing pending tasks with no locality preferences.
private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]

// Set containing all pending tasks (also used as a stack, as above).
private val allPendingTasks = new ArrayBuffer[Int]

// Tasks that can be speculated. Since these will be a small fraction of total
// tasks, we'll just hold them in a HashSet.
private[scheduler] val speculatableTasks = new HashSet[Int]

1.add tasks

// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {
  addPendingTask(i)
}
/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
  //拿到该任务的location
  for (loc <- tasks(index).preferredLocations) {
    loc match {
      //ExecutorCacheTaskLocation类型 直接添加到executor队列中
      case e: ExecutorCacheTaskLocation =>
        pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
      //HDFSCacheTaskLocation 先判断是否有可用的executors
      case e: HDFSCacheTaskLocation =>
        val exe = sched.getExecutorsAliveOnHost(loc.host)
       //如果有可用的。exe是一个数组。some(set)表示不为空。
        exe match {
          case Some(set) =>
            for (e <- set) {
              pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
            }
            logInfo(s"Pending task $index has a cached location at ${e.host} " +
              ", where there are executors " + set.mkString(","))
          //没有可用的。
          case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
              ", but there are no executors alive there.")
        }
      case _ =>
    }
    pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
    for (rack <- sched.getRackForHost(loc.host)) {
      pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
    }
  }

  if (tasks(index).preferredLocations == Nil) {
    pendingTasksWithNoPrefs += index
  }

  allPendingTasks += index  // No point scanning this whole list to find the old task there
}
2.核心接口:

resourceoffer:根据taskscheduler所提供的单个resource资源以及任务的host executor locality本地行的要求返回一个合适的任务。

/**
 * Respond to an offer of a single executor from the scheduler by finding a task
 *
 * NOTE: this function is either called with a maxLocality which
 * would be adjusted by delay scheduling algorithm or it will be with a special
 * NO_PREF locality which will be not modified
 *
 * @param execId the executor Id of the offered resource
 * @param host  the host Id of the offered resource
 * @param maxLocality the maximum locality we want to schedule the tasks at
 */
@throws[TaskNotSerializableException]
def resourceOffer(
    execId: String,
    host: String,
    maxLocality: TaskLocality.TaskLocality)
  : Option[TaskDescription] =
{
  val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
    blacklist.isNodeBlacklistedForTaskSet(host) ||
      blacklist.isExecutorBlacklistedForTaskSet(execId)
  }
  if (!isZombie && !offerBlacklisted) {
    val curTime = clock.getTimeMillis()

    var allowedLocality = maxLocality

    if (maxLocality != TaskLocality.NO_PREF) {
      allowedLocality = getAllowedLocalityLevel(curTime)
      if (allowedLocality > maxLocality) {
        // We're not allowed to search for farther-away tasks
        allowedLocality = maxLocality
      }
    }

    dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
      // Found a task; do some bookkeeping and return a task description
      val task = tasks(index)
      val taskId = sched.newTaskId()
      // Do various bookkeeping
      copiesRunning(index) += 1
      val attemptNum = taskAttempts(index).size
      val info = new TaskInfo(taskId, index, attemptNum, curTime,
        execId, host, taskLocality, speculative)
      taskInfos(taskId) = info
      taskAttempts(index) = info :: taskAttempts(index)
      // Update our locality level for delay scheduling
      // NO_PREF will not affect the variables related to delay scheduling
      if (maxLocality != TaskLocality.NO_PREF) {
        currentLocalityIndex = getLocalityIndex(taskLocality)
        lastLaunchTime = curTime
      }
      // Serialize and return the task
      val serializedTask: ByteBuffer = try {
        ser.serialize(task)
      } catch {
        // If the task cannot be serialized, then there's no point to re-attempt the task,
        // as it will always fail. So just abort the whole task-set.
        case NonFatal(e) =>
          val msg = s"Failed to serialize task $taskId, not attempting to retry it."
          logError(msg, e)
          abort(s"$msg Exception during serialization: $e")
          throw new TaskNotSerializableException(e)
      }
      if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
        !emittedTaskSizeWarning) {
        emittedTaskSizeWarning = true
        logWarning(s"Stage ${task.stageId} contains a task of very large size " +
          s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
          s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
      }
      addRunningTask(taskId)

      // We used to log the time it takes to serialize the task, but task size is already
      // a good proxy to task serialization time.
      // val timeTaken = clock.getTime() - startTime
      val taskName = s"task ${info.id} in stage ${taskSet.id}"
      logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
        s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")

      sched.dagScheduler.taskStarted(task, info)
      new TaskDescription(
        taskId,
        attemptNum,
        execId,
        taskName,
        index,
        sched.sc.addedFiles,
        sched.sc.addedJars,
        task.localProperties,
        serializedTask)
    }
  } else {
    None
  }
}

调度池和调度模型分析

前面已经介绍,dagscheduler负责构建具有依赖关系的任务集。 tasksetmanager负责在具体任务集的内部调度任务。而taskscheduler负责将资源提供给tasksetmanager供其作为调度任务的依据。可是sparkcontext可能同时存在多个可运行的任务集,这些任务集之间有时如何调度的,则是由调度池来决定的。调度池管理的对象是下一季的调度池或者tasksetmanagr对象。

1. Taskscheduler

/**
 * Low-level task scheduler interface, currently implemented exclusively by
 * [[org.apache.spark.scheduler.TaskSchedulerImpl]].
 * This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks
 * for a single SparkContext. These schedulers get sets of tasks submitted to them from the
 * DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
 * them, retrying if there are failures, and mitigating stragglers. They return events to the
 * DAGScheduler.
 */
private[spark] trait TaskScheduler {

  private val appId = "spark-application-" + System.currentTimeMillis

  def rootPool: Pool

  def schedulingMode: SchedulingMode

  def start(): Unit

  // Invoked after system has successfully initialized (typically in spark context).
  // Yarn uses this to bootstrap allocation of resources based on preferred locations,
  // wait for slave registrations, etc.
  def postStartHook() { }

  // Disconnect from the cluster.
  def stop(): Unit

  // Submit a sequence of tasks to run.
  def submitTasks(taskSet: TaskSet): Unit

  // Cancel a stage.
  def cancelTasks(stageId: Int, interruptThread: Boolean): Unit

  /**
   * Kills a task attempt.
   *
   * @return Whether the task was successfully killed.
   */
  def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean

  // Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
  def setDAGScheduler(dagScheduler: DAGScheduler): Unit

  // Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
  def defaultParallelism(): Int

  /**
   * Update metrics for in-progress tasks and let the master know that the BlockManager is still
   * alive. Return true if the driver knows about the given block manager. Otherwise, return false,
   * indicating that the block manager should re-register.
   */
  def executorHeartbeatReceived(
      execId: String,
      accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
      blockManagerId: BlockManagerId): Boolean

  /**
   * Get an application ID associated with the job.
   *
   * @return An application ID
   */
  def applicationId(): String = appId

  /**
   * Process a lost executor
   */
  def executorLost(executorId: String, reason: ExecutorLossReason): Unit

  /**
   * Get an application's attempt ID associated with the job.
   *
   * @return An application's Attempt ID
   */
  def applicationAttemptId(): Option[String]

}

2.TaskSchedulerImpl

初始化过程,会根据用户设定的schedulingmodel调度模式,创建一个rootpool根调度池。知乎再根据具体的调度模式创建schedulablebuilder对象。

val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)

override def setDAGScheduler(dagScheduler: DAGScheduler) {
  this.dagScheduler = dagScheduler
}

def initialize(backend: SchedulerBackend) {
  this.backend = backend
  schedulableBuilder = {
    schedulingMode match {
      case SchedulingMode.FIFO =>
        new FIFOSchedulableBuilder(rootPool)
      case SchedulingMode.FAIR =>
        new FairSchedulableBuilder(rootPool, conf)
      case _ =>
        throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
        s"$schedulingMode")
    }
  }
  schedulableBuilder.buildPools()
}

具体的schedulablebuilder的buildroots方法将会在rootpool的基础上完成整个调度池的构建任务。

根据上述代码知道:

对应两个类型的调度池。

FIFO先进先出,FIFO调度池直接管理的是tasksetmanager。每个tasksetmanaer创建时都存储了其对应的stageid,fifo调度池最终根据stageid的顺序来调度tasksetmanager。

FAIR:公平调度,公平调度池管理的事下一级的调度池。或者tasksetmanager。公平调度的基本原则是根据所管理的调度池活着manager中正在运行的任务的数量来判断优先级。当采用公调度。,所构建的调度池市两级结构。即根调度池管理一组子调度池。 子调度池管理术语该调度池的tasksetmanager。

任务调度器

上次说到DAGScheduler.submitMissingTasks中最终调用了taskScheduler.submitTasks来提交任务。

这篇我们就从taskScheduler.submitTasks开始讲,深入理解TaskScheduler的运行过程。

提交task

  • TaskSchedulerImpl.submitTasks
    • CoarseGrainedSchedulerBackend.reviveOffers
  • CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers
    • TaskSchedulerImpl.resourceOffers
      TaskSchedulerImpl.resourceOfferSingleTaskSet
    • CoarseGrainedSchedulerBackend.DriverEndpoint.launchTasks

submittasks:

override def submitTasks(taskSet: TaskSet) {
  val tasks = taskSet.tasks
  logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
  this.synchronized {
    //create manager 任务集管理器
    val manager = createTaskSetManager(taskSet, maxTaskFailures)
    val stage = taskSet.stageId
    val stageTaskSets =
      taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
    stageTaskSets(taskSet.stageAttemptId) = manager
    val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
      ts.taskSet != taskSet && !ts.isZombie
    }
    if (conflictingTaskSet) {
      throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
        s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
    }
    //添加到调度器
    schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)

    if (!isLocal && !hasReceivedTask) {
      starvationTimer.scheduleAtFixedRate(new TimerTask() {
        override def run() {
          if (!hasLaunchedTask) {
            logWarning("Initial job has not accepted any resources; " +
              "check your cluster UI to ensure that workers are registered " +
              "and have sufficient resources")
          } else {
            this.cancel()
          }
        }
      }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
    }
    hasReceivedTask = true
  }
  //分配资源
  backend.reviveOffers()
}

2.backend

var backend: SchedulerBackend = null

backend是一个SchedulerBackend接口。SchedulerBackend接口由CoarseGrainedSchedulerBackend类实现。

    原文作者:raincoffee
    原文地址: https://www.jianshu.com/p/ec7f3cc1bfd8
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞