Spark系列——作业原理详解

前言

本篇文章主要是从作业提交到最后获取到作业结果,从源码的角度,但是不涉及源码进行的分析.其目的是读完本篇文章,你将对作业的基本流程有个清晰的认识。

1.任务提交过程

  • 首先,我们知道,一个action算子是触发一个job生成的地方,当遇见action算子,会执行sparkcontext的runjob方法,最后会交给dagSchedule的submitjob,这里会创建一个jobwaiter对象,并发送一个JobSubmitted消息进行作业任务的执行,同时 waiter.awaitResult()会等待作业执行结果的返回:成功或者失败。到这里,我们对于作业应该有个基本的认识了,那么接下来我们再来深入一点,这个作业submit之后发生了什么呢?

2.划 分 调 度 阶 段

  • spark是资源调度是粗粒度的,我们这里不讨论资源申请,当我们提交一个任务之后(此时资源应该都是在集群中申请好了),Spark首先会对我们的作业任务划分调度阶段,而这个调度阶段的划分是由 DAGScheduler 负责的,其调度是基于stage的,那么下面我们看看stage是怎么划分的。
  • 一个application中的rdd集合相互依赖形成了一个依赖树,DAGScheduler 通过其 getParentStages 方法会从最后一个finalrdd开始,判断依赖树中是否有shuffle,如果没有,就生成一个stage,如果有,调用getAncestorShuffleDepend,使用广度优先遍历整个依赖树,当遇到shuffle dependencies的时候,就会通过newOrUsedShuffleStag生成一个个stage,并划分为两个调度阶段,这样一个job也就被划分成了一个或者多个stage了。
  • 到这里我们的作业已经被划分成了一个个stage了,接下来就看看stage是怎么被提交的吧。。。

3.提 交 调 度 阶 段

  • 前面我们提到了JobSubmitted消息,那么这个消息实际上会触发 DAGScheduler 的 handleJobSubmitted 方法,首先该方法会在生成 finalStage 的同时建立起所有调度阶段的依赖关系(至于怎么建立的,我们后面慢慢深入),然后通过 fmalStage 生成一个作业实例ActiveJob,然后在submitStage(finalStage)开始提交作业。
  • 在作业提交调度阶段开始时,在 submitStage 方法中调用 getMissingParentStages 方法获取finalStage 父调度阶段,如果不存在父调度阶段,则使用 submitMissingTasks(stage) 方法提交执行;如果存在父调度阶段,则把该调度阶段存放到 waitingStages 列表中,同时递归调用 submitStage,直到找到没有父stage的stage调用 submitMissingTasks(stage)作业一次调度的入口,这样一次调度任务就发送到Excutor开始执行了。
  • 当Excutor的task执行完成时发通知消息CompleteEvent,会调用到DAGschedule的handleTaskCompletion更新状态,并且判断该task所属的stage是否所有任务都已经完成,如果完成,则扫描等待运行调度阶段列表,检查它们的父调度阶段是否存在未完成,如果不存在则表明该调度阶段准备就绪,生成实例并提交运行。(至于其中失败重试的机制不做讨论)
  • 到此,stage提交的基本情况我们已经了解,但是对于一个了解spark的人来说,我们熟悉的task还没有出现,接下来,我们就来看看stage的task的执行流程吧。

4.提 交 任 务

  • 前面我们说到提交 stage 的方法 submitStage ,该方法内部会调用到 DAGScheduler 的 submitMissingTasks 方法对每个stage 的 task 进行提交,其task生成规则如下:首先根据每个 stage 最后一个rdd的 Partition 个数拆分对应个数的 task ,这些 task 组成一个任务集 taskset 提交到 TaskScheduler 进行处理。对于 ResultStage (作业中最后的stage)生 成 ResultTask , 对 于 ShuffleMapStage 生成 ShuffleMapTask 。
  • 当 TaskScheduler 收到发送过来的任务集时,在 submitTasks 方法中(在 TaskSchedulerlmpl类中进行实现)构建一个 TaskSetManager 的实例,用于管理这个任务集的生命周期,并通过schedulableBuilder的addTaskSetManager放入系统的调度池中。然后调用 SchedulerBackend的 reviveOffers ,向 DriverEndPoint 终端点发送ReviveOffers消息,调用SchedulerBackend的makeOffers 方法,首先会获取集群中可用的 Executor ,并通过TaskSchedulerlmpl的resourceOffers 按照就近原则对进行资源的分配,并划分 PROCESS _ LOCAL、 NODE LOCAL、 NO PREF 、 RACK_LOCAL和 ANY 五个等级。然后进行launchtask 操作,把分配好资源的 task 一个个发送到 Worker 节点上的 CoarseGrainedExecutorBackend ,然后通过其内部的Executor 来执行任务。
  • 至此,我们的task算是正式提交到excutor准备执行了。

5.执 行 任 务

  • 当 CoarseGrainedExecutorBackend(excutor的守护进程) 接收到 LaunchTask 消息时,会调用 Executor 的 launchTask方法进行处理。在 Executor 的 launchTask 方法中,初始化一个 TaskRunner 来封装任务,它用于管理任务运行时的细节,再把 TaskRumier 对象放入到 ThreadPool (线程池)中去执行。在 TaskRunner 的 run 方法里,首先会对发送过来的 Task 本身以及它所依赖的 Jar 等文件的反序列,然后对反序列化的任务调用 Task 的 runTask 方法。由于 Task 本身是一个抽象类,具体的 runTask 方法是由它的两个子类 ShuffleMapTask 和 RedultTask 来实现的。
  • 对 于 ShuffleMapTask 而言,它的计算结果会写到 BlockManager 之中,最终返回给DAGScheduler 的是一个 MapStatus 对象。该对象中管理了 ShuffleMapTask 的运算结果存储到BlockManager 里的相关存储信息,而不是计算结果本身,这些存储信息将会成为下一阶段的任务需要获得的输入数据时的依据。
  • 对于 ResultTask 的 runTask 方法而言,它最终返回的是 func 函数的计算结果,这里猜测应该是rdd action算子的结果了。
  • 至此,task计算结束,下面我们看看计算的结果是怎么处理的。

6.获 取 执 行 结 果

  • 首先对于 Executor 的计算结果,会根据结果的大小有不同的策略。
    (1) 生成结果大小大于1GB结果直接丢弃,该配置项可以通过 spark . driver.maxResultSize进行设置。
    (2) 生成结果大小在[128 MB -200 KB,1 GB] : 如果生成的结果大于等于(128 MB -200 KB )时,会把该结果以taskld 为编号存入到 BlockManager 中,然后把该编号通过 Netty 发送给 Driver终端点,该阈值是 Netty 框架传输的最大值 spark . akka . frameSize (默认为128 MB )和 Netty 的预留空间 reservedSizeBytes (200 KB ) 差值。
    (3) 生成结果大小在(0 , 128 MB -200 KB):通过 Netty 直接发送到 Driver 终端点。
  • 任务执行完毕后, ExecutorBackend 会将任务的执行结果发送给 DriverEndPoint 终端点。该终端点会转给 TaskSchedulerlmpl 的 statusUpdate 方法进行处理,并在该方法中获取结果 result ,对于不同的任务状态有不同的处理。
    (1) 如果类型是 TaskState . FINISHED ,那么调用 TaskResultGetter 的 enqueueSuccessfulTask方法进行处理。 enqueueSuccessfulTask 方法的逻辑比较简单,如果是 IndirectTaskResult ,那么需要通过 sparkEnv . blockManager . getRemoteBytes ( blockld )来获取结果: ; 如果是DirectTaskResult ,那么结果就无需远程获取了。
    ( 2 ) 如果类型是 TaskState.FAILED 或者 TaskState.KILLED 或者 TaskState.LOST ,调用TaskResultGetter 的 enqueueFailedTask 进行处理。对 于 TaskState.LOST,还需要将其所在的Executor 标记为 failed ,并且根据更新后的 Executor 重新调度。
  • 然后将获取的结果通过TaskSchedulerlmpl 的 handleSuccessfulTask进行处理,最后发送一个completionevnet消息最终调用DAGScheduler 的 handleTaskCompletion 方法。
    (1) 对于shufflemaptask任务的结果 其实质是一个MapStatus,将其 注册到MapOutputTrackerMaster 中,下游的 stage 需要数据由其MapOutputTrackerWorker向MapOutputTrackerMaster 查找,到此也就完成ShuffleMapTask 的处理。
    (2) 如果任务是 ResultTask , 判断该作业是否完成,如果完成,则标记该作业已经完成,清除作业依赖的资源并发送消息给系统监听总线告知作业执行完毕。

总结

当我们提交一个job,首先会被 DAGScheduler 通过宽窄依赖解析成一个个 stage,然后按顺序以 taskset 的形式提交 stage 给 TaskScheduler ,TaskScheduler 将 taskset 构建成 TaskSetManager 对象管理,并按照调度系统给定的策略向 Executor 提交任务,Executor 将接受的到 task 以 taskrunner 的方式执行计算出结果,并储存到 BlockManager ,然后向 TaskScheduler 返回一个记录了结果信息的MapStatus对象,并注册到 driver 端的 MapOutputTrackerMaster,然后进行下一轮的 stage 调度 (如果是ResultTask执行结果,那么数据是我们算子决定了他最后会落地在哪的)

    原文作者:code_solve
    原文地址: https://www.jianshu.com/p/77edbf43f903
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞