Spark-Sql的参数调优:
官网: http://spark.apache.org/docs/latest/sql-programming-guide.html
spark.catalog.cacheTable("tableName")
sqlContext.cacheTable("tableName")
spark.catalog.uncacheTable("tableName")
sqlContext.uncacheTable("tableName")
或者缓存dataFrame
dataFrame.cache()
dataFrame.unpersist()
或者CACHE TABLE
//缓存全表
sqlContext.sql("CACHE TABLE activity")
//缓存过滤结果
sqlContext.sql("CACHE TABLE activity_cached as select * from activity where ...")
// lazy cache
sqlContext.sql("CACHE LAZY TABLE ...")
// 取消缓存
sqlContext.sql("UNCACHE TABLE activity")
可通过两种配置方式开启缓存数据功能:
使用SQLContext的setConf方法
执行SQL命令 SET key=value
- 参数配置
Property Name | Default | Meaning |
---|---|---|
spark.sql.files.maxPartitionBytes | 134217728 (128 MB) | The maximum number of bytes to pack into a single partition when reading files. |
spark.sql.files.openCostInBytes | 4194304 (4 MB) | The estimated cost to open a file, measured by the number of bytes could be scanned in the same time. This is used when putting multiple files into a partition. It is better to over estimated, then the partitions with small files will be faster than partitions with bigger files (which is scheduled first). |
spark.sql.broadcastTimeout | 300 | Timeout in seconds for the broadcast wait time in broadcast joins |
spark.sql.autoBroadcastJoinThreshold | 10485760 (10 MB) | Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan has been run. |
spark.sql.shuffle.partitions | 200 | Configures the number of partitions to use when shuffling data for joins or aggregations. |
用到的配置
— spark.sql.autoBroadcastJoinThreshold, broadcast表的最大值10M,当这是为-1时, broadcasting不可用,内存允许的情况下加大这个值
— spark.sql.shuffle.partitions 当join或者聚合产生shuffle操作时, partitions的数量, 这个值可以调大点, 我一般配置500, 切分更多的task, 有助于数据倾斜的减缓, 但是如果task越多, shuffle数据量也会增多
- Broadcast Hint for SQL Queries
参考: https://blog.csdn.net/dabokele/article/details/65963401
import org.apache.spark.sql.functions.broadcast
broadcast(spark.table("src")).join(spark.table("records"), "key").show()
对于broadcast join模式,会将小于spark.sql.autoBroadcastJoinThreshold值(默认为10M)的表广播到其他计算节点,不走shuffle过程,所以会更加高效。
官网的原话是这个样子:
The BROADCAST hint guides Spark to broadcast each specified table when joining them with another table or view. When Spark deciding the join methods, the broadcast hash join (i.e., BHJ) is preferred, even if the statistics is above the configuration spark.sql.autoBroadcastJoinThreshold. When both sides of a join are specified, Spark broadcasts the one having the lower statistics. Note Spark does not guarantee BHJ is always chosen, since not all cases (e.g. full outer join) support BHJ. When the broadcast nested loop join is selected, we still respect the hint.
注意: 确定broadcast hash join的决定性因素是hive的表统计信息一定要准确。并且,由于视图是没有表统计信息的,所以所有的视图在join时都不会被广播。所以至少要有一张hive表。
————————待完善————————