Spark1.3.1源码分析 Spark-Master、Worker启动流程

Master 和 Worker关系图

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

总结

  • master:通过读取配置,创建actorSystem,反射调用master,master启动后,执行生命周期方法,preStartreceiveWithLogging,定时val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000清理失去心跳的Worker
  • worker:通过读取配置,加载worker所在服务器的cpu cores,memory大小等信息,创建actorSystem,反射调用worker,worker启动后执行生命周期方法preStartreceiveWithLogging,向master注册信息,最重要的信息worker的cpu cores和memory资源大小,定时向master报心跳val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4,防止被master清理
  • 所以master会保存worker各个节点的资源信息,与保持心跳,作为后续执行job资源分配,调度的基础

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

Spark中start-all.sh脚本

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

Master

1.查看master启动脚本start-master.sh

start-master.sh脚本中可以看到master启动的时候,启动的是org.apache.spark.deploy.master.Master类,所以要看源码,从这个类查看,在从Master伴生对象main方法入手

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

2. 源码分析

main方法主要做了以下三件事

  • 读取配置
  • 创建ActorSystem
  • 通过ActorSystem启动Master服务
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    流程1.加载配置文件 2.启动master
    val args = new MasterArguments(argStrings, conf)这句代码的功能就是加载配置文件,但是里面有可以借鉴Utils工具类的代码
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    关键点在val (actorSystem, _, _, _) = startSystemAndActor(args.host, args.port, args.webUiPort, conf),主要作用,调用创建了ActorSystem
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    startService函数作为Utils.startServiceOnPort(port, startService, conf, name)的参数,
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    Utils.startServiceOnPort(port, startService, conf, name)中只是计算出master启动的端口
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    所以关键还是要看startService方法,该方法又调用doCreateActorSystem
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    所以第一个红框的作用就是读取配置,包括端口信息,创建ActorSystem,第二个红框,通过反射启动Master
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    启动Master,Master会走Actor的生命周期方法preStart启动,receiveWithLogging,接收信息
    preStart方法中,启动webUi等操作,最重要的是这句代码,代码,启动一个定时器,定时发送给自己一个case objec CheckForWorkerTimeOut,间隔是val WORKER_TIMEOUT = conf.getLong("spark.worker.timeout", 60) * 1000
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    Master中最最重要的方法,receiveWithLogging,master启动后,通过该方法接收message做相应的处理,首先查看preStart中,查看定时发CheckForWorkerTimeOut给自己的receive调用的方法,查看源码,
    总结:Master启动后,定时发送CheckForWorkerTimeOut,给自己,在receiveWithLogging,调用timeOutDeadWorkers,定时清理超过心跳时间的Worker,从val workers = new HashSet[WorkerInfo]移除
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

Worker

1.查看worker启动脚本start-slave.sh

start-slaves.sh启动start-slave.sh,启动org.apache.spark.deploy.worker.Worker

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

2.源码分析

Worker启动跟Master启动几乎一模一样,

  • 读取配置,获取cpu cores和`memeory
  • 创建ActorSystem
  • 反射创建Worker,Worker启动,调用生命周期方法
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png
    《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

所以直接看Worker的preStartreceiveWithLogging
preStart方法中,会创建工作目录WorkDir,启动WorkWebUi,最最重要的是,向master注册,registerWithMaster查看方法,调用tryRegisterAllMasters,获取master uri 比如master:7070,获取master的actor,然后向master发送异步无返回值message,将自己的信息封装到case class RegisterWorker,包括自己的id,ip,port, cpu cores,内存大小信息等,所以此时需要到master的receiveWithLogging查看接收到的RegisterWorker做出什么样的操作

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

master接收到worker的信息后,将RegisterWorker 的信息封装成一个WorkerInfo(拥有worker的信息,id,ip,port, cpu cores,内存大小信息等),再将workerinfo的信息添加到persistenceEngine持久化起来,然后向worker发送RegisteredWorker,告诉worker注册成功,接着调用调度方法schedule(),这个方法大概是这样的,master可能拥有许多client提交的任务,当资源不足的时候,任务会排队,所以当有新的资源,就是worker加入的时候,如果此时有任务排队,又有资源加入master会调度任务分配资源,就是这个schedule()方法。woker收到注册成功的信息RegisteredWorker,所以此时需要去worker的receiveWithLogging中查看

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

worker接收到master的信息后,启动定时器,定时
val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4向自己发送心跳
SendHeartbeat,此时需要在worker的
receiveWithLogging方法中查看SendHeartbeat,查看代码,又发送
heartBeat给master

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

master收到心跳后,判断是否存在workerId,如果存在则更新workerInfo的心跳时间,如果不存在,发送信息
ReconnectWorker,让worker重新向注册。

《Spark1.3.1源码分析 Spark-Master、Worker启动流程》 image.png

    原文作者:LancerLin_LX
    原文地址: https://www.jianshu.com/p/3686201fbf46
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞