Spark 中的 --files 参数与 ConfigFactory 工厂方法

Spark 中的 –files 参数与 ConfigFactory 工厂方法

scala 对象

以前有个大数据项目做小程序统计,读取 HDFS 上的 Parquet 文件,统计完毕后,将结果写入到 MySQL 数据库。首先想到的是将 MySQL 的配置写在代码里面:

val jdbcUrl  = "jdbc:mysql://127.0.0.1:6606/test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&useSSL=false"
val user     = "root"
val password = "averyloooooongword"
val driver   = "com.mysql.jdbc.Driver"

properties 文件

如果是测试,生产环境各有一套,那上面的代码就要分别复制俩份,不便于维护!后来知道了可以把配置放在 resources 目录下, 针对本地,测试和生产环境,分别创建不同的 properties 文件:

conf.properties  
conf_product.properties 
env.properties  
local.properties

例如其中的 conf.properties 内容如下:

#  测试环境配置

## 数据库配置
jdbc.url=jdbc:mysql://10.0.0.11:3306/ald_xinen_test?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false
jdbc.user=aldwx
jdbc.pwd=123456
jdbc.driver=com.mysql.jdbc.Driver

# parquet 文件目录
tongji.parquet=hdfs://10.0.0.212:9000/ald_log_parquet

然后在代码里面读取 resource 文件中的配置:

    /**
     * 根据 key 获取 properties 文件中的 value
     * @param key properties 文件中等号左边的键
     * @return 返回 properties 文件中等号右边的值
     */
    public static String getProperty(String key) {
        Properties properties = new Properties();
        InputStream in = ConfigurationUtil.class.getClassLoader().getResourceAsStream(getEnvProperty("env.conf"));
        try {
            properties.load(in);
            in.close();
        } catch (IOException e) {
            e.printStackTrace();
        }
        return (String) properties.get(key);
    }

这样解决了多个环境中配置不同的问题,只需要复制多个 properties 文件,根据需要修改就行。但是这种方法不是最优的,因为配置不是结构化的,而是通过注释分割了不同的配置。

conf 文件

resources 目录下的文件如下:

application.conf              
application.production.conf      
application.local.conf             
log4j.properties              
metrics.properties

ConfigFactory 工厂方法默认会读取 resources 目录下面名为 application.conf 的文件:

# Spark 相关配置
spark {
  master                   = "local[2]"
  streaming.batch.duration = 5001  // Would normally be `ms` in config but Spark just wants the Long
  eventLog.enabled         = true
  ui.enabled               = true
  ui.port                  = 4040
  metrics.conf             = metrics.properties
  checkpoint.path          = "/tmp/checkpoint/telematics-local"
  stopper.port             = 12345
  spark.cleaner.ttl        = 3600
  spark.cleaner.referenceTracking.cleanCheckpoints = true
}

# Kafka 相关配置
kafka {

  metadata.broker.list = "localhost:9092"
  zookeeper.connect    = "localhost:2181"

  topic.dtcdata {
    name = "dc-diagnostic-report"
    partition.num = 1      
    replication.factor = 1  
  }

  group.id             = "group-rds"
  timeOut              = "3000"
  bufferSize           = "100"
  clientId             = "telematics"
  key.serializer.class = "kafka.serializer.StringEncoder"
  serializer.class     = "com.wm.dtc.pipeline.kafka.SourceDataSerializer"
//  serializer.class     = "kafka.serializer.DefaultEncoder"
}

# MySQL 配置
mysql {
  dataSource.maxLifetime              = 800000
  dataSource.idleTimeout              = 600000
  dataSource.maximumPoolSize          = 10
  dataSource.cachePrepStmts           = true
  dataSource.prepStmtCacheSize        = 250
  dataSource.prepStmtCacheSqlLimit    = 204800
  dataSource.useServerPrepStmts       = true
  dataSource.useLocalSessionState     = true
  dataSource.rewriteBatchedStatements = true
  dataSource.cacheResultSetMetadata   = true
  dataSource.cacheServerConfiguration = true
  dataSource.elideSetAutoCommits      = true
  dataSource.maintainTimeStats        = false

  jdbcUrl="jdbc:mysql://127.0.0.1:6606/wmdtc?useUnicode=true&characterEncoding=utf-8&autoReconnect=true&failOverReadOnly=false&useSSL=false"
  jdbcDriver="com.mysql.jdbc.Driver"
  dataSource.user="root"
  dataSource.password="123456"
}

为了验证,我创建了一个 Object 对象:

package allinone
import com.typesafe.config.ConfigFactory
import scopt.OptionParser

object SparkFilesArgs extends App  {
  val config = ConfigFactory.load()
  val sparkConf = config.getConfig("spark")
  val sparkMaster = sparkConf.getString("master")
  val sparkDuration = sparkConf.getLong("streaming.batch.duration")
  println(sparkMaster, sparkDuration)
}

如果我直接运行就会打印:

(local[2],5001)

确实是 application.conf 文件中 Spark 的配置。

但是生产环境我们打算使用另外一个配置文件 application.production.conf:

spark {
  master = "yarn"
  streaming.batch.duration = 5002
  eventLog.enabled=true
  ui.enabled = true
  ui.port = 4040
  metrics.conf = metrics.properties
  checkpoint.path = "/tmp/telematics"
  stopper.port = 12345
  spark.cleaner.ttl = 3600
  spark.cleaner.referenceTracking.cleanCheckpoints = true

  trajectory.path = "hdfs://CRRCNameservice/road_matching/output/road_match_result"
  city.path = "hdfs://CRRCNameservice/user/root/telematics/data/city.csv"
}

##cassandra相关配置
cassandra {
  keyspace = wmdtc
  cardata.name = can_signal
  trip.name = trip
  latest.name = latest
  latest.interval = 15000

  connection.host = "WMBigdata2,WMBigdata3,WMBigdata4,WMBigdata5,WMBigdata6"
  write.consistency_level = LOCAL_ONE
  read.consistency_level = LOCAL_ONE
  concurrent.writes = 24
  batch.size.bytes = 65536
  batch.grouping.buffer.size = 1000
  connection.keep_alive_ms = 300000
  auth.username = cihon
  auth.password = cihon
}

kafka {
  metadata.broker.list = "WMBigdata2:9092,WMBigdata3:9092,WMBigdata4:9092,WMBigdata5:9092,WMBigdata6:9092"
  zookeeper.connect = "WMBigdata2:2181,WMBigdata3:2181,WMBigdata4:2181"

  topic.obddata {
    name = "wmdtc"
  }

  group.id = "can_signal"
  timeOut = "3000"
  bufferSize = "100"
  clientId = "telematics"

  key.serializer.class = "kafka.serializer.StringEncoder"
  serializer.class = "com.wm.telematics.pipeline.kafka.SourceDataSerializer"

}

akka {
  loglevel = INFO
  stdout-loglevel = WARNING
  loggers = ["akka.event.slf4j.Slf4jLogger"]
}

##geoService接口URL
webservice {
  url = "http://101.201.108.155:8088/map/roadmessage"
}

##geoService相关配置
geoservice {
  timeout = 3
  useRealData = false
}

既然 ConfigFactory 方法默认读取 application.conf 文件,但是

val config = ConfigFactory.load()

相当于:

val config = ConfigFactory.load("application.conf")

但是 load 方法也接受参数:resourceBasename:

val config = ConfigFactory.load("application.production") // 加载生产环境的配置

这样在代码里面通过加载不同的配置文件实现本地、测试、生产环境的切换和部署,但是在代码里面读取配置还是不够优美!所以我们有 Spark 的 --files 命令行选项。顾名思义,显而易见,也正如官网所描述的那样, --files 参数后面的值是逗号分割的文本文件, 里面有一个 .conf 文件, load 方法会加载 --files 选项传递过来的配置文件:

#!/bin/sh

CONF_DIR=/root/telematics/resources
APP_CONF=application.production.conf
EXECUTOR_JMX_PORT=23339
DRIVER_JMX_PORT=2340

spark-submit \
  --name WM_telematics \
  --class allinone.SparkFilesArgs \
  --master local[*] \
  --deploy-mode client \
  --driver-memory 2g \
  --driver-cores 2 \
  --executor-memory 1g \
  --executor-cores 3 \
  --num-executors 3 \
  --conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf spark.executor.memoryOverhead=4096 \
  --conf spark.driver.memoryOverhead=2048 \
  --conf spark.yarn.maxAppAttempts=2 \
  --conf spark.yarn.submit.waitAppCompletion=false \
  --conf spark.network.timeout=1800s \
  --conf spark.scheduler.executorTaskBlacklistTime=30000 \
  --conf spark.core.connection.ack.wait.timeout=300s \
  --files $CONF_DIR/$APP_CONF,$CONF_DIR/log4j.properties,$CONF_DIR/metrics.properties \
  /Users/ohmycloud/work/cihon/sxw/all-in-one/target/allinone-1.0-SNAPSHOT.jar

它打印:

(local[*],5002)

因为我在命令行选项中指定了 master 为 local[*], 配置文件为 application.production.conf

resource not found on classpath: application.conf

本地 localhost

jar 包里面我把 application.conf 给删除了,用 --files 传参数给 spark-submit 的方式,但是报:在 classpath 下找不到 application.conf 这个文件了。

cat spark-submit.sh:

#!/bin/sh

CONF_DIR=/Users/ohmycloud/work/cihon/gac/sources
APP_CONF=application.conf
EXECUTOR_JMX_PORT=23333
DRIVER_JMX_PORT=2334

spark-submit \
  --class $1 \
  --master local[2] \
  --deploy-mode client \
  --driver-memory 2g \
  --driver-cores 2 \
  --executor-memory 2g \
  --executor-cores 2 \
  --num-executors 4 \
  --conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf spark.yarn.executor.memoryOverhead=1024 \
  --conf spark.yarn.driver.memoryOverhead=1024 \
  --conf spark.yarn.maxAppAttempts=2 \
  --conf spark.yarn.submit.waitAppCompletion=false \
  --files $CONF_DIR/$APP_CONF \
  /Users/ohmycloud/demo/Spark/WriteParquet2Kafka/target/socket-structured-streaming-1.0-SNAPSHOT.jar

原因是 application.conf 文件所在的路径 /Users/ohmycloud/work/cihon/gac/sources 不在 classpath 里面!

使用

 --driver-class-path /Users/ohmycloud/work/cihon/gac/sources 

而非

 --driver-class-path /Users/ohmycloud/work/cihon/gac/sources/application.conf 

来添加 class path。

#!/bin/sh

CONF_DIR=/Users/ohmycloud/work/cihon/gac/sources
APP_CONF=application.conf
EXECUTOR_JMX_PORT=23333
DRIVER_JMX_PORT=2334

spark-submit \
  --class $1 \
  --master local[2] \
  --deploy-mode client \
  --driver-memory 2g \
  --driver-cores 2 \
  --executor-memory 2g \
  --executor-cores 2 \
  --num-executors 4 \
  --conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf spark.yarn.executor.memoryOverhead=1024 \
  --conf spark.yarn.driver.memoryOverhead=1024 \
  --conf spark.yarn.maxAppAttempts=2 \
  --conf spark.yarn.submit.waitAppCompletion=false \
  --driver-class-path /Users/ohmycloud/work/cihon/gac/sources \
  --files $CONF_DIR/$APP_CONF \
  /Users/ohmycloud/demo/Spark/WriteParquet2Kafka/target/socket-structured-streaming-1.0-SNAPSHOT.jar

yarn 模式

yarn 模式下,不需要添加 driver-class-path 了:

#!/bin/sh

CONF_DIR=/root/resources
APP_CONF=application.test.conf
EXECUTOR_JMX_PORT=23333
DRIVER_JMX_PORT=2334

spark2-submit \
  --class $1 \
  --master yarn \
  --deploy-mode cluster \
  --driver-memory 2g \
  --driver-cores 2 \
  --executor-memory 2g \
  --executor-cores 2 \
  --num-executors 4 \
  --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0 \
  --conf "spark.executor.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$EXECUTOR_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf "spark.driver.extraJavaOptions=-Dconfig.resource=$APP_CONF -Dcom.sun.management.jmxremote.port=$DRIVER_JMX_PORT -Dcom.sun.management.jmxremote.ssl=false -Dcom.sun.management.jmxremote.authenticate=false -Djava.rmi.server.hostname=`hostname`" \
  --conf spark.executor.memoryOverhead=1024 \
  --conf spark.driver.memoryOverhead=1024 \
  --conf spark.yarn.maxAppAttempts=2 \
  --conf spark.yarn.submit.waitAppCompletion=false \
  --files $CONF_DIR/$APP_CONF,$CONF_DIR/log4j.properties,$CONF_DIR/metrics.properties \
  target/socket-structured-streaming-1.0-SNAPSHOT.jar

但是实际上, 后来发现有时候不行,所以最好还是加上 driver-class-path!

Attention

attention

我在一个离线程序中给配置文件起了一个不带 application 的名字后,程序就报【找不到某个键了】,该成 application.conf 之后就可以了。

References

Using typesafe config with Spark on Yarn
Externalize properties – typesafe config
Spark Context and Spark Configuration
How to specify custom conf file for Spark Standalone’s master?
Scala Load Configuration With PureConfig
Example: Running a Spark application with optional parameters

    原文作者:焉知非鱼
    原文地址: https://www.jianshu.com/p/525642a286e7
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞