OpenSpark:一款好用的开箱即用Spark环境

简介

你是否遇到过每次开发大数据应用都要重新搭建Java/Hadoop/Spark环境的情况?是否想过自学和测试Spark应用却没有功能完整稳定可用的集群环境?是否有过实验室提供了性能强大的物理机却要自己折腾Hadoop的搭建?或者是给了一个集群配置却需要自己安装客户端环境?如果有过这些问题,你可能需要一个开箱即用集群环境,OpenSpark是目前最好的选择之一。

OpenSpark是一个Hadoop/Spark集成环境,通过Docker镜像封装可以运行在Linux/MacOS/Windows,预装的多版本的Hadoop和Spark可以秒级启动,通过预置配置可以把任务提交到本地Yarn集群或者任意外部集群中,是一个功能完整并且repeatable的集群环境,项目地址 tobegit3hub/openspark

启动OpenSpark

我们知道Spark集群有多种启动方式,基于本地、Spark cluster、Yarn cluster以及实验版本的Kubernetes cluster。对于大部分自学或者Demo的环境中,本地只要下载pre-built hadoop-included的Spark包解压即可,但真实的场景以及大数据规模下还是需要能对接Hadoop/Yarn集群。因此在OpenSpark镜像中,我们把Java、Hadoop、Spark依赖都安装好,可按需启动Yarn集群和提交本地或分布式RDD应用,启动方法也很简单。

docker run -it -v /:/host tobegit3hub/openspark bash

容器启动时挂载宿主机目录可以方便地访问宿主机的jar包以及pyspark脚本,有了这个环境就可以在本地IDE开发代码而在OpenSpark环境中运行或者提交了,初始化sshd以及格式化hdfs目录启动集群的方式也很简单。

/scripts/prepare_sshd.sh
/scripts/prepare_hadoop.sh

使用PySpark

对于Spark初学者来说,PySpark是最容易入门和熟悉Spark API的方式。如果你下载了pre-built的Spark安装包和本地安装好可用的Java 7/8环境,只要运行pyspark命令就可以了,但这其实只是用了local[*]的资源而本地存储。如果使用OpenSpark环境,你可以把Hadoop集群的core-site.xml等配置直接拷贝到OpenSpark容器内,这样本地也可以启动基于yarn集群的pyspark接口了。

export PYSPARK_DRIVER_PYTHON=ipython
pyspark --master yarn

启动后就可以写任意的PySpark代码了,例如下面在内存中构造Dataframe的示例,更多可运行的Spark applications可参考 tobegit3hub/spark_examples

from pyspark.sql import SparkSession

def main():
  spark = SparkSession.builder.appName("Dataframe demo").getOrCreate()
  sc = spark.sparkContext

  rows = [("name", "tobe"), ("age", "20"), ("gender", "male")]

  df = spark.createDataFrame(rows, ["attribute", "value"])
  df.show()

if __name__ == "__main__":
  main()

提交Spark应用

本地调试PySpark脚本毕竟只适用于学习,实际开发我们还是需要把任务打成Jar包或者Python脚本提交到集群上,那么我们可以使用OpenSpark自带的Yarn集群进行开发测试,然后修改集群配置就可以无缝迁移到在线的Hadoop集群上。

首先,我们可以像前面的PySpark例子这样编写单个Python文件,或者是在Maven项目中提交Spark lib依赖,然后编写Scala或Java的应用,然后使用标准的mvn编译成jar包即可。

git clone https://github.com/tobegit3hub/spark_examples

cd ./spark_examples/spark_maven_project/

mvn clean package

如果用Scala编写Spark应用其实非常简单,为了避免依赖本地或者集群的文件,我们同样可以在memory中构造数据,然后编译成Jar包在本地测试或者提交集群。

import org.apache.spark.sql.SparkSession

object SparkExample {

  def main(args: Array[String]): Unit = {

    // Get SparkSession and SparkContext
    val spark = SparkSession
      .builder()
      .appName("Spark Example")
      // May be ignored when submitting to cluster
      .master("local[*]")
      .getOrCreate()
    val sc = spark.sparkContext

    // Use Dataframe
    val myRange = spark.range(1000).toDF("number")
    myRange.show()
    println(myRange)

    // Use RDD
    val input = sc.parallelize(List(1, 2, 3, 4))
    val result = input.map(x => x * 2)
    result.collect().foreach(println)

  }

}

如果IDE中已经配置好JAVA_HOME、HADOOP_HOME、SPARK_HOME等依赖,可以直接在IDE中运行测试,也可以进入OpenSpark的容器,直接提交Jar包。

spark-submit --class com.tobe.SparkExample --master yarn --deploy-mode client ./target/spark-maven-project-1.0-SNAPSHOT.jar

如果想把任务提交到已有的分布式Hadoop/Yarn集群,只需要找到容器内的Hadoop配置文件路径,复制粘贴集群中的所有XML文件即可。

cp /host/hadoop/*.xml $HADOOP_HOME/conf/hadoop/

spark-submit --class com.tobe.SparkExample --master yarn --deploy-mode client ./target/spark-maven-project-1.0-SNAPSHOT.jar

当然,如果你的应用里就要提交Spark任务提交的逻辑,而不适用spark-submit,也可以使用Java yarn-spark client,例如在OpenSpark环境中直接执行一个Java class即可。

mvn exec:java -Dexec.mainClass="com.tobe.sparkclient.SparkSubmitter"

Submitter代码如下,可修改Java源文件指定class和jar包路径。

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.spark.SparkConf;
import org.apache.spark.deploy.yarn.Client;
import org.apache.spark.deploy.yarn.ClientArguments;

/**
 * Java main class.
 *
 * mvn exec:java -Dexec.mainClass="com.tobe.sparkclient.SparkSubmitter"
 */
public class SparkSubmitter {
    private static Logger logger = LoggerFactory.getLogger(SparkSubmitter.class);

    public static void main(String[] args) {

        Configuration hadoopConfiguration = new Configuration();
        SparkConf sparkConf = new SparkConf();
        sparkConf.set("spark.submit.deployMode", "cluster");

        List<String> submitArgs = new ArrayList<String>();
        submitArgs.add("--jar");
        submitArgs.add("/root/spark-maven-project-1.0-SNAPSHOT.jar");
        submitArgs.add("--class");
        submitArgs.add("com.tobe.SparkExample");

        ClientArguments clientArguments = new ClientArguments(submitArgs.toArray(new String[submitArgs.size()]));
        Client client = new Client(clientArguments, hadoopConfiguration, sparkConf);
        logger.info("Submit App with args: " + Arrays.asList(submitArgs));

        try {
            ApplicationId applicationId = client.submitApplication();
            Thread.sleep(60000);
        } catch (Throwable t) {
            logger.error("Spark Application failed: " + t.getMessage(), t);
            throw new RuntimeException("Spark Application failed", t);
        }
    }
}

总结

从功能角度来看,OpenSpark只是提供一个封装好的Hadoop、Spark容器镜像,同时提供一些可复用的配置文件,但因为稳定可用的特点,也让开发者可是方便地开发测试Spark 1.6、Spark 2.3以及在不同的Yarn集群上的调度测试。

因此,有了OpenSpark,开发PySpark、Spark steaming以及任意的大数据应用都变得更加简单了。

    原文作者:tobe
    原文地址: https://zhuanlan.zhihu.com/p/52632846
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞