提高spark任务稳定性1 - Blacklist 机制

场景

一个 spark 应用的产生过程: 获取需求 -> 编写spark代码 -> 测试通过 -> 扔上平台调度。

往往应用会正常运行一段时间,突然有一天运行失败,或是失败了一次才运行成功。

从开发者的角度看,我的代码没问题,测试也通过了,之前一段都运行好好的,怎么突然就失败了呢?为什么我重新调度又能正常运行了,是不是你们平台不稳定?

是什么导致了上述问题?

分布式集群中,特别是高负载的情况下,就会引发很多意想不到的问题,例如:

  1. 坏盘/硬盘满将会 导致 /path/to/usercache 目录创建失败,一个stage中任务失败次数达到一定次数(spark.task.maxFailures)会导致整个job失败。
  2. executor 注册 external shuffle service 超时。
  3. executor 从 external shuffle service 获取数据超时,task 反复失败后导致了整 个stage 的失败。
  4. 环境依赖问题,例如 xxx 包不存在, xxx 包没有安装。
  5. dns 没有配置,网络不通。
  6. etc.

为什么 task 失败后还会被 schedular 重新调度在原来的 node 或是 executor上?

数据本地性(spark会优先把task调度在有相应数据的节点上)导致。

是否只能听天由命,每次失败后重新调度? 如果任务有SLA的限制怎么办?

介绍

spark 2.1 中增加了 blacklist 机制,当前(2.3.0)还是试验性质的功能,黑名单机制允许你设置 task 在 executor / node 上失败次数的阈值, 从而避免了一路走到黑的情况出现。 :)

相关参数

配置默认值描述
spark.blacklist.enabledfalse是否开启黑名单机制
spark.blacklist.timeout1h对于被加入 application 黑名单的 executor/节点 ,多长时间后无条件的移出黑名单以运行新任务
spark.blacklist.task.maxTaskAttemptsPerExecutor1对于同一个 task 在某个 executor 中的失败重试阈值。达到阈值后,在执行这个 task 时,该 executor 将被加入黑名单
spark.blacklist.task.maxTaskAttemptsPerNode2对于同一个 task 在某个节点上的失败重试阈值。达到阈值后,在执行这个 task 时,该节点将被加入黑名单
spark.blacklist.stage.maxFailedTasksPerExecutor2一个 stage 中,不同的 task 在同一个 executor 的失败阈值。达到阈值后,在执行这个 stage 时该 executor 将会被加入黑名单
spark.blacklist.stage.maxFailedExecutorsPerNode2一个 stage 中,不同的 executor 加入黑名单的阈值。达到阈值后,在执行这个 stage 时该节点将会被加入黑名单
spark.blacklist.application.maxFailedTasksPerExecutor2在同一个 executor 中,不同的 task的失败阈值 。达到阈值后,在整个 appliction 运行期间,该 executor 都会被加入黑名单,加入时间超过 spark.blacklist.timeout 后,自动从黑名单中移除。值得注意的是,如果开启了 dynamic allocation,这些 executor 可能会由于空闲时间过长被回收。
spark.blacklist.application.maxFailedExecutorsPerNode2在一个节点中,不同 executor 加入 application 黑名单的阈值。达到这个阈值后,该节点会进入 application 黑名单,加入时间超过 spark.blacklist.timeout 后,自动从黑名单中移除。值得注意的是,如果开启了 dynamic allocation,该节点上的 executor 可能会由于空闲时间过长被回收。
spark.blacklist.killBlacklistedExecutorsfalse如果开启该配置,spark 会自动关闭并重启加入黑名单的 executor,如果整个节点都加入了黑名单,则该节点上的所有 executor 都会被关闭。
spark.blacklist.application.fetchFailure.enabledfalse如果开启该配置,当发生 fetch failure时,立即将该 executor 加入到黑名单。要是开启了 external shuffle service,整个节点都会被加入黑名单。

实现细节

因为是实验性质的功能,所以代码可能会随时变动。

只贴出部分核心代码。

TaskSetBlacklist

黑名单账本:

//k:executor v:该executor上每个 task 的失败情况(task失败的次数和最近一次失败时间)
val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()

//k:节点,v:该节点上有失败任务的 executor
private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]()
//k:节点, v:该节点上加入黑名单的 taskId
private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()
  
//加入黑名单的 executor 
private val blacklistedExecs = new HashSet[String]()
//加入黑名单的 node
private val blacklistedNodes = new HashSet[String]()
// 判断 executor 是否加入了给定 task 的黑名单
def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {
    execToFailures.get(executorId).exists { execFailures =>
      execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
    }
}

//判断 node 是否加入了给定 task 的黑名单
def isNodeBlacklistedForTask(node: String, index: Int): Boolean = {
    nodeToBlacklistedTaskIndexes.get(node).exists(_.contains(index))
}

当有task失败时,TaskSetManager 会调用更新黑名单的操作:

  1. 根据 taskid 更新 excutor 上该 task 的失败次数和失败时间
  2. 判断 task 是否在该节点其他 executor 上有失败记录,如果有,将重试次数相加,如果 >= MAX_TASK_ATTEMPTS_PER_NODE ,则将该 node 加入这个 taskId 的黑名单
  3. 判断在这个stage中,一个executor中失败的任务次数是否 >= MAX_FAILURES_PER_EXEC_STAGE,如果是,则将该 executor 加入这个 stageId 的黑名单
  4. 判断在这个stage中,同一个 node 的 executor 的失败记录是否 >= MAX_FAILED_EXEC_PER_NODE_STAGE,如果是,则将该 node 加入这个 stageId 的黑名单

阈值参数:

  • MAX_TASK_ATTEMPTS_PER_EXECUTOR:每个 executor 上最大的任务重试次数
  • MAX_TASK_ATTEMPTS_PER_NODE:每个 node 上最大的任务重试次数
  • MAX_FAILURES_PER_EXEC_STAGE:一个 stage 中,每个executor 上最多任务失败次数
  • MAX_FAILED_EXEC_PER_NODE_STAGE:一个 stage 中,每个节点上 executor 的最多失败次数
  private[scheduler] def updateBlacklistForFailedTask(
      host: String,
      exec: String,
      index: Int,
      failureReason: String): Unit = {
    latestFailureReason = failureReason
    val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
    execFailures.updateWithFailure(index, clock.getTimeMillis())

    val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
    execsWithFailuresOnNode += exec
    val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
      execToFailures.get(exec).map { failures =>
        failures.getNumTaskFailures(index)
      }
    }.sum
    if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) {
      nodeToBlacklistedTaskIndexes.getOrElseUpdate(host, new HashSet()) += index
    }

    if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
      if (blacklistedExecs.add(exec)) {
        logInfo(s"Blacklisting executor ${exec} for stage $stageId")
        val blacklistedExecutorsOnNode =
          execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
        if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) {
          if (blacklistedNodes.add(host)) {
            logInfo(s"Blacklisting ${host} for stage $stageId")
          }
        }
      }
    }
  }

BlacklistTracker

实现原理和TaskSetBlacklist,下文就不再贴出黑名单判断,黑名单对象等代码。

TaskSetBlacklist 不同的是,在一个 taskSet 完全成功之前,BlacklistTracker 无法获取到任务失败的情况。

核心代码:

当一个 taskSet 执行成功时会调用以下代码,流程如下:

  1. 将每个 executor 上的 task 失败次数进行累计,如果 executor 最后一次 task 失败的时间超过 BLACKLIST_TIMEOUT_MILLIS,则移除该失败任务。
  2. 如果 executor 上失败次数大于等于设定的阈值并且不在黑名单中
    • executor 及其对应的到期时间加入到 application 的黑名单中,从executor失败列表中移除该 executor,并更新 nextExpiryTime,用于下次启动任务的时候判断黑名单是否已到期
    • 根据 spark.blacklist.killBlacklistedExecutors 判断是否要杀死 executor
    • 更新 node 上的 executor 失败次数
    • 如果一个节点上的 executor 的失败次数大于等于阈值并且不在黑名单中
      • node 及其对应的到期时间加入到 application 的黑名单中
      • 如果开启了 spark.blacklist.killBlacklistedExecutors,则将此 node 上的所有 executor 杀死
  • BLACKLIST_TIMEOUT_MILLIS:加入黑名单后的过期时间
  • MAX_FAILURES_PER_EXEC:每个executor上最多的task失败次数
  • MAX_FAILED_EXEC_PER_NODE: 每个节点上加入黑名单的executor的最大数量
def updateBlacklistForSuccessfulTaskSet(
      stageId: Int,
      stageAttemptId: Int,
      failuresByExec: HashMap[String, ExecutorFailuresInTaskSet]): Unit = {
    val now = clock.getTimeMillis()
    failuresByExec.foreach { case (exec, failuresInTaskSet) =>
      val appFailuresOnExecutor =
        executorIdToFailureList.getOrElseUpdate(exec, new ExecutorFailureList)
      appFailuresOnExecutor.addFailures(stageId, stageAttemptId, failuresInTaskSet)
      appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now)
      val newTotal = appFailuresOnExecutor.numUniqueTaskFailures

      val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
      if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
        logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
          s" task failures in successful task sets")
        val node = failuresInTaskSet.node
        executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
        listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
        executorIdToFailureList.remove(exec)
        updateNextExpiryTime()
        killBlacklistedExecutor(exec)

        val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
        blacklistedExecsOnNode += exec
        if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
            !nodeIdToBlacklistExpiryTime.contains(node)) {
          logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
            s"executors blacklisted: ${blacklistedExecsOnNode}")
          nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
          listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
          _nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
          killExecutorsOnBlacklistedNode(node)
        }
      }
    }
  }

什么时候进行黑名单的判断

一个 stage 提交的调用链:

TaskSchedulerImpl.submitTasks ->
CoarseGrainedSchedulerBackend.reviveOffers ->
CoarseGrainedSchedulerBackend.makeOffers ->
TaskSchedulerImpl.resourceOffers ->
TaskSchedulerImpl.resourceOfferSingleTaskSet ->
CoarseGrainedSchedulerBackend.launchTasks

appliaction 级别的黑名单在 TaskSchedulerImpl.resourceOffers 中完成判断,stage/task 级别的黑名单在 TaskSchedulerImpl.resourceOfferSingleTaskSet 中完成判断。

如果所有的节点都被加入了黑名单?

如果将task的重试次数设置的比较高,有可能会出现这个问题,这个时候。将会中断这个 stage 的执行

TaskSchedulerImpl.resourceOffers

if (!launchedAnyTask) {
    taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}

结语

简单的来说,对于一个 application ,提供了三种级别的黑名单可以用于 executor/node: task blacklist -> stage blacklist -> application blacklist

通过这些黑名单的设置可以避免由于 task 反复调度在有问题的 executor/node (坏盘,磁盘满了,shuffle fetch 失败,环境错误等)上,进而导致整个 Application 运行失败的情况。

tip: BlacklistTracker.updateBlacklistForFetchFailure 在当前版本(2.3.0)存在BUG SPARK-24021,将在 2.3.1 进行修复。如果打开了 spark.blacklist.application.fetchFailure.enabled 配置将会受到影响。

    原文作者:breeze_lsw
    原文地址: https://www.jianshu.com/p/e665dfe1c811
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞