简介
你是否遇到过每次开发大数据应用都要重新搭建Java/Hadoop/Spark环境的情况?是否想过自学和测试Spark应用却没有功能完整稳定可用的集群环境?是否有过实验室提供了性能强大的物理机却要自己折腾Hadoop的搭建?或者是给了一个集群配置却需要自己安装客户端环境?如果有过这些问题,你可能需要一个开箱即用集群环境,OpenSpark是目前最好的选择之一。
OpenSpark是一个Hadoop/Spark集成环境,通过Docker镜像封装可以运行在Linux/MacOS/Windows,预装的多版本的Hadoop和Spark可以秒级启动,通过预置配置可以把任务提交到本地Yarn集群或者任意外部集群中,是一个功能完整并且repeatable的集群环境,项目地址 tobegit3hub/openspark 。
启动OpenSpark
我们知道Spark集群有多种启动方式,基于本地、Spark cluster、Yarn cluster以及实验版本的Kubernetes cluster。对于大部分自学或者Demo的环境中,本地只要下载pre-built hadoop-included的Spark包解压即可,但真实的场景以及大数据规模下还是需要能对接Hadoop/Yarn集群。因此在OpenSpark镜像中,我们把Java、Hadoop、Spark依赖都安装好,可按需启动Yarn集群和提交本地或分布式RDD应用,启动方法也很简单。
docker run -it -v /:/host tobegit3hub/openspark bash
容器启动时挂载宿主机目录可以方便地访问宿主机的jar包以及pyspark脚本,有了这个环境就可以在本地IDE开发代码而在OpenSpark环境中运行或者提交了,初始化sshd以及格式化hdfs目录启动集群的方式也很简单。
/scripts/prepare_sshd.sh
/scripts/prepare_hadoop.sh
使用PySpark
对于Spark初学者来说,PySpark是最容易入门和熟悉Spark API的方式。如果你下载了pre-built的Spark安装包和本地安装好可用的Java 7/8环境,只要运行pyspark命令就可以了,但这其实只是用了local[*]的资源而本地存储。如果使用OpenSpark环境,你可以把Hadoop集群的core-site.xml等配置直接拷贝到OpenSpark容器内,这样本地也可以启动基于yarn集群的pyspark接口了。
export PYSPARK_DRIVER_PYTHON=ipython
pyspark --master yarn
启动后就可以写任意的PySpark代码了,例如下面在内存中构造Dataframe的示例,更多可运行的Spark applications可参考 tobegit3hub/spark_examples 。
from pyspark.sql import SparkSession
def main():
spark = SparkSession.builder.appName("Dataframe demo").getOrCreate()
sc = spark.sparkContext
rows = [("name", "tobe"), ("age", "20"), ("gender", "male")]
df = spark.createDataFrame(rows, ["attribute", "value"])
df.show()
if __name__ == "__main__":
main()
提交Spark应用
本地调试PySpark脚本毕竟只适用于学习,实际开发我们还是需要把任务打成Jar包或者Python脚本提交到集群上,那么我们可以使用OpenSpark自带的Yarn集群进行开发测试,然后修改集群配置就可以无缝迁移到在线的Hadoop集群上。
首先,我们可以像前面的PySpark例子这样编写单个Python文件,或者是在Maven项目中提交Spark lib依赖,然后编写Scala或Java的应用,然后使用标准的mvn编译成jar包即可。
git clone https://github.com/tobegit3hub/spark_examples
cd ./spark_examples/spark_maven_project/
mvn clean package
如果用Scala编写Spark应用其实非常简单,为了避免依赖本地或者集群的文件,我们同样可以在memory中构造数据,然后编译成Jar包在本地测试或者提交集群。
import org.apache.spark.sql.SparkSession
object SparkExample {
def main(args: Array[String]): Unit = {
// Get SparkSession and SparkContext
val spark = SparkSession
.builder()
.appName("Spark Example")
// May be ignored when submitting to cluster
.master("local[*]")
.getOrCreate()
val sc = spark.sparkContext
// Use Dataframe
val myRange = spark.range(1000).toDF("number")
myRange.show()
println(myRange)
// Use RDD
val input = sc.parallelize(List(1, 2, 3, 4))
val result = input.map(x => x * 2)
result.collect().foreach(println)
}
}
如果IDE中已经配置好JAVA_HOME、HADOOP_HOME、SPARK_HOME等依赖,可以直接在IDE中运行测试,也可以进入OpenSpark的容器,直接提交Jar包。
spark-submit --class com.tobe.SparkExample --master yarn --deploy-mode client ./target/spark-maven-project-1.0-SNAPSHOT.jar
如果想把任务提交到已有的分布式Hadoop/Yarn集群,只需要找到容器内的Hadoop配置文件路径,复制粘贴集群中的所有XML文件即可。
cp /host/hadoop/*.xml $HADOOP_HOME/conf/hadoop/
spark-submit --class com.tobe.SparkExample --master yarn --deploy-mode client ./target/spark-maven-project-1.0-SNAPSHOT.jar
当然,如果你的应用里就要提交Spark任务提交的逻辑,而不适用spark-submit,也可以使用Java yarn-spark client,例如在OpenSpark环境中直接执行一个Java class即可。
mvn exec:java -Dexec.mainClass="com.tobe.sparkclient.SparkSubmitter"
Submitter代码如下,可修改Java源文件指定class和jar包路径。
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;
/**
* Java main class.
*
* mvn exec:java -Dexec.mainClass="com.tobe.sparkclient.SparkSubmitter"
*/
public class SparkSubmitter {
private static Logger logger = LoggerFactory.getLogger(SparkSubmitter.class);
public static void main(String[] args) {
Configuration hadoopConfiguration = new Configuration();
SparkConf sparkConf = new SparkConf();
sparkConf.set("spark.submit.deployMode", "cluster");
List<String> submitArgs = new ArrayList<String>();
submitArgs.add("--jar");
submitArgs.add("/root/spark-maven-project-1.0-SNAPSHOT.jar");
submitArgs.add("--class");
submitArgs.add("com.tobe.SparkExample");
ClientArguments clientArguments = new ClientArguments(submitArgs.toArray(new String[submitArgs.size()]));
Client client = new Client(clientArguments, hadoopConfiguration, sparkConf);
logger.info("Submit App with args: " + Arrays.asList(submitArgs));
try {
ApplicationId applicationId = client.submitApplication();
Thread.sleep(60000);
} catch (Throwable t) {
logger.error("Spark Application failed: " + t.getMessage(), t);
throw new RuntimeException("Spark Application failed", t);
}
}
}
总结
从功能角度来看,OpenSpark只是提供一个封装好的Hadoop、Spark容器镜像,同时提供一些可复用的配置文件,但因为稳定可用的特点,也让开发者可是方便地开发测试Spark 1.6、Spark 2.3以及在不同的Yarn集群上的调度测试。
因此,有了OpenSpark,开发PySpark、Spark steaming以及任意的大数据应用都变得更加简单了。