* Schedules the tasks within a single TaskSet in the TaskSchedulerImpl. This class keeps track of
* each task, retries tasks if they fail (up to a limited number of times), and
* handles locality-aware scheduling for this TaskSet via delay scheduling. The main interfaces
* to it are resourceOffer, which asks the TaskSet whether it wants to run a task on one node,
* and statusUpdate, which tells it that one of its tasks changed state (e.g. finished).
* THREADING: This class is designed to only be called from code with a lock on the
* TaskScheduler (e.g. its event handlers). It should not be called from other threads.
* @param sched the TaskSchedulerImpl associated with the TaskSetManager
* @param taskSet the TaskSet to manage scheduling for
* @param maxTaskFailures if any particular task fails this number of times, the entire
* task set will be aborted
private[spark] class TaskSetManager(
sched: TaskSchedulerImpl,
val taskSet: TaskSet,
val maxTaskFailures: Int,
blacklistTracker: Option[BlacklistTracker] = None,
clock: Clock = new SystemClock()) extends Schedulable with Logging
private val pendingTasksForExecutor = new HashMap[String, ArrayBuffer[Int]]
// Set of pending tasks for each host. Similar to pendingTasksForExecutor,
// but at host level.
private val pendingTasksForHost = new HashMap[String, ArrayBuffer[Int]]
// Set of pending tasks for each rack -- similar to the above.
private val pendingTasksForRack = new HashMap[String, ArrayBuffer[Int]]
// Set containing pending tasks with no locality preferences.
private[scheduler] var pendingTasksWithNoPrefs = new ArrayBuffer[Int]
// Set containing all pending tasks (also used as a stack, as above).
private val allPendingTasks = new ArrayBuffer[Int]
// Tasks that can be speculated. Since these will be a small fraction of total
// tasks, we'll just hold them in a HashSet.
private[scheduler] val speculatableTasks = new HashSet[Int]
1.add tasks
// Add all our tasks to the pending lists. We do this in reverse order
// of task index so that tasks with low indices get launched first.
for (i <- (0 until numTasks).reverse) {
/** Add a task to all the pending-task lists that it should be on. */
private def addPendingTask(index: Int) {
for (loc <- tasks(index).preferredLocations) {
loc match {
//ExecutorCacheTaskLocation类型 直接添加到executor队列中
case e: ExecutorCacheTaskLocation =>
pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
//HDFSCacheTaskLocation 先判断是否有可用的executors
case e: HDFSCacheTaskLocation =>
val exe = sched.getExecutorsAliveOnHost(loc.host)
exe match {
case Some(set) =>
for (e <- set) {
pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
logInfo(s"Pending task $index has a cached location at ${e.host} " +
", where there are executors " + set.mkString(","))
case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
", but there are no executors alive there.")
case _ =>
pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
for (rack <- sched.getRackForHost(loc.host)) {
pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
if (tasks(index).preferredLocations == Nil) {
pendingTasksWithNoPrefs += index
allPendingTasks += index // No point scanning this whole list to find the old task there
resourceoffer:根据taskscheduler所提供的单个resource资源以及任务的host executor locality本地行的要求返回一个合适的任务。
* Respond to an offer of a single executor from the scheduler by finding a task
* NOTE: this function is either called with a maxLocality which
* would be adjusted by delay scheduling algorithm or it will be with a special
* NO_PREF locality which will be not modified
* @param execId the executor Id of the offered resource
* @param host the host Id of the offered resource
* @param maxLocality the maximum locality we want to schedule the tasks at
def resourceOffer(
execId: String,
host: String,
maxLocality: TaskLocality.TaskLocality)
: Option[TaskDescription] =
val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
blacklist.isNodeBlacklistedForTaskSet(host) ||
if (!isZombie && !offerBlacklisted) {
val curTime = clock.getTimeMillis()
var allowedLocality = maxLocality
if (maxLocality != TaskLocality.NO_PREF) {
allowedLocality = getAllowedLocalityLevel(curTime)
if (allowedLocality > maxLocality) {
// We're not allowed to search for farther-away tasks
allowedLocality = maxLocality
dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
// Found a task; do some bookkeeping and return a task description
val task = tasks(index)
val taskId = sched.newTaskId()
// Do various bookkeeping
copiesRunning(index) += 1
val attemptNum = taskAttempts(index).size
val info = new TaskInfo(taskId, index, attemptNum, curTime,
execId, host, taskLocality, speculative)
taskInfos(taskId) = info
taskAttempts(index) = info :: taskAttempts(index)
// Update our locality level for delay scheduling
// NO_PREF will not affect the variables related to delay scheduling
if (maxLocality != TaskLocality.NO_PREF) {
currentLocalityIndex = getLocalityIndex(taskLocality)
lastLaunchTime = curTime
// Serialize and return the task
val serializedTask: ByteBuffer = try {
} catch {
// If the task cannot be serialized, then there's no point to re-attempt the task,
// as it will always fail. So just abort the whole task-set.
case NonFatal(e) =>
val msg = s"Failed to serialize task $taskId, not attempting to retry it."
logError(msg, e)
abort(s"$msg Exception during serialization: $e")
throw new TaskNotSerializableException(e)
if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
!emittedTaskSizeWarning) {
emittedTaskSizeWarning = true
logWarning(s"Stage ${task.stageId} contains a task of very large size " +
s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
// We used to log the time it takes to serialize the task, but task size is already
// a good proxy to task serialization time.
// val timeTaken = clock.getTime() - startTime
val taskName = s"task ${info.id} in stage ${taskSet.id}"
logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
sched.dagScheduler.taskStarted(task, info)
new TaskDescription(
} else {
前面已经介绍,dagscheduler负责构建具有依赖关系的任务集。 tasksetmanager负责在具体任务集的内部调度任务。而taskscheduler负责将资源提供给tasksetmanager供其作为调度任务的依据。可是sparkcontext可能同时存在多个可运行的任务集,这些任务集之间有时如何调度的,则是由调度池来决定的。调度池管理的对象是下一季的调度池或者tasksetmanagr对象。
1. Taskscheduler
* Low-level task scheduler interface, currently implemented exclusively by
* [[org.apache.spark.scheduler.TaskSchedulerImpl]].
* This interface allows plugging in different task schedulers. Each TaskScheduler schedules tasks
* for a single SparkContext. These schedulers get sets of tasks submitted to them from the
* DAGScheduler for each stage, and are responsible for sending the tasks to the cluster, running
* them, retrying if there are failures, and mitigating stragglers. They return events to the
* DAGScheduler.
private[spark] trait TaskScheduler {
private val appId = "spark-application-" + System.currentTimeMillis
def rootPool: Pool
def schedulingMode: SchedulingMode
def start(): Unit
// Invoked after system has successfully initialized (typically in spark context).
// Yarn uses this to bootstrap allocation of resources based on preferred locations,
// wait for slave registrations, etc.
def postStartHook() { }
// Disconnect from the cluster.
def stop(): Unit
// Submit a sequence of tasks to run.
def submitTasks(taskSet: TaskSet): Unit
// Cancel a stage.
def cancelTasks(stageId: Int, interruptThread: Boolean): Unit
* Kills a task attempt.
* @return Whether the task was successfully killed.
def killTaskAttempt(taskId: Long, interruptThread: Boolean, reason: String): Boolean
// Set the DAG scheduler for upcalls. This is guaranteed to be set before submitTasks is called.
def setDAGScheduler(dagScheduler: DAGScheduler): Unit
// Get the default level of parallelism to use in the cluster, as a hint for sizing jobs.
def defaultParallelism(): Int
* Update metrics for in-progress tasks and let the master know that the BlockManager is still
* alive. Return true if the driver knows about the given block manager. Otherwise, return false,
* indicating that the block manager should re-register.
def executorHeartbeatReceived(
execId: String,
accumUpdates: Array[(Long, Seq[AccumulatorV2[_, _]])],
blockManagerId: BlockManagerId): Boolean
* Get an application ID associated with the job.
* @return An application ID
def applicationId(): String = appId
* Process a lost executor
def executorLost(executorId: String, reason: ExecutorLossReason): Unit
* Get an application's attempt ID associated with the job.
* @return An application's Attempt ID
def applicationAttemptId(): Option[String]
val rootPool: Pool = new Pool("", schedulingMode, 0, 0)
// This is a var so that we can reset it for testing purposes.
private[spark] var taskResultGetter = new TaskResultGetter(sc.env, this)
override def setDAGScheduler(dagScheduler: DAGScheduler) {
this.dagScheduler = dagScheduler
def initialize(backend: SchedulerBackend) {
this.backend = backend
schedulableBuilder = {
schedulingMode match {
case SchedulingMode.FIFO =>
new FIFOSchedulableBuilder(rootPool)
case SchedulingMode.FAIR =>
new FairSchedulableBuilder(rootPool, conf)
case _ =>
throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
FAIR:公平调度,公平调度池管理的事下一级的调度池。或者tasksetmanager。公平调度的基本原则是根据所管理的调度池活着manager中正在运行的任务的数量来判断优先级。当采用公调度。,所构建的调度池市两级结构。即根调度池管理一组子调度池。 子调度池管理术语该调度池的tasksetmanager。
- TaskSchedulerImpl.submitTasks
- CoarseGrainedSchedulerBackend.reviveOffers
- CoarseGrainedSchedulerBackend.DriverEndpoint.makeOffers
- TaskSchedulerImpl.resourceOffers
TaskSchedulerImpl.resourceOfferSingleTaskSet - CoarseGrainedSchedulerBackend.DriverEndpoint.launchTasks
- TaskSchedulerImpl.resourceOffers
override def submitTasks(taskSet: TaskSet) {
val tasks = taskSet.tasks
logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
this.synchronized {
//create manager 任务集管理器
val manager = createTaskSetManager(taskSet, maxTaskFailures)
val stage = taskSet.stageId
val stageTaskSets =
taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
stageTaskSets(taskSet.stageAttemptId) = manager
val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
ts.taskSet != taskSet && !ts.isZombie
if (conflictingTaskSet) {
throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
if (!isLocal && !hasReceivedTask) {
starvationTimer.scheduleAtFixedRate(new TimerTask() {
override def run() {
if (!hasLaunchedTask) {
logWarning("Initial job has not accepted any resources; " +
"check your cluster UI to ensure that workers are registered " +
"and have sufficient resources")
} else {
hasReceivedTask = true
var backend: SchedulerBackend = null