Spark Standalone模式
- 安装Spark Standalone集群
- 手动启动集群
- 集群创建脚本
- 提交应用到集群
- 创建Spark应用
- 资源调度及分配
- 监控与日志
- 与Hadoop共存
- 配置网络安全端口
- 高可用性
- 基于Zookeeper的Master
- 本地系统的单节点恢复
1.安装Spark Standalone集群
安装Spark Standalone集群,你只需要在每个节点上部署编译好的Spark即可。你可以在官网上得到已经预编译好的,也可以根据自己的需要进行编译。
一旦启动,master节点将打印出Spark://HOST:PORT URL,你可以用这个URL来连接worker节点或者把它赋值给“master”参数传递给SparkContext。你也可以在master的WEB UI找到这个URL,默认的是http://localhost:8080,最好是http://master所在的ip地址:8080,这样和master在同一个局域网内的机器都可以访问。
./sbin/ <master-spark-URL>
一旦你启动了worker节点,通过master的WEB UI,你可以看到注册到它上面的worker的信息,比如CPU核数、内存等。
Argument | Meaning |
-h HOST, –host HOST | Hostname to listen on |
-i HOST, –ip HOST | Hostname to listen on (deprecated, use -h or –host) |
-p PORT, –port PORT | Port for service to listen on (default: 7077 for master, random for worker) |
–webui-port PORT | Port for web UI (default: 8080 for master, 8081 for worker) |
-c CORES, –cores CORES | Total CPU cores to allow Spark applications to use on the machine (default: all available); only on worker |
-m MEM, –memory MEM | Total amount of memory to allow Spark applications to use on the machine, in a format like 1000M or 2G (default: your machine’s total RAM minus 1 GB); only on worker |
-d DIR, –work-dir DIR | Directory to use for scratch space and job output logs (default: SPARK_HOME/work); only on worker |
–properties-file FILE | Path to a custom Spark properties file to load (default: conf/spark-defaults.conf) |
- sbin/ – 启动脚本所在机器上的master节点
- sbin/ – 启动conf/slaves文件中指定的slave所有节点
- sbin/ – 启动脚本所在的机器上的slave节点
- sbin/ – 启动脚本所在的slave节点及与其相关的slave节点
- sbin/ – 停止脚本所在机器上的master节点
- sbin/ – 启动conf/slaves文件中指定的slave所有节点
- sbin/ – 停止脚本所在机器上的master节点
注意这些脚本必须在你想要运行Spark master节点上,而不是你本地机器
Environment Variable | Meaning |
SPARK_MASTER_IP | Bind the master to a specific IP address, for example a public one. |
SPARK_MASTER_PORT | Start the master on a different port (default: 7077). |
SPARK_MASTER_WEBUI_PORT | Port for the master web UI (default: 8080). |
SPARK_MASTER_OPTS | Configuration properties that apply only to the master in the form “-Dx=y” (default: none). See below for a list of possible options. |
SPARK_LOCAL_DIRS | Directory to use for “scratch” space in Spark, including map output files and RDDs that get stored on disk. This should be on a fast, local disk in your system. It can also be a comma-separated list of multiple directories on different disks. |
SPARK_WORKER_CORES | Total number of cores to allow Spark applications to use on the machine (default: all available cores). |
SPARK_WORKER_MEMORY | Total amount of memory to allow Spark applications to use on the machine, e.g. 1000m, 2g (default: total memory minus 1 GB); note that each application’s individual memory is configured using its spark.executor.memory property. |
SPARK_WORKER_PORT | Start the Spark worker on a specific port (default: random). |
SPARK_WORKER_WEBUI_PORT | Port for the worker web UI (default: 8081). |
SPARK_WORKER_INSTANCES | Number of worker instances to run on each machine (default: 1). You can make this more than 1 if you have have very large machines and would like multiple Spark worker processes. If you do set this, make sure to also set SPARK_WORKER_CORES explicitly to limit the cores per worker, or else each worker will try to use all the cores. |
SPARK_WORKER_DIR | Directory to run applications in, which will include both logs and scratch space (default: SPARK_HOME/work). |
SPARK_WORKER_OPTS | Configuration properties that apply only to the worker in the form “-Dx=y” (default: none). See below for a list of possible options. |
SPARK_DAEMON_MEMORY | Memory to allocate to the Spark master and worker daemons themselves (default: 1g). |
SPARK_DAEMON_JAVA_OPTS | JVM options for the Spark master and worker daemons themselves in the form “-Dx=y” (default: none). |
SPARK_PUBLIC_DNS | The public DNS name of the Spark master and workers (default: none). |
Property Name | Default | Meaning |
spark.deploy.retainedApplications | 200 | The maximum number of completed applications to display. Older applications will be dropped from the UI to maintain this limit. |
spark.deploy.retainedDrivers | 200 | The maximum number of completed drivers to display. Older drivers will be dropped from the UI to maintain this limit. |
spark.deploy.spreadOut | true | Whether the standalone cluster manager should spread applications out across nodes or try to consolidate them onto as few nodes as possible. Spreading out is usually better for data locality in HDFS, but consolidating is more efficient for compute-intensive workloads. |
spark.deploy.defaultCores | (infinite) | Default number of cores to give to applications in Spark’s standalone mode if they don’t set spark.cores.max. If not set, applications always get all available cores unless they configure spark.cores.max themselves. Set this lower on a shared cluster to prevent users from grabbing the whole cluster by default. |
spark.worker.timeout | 60 | Number of seconds after which the standalone deploy master considers a worker lost if it receives no heartbeats. |
Property Name | Default | Meaning |
spark.worker.cleanup.enabled | false | Enable periodic cleanup of worker / application directories. Note that this only affects standalone mode, as YARN works differently. Only the directories of stopped applications are cleaned up. |
spark.worker.cleanup.interval | 1800 (30 minutes) | Controls the interval, in seconds, at which the worker cleans up old application work dirs on the local machine. |
spark.worker.cleanup.appDataTtl | 7 * 24 * 3600 (7 days) | The number of seconds to retain application work directories on each worker. This is a Time To Live and should depend on the amount of available disk space you have. Application logs and jars are downloaded to each application work dir. Over time, the work dirs can quickly fill up disk space, especially if you run jobs very frequently. |
在Spark集群中运行一个Spark应用程序,需要把master节点的Spark://IP:PORT URL传递给SparkContext 的构造函数中。
./bin/spark-shell –master spark://IP:PORT
你也可以传递选项–total-executor-cores <numCores>来控制Spark Shell使用的机器的核数。
如果你的应用通过Spark submit提交,这个应用jar自动分发到集群中的所有worker节点上。对于你的应用依赖的额外的jars,你应该通过–jars 参数来指定,多个之间用逗号分隔(如果:–jars jar1,jar2)
另外,standalone cluster模式也自动重启你的应用程序。为了使用这个特性,你可以在spark-submit启动你的应用程序时传递–supervise参数。
./bin/spark-class org.apache.spark.deploy.Client kill <master url> <driver ID>
Standalone cluster模式目前仅支持应用调度的FIFO模式。为了运行多个用户,你可以控制每个应用使用的最大资源。默认,它会使用集群中所有机器的核数,这只对于集群中只有一个应用有效。你可以通过 spark.cores.max 参数来控制核数,如下所示:
val conf = new SparkConf()
.set(“spark.cores.max”, “10”)val sc = new SparkContext(conf)
另外,你可以在集群的master中配置 spark.deploy.defaultCores参数来改变默认值。如下所示:
export SPARK_MASTER_OPTS=”-Dspark.deploy.defaultCores=<value>”
Spark Standalone模式提供了一个web接口来监控集群。master和每个worker有他们自己的WEB UI。默认你可以通过8080端口访问master的WEB UI。这个端口可以在配置文件中修改或在命令行中选项修改。
你可以基于你现有的Hadoop集群运行Spark,只需要在同样的机器上启动单独的服务即可。在Spark中访问Hadoop中的数据,只需要使用hdfs:// URL (典型hdfs://<namenode>:9000/path)路径即可。另外,你可以为Spark创建一个独立的集群,通过网络仍然可以访问HDFS,这可能比本次磁盘慢。
System property | Meaning |
spark.deploy.recoveryMode | Set to ZOOKEEPER to enable standby Master recovery mode (default: NONE). |
spark.deploy.zookeeper.url | The ZooKeeper cluster url (e.g.,, |
spark.deploy.zookeeper.dir | The directory in ZooKeeper to store recovery state (default: /spark). |
为了调度新的应用或集群中添加worker,他们需要知道当期啊leader 的ip地址。这仅需要传递一个list即可。例如,你通过spark://host1:port1,host2:port2来启动应用程序时,如果host1宕机了,集群仍让正常,因为集群已经重新找到了一个新的leader,即host2
System property | Meaning |
spark.deploy.recoveryMode | Set to FILESYSTEM to enable single-node recovery mode (default: NONE). |
spark.deploy.recoveryDirectory | The directory in which Spark will store recovery state, accessible from the Master’s perspective. |
- 这种解决方案被用在monit这样的系统中。
- 尽管这种文件恢复模式看起来很好,但效果不太好。特别,通过sotp-master.sh来杀死一个master不能清除它的恢复状态,所以无论你何时启动一个新的master,它将进行恢复模式。这可能导致启动时间的增加。