版本: 2.3.3
Hive on Spark为Hive提供了Apache Spark作为执行引擎。
set hive.execution.engine=spark;
Hive 1.1+以上版本提供Hive on Spark 。它在“ spark ”和“spark2”分支中仍处于发展阶段,并且定期合并到Hive的“主”分支中。
参见 HIVE-7292 及其子任务和相关问题。
版本兼容性
Hive on Spark仅用特定版本的Spark进行测试,因此给定版本的Hive只能保证与Spark的特定版本兼容。Spark的其他版本可能与给定版本的Hive一起使用,但不能保证。以下是Hive版本及其相应兼容Spark版本的列表。
Hive版本 | Spark版本 |
---|---|
master | 2.3.0 |
2.3.x | 2.0.0 |
2.2.x | 1.6.0 |
2.1.x | 1.6.0 |
2.0.x | 1.5.0 |
1.2.x | 1.3.1 |
1.1.x | 1.2.0 |
Spark 安装
按照说明安装Spark:
YARN模式:http : //spark.apache.org/docs/latest/running-on-yarn.html
独立模式:https: //spark.apache.org/docs/latest/spark-standalone.html
Hive on Spark 默认支持 Spark on YARN模式。
对于安装执行以下任务:
- 安装Spark(或者下载预先构建的Spark,或者从源代码构建程序集)。
- 安装/构建兼容版本。Hive root
pom.xml
的<spark.version>定义了它构建/测试的Spark版本。 - 安装/构建兼容的发行版。Spark的每个版本都有几个发行版,与不同版本的Hadoop相对应。
- 一旦安装了Spark,找到并记录<spark-assembly – *.jar>位置。
- 请注意,您必须拥有不包含Hive jar 的Spark版本 。这意味着不是用Hive配置文件构建的。如果您将使用Parquet tables,建议启用“parquet-provided” profile 。否则,Parquet依赖性可能会发生冲突。要从安装中删除Hive jar,只需使用Spark的以下命令:
- 安装/构建兼容版本。Hive root
在Spark 2.0.0之前:
./make-distribution.sh --name "hadoop2-without-hive" --tgz "-Pyarn,hadoop-provided,[hadoop-2.4,parquet-provided](https://cwiki.apache.org/confluence/display/Hive/hadoop-2.4,parquet-provided)"
从Spark 2.0.0起:
./dev/make-distribution.sh --name "hadoop2-without-hive" --tgz "-Pyarn,hadoop-provided,hadoop-2.7,parquet-provided"
由Spark 2.3.0起:
./dev/make-distribution.sh --name "hadoop2-without-hive" --tgz "-Pyarn,hadoop-provided,hadoop-2.7,parquet-provided,orc-provided"
- 启动Spark群集
- 记下<Spark Master URL>。这可以在Spark master WebUI中找到。
配置YARN
公平调度程序是必需的,而不是容量调度程序。这在YARN集群中公平地分配了相同份额的资源。
yarn.resourcemanager.scheduler.class = org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler
配置Hive
要将Spark依赖添加到Hive,请执行以下操作:
- 在Hive 2.2.0之前,将spark-assembly jar链接到
HIVE_HOME/lib
。 - 由于Hive 2.2.0,Hive on Spark 使用Spark 2.0.0及以上版本运行,它没有assembly jar。
- 要以YARN模式(yarn-client or yarn-cluster)运行,请将以下jars 链接到
HIVE_HOME/lib
。- scala-library
- spark-core
- spark-network-common
- 要以LOCAL模式运行(仅用于调试),除上述以外,还要连接下列jars 到
HIVE_HOME/lib
。- chill-java chill jackson-module-paranamer jackson-module-scala jersey-container-servlet-core
- jersey-server json4s-ast kryo-shaded minlog scala-xml spark-launcher
- spark-network-shuffle spark-unsafe xbean-asm5-shaded
- 要以YARN模式(yarn-client or yarn-cluster)运行,请将以下jars 链接到
- 在Hive 2.2.0之前,将spark-assembly jar链接到
配置Hive执行引擎以使用Spark:
set hive.execution.engine=spark;
有关配置Hive和远程Spark驱动程序的其他属性,请参阅Hive配置属性的Spark部分。
- 为Hive配置Spark应用程序配置。请参阅:http : //spark.apache.org/docs/latest/configuration.html。这可以通过将这些属性的文件“spark-defaults.conf”添加到Hive类路径中,或者通过将它们设置为Hive配置(
hive-site.xml
)来完成。例如:
set spark.master=<Spark Master URL>
set spark.eventLog.enabled=true;
set spark.eventLog.dir=<Spark event log folder (must exist)>
set spark.executor.memory=512m;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;
配置属性细节
* `spark.executor.memory`: 每个执行程序进程使用的内存量。
* `spark.executor.cores`:每个执行者的核心数量。
* `spark.yarn.executor.memoryOverhead`:在yarn上运行Spark时,为每个执行程序分配的堆内存量(以兆字节为单位)。这些内存用于,VM overheads ,interned strings,其他本地overheads等等。除了执行程序的内存外,启动执行程序的容器需要一些额外的系统进程内存,就是这种开销。
* `spark.executor.instances`:分配给每个应用程序的执行程序的数量。
* `spark.driver.memory`:分配给远程Spark上下文(RSC)的内存量。我们推荐4GB。
* `spark.yarn.driver.memoryOverhead`:我们推荐400(MB)。
允许Yarn缓存节点上必需的Spark依赖性JAR,以便每次应用程序运行时不需要分发它。
- 在Hive 2.2.0之前,将spark-assembly jar上传到hdfs文件(例如:hdfs://xxxx:8020/spark-assembly.jar
)并在hive-site.xml中添加以下内容
- 在Hive 2.2.0之前,将spark-assembly jar上传到hdfs文件(例如:hdfs://xxxx:8020/spark-assembly.jar
<property>
<name>spark.yarn.jar</name>
<value>hdfs://xxxx:8020/spark-assembly.jar</value>
</property>
- Hive 2.2.0,将$SPARK_HOME/jars中的所有jar上传到hdfs文件夹(例如:hdfs:///xxxx:8020/spark-jars
),并在hive-site.xml中添加以下内容
<property>
<name>spark.yarn.jars</name>
<value>hdfs://xxxx:8020/spark-jars/*</value>
</property>
配置Spark
设置执行程序的内存大小要比简单地设置为尽可能大要复杂。有几件事情需要考虑:
更多的执行器内存意味着它可以为更多的查询启用mapjoin优化。
另一方面,更多的执行者内存从GC的角度来看变得很笨拙。
一些实验表明,HDFS客户端不能很好地处理并发写,因此如果执行者核心太多,它可能会面临竞争状态。
以下设置需要针对群集进行调整,这些设置也可能适用于在Spark之外的Hive上提交Spark作业:
属性 | 建议 |
---|---|
spark.executor.cores | 在5-7之间,请参阅调优细节部分 |
spark.executor.memory | yarn.nodemanager.resource.memory-mb *(spark.executor.cores / yarn.nodemanager.resource.cpu-vcores) |
spark.yarn.executor.memoryOverhead | spark.executor.memory的15-20% |
spark.executor.instances | 取决于spark.executor.memory + spark.yarn.executor.memoryOverhead,请参阅调整详细信息部分。 |
调整细节
在Spark on YARN 时, 我们通常建议将spark.executor.cores设置 为5,6或7,具体取决于可以被节点整除。例如,如果 yarn.nodemanager.resource.cpu-vcores是19,那么6是更好的选择(所有执行者只能拥有相同数量的内核,如果我们选择5,那么每个执行者只能获得3个内核;如果我们选择了7,那么只有2个执行者被使用,5个核心将被浪费)。如果是20,那么5是更好的选择(因为这样你会得到4个执行者,并且没有核心被浪费)。
对于spark.executor.memory
,我们推荐计算 yarn.nodemanager.resource.memory-mb *(spark.executor.cores / yarn.nodemanager.resource.cpu-vcores), 然后拆分spark.executor.memory和。根据我们的实验,我们建议设置 为总内存的15-20%左右。 spark.yarn.executor.memoryOverhead``spark.yarn.executor.memoryOverhead
在决定每个执行程序接收多少内存之后,您需要决定将多少执行程序分配给查询。在GA版本中,将支持Spark动态执行程序分配。但是,对于这个测试版,只能使用静态资源分配。根据每个节点的物理内存的配置 spark.executor.memory
和spark.yarn.executor.memoryOverhead
,你需要选择实例和组的数量spark.executor.instances
。
现在是一个真实世界的例子 假设每个节点具有12个虚拟内核的10个节点具有64GB的内存,例如, yarn.nodemanager.resource.cpu-vcores=12
。一个节点将被用作主节点,因此集群将有9个从节点。我们将配置spark.executor.cores
为6.鉴于64GB的内存yarn.nodemanager.resource.memory-mb
将为50GB。我们将确定每个执行程序的内存量如下:50GB *(6/12)= 25GB。我们将分配20%spark.yarn.executor.memoryOverhead
,或5120和80%spark.executor.memory
,或20GB。
在这个9节点集群上,每个主机都有两个执行者。因此,我们可以配置spark.executor.instances
2到18之间的某个值。值为18将利用整个群集。
常见问题(绿色已解决,将从此列表中删除)
https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started
推荐配置
有关这些设置的详细信息,请参阅HIVE-9153。
mapreduce.input.fileinputformat.split.maxsize=750000000
hive.vectorized.execution.enabled=true
hive.cbo.enable=true
hive.optimize.reducededuplication.min.reducer=4
hive.optimize.reducededuplication=true
hive.orc.splits.include.file.footer=false
hive.merge.mapfiles=true
hive.merge.sparkfiles=false
hive.merge.smallfiles.avgsize=16000000
hive.merge.size.per.task=256000000
hive.merge.orcfile.stripe.level=true
hive.auto.convert.join=true
hive.auto.convert.join.noconditionaltask=true
hive.auto.convert.join.noconditionaltask.size=894435328
hive.optimize.bucketmapjoin.sortedmerge=false
hive.map.aggr.hash.percentmemory=0.5
hive.map.aggr=true
hive.optimize.sort.dynamic.partition=false
hive.stats.autogather=true
hive.stats.fetch.column.stats=true
hive.vectorized.execution.reduce.enabled=false
hive.vectorized.groupby.checkinterval=4096
hive.vectorized.groupby.flush.percent=0.1
hive.compute.query.using.stats=true
hive.limit.pushdown.memory.usage=0.4
hive.optimize.index.filter=true
hive.exec.reducers.bytes.per.reducer=67108864
hive.smbjoin.cache.rows=10000
hive.exec.orc.default.stripe.size=67108864
hive.fetch.task.conversion=more
hive.fetch.task.conversion.threshold=1073741824
hive.fetch.task.aggr=false
mapreduce.input.fileinputformat.list-status.num-threads=5
spark.kryo.referenceTracking=false
spark.kryo.classesToRegister=org.apache.hadoop.hive.ql.io.HiveKey,org.apache.hadoop.io.BytesWritable,org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch
有关其他属性,请参阅配置页面的Spark部分。