Spark源码剖析(二):任务分配(含源码)

SparkContext所做的准备

在SparkContext(这里基于Spark的版本是1.3.1)主要做的工作是:

1.创建SparkEnv,里面又一个很重要的对象ActorSystem

2.创建TaskScheduler,这里是根据提交的集群来创建相应的TaskScheduler

3.创建DAGScheduler

4.调用taskScheduler.start()方法启动

部分源码如下:

class SparkContext(config: SparkConf) extends Logging with ExecutorAllocationClient {
    //TODO 该方法创建了一个ActorSystem   private[spark] def createSparkEnv(
      conf: SparkConf,
      isLocal: Boolean,
      listenerBus: LiveListenerBus): SparkEnv = {
    SparkEnv.createDriverEnv(conf, isLocal, listenerBus)
  }

  // Create and start the scheduler   //TODO 创建一个TaskScheduler   private[spark] var (schedulerBackend, taskScheduler) =
    SparkContext.createTaskScheduler(this, master)
  //TODO 通过ActorSystem创建了一个Actor,这个心跳是Executors和DriverActor的心跳   private val heartbeatReceiver = env.actorSystem.actorOf(
    Props(new HeartbeatReceiver(taskScheduler)), "HeartbeatReceiver")
  @volatile private[spark] var dagScheduler: DAGScheduler = _
  try {
    //TODO 创建了一个DAGScheduler,以后用来把DAG切分成Stage     dagScheduler = new DAGScheduler(this)
  } catch {
    case e: Exception => {
      try {
        stop()
      } finally {
        throw new SparkException("Error while constructing DAGScheduler", e)
      }
    }
  }

  // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's   // constructor   //TODO 启动taskScheduler   taskScheduler.start()
}

SparkContext内的创建TaskScheduler内部流程

这方法是在SparkContext的构造函数中创建,下面是部分源码:

//TODO 根据提交任务时指定的URL创建相应的TaskScheduler
  private def createTaskScheduler(sc: SparkContext,
      master: String): (SchedulerBackend, TaskScheduler) = 
      {
          //TODO spark的StandAlone模式
      case SPARK_REGEX(sparkUrl) =>
        //TODO 创建了一个TaskSchedulerImpl
        val scheduler = new TaskSchedulerImpl(sc)
        val masterUrls = sparkUrl.split(",").map("spark://" + _)
        //TODO 创建了一个SparkDeploySchedulerBackend
        val backend = new SparkDeploySchedulerBackend(scheduler, sc, masterUrls)
        //TODO 调用initialize创建调度器
        scheduler.initialize(backend)
        (backend, scheduler)
  }

这里根据相应的提交创建对应的TaskScheduler对象,然后创建一个SparkDeploySchedulerBacken对象,最后调用initialize()方法创建调度器,接下来我们来看一下创建调度器的内部方法发生了什么?

def initialize(backend: SchedulerBackend) {
    this.backend = backend
    // temporarily set rootPool name to empty
    rootPool = new Pool("", schedulingMode, 0, 0)
    schedulableBuilder = {
      schedulingMode match {
         //默认的调度模式是FIFO
        case SchedulingMode.FIFO =>
          new FIFOSchedulableBuilder(rootPool)
        case SchedulingMode.FAIR =>
          new FairSchedulableBuilder(rootPool, conf)
      }
    }
    schedulableBuilder.buildPools()
  }

可以看出,在内部方法里面选择的是调度模式,这里需要注意的是Spark默认的调度模式是FIFO.

启动TaskScheduler

在创建TaskScheduler方法以后,SparkContext构造器使用taskScheduler.start()启动任务调度器,接下来我们看一下内部发生了什么,这里需要注意的是,如果你看的并非Spark集群的实现,需要看的实现类是不不一样的;我在这里看的是TaskSchedulerImpl类。

private[spark] class TaskSchedulerImpl(
    val sc: SparkContext,
    val maxTaskFailures: Int,
    isLocal: Boolean = false)
  extends TaskScheduler with Logging
{
        override def start() {
    //TODO 首先掉用SparkDeploySchedulerBackend的start方法
    backend.start()

    if (!isLocal && conf.getBoolean("spark.speculation", false)) {
      logInfo("Starting speculative execution thread")
      import sc.env.actorSystem.dispatcher
      sc.env.actorSystem.scheduler.schedule(SPECULATION_INTERVAL milliseconds,
            SPECULATION_INTERVAL milliseconds) {
        Utils.tryOrExit { checkSpeculatableTasks() }
      }
    }
}

这里首先调用SparkDeploySchedulerBackend的start()方法,我们继续追踪内部源码

override def start() {
    //TODO 首先调用父类的start方法来创建DriverActor
    super.start()

    // The endpoint for executors to talk to us
    //TODO 准备一些参数,以后把这些参数封装到一个对象中,然后将该对象发送给Master
    val driverUrl = AkkaUtils.address(
      AkkaUtils.protocol(actorSystem),
      SparkEnv.driverActorSystemName,
      conf.get("spark.driver.host"),
      conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil
      }

    // Start executors with a few necessary configs for registering with the scheduler
    val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    //TODO 重要:这个参数是以后Executor的实现类
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    //TODO 把参数封装到ApplicationDescription
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec)
    //TODO 创建一个AppClient把ApplicationDescription通过主构造器传进去
    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    //TODO 然后调用AppClient的start方法,在start方法中创建了一个ClientActor用于与Master通信
    client.start()

    waitForRegistration()
  }

这个方法内部首先调用父类的start()方法,我们可以继续跟进,查看父类的start()方法内部,Spark干了些什么。这里我们需要看一下CoarseGrainedSchedulerBackend类的start()方法

override def start() {
    val properties = new ArrayBuffer[(String, String)]
    for ((key, value) <- scheduler.sc.conf.getAll) {
      if (key.startsWith("spark.")) {
        properties += ((key, value))
      }
    }
    // TODO (prashant) send conf instead of properties
    // TODO 通过ActorSystem创建DriverActor
    driverActor = actorSystem.actorOf(
      Props(new DriverActor(properties)), name = CoarseGrainedSchedulerBackend.ACTOR_NAME)
  }

上面创建了一个DirverActor对象,这个对象是用于和Executor通信的,这里的ActorSystem是在SparkContext的SparkEnv创建的。

接下来,Spark需要封装一些我们设置的参数,具体如下:

//TODO 准备一些参数,以后把这些参数封装到一个对象中,然后将该对象发送给Master
    val driverUrl = AkkaUtils.address(
      AkkaUtils.protocol(actorSystem),
      SparkEnv.driverActorSystemName,
      conf.get("spark.driver.host"),
      conf.get("spark.driver.port"),
      CoarseGrainedSchedulerBackend.ACTOR_NAME)
    val args = Seq(
      "--driver-url", driverUrl,
      "--executor-id", "{{EXECUTOR_ID}}",
      "--hostname", "{{HOSTNAME}}",
      "--cores", "{{CORES}}",
      "--app-id", "{{APP_ID}}",
      "--worker-url", "{{WORKER_URL}}")
    val extraJavaOpts = sc.conf.getOption("spark.executor.extraJavaOptions")
      .map(Utils.splitCommandString).getOrElse(Seq.empty)
    val classPathEntries = sc.conf.getOption("spark.executor.extraClassPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)
    val libraryPathEntries = sc.conf.getOption("spark.executor.extraLibraryPath")
      .map(_.split(java.io.File.pathSeparator).toSeq).getOrElse(Nil)

    // When testing, expose the parent class path to the child. This is processed by
    // compute-classpath.{cmd,sh} and makes all needed jars available to child processes
    // when the assembly is built with the "*-provided" profiles enabled.
    val testingClassPath =
      if (sys.props.contains("spark.testing")) {
        sys.props("java.class.path").split(java.io.File.pathSeparator).toSeq
      } else {
        Nil}

然后,需要封装一个很重要的参数,主要主要用于以后Worker进程启动Executor进程,具体如下:

val sparkJavaOpts = Utils.sparkJavaOpts(conf, SparkConf.isExecutorStartupConf)
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    //TODO 重要:这个参数是以后Executor的实现类
    val command = Command("org.apache.spark.executor.CoarseGrainedExecutorBackend",
      args, sc.executorEnvs, classPathEntries ++ testingClassPath, libraryPathEntries, javaOpts)
    val appUIAddress = sc.ui.map(_.appUIAddress).getOrElse("")
    //TODO 把参数封装到ApplicationDescription
    val appDesc = new ApplicationDescription(sc.appName, maxCores, sc.executorMemory, command,
      appUIAddress, sc.eventLogDir, sc.eventLogCodec)
    //TODO 创建一个AppClient把ApplicationDescription通过主构造器传进去
    client = new AppClient(sc.env.actorSystem, masters, appDesc, this, conf)
    //TODO 然后调用AppClient的start方法,在start方法中创建了一个ClientActor用于与Master通信
    client.start()

然后创建一个AppClient的对象,然后和通信,这里调用client.start()方法就是调用AppClient的生命周期方法。

接下来,我们将要从preStart()方法开始看起来:

//TODO ClientActor的生命周期方法
    override def preStart() {
      context.system.eventStream.subscribe(self, classOf[RemotingLifecycleEvent])
      try {
        //TODO ClientActor向Master注册
        registerWithMaster()
      } catch {
        case e: Exception =>
          logWarning("Failed to connect to master", e)
          markDisconnected()
          context.stop(self)
      }
    }

可以看出,这里主要是向Master注册,我们具体看一下注册方法的内部:

def registerWithMaster() {
      //TODO 向Master注册
      tryRegisterAllMasters()
      import context.dispatcher
      var retries = 0
      registrationRetryTimer = Some {
        context.system.scheduler.schedule(REGISTRATION_TIMEOUT, REGISTRATION_TIMEOUT) {
          Utils.tryOrExit {
            retries += 1
            if (registered) {
              registrationRetryTimer.foreach(_.cancel())
            } else if (retries >= REGISTRATION_RETRIES) {
              markDead("All masters are unresponsive! Giving up.")
            } else {
              tryRegisterAllMasters()
            }
          }
        }
      }
    }


def tryRegisterAllMasters() {
      for (masterAkkaUrl <- masterAkkaUrls) {
        logInfo("Connecting to master " + masterAkkaUrl + "...")
        //TODO 循环所有Master地址,跟Master建立连接
        val actor = context.actorSelection(masterAkkaUrl)
        //TODO 拿到了Master的一个引用,然后向Master发送注册应用的请求,所有的参数都封装到appDescription
        actor ! RegisterApplication(appDescription)
      }
    }

主要的流程是:

1.向所有的Master发送注册信息,RegisterApplication(appDescription),RegisterApplication是一个用例类,appDescription是上述我们分析的封装参数

2.如果超过一定的次数,那么将会报错

下面,我们就需要看一下Master收到注册信息后,如何处理了!以下代码是Master类中

//TODO ClientActor发送过来的注册应用的消息
    case RegisterApplication(description) => {
      if (state == RecoveryState.STANDBY) {
        // ignore, don't send response
      } else {
        logInfo("Registering app " + description.name)
        //TODO 首先把应用的信息放到内存中存储
        val app = createApplication(description, sender)
        registerApplication(app)
        logInfo("Registered app " + description.name + " with ID " + app.id)
        //TODO 利用持久化引擎保存
        persistenceEngine.addApplication(app)
        //TODO Master向ClientActor发送注册成功的消息
        sender ! RegisteredApplication(app.id, masterUrl)
        //TODO 重要:Master开始调度资源,其实就是把任务启动到哪些Worker上
        schedule()
      }
    }

这里面代码的主要流程是将我们提交的程序配置信息保存,并且持久化,这主要是保证高可用,然后在向Driver发送消息表示注册成功,然后调用schedule()方法,这是一个非常重要的方法。

这里有两种调度的方式,一种是尽量打散,一种是尽量集中。

//TODO 下面是两种调度方式,一中是尽量打散,另一种是尽量集中
    // Right now this is a very simple FIFO scheduler. We keep trying to fit in the first app
    // in the queue, then the second app, etc.
    if (spreadOutApps) {
      // Try to spread out each app among all the nodes, until it has all its cores
      for (app <- waitingApps if app.coresLeft > 0) {
        val usableWorkers = workers.toArray.filter(_.state == WorkerState.ALIVE)
          .filter(canUse(app, _)).sortBy(_.coresFree).reverse
        val numUsable = usableWorkers.length
        val assigned = new Array[Int](numUsable) // Number of cores to give on each node
        var toAssign = math.min(app.coresLeft, usableWorkers.map(_.coresFree).sum)
        var pos = 0
        while (toAssign > 0) {
          if (usableWorkers(pos).coresFree - assigned(pos) > 0) {
            toAssign -= 1
            assigned(pos) += 1
          }
          pos = (pos + 1) % numUsable
        }
        // Now that we've decided how many cores to give on each node, let's actually give them
        for (pos <- 0 until numUsable) {
          if (assigned(pos) > 0) {
            val exec = app.addExecutor(usableWorkers(pos), assigned(pos))
            //TODO Master发送消息让Worker启动Executor
            launchExecutor(usableWorkers(pos), exec)
            app.state = ApplicationState.RUNNING
          }
        }
      }
    } else {
      // Pack each app into as few nodes as possible until we've assigned all its cores
      for (worker <- workers if worker.coresFree > 0 && worker.state == WorkerState.ALIVE) {
        for (app <- waitingApps if app.coresLeft > 0) {
          if (canUse(app, worker)) {
            val coresToUse = math.min(worker.coresFree, app.coresLeft)
            if (coresToUse > 0) {
              val exec = app.addExecutor(worker, coresToUse)
              //TODO Master发送消息让Worker启动Executor
              launchExecutor(worker, exec)
              app.state = ApplicationState.RUNNING
            }
          }
        }
      }
    }
  }

尽量打散方法的逻辑是:

1.首先进行对Worker进行筛选,通过是否存活,是否已经启动该应用程序,CPU核数大小倒叙排列

2.创建一个待分配资源的数组,这个数组大小的可用的Worker的数量

3.然后通过轮询的方法进行分配

尽量集中的方法的逻辑是:

1.首先是根据Worker的可用内存和是否已经启动该应用程序来筛选Worker

2.尽量将一个Worker的所有资源分配,如果一个Worker分配后还是不能满足,继续第二个Worker

3.然后再继续进行分配其他应用

启动Executor

在任务分配完成后,就是启动Executor

 def launchExecutor(worker: WorkerInfo, exec: ExecutorDesc) {
    logInfo("Launching executor " + exec.fullId + " on worker " + worker.id)
    //TODO 记录该Worker使用的资源
    worker.addExecutor(exec)
    //TODO Master发送消息给Worker,把参数通过case class传递给Worker,让他启动Executor,
    worker.actor ! LaunchExecutor(masterUrl,
      exec.application.id, exec.id, exec.application.desc, exec.cores, exec.memory)
    //TODO Master向ClientActor发送消息,告诉它Executor已经启动了
    exec.application.driver ! ExecutorAdded(
      exec.id, worker.id, worker.hostPort, exec.cores, exec.memory)
  }

主要的逻辑如下:

1.记录Worker使用的资源

2.将上述封装的信息发送给Worker,然后让Worker启动Executor

3.最后通知Driver,Executor已经启动了。

    原文作者:Oeljeklaus
    原文地址: https://zhuanlan.zhihu.com/p/38623881
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞