spark底层源码解析之作业提交:

作业的提交做的主要的事情是:通过提交的最后一个rdd的依赖关系来划分stage,在再将stage转换成task,由diver端发送给一个个的将task发送到Mster端,最后提交到到CoarseGrainedExecutorBackend里面让executor执行.接下来就从源码角度整个的分析一遍流程

下面的每一个开头都表示在那个类里面执行的方法,要是没有进行标注表示这个方法所在的类和上面的方法所在同一个类.同时代码里面也会给出很多的注释,记得看得仔细点哦

SparkContext:RDD内部隐式触发SparkContext的runJob()方法 将RDD,操作rdd的函数以及分区数传过来.

  def runJob[T, U: ClassTag](
      rdd: RDD[T],
      func: Iterator[T] => U,
      partitions: Seq[Int]): Array[U] = {
    val cleanedFunc = clean(func)
    runJob(rdd, (ctx: TaskContext, it: Iterator[T]) => cleanedFunc(it), partitions)
  }

DAGScheduler:接着调用DAGScheduler的runJob方法,
在这里会执行submitJob方法,这个方法会发生线程等待,直到返回作业执行的结果.

def runJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): Unit = {
    val start = System.nanoTime
    //在这里会发生线程等待,直到返回结果
    val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
    ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
    waiter.completionFuture.value.get match {
      case scala.util.Success(_) =>
        logInfo("Job %d finished: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
      case scala.util.Failure(exception) =>
        logInfo("Job %d failed: %s, took %f s".format
          (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
        // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
        val callerStackTrace = Thread.currentThread().getStackTrace.tail
        exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
        throw exception
    }
  }

的在submitJob方法,首先会检查分区数,接着调用DAGScheduler的一个内部类DAGSchedulerEventProcessLoop;接着在它里面的消息接收方法OnReceive方法对JobSubmitted进行模式匹配,再次回到DAGScheduler中的handleJobSubmitted方法

def submitJob[T, U](
      rdd: RDD[T],
      func: (TaskContext, Iterator[T]) => U,
      partitions: Seq[Int],
      callSite: CallSite,
      resultHandler: (Int, U) => Unit,
      properties: Properties): JobWaiter[U] = {
    /**
      *     Check to make sure we are not launching a task on a partition that does not exist.
      */

    val maxPartitions = rdd.partitions.length
    partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
      throw new IllegalArgumentException(
        "Attempting to access a non-existent partition: " + p + ". " +
          "Total number of partitions: " + maxPartitions)
    }

    val jobId = nextJobId.getAndIncrement()
    if (partitions.size == 0) {
      // Return immediately if the job is running 0 tasks
      return new JobWaiter[U](this, jobId, 0, resultHandler)
    }

    assert(partitions.size > 0)
    val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
    val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
    //执行
    eventProcessLoop.post(JobSubmitted(
      jobId, rdd, func2, partitions.toArray, callSite, waiter,
      SerializationUtils.clone(properties)))
    waiter
  }

handleJobSubmitted这个方法真的太重要,从这个方法开始我们就开始要进行最重要的一步,stage的划分了.stage的划分是以createResultStage这个方法为入口,这个方法的返回值是一个job的最后一个stage.接下来就让我进到这个方法里面来探究stage到底是怎么进行划分的吧.

  private[scheduler] def handleJobSubmitted(jobId: Int,
      finalRDD: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      callSite: CallSite,
      listener: JobListener,
      properties: Properties) {
    var finalStage: ResultStage = null
    try {
      // New stage creation may throw an exception if, for example, jobs are run on a
      // HadoopRDD whose underlying HDFS files have been deleted.

      /**
        * 这里其实是获取到最后一个stage   通过我们前面传过来的最后一个rdd来获取的
        */
      finalStage = createResultStage(finalRDD, func, partitions, jobId, callSite)
    } catch {
      case e: Exception =>
        logWarning("Creating new stage failed due to exception - job: " + jobId, e)
        listener.jobFailed(e)
        return
    }
    /**
      *  根据stage创建job,一个finalStage创建一个job,所以一个action算子对应一个job
      */
    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
    clearCacheLocs()
    /**
      *  打印日志
      */
    logInfo("Got job %s (%s) with %d output partitions".format(
      job.jobId, callSite.shortForm, partitions.length))
    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")
    logInfo("Parents of final stage: " + finalStage.parents)
    logInfo("Missing parents: " + getMissingParentStages(finalStage))

    //创建job的提交时间
    val jobSubmissionTime = clock.getTimeMillis()

    //赋值
    jobIdToActiveJob(jobId) = job

    //把这个job保存下来
    activeJobs += job
    finalStage.setActiveJob(job)
    //获取到整个划分的所有stage的集合
    val stageIds = jobIdToStageIds(jobId).toArray
    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
    listenerBus.post(
      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
    //提交最后一个stage
    submitStage(finalStage)
  }

首先我们得明确一点就是,我们的stage是作业不同的调度阶段,而它的划分依据是RDD是否发生的宽窄依赖,而RDD又是具有血缘性的,所以我们的stage要要能够简历起来血缘关系才能不断的进行回溯.所以我们这里要关注的是
getOrCreateParentStages(rdd, jobId)这个方法,看他是怎么创建stage的.

  private def createResultStage(
      rdd: RDD[_],
      func: (TaskContext, Iterator[_]) => _,
      partitions: Array[Int],
      jobId: Int,
      callSite: CallSite): ResultStage = {
    /**
      *   通过最后一个RDD里面获取它的依赖关系去划分stage  它的父stage  用一个list去保存
      *   这里用集合的原因是因为它的父rdd是发生join之类的操作,所有会划分出来两个stage,所以就用list去保存
       */
     val parents: List[Stage] = getOrCreateParentStages(rdd, jobId)
     val id = nextStageId.getAndIncrement()
    /**
      * 把 List集合的stage封装起来 通过最后一个rdd去创建最后一个stage ,同时里保存了他的父stage
      *所以最后一个stage就是ResultStage,别的都是MapShuffleStage
      */
     val stage = new ResultStage(id, rdd, func, partitions, parents, jobId, callSite)
    /**
      * 把最后一个stage进行注册!
      */
    stageIdToStage(id) = stage
    updateJobIdStageIdMaps(jobId, stage)
    stage
  }

  /**
   * Get or create the list of parent stages for a given RDD.  The new Stages will be created with
   * the provided firstJobId.
   */
  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    /**
      * 这个方法返回的是当前rdd的父rdd的一个ShuffleDependency的hashSet集合
      * 为什么要用集合保存呢,这里就是当他的父rdd有多个的时候,比如join操作呀,那是不是得用集合嘛
      */
    getShuffleDependencies(rdd)
      .map
      { shuffleDep =>

        /**
          *  拿出里面的dependency去创建他的父Stage   这一步很关键哦
          */
      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

getOrCreateParentStages()这个方法是stage划分里面最关键的几个方法之一.在它里面首先我们通过传进来的RDD去调用
getShuffleDependencies(rdd)方法,这个方法是获取到当前RDD的父宽依赖列表.
话不多说,因为本人觉得这个方法蛮重要的,所以进去这个方法看看

  private def getOrCreateParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {
    /**
      * 这个方法返回的是当前rdd的父rdd的一个ShuffleDependency的hashSet集合
      * 为什么要用集合保存呢,这里就是当他的父rdd有多个的时候,比如join操作呀,那是不是得用集合嘛
      */
    getShuffleDependencies(rdd)
      .map
      { shuffleDep =>

        /**
          *  拿出里面的dependency去创建他的父Stage   这一步很关键哦
          */

      getOrCreateShuffleMapStage(shuffleDep, firstJobId)
    }.toList
  }

这里用到了一个很关键的数据结构:,通过不断的压栈出栈实现对RDD的迭代,去找到它的ShuffleDependency.具体的做法是:在rdd里面有一个方法是dependencies,通过它可以获得它的父rdd的依赖的一个Dependency对象(不知道宽窄),然后对这个依赖进行,模式匹配,要是shuffleDepen就保存下来,要是窄依赖就调用Dependency里面的一个属性获取当前依赖的rdd,同时入栈,继续进行遍历.最终返回的是一个发生shuffleDepen的list集合.所有大家发现没有这个Dependency就相当于一个指针一样,这个指针的概念我们后面要一直用哦.

  private[scheduler] def getShuffleDependencies(
      rdd: RDD[_]): HashSet[ShuffleDependency[_, _, _]]
  = {
    val parents = new HashSet[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    val waitingForVisit = new Stack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit

        toVisit.dependencies.foreach {
          case shuffleDep: ShuffleDependency[_, _, _] =>
            parents += shuffleDep
          case dependency =>
            waitingForVisit.push(dependency.rdd)
        }
      }
    }
    parents
  }

从* getShuffleDependencies方法出去,来到 getOrCreateShuffleMapStage(shuffleDep, firstJobId)这个方法里面,这个方法也在划分stage里面太他妈重要了,它是通过我们上面得到的依赖关系去划分出stage的.(划分原则是要是有stage的话直接获取,没有stage就通过依赖关系去创建stage)所以我们再次进入这个方法里面,可以看到是通过之前的依赖关系里面的shuffleId属性去获取它所对应的stage,这里我们假设是第一次调用这个方法,我们的stage还没有创建,所以接下来去创建stage,所以进入case None 的getMissingAncestorShuffleDependencies(shuffleDep.rdd)*这个方法,看我对这个方法的注释就知道这个方法的重要性了吧.哈哈,所以就让我们进入到这个stage里面最重要的方法,怎么划分出所有的stage!!!

private def getOrCreateShuffleMapStage(
      shuffleDep: ShuffleDependency[_, _, _],
      firstJobId: Int): ShuffleMapStage = {

    /**
      * 通过shuffle的dependency获取到shuffleID去获取stage
      */
    shuffleIdToMapStage.get(shuffleDep.shuffleId) match {
      case Some(stage) =>
        //获取到了就返回就完事了
        stage

      case None =>
        // Create stages for all missing ancestor shuffle dependencies.
        /**
          * 我日你个妈嘞,原来这个方法里面才是获取当前RDD的所有父的Shuffle依赖......
          */
        getMissingAncestorShuffleDependencies(shuffleDep.rdd)
          .foreach { dep =>
          // Even though getMissingAncestorShuffleDependencies only returns shuffle dependencies
          // that were not already in shuffleIdToMapStage, it's possible that by the time we
          // get to a particular dependency in the foreach loop, it's been added to
          // shuffleIdToMapStage by the stage creation process for an earlier dependency. See
          // SPARK-13902 for more information.
          if (!shuffleIdToMapStage.contains(dep.shuffleId)) {
            /**
              * 这里就是通过它的父依赖去创建stage 再将stage进行注册
              * 里面通过递归调用获取到所有的血缘关系!!!!!!
              */
            createShuffleMapStage(dep, firstJobId)
          }
        }
        // Finally, create a stage for the given shuffle dependency.


        createShuffleMapStage(shuffleDep, firstJobId)
    }
  }

getMissingAncestorShuffleDependencies(shuffleDep.rdd),在这个方法里面还是运用到了这个数据结构,同时通过递归调用 getShuffleDependencies(toVisit)这个方法,最终这个方法返回的是整个shuffle依赖链,从这个方法出来,foreach它的shuffle依赖. createShuffleMapStage方法通过它的shuffle依赖创建stage,同时将stage注册.

private def getMissingAncestorShuffleDependencies(
      rdd: RDD[_]): Stack[ShuffleDependency[_, _, _]] = {
    val ancestors = new Stack[ShuffleDependency[_, _, _]]
    val visited = new HashSet[RDD[_]]
    // We are manually maintaining a stack here to prevent StackOverflowError
    // caused by recursively visiting
    val waitingForVisit = new Stack[RDD[_]]
    waitingForVisit.push(rdd)
    while (waitingForVisit.nonEmpty) {
      val toVisit = waitingForVisit.pop()
      if (!visited(toVisit)) {
        visited += toVisit
        /**
          * 又是这个方法哦,通过递归调用这个方法
          */
        getShuffleDependencies(toVisit).
          foreach { shuffleDep =>
          if (!shuffleIdToMapStage.contains(shuffleDep.shuffleId)) {
            ancestors.push(shuffleDep)
            /**
              * 这里就是通过压栈出栈的操作,获取到她的所有的依赖
              */
            waitingForVisit.push(shuffleDep.rdd)
          } // Otherwise, the dependency and its ancestors have already been registered.
        }
      }
    }
    /**
      * 返回的是一个所有依赖关系的栈
      */
    ancestors
  }

createShuffleMapStage在这个方法里面我们就创建了所有的shuffleMapStage了,同时完成了注册.记住!!!!我们这里创建的一定是shuffleMapstage.
最后回到getOrCreateShuffleMapStage方法里面,返回一个stage.别搞混了哦,虽然注册了所有的stage,但是这里只返回当前RDD对应的父stage

def createShuffleMapStage(shuffleDep: ShuffleDependency[_, _, _], jobId: Int): ShuffleMapStage = {
    /**
      * 这里的shuffleDep 就是他的一个父依赖
      */
    val rdd = shuffleDep.rdd
    /**
      *  一个分区执行一个task  所以分区数等于task数
      */
    val numTasks = rdd.partitions.length
    /**
      * !!!!!在这里就进行了递归调用
      */
    val parents = getOrCreateParentStages(rdd, jobId)
    val id = nextStageId.getAndIncrement()
    /**
      * 通过他的父依赖创建stage
      */
    val stage = new ShuffleMapStage(id, rdd, numTasks, parents, jobId, rdd.creationSite, shuffleDep)
    /**
      *  很关键, 这里进行了注册,把stage和他的shuffleid,还有一个自增的的stage内置id放在了一起
      */
    stageIdToStage(id) = stage
    shuffleIdToMapStage(shuffleDep.shuffleId) = stage
    updateJobIdStageIdMaps(jobId, stage)

    if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {
      // A previously run stage generated partitions for this shuffle, so for each output
      // that's still available, copy information about that output location to the new stage
      // (so we don't unnecessarily re-compute that data).
      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)
      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)
      (0 until locs.length).foreach { i =>
        if (locs(i) ne null) {
          // locs(i) will be null if missing
          stage.addOutputLoc(i, locs(i))
        }
      }
    } else {
      // Kind of ugly: need to register RDDs with the cache and map output tracker here
      // since we can't do it in the RDD constructor because # of partitions is unknown
      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")
      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length)
    }
    stage
  }

通过上面的代码分析我们的stage的划分就完成了

然后对方法进行回溯.回到createResultStage这个方法里面.通过前面获取的父stage列表,创建出最后ResultStage对象返回.最后在handleJobSubmitted里面调用submitStage()方法将ResultStage提交.

    原文作者:Yellow_0ce3
    原文地址: https://www.jianshu.com/p/82bc9d7bb6fa
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞