场景
一个 spark 应用的产生过程: 获取需求 -> 编写spark代码 -> 测试通过 -> 扔上平台调度。
往往应用会正常运行一段时间,突然有一天运行失败,或是失败了一次才运行成功。
从开发者的角度看,我的代码没问题,测试也通过了,之前一段都运行好好的,怎么突然就失败了呢?为什么我重新调度又能正常运行了,是不是你们平台不稳定?
是什么导致了上述问题?
分布式集群中,特别是高负载的情况下,就会引发很多意想不到的问题,例如:
- 坏盘/硬盘满将会 导致
/path/to/usercache
目录创建失败,一个stage中任务失败次数达到一定次数(spark.task.maxFailures
)会导致整个job失败。 - executor 注册 external shuffle service 超时。
- executor 从 external shuffle service 获取数据超时,task 反复失败后导致了整 个stage 的失败。
- 环境依赖问题,例如 xxx 包不存在, xxx 包没有安装。
- dns 没有配置,网络不通。
- etc.
为什么 task
失败后还会被 schedular
重新调度在原来的 node
或是 executor
上?
数据本地性(spark会优先把task调度在有相应数据的节点上)导致。
是否只能听天由命,每次失败后重新调度? 如果任务有SLA的限制怎么办?
介绍
spark 2.1 中增加了 blacklist
机制,当前(2.3.0)还是试验性质的功能,黑名单机制允许你设置 task 在 executor / node 上失败次数的阈值, 从而避免了一路走到黑的情况出现。 :)
相关参数
配置 | 默认值 | 描述 |
---|---|---|
spark.blacklist.enabled | false | 是否开启黑名单机制 |
spark.blacklist.timeout | 1h | 对于被加入 application 黑名单的 executor/节点 ,多长时间后无条件的移出黑名单以运行新任务 |
spark.blacklist.task.maxTaskAttemptsPerExecutor | 1 | 对于同一个 task 在某个 executor 中的失败重试阈值。达到阈值后,在执行这个 task 时,该 executor 将被加入黑名单 |
spark.blacklist.task.maxTaskAttemptsPerNode | 2 | 对于同一个 task 在某个节点上的失败重试阈值。达到阈值后,在执行这个 task 时,该节点将被加入黑名单 |
spark.blacklist.stage.maxFailedTasksPerExecutor | 2 | 一个 stage 中,不同的 task 在同一个 executor 的失败阈值。达到阈值后,在执行这个 stage 时该 executor 将会被加入黑名单 |
spark.blacklist.stage.maxFailedExecutorsPerNode | 2 | 一个 stage 中,不同的 executor 加入黑名单的阈值。达到阈值后,在执行这个 stage 时该节点将会被加入黑名单 |
spark.blacklist.application.maxFailedTasksPerExecutor | 2 | 在同一个 executor 中,不同的 task的失败阈值 。达到阈值后,在整个 appliction 运行期间,该 executor 都会被加入黑名单,加入时间超过 spark.blacklist.timeout 后,自动从黑名单中移除。值得注意的是,如果开启了 dynamic allocation ,这些 executor 可能会由于空闲时间过长被回收。 |
spark.blacklist.application.maxFailedExecutorsPerNode | 2 | 在一个节点中,不同 executor 加入 application 黑名单的阈值。达到这个阈值后,该节点会进入 application 黑名单,加入时间超过 spark.blacklist.timeout 后,自动从黑名单中移除。值得注意的是,如果开启了 dynamic allocation ,该节点上的 executor 可能会由于空闲时间过长被回收。 |
spark.blacklist.killBlacklistedExecutors | false | 如果开启该配置,spark 会自动关闭并重启加入黑名单的 executor,如果整个节点都加入了黑名单,则该节点上的所有 executor 都会被关闭。 |
spark.blacklist.application.fetchFailure.enabled | false | 如果开启该配置,当发生 fetch failure 时,立即将该 executor 加入到黑名单。要是开启了 external shuffle service ,整个节点都会被加入黑名单。 |
实现细节
因为是实验性质的功能,所以代码可能会随时变动。
只贴出部分核心代码。
TaskSetBlacklist
黑名单账本:
//k:executor v:该executor上每个 task 的失败情况(task失败的次数和最近一次失败时间)
val execToFailures = new HashMap[String, ExecutorFailuresInTaskSet]()
//k:节点,v:该节点上有失败任务的 executor
private val nodeToExecsWithFailures = new HashMap[String, HashSet[String]]()
//k:节点, v:该节点上加入黑名单的 taskId
private val nodeToBlacklistedTaskIndexes = new HashMap[String, HashSet[Int]]()
//加入黑名单的 executor
private val blacklistedExecs = new HashSet[String]()
//加入黑名单的 node
private val blacklistedNodes = new HashSet[String]()
// 判断 executor 是否加入了给定 task 的黑名单
def isExecutorBlacklistedForTask(executorId: String, index: Int): Boolean = {
execToFailures.get(executorId).exists { execFailures =>
execFailures.getNumTaskFailures(index) >= MAX_TASK_ATTEMPTS_PER_EXECUTOR
}
}
//判断 node 是否加入了给定 task 的黑名单
def isNodeBlacklistedForTask(node: String, index: Int): Boolean = {
nodeToBlacklistedTaskIndexes.get(node).exists(_.contains(index))
}
当有task失败时,TaskSetManager 会调用更新黑名单的操作:
- 根据
taskid
更新excutor
上该task
的失败次数和失败时间 - 判断
task
是否在该节点其他executor
上有失败记录,如果有,将重试次数相加,如果 >=MAX_TASK_ATTEMPTS_PER_NODE
,则将该node
加入这个taskId
的黑名单 - 判断在这个stage中,一个executor中失败的任务次数是否 >=
MAX_FAILURES_PER_EXEC_STAGE
,如果是,则将该executor
加入这个stageId
的黑名单 - 判断在这个stage中,同一个 node 的 executor 的失败记录是否 >=
MAX_FAILED_EXEC_PER_NODE_STAGE
,如果是,则将该node
加入这个stageId
的黑名单
阈值参数:
- MAX_TASK_ATTEMPTS_PER_EXECUTOR:每个 executor 上最大的任务重试次数
- MAX_TASK_ATTEMPTS_PER_NODE:每个 node 上最大的任务重试次数
- MAX_FAILURES_PER_EXEC_STAGE:一个 stage 中,每个executor 上最多任务失败次数
- MAX_FAILED_EXEC_PER_NODE_STAGE:一个 stage 中,每个节点上 executor 的最多失败次数
private[scheduler] def updateBlacklistForFailedTask(
host: String,
exec: String,
index: Int,
failureReason: String): Unit = {
latestFailureReason = failureReason
val execFailures = execToFailures.getOrElseUpdate(exec, new ExecutorFailuresInTaskSet(host))
execFailures.updateWithFailure(index, clock.getTimeMillis())
val execsWithFailuresOnNode = nodeToExecsWithFailures.getOrElseUpdate(host, new HashSet())
execsWithFailuresOnNode += exec
val failuresOnHost = execsWithFailuresOnNode.toIterator.flatMap { exec =>
execToFailures.get(exec).map { failures =>
failures.getNumTaskFailures(index)
}
}.sum
if (failuresOnHost >= MAX_TASK_ATTEMPTS_PER_NODE) {
nodeToBlacklistedTaskIndexes.getOrElseUpdate(host, new HashSet()) += index
}
if (execFailures.numUniqueTasksWithFailures >= MAX_FAILURES_PER_EXEC_STAGE) {
if (blacklistedExecs.add(exec)) {
logInfo(s"Blacklisting executor ${exec} for stage $stageId")
val blacklistedExecutorsOnNode =
execsWithFailuresOnNode.filter(blacklistedExecs.contains(_))
if (blacklistedExecutorsOnNode.size >= MAX_FAILED_EXEC_PER_NODE_STAGE) {
if (blacklistedNodes.add(host)) {
logInfo(s"Blacklisting ${host} for stage $stageId")
}
}
}
}
}
BlacklistTracker
实现原理和TaskSetBlacklist
,下文就不再贴出黑名单判断,黑名单对象等代码。
与 TaskSetBlacklist
不同的是,在一个 taskSet 完全成功之前,BlacklistTracker
无法获取到任务失败的情况。
核心代码:
当一个 taskSet 执行成功时会调用以下代码,流程如下:
- 将每个 executor 上的 task 失败次数进行累计,如果 executor 最后一次 task 失败的时间超过
BLACKLIST_TIMEOUT_MILLIS
,则移除该失败任务。 - 如果 executor 上失败次数大于等于设定的阈值并且不在黑名单中
- 将
executor
及其对应的到期时间加入到application
的黑名单中,从executor失败列表中移除该 executor,并更新nextExpiryTime
,用于下次启动任务的时候判断黑名单是否已到期 - 根据
spark.blacklist.killBlacklistedExecutors
判断是否要杀死executor
- 更新
node
上的 executor 失败次数 - 如果一个节点上的
executor
的失败次数大于等于阈值并且不在黑名单中- 将
node
及其对应的到期时间加入到application
的黑名单中 - 如果开启了
spark.blacklist.killBlacklistedExecutors
,则将此node
上的所有 executor 杀死
- 将
- 将
- BLACKLIST_TIMEOUT_MILLIS:加入黑名单后的过期时间
- MAX_FAILURES_PER_EXEC:每个executor上最多的task失败次数
- MAX_FAILED_EXEC_PER_NODE: 每个节点上加入黑名单的executor的最大数量
def updateBlacklistForSuccessfulTaskSet(
stageId: Int,
stageAttemptId: Int,
failuresByExec: HashMap[String, ExecutorFailuresInTaskSet]): Unit = {
val now = clock.getTimeMillis()
failuresByExec.foreach { case (exec, failuresInTaskSet) =>
val appFailuresOnExecutor =
executorIdToFailureList.getOrElseUpdate(exec, new ExecutorFailureList)
appFailuresOnExecutor.addFailures(stageId, stageAttemptId, failuresInTaskSet)
appFailuresOnExecutor.dropFailuresWithTimeoutBefore(now)
val newTotal = appFailuresOnExecutor.numUniqueTaskFailures
val expiryTimeForNewBlacklists = now + BLACKLIST_TIMEOUT_MILLIS
if (newTotal >= MAX_FAILURES_PER_EXEC && !executorIdToBlacklistStatus.contains(exec)) {
logInfo(s"Blacklisting executor id: $exec because it has $newTotal" +
s" task failures in successful task sets")
val node = failuresInTaskSet.node
executorIdToBlacklistStatus.put(exec, BlacklistedExecutor(node, expiryTimeForNewBlacklists))
listenerBus.post(SparkListenerExecutorBlacklisted(now, exec, newTotal))
executorIdToFailureList.remove(exec)
updateNextExpiryTime()
killBlacklistedExecutor(exec)
val blacklistedExecsOnNode = nodeToBlacklistedExecs.getOrElseUpdate(node, HashSet[String]())
blacklistedExecsOnNode += exec
if (blacklistedExecsOnNode.size >= MAX_FAILED_EXEC_PER_NODE &&
!nodeIdToBlacklistExpiryTime.contains(node)) {
logInfo(s"Blacklisting node $node because it has ${blacklistedExecsOnNode.size} " +
s"executors blacklisted: ${blacklistedExecsOnNode}")
nodeIdToBlacklistExpiryTime.put(node, expiryTimeForNewBlacklists)
listenerBus.post(SparkListenerNodeBlacklisted(now, node, blacklistedExecsOnNode.size))
_nodeBlacklist.set(nodeIdToBlacklistExpiryTime.keySet.toSet)
killExecutorsOnBlacklistedNode(node)
}
}
}
}
什么时候进行黑名单的判断
一个 stage 提交的调用链:
TaskSchedulerImpl.submitTasks
->
CoarseGrainedSchedulerBackend.reviveOffers
->
CoarseGrainedSchedulerBackend.makeOffers
->
TaskSchedulerImpl.resourceOffers
->
TaskSchedulerImpl.resourceOfferSingleTaskSet
->
CoarseGrainedSchedulerBackend.launchTasks
appliaction 级别的黑名单在 TaskSchedulerImpl.resourceOffers
中完成判断,stage/task 级别的黑名单在 TaskSchedulerImpl.resourceOfferSingleTaskSet
中完成判断。
如果所有的节点都被加入了黑名单?
如果将task的重试次数设置的比较高,有可能会出现这个问题,这个时候。将会中断这个 stage 的执行
TaskSchedulerImpl.resourceOffers
if (!launchedAnyTask) {
taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
}
结语
简单的来说,对于一个 application ,提供了三种级别的黑名单可以用于 executor/node: task blacklist -> stage blacklist -> application blacklist
通过这些黑名单的设置可以避免由于 task 反复调度在有问题的 executor/node
(坏盘,磁盘满了,shuffle fetch 失败,环境错误等)上,进而导致整个 Application
运行失败的情况。
tip: BlacklistTracker.updateBlacklistForFetchFailure
在当前版本(2.3.0)存在BUG SPARK-24021,将在 2.3.1 进行修复。如果打开了 spark.blacklist.application.fetchFailure.enabled
配置将会受到影响。