在YARN上运行Spark
在Spark0.6.0版本开始支持YARN模式,随后的版本在逐渐地完善。
在YARN上启动Spark
确保HADOOP_CONF_DIR或YARN_CONF_DIR属性的值已经指向了Hadoop集群的配置文件。Spark通常使用这些配置信息来向HDFS写入数据和连接到YARN资源管理器。这个目录下所有的文件将会被分发到YARN集群中,所以所有应用使用的容器都使用同样的配置。如果Java的系统属性或YARN没有管理的环境变量等配置,它们应该在Spark 的应用配置项中配置。
在YARN上启动Spark有两种部署模式。在Cluster模式中,Spark的driver程序运行在被YARN管理的集群中的任何一个master进程中,并且client初始化应用后可以退出。在Client模式中,driver程序运行在client进程中,并且这个应用程序的master只能被用来从YARN上请求资源。
和Spark Standalone和Mesos模式不同的是,master的地址被指定在–master参数中,在YARN模式中,ResourceManager的地址可以在Hadoop的配置文件中找到。这样,–master的的参数是yarn。
在cluster模式中启动Spark应用程序:
$ ./bin/spark-submit –class path.to.your.Class –master yarn –deploy-mode cluster [options] <app jar> [app options]
举例:
$ ./bin/spark-submit –class org.apache.spark.examples.SparkPi \
–master yarn \
–deploy-mode cluster \
–driver-memory 4g \
–executor-memory 2g \
–executor-cores 1 \
–queue thequeue \
lib/spark-examples*.jar \
10
上面的应用例子将会启动一个YARN client程序,它将会启动默认的应用Master。而SparkPi将会作为应用Master的一个子线程运行。client将会周期性地轮询应用Master来达到转态的更新并把它们显示在console终端。一旦你的应用程序运行完毕,client将会退出。
在client模式中启动Spark应用和cluster模式一样,只是将cluster替换为client。如下所示:
$ ./bin/spark-shell –master yarn –deploy-mode client
添加其他Jar
在cluster模式中,driver程序和client在不同的机器上,所以只对于本机的可行的SparkContext.addJar将会失效。为了使client继续能使用SparkContext.addJar,可以在创建命令时给–jars选项赋值。
$ ./bin/spark-submit –class my.main.Class \
–master yarn \
–deploy-mode cluster \
–jars my-other-jar.jar,my-other-other-jar.jar
my-main-jar.jar
app_arg1 app_arg2
预备
在YARN上运行Spark要求一个支持YARN的一个二进制发布包。你可以在官网上下载,也可以自己编译一个。
配置
Spark on YARN上的许多配置和其他模式基本上一样。
调试你的应用程序
在YARN中,executor和应用master运行在“containers”(容器)中。应用程序运行完毕后,YARN提供了两种存放容器日志的方式。如果日志聚合服务被开启的话(通过yarn.log-aggregation-enable来配置),容器日志将会被拷贝到HDFS中并且删除本机上的日志文件。这些日志文件使用yarn logs命令可以在任何一台集群中的机器看到。如下:
yarn logs -applicationId <app ID>
上面的命令将会打印出应用程序申请到的所有容器日志文件的内容。你也可以通过HDFS shell或API来直接看这些容器文件。这些日志文件的目录可以查看YARN配置(yarn.nodemanager.remote-app-log-dir and yarn.nodemanager.remote-app-log-dir-suffix)。这些日志在Spark Web UI的“Executors”的选项卡中也能看到。你需要启动Spark history server和MapReduce history server并且正确地在yarn-site.xml配置好 yarn.log.server.url选项。这个Spark history server UI的日志URL将会把重定向到MapReduce的history server,从而显示日志信息。
当日志聚合服务关闭时,日志被保留在每台机器的YARN_APP_LOGS_DIR目录下,该目录通常被用来配置为/tmp/logs或$HADOOP_HOME/logs/userlogs,这取决于Hadoop的版本和安装。查看一个容器的日志信息需要到对应的主机上的这个目录下查找。子目录名称通过应用ID和容器ID来构成。这种日志在Spark WebUI的Executors选项卡中也能看到并且不要求启动MapReduce history server,因为不需要读取HDFS上的数据。
回顾一下每个容器创建的环境,增加yarn.nodemanager.delete.debug-delay-sec到一个大数值(比如36000),并且在容器上创建的节点上的yarn.nodemanager.local-dirs中得到应用程序的缓存。这个目录包括创建的脚本,JARs和用于创建每个容器的所有环境变量。它对于调试classpath问题是特别有用的。(注意允许这种方式在集群的设置和所有节点的重启需要管理员权限,这样的话它宿主机上不可用。)
对每个应用的master或executors使用自定义的log4j配置的话, 请看下:
- 用spark-submit上传一个自己编写的log4j.properties文件,通过–files参数把它和应用一起提交。
- 给每个driver添加值-Dlog4j.configuration=<location of configuration file>到spark.driver.extraJavaOptions选项中。注意如果使用该选项的话,该文件需要在所有的节点上都存在。
l 上传$SPARK_CONF_DIR/log4j.properties文件后,它会和其他的配置一样自己更新。注意如果多个option指定时,上面介绍的那种option比这种有更高的优先权。
Note that for the first option, both executors and the application master will share the same log4j configuration, which may cause issues when they run on the same node (e.g. trying to write to the same log file).
注意,对于第一种option而言,所有的executors和应用程序master将会使用同样的log4j配置,当他们运行在一样的节点上可能会出问题(例如:写入到同样的日志文件中,也就是并发写,不难理解吧)
如果在Yarn中你需要一个合适的位置来存放日志文件,通过在你的log4j.properties中配置spark.yarn.app.container.log.dir,那么yarn可以更好的聚合它们并展示。例如:
log4j.appender.file_appender.File=${spark.yarn.app.container.log.dir}/spark.log.对于Streaming程序而言,配置RollingFileAppender和yarn的日志文件目录将避免大日志文件造成的磁盘移除,而且,日志也可以很好地被YARN使用。
重点提示
- 在调度决策中主要的资源请求是否得到,取决于正在使用的调度器和它的具体配置。
- 在cluster模式中,Spark executors和driver将会使用为YARN配置的本地文件目录(Hadoop YARN配置项 yarn.nodemanager.local-dirs)。如果使用特定的spark.local.dir,它将会失效。在client模式中,Spark executors将会使用YARN配置的本地目录,但Spark driver将使用spark.local.dir选项定义好的。这是因为Client模式下Spark driver只是Spark的executor在执行,没有运行在YARN集群中。
- –files和–archives选项支持和Hadoop一样使用#来指定文件别名。例如:你可以指定–files localtest.txt#appSees.txt,那么它将会把本地文件的localtest.txt文件上传到HDFS中,可理解为,它在HDFS中文件名将是appSees.tx,在YARN中使用appSees.txt文件名即可。
- –jars 选项,如果你在使用本地文件和运行在cluster模式时,SparkContext.addJar函数将会起作用。如果你正在用HDFS、HTTP、HTTPS、FTP的文件时,它不需要。
Spark 属性配置项,可根据如下列表进行参数的调整:
Property Name | Default | Meaning |
---|---|---|
spark.yarn.am.memory | 512m | Amount of memory to use for the YARN Application Master in client mode, in the same format as JVM memory strings (e.g. 512m , 2g ). In cluster mode, use spark.driver.memory instead.Use lower-case suffixes, e.g. |
spark.driver.cores | 1 | Number of cores used by the driver in YARN cluster mode. Since the driver is run in the same JVM as the YARN Application Master in cluster mode, this also controls the cores used by the YARN Application Master. In client mode, use spark.yarn.am.cores to control the number of cores used by the YARN Application Master instead. |
spark.yarn.am.cores | 1 | Number of cores to use for the YARN Application Master in client mode. In cluster mode, use spark.driver.cores instead. |
spark.yarn.am.waitTime | 100s | In cluster mode, time for the YARN Application Master to wait for the SparkContext to be initialized. In client mode, time for the YARN Application Master to wait for the driver to connect to it. |
spark.yarn.submit.file.replication | The default HDFS replication (usually 3 ) | HDFS replication level for the files uploaded into HDFS for the application. These include things like the Spark jar, the app jar, and any distributed cache files/archives. |
spark.yarn.preserve.staging.files | false | Set to true to preserve the staged files (Spark jar, app jar, distributed cache files) at the end of the job rather than delete them. |
spark.yarn.scheduler.heartbeat.interval-ms | 3000 | The interval in ms in which the Spark application master heartbeats into the YARN ResourceManager. The value is capped at half the value of YARN’s configuration for the expiry interval, i.e. yarn.am.liveness-monitor.expiry-interval-ms . |
spark.yarn.scheduler.initial-allocation.interval | 200ms | The initial interval in which the Spark application master eagerly heartbeats to the YARN ResourceManager when there are pending container allocation requests. It should be no larger than spark.yarn.scheduler.heartbeat.interval-ms . The allocation interval will doubled on successive eager heartbeats if pending containers still exist, until spark.yarn.scheduler.heartbeat.interval-ms is reached. |
spark.yarn.max.executor.failures | numExecutors * 2, with minimum of 3 | The maximum number of executor failures before failing the application. |
spark.yarn.historyServer.address | (none) | The address of the Spark history server, e.g. host.com:18080 . The address should not contain a scheme (http:// ). Defaults to not being set since the history server is an optional service. This address is given to the YARN ResourceManager when the Spark application finishes to link the application from the ResourceManager UI to the Spark history server UI. For this property, YARN properties can be used as variables, and these are substituted by Spark at runtime. For example, if the Spark history server runs on the same node as the YARN ResourceManager, it can be set to ${hadoopconf-yarn.resourcemanager.hostname}:18080 . |
spark.yarn.dist.archives | (none) | Comma separated list of archives to be extracted into the working directory of each executor. |
spark.yarn.dist.files | (none) | Comma-separated list of files to be placed in the working directory of each executor. |
spark.executor.instances | 2 | The number of executors. Note that this property is incompatible with spark.dynamicAllocation.enabled . If both spark.dynamicAllocation.enabled and spark.executor.instances are specified, dynamic allocation is turned off and the specified number of spark.executor.instances is used. |
spark.yarn.executor.memoryOverhead | executorMemory * 0.10, with minimum of 384 | The amount of off-heap memory (in megabytes) to be allocated per executor. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the executor size (typically 6-10%). |
spark.yarn.driver.memoryOverhead | driverMemory * 0.10, with minimum of 384 | The amount of off-heap memory (in megabytes) to be allocated per driver in cluster mode. This is memory that accounts for things like VM overheads, interned strings, other native overheads, etc. This tends to grow with the container size (typically 6-10%). |
spark.yarn.am.memoryOverhead | AM memory * 0.10, with minimum of 384 | Same as spark.yarn.driver.memoryOverhead , but for the YARN Application Master in client mode. |
spark.yarn.am.port | (random) | Port for the YARN Application Master to listen on. In YARN client mode, this is used to communicate between the Spark driver running on a gateway and the YARN Application Master running on YARN. In YARN cluster mode, this is used for the dynamic executor feature, where it handles the kill from the scheduler backend. |
spark.yarn.queue | default | The name of the YARN queue to which the application is submitted. |
spark.yarn.jar | (none) | The location of the Spark jar file, in case overriding the default location is desired. By default, Spark on YARN will use a Spark jar installed locally, but the Spark jar can also be in a world-readable location on HDFS. This allows YARN to cache it on nodes so that it doesn’t need to be distributed each time an application runs. To point to a jar on HDFS, for example, set this configuration to hdfs:///some/path . |
spark.yarn.access.namenodes | (none) | A comma-separated list of secure HDFS namenodes your Spark application is going to access. For example, spark.yarn.access.namenodes=hdfs://nn1.com:8032,hdfs://nn2.com:8032 . The Spark application must have access to the namenodes listed and Kerberos must be properly configured to be able to access them (either in the same realm or in a trusted realm). Spark acquires security tokens for each of the namenodes so that the Spark application can access those remote HDFS clusters. |
spark.yarn.appMasterEnv.[EnvironmentVariableName] | (none) | Add the environment variable specified by EnvironmentVariableName to the Application Master process launched on YARN. The user can specify multiple of these and to set multiple environment variables. In cluster mode this controls the environment of the Spark driver and in client mode it only controls the environment of the executor launcher. |
spark.yarn.containerLauncherMaxThreads | 25 | The maximum number of threads to use in the YARN Application Master for launching executor containers. |
spark.yarn.am.extraJavaOptions | (none) | A string of extra JVM options to pass to the YARN Application Master in client mode. In cluster mode, use spark.driver.extraJavaOptions instead. |
spark.yarn.am.extraLibraryPath | (none) | Set a special library path to use when launching the YARN Application Master in client mode. |
spark.yarn.maxAppAttempts | yarn.resourcemanager.am.max-attempts in YARN | The maximum number of attempts that will be made to submit the application. It should be no larger than the global number of max attempts in the YARN configuration. |
spark.yarn.am.attemptFailuresValidityInterval | (none) | Defines the validity interval for AM failure tracking. If the AM has been running for at least the defined interval, the AM failure count will be reset. This feature is not enabled if not configured, and only supported in Hadoop 2.6+. |
spark.yarn.submit.waitAppCompletion | true | In YARN cluster mode, controls whether the client waits to exit until the application completes. If set to true , the client process will stay alive reporting the application’s status. Otherwise, the client process will exit after submission. |
spark.yarn.am.nodeLabelExpression | (none) | A YARN node label expression that restricts the set of nodes AM will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored. |
spark.yarn.executor.nodeLabelExpression | (none) | A YARN node label expression that restricts the set of nodes executors will be scheduled on. Only versions of YARN greater than or equal to 2.6 support node label expressions, so when running against earlier versions, this property will be ignored. |
spark.yarn.tags | (none) | Comma-separated list of strings to pass through as YARN application tags appearing in YARN ApplicationReports, which can be used for filtering when querying YARN apps. |
spark.yarn.keytab | (none) | The full path to the file that contains the keytab for the principal specified above. This keytab will be copied to the node running the YARN Application Master via the Secure Distributed Cache, for renewing the login tickets and the delegation tokens periodically. (Works also with the “local” master) |
spark.yarn.principal | (none) | Principal to be used to login to KDC, while running on secure HDFS. (Works also with the “local” master) |
spark.yarn.config.gatewayPath | (none) | A path that is valid on the gateway host (the host where a Spark application is started) but may differ for paths for the same resource in other nodes in the cluster. Coupled with spark.yarn.config.replacementPath , this is used to support clusters with heterogeneous configurations, so that Spark can correctly launch remote processes.The replacement path normally will contain a reference to some environment variable exported by YARN (and, thus, visible to Spark containers). For example, if the gateway node has Hadoop libraries installed on |
spark.yarn.config.replacementPath | (none) | See spark.yarn.config.gatewayPath . |
spark.yarn.security.tokens.${service}.enabled |
| Controls whether to retrieve delegation tokens for non-HDFS services when security is enabled. By default, delegation tokens for all supported services are retrieved when those services are configured, but it’s possible to disable that behavior if it somehow conflicts with the application being run. Currently supported services are: |