Spark 2.3.0测试笔记一:Shuffle到胃疼

1 前言

最近有想法将生产用的spark2.1.x版本的基线上调至spark2.3系列,在此基础上维护一个相对稳定的生产版本,以满足用户日益增长的新特性和性能的需求。

2 测试数据

benchmarkscalefileformatpartitionedlink
TPCDS1Tparquettruehttps://github.com/yaooqinn/tpcds-for-spark

3 实验对象

3.1 参照组

baselinemodifiedcommitlink
2.1.2true9ef23aehttps://github.com/yaooqinn/spark/tree/v2.1.2-based

3.2 实验组

baselinemodifiedcommitlink
2.3.0false992447fhttps://github.com/yaooqinn/spark/tree/v2.3.0-based

4 配置

4.1 硬件配置

机器类别CPUMemoryDisk台数Services
虚拟机4 × 1 × 132g500g × 15NN(1) SNN (1) RM(1) HMS(1) SHS(1)
物理机48 × 2 × 12256g7.3T × 123DN(3) NM(3)
物理机48 × 2× 12256g1.1T × 14DN(4) NM(4)
物理机32 × 2 × 8128g3.6T × 121DN(1) NM(1)
物理机40 × 2 × 10256g500g × 11NM(1)
物理机48 × 2 × 1232g1.1T × 11DN(1)

说明:复杂的机器配置,模拟更加真实的场景,在一些单盘物理机上进行shuffle亦可模拟生产环境中可能的IO瓶颈,总之这些场景不影响总体的测试对比。

4.2 metrics

《Spark 2.3.0测试笔记一:Shuffle到胃疼》 hdfs
《Spark 2.3.0测试笔记一:Shuffle到胃疼》 yarn

4.3 SparkConf

## Basic Settings ##
spark.master                                          yarn
spark.submit.deployMode                               client
spark.serializer                                      org.apache.spark.serializer.KryoSerializer
spark.kryoserializer.buffer.max                       256m
spark.local.dir                                       ./local

## Hadoop Settings ##
spark.hadoop.fs.hdfs.impl.disable.cache               true
spark.hadoop.fs.file.impl.disable.cache               true

## Driver/AM Settings ##
spark.yarn.am.cores                                   2
spark.yarn.am.memory                                  2g
spark.yarn.am.memoryOverhead                          512
spark.yarn.am.extraJavaOptions                        -XX:PermSize=1024m -XX:MaxPermSize=2048m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution
spark.driver.maxResultSize                            2g
spark.driver.memory                                   30g
spark.driver.extraJavaOptions                         -XX:PermSize=1024m -XX:MaxPermSize=1024m

## Executor Settings ##
spark.executor.instances                              40
spark.executor.cores                                  4
spark.executor.memory                                 20g
spark.executor.memoryOverhead                         4096
spark.executor.extraJavaOptions                       -XX:PermSize=1024m -XX:MaxPermSize=1024m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution

## Security Settings ##
spark.yarn.keytab                                     ?
spark.yarn.principal                                  ?

## Dynamic Allocation Settings ##
spark.shuffle.service.enabled                         true
spark.dynamicAllocation.enabled                       false
spark.dynamicAllocation.initialExecutors              1
spark.dynamicAllocation.minExecutors                  1
spark.dynamicAllocation.maxExecutors                  50

## SQL Configurations ##
spark.sql.autoBroadcastJoinThreshold                  204857600
spark.sql.warehouse.dir                               /user/spark/warehouse
# spark.sql.hive.convertCTAS                            true
# spark.sql.sources.default                             parquet
spark.sql.shuffle.partitions                          1024
# spark.sql.hive.convertMetastoreParquet                false
spark.sql.crossJoin.enabled=true

## History Server Client Settings ##
# spark.eventLog.enabled                                true
spark.eventLog.compress                               true
spark.eventLog.dir                                    hdfs:///spark2-history/
spark.yarn.historyServer.address                      ?:18081

5 测试结果

5.1 实验数据

《Spark 2.3.0测试笔记一:Shuffle到胃疼》 测试结果

  1. 列1:sql statement
  2. 列2:参照组结果
  3. 列3:实验组结果
  4. 列4:(实验组 – 参照组) / 参照组
  5. 绿色:列4 < 0, 性能提升
  6. 粉红:0<=列4 < 1,性能下降
  7. 深红:列4 > 1, 性能下下下降

5.2 定性结果

  1. 磁盘IO相关的告警,用量,负荷等等。单盘物理机在大shuffle的情况下,负载过重会出现相关的告警;参照组只在query14等个别查询下产生相关告警,实验组一晚上收到的告警让人无法安眠。
  2. 实验组query72为手动kill,shuffle了一天也没有出结果,由于出现数据倾斜的情况,在spark.sql.shuffle.partitions=1024情况下,只有10个左右task在计算结果,单task处理T级别数据,几乎卡死,其余全在毫秒级空转完成。参照组400多秒完成查询。
  3. 出现大面积的性能下降问题,可能的原因有很多,可以确定的一点,有这么多shuffle,Broadcast Join应该变成了SortMerge Join

6 相关分析

我们拿query72来进行分析

6.1 参照组执行计划

== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[total_cnt#3L DESC NULLS LAST,i_item_desc#62 ASC NULLS FIRST,w_warehouse_name#46 ASC NULLS FIRST,d_week_seq#98L ASC NULLS FIRST], output=[i_item_desc#62,w_warehouse_name#46,d_week_seq#98L,no_promo#1L,promo#2L,total_cnt#3L])
+- *HashAggregate(keys=[i_item_desc#62, w_warehouse_name#46, d_week_seq#98L], functions=[sum(cast(CASE WHEN isnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), sum(cast(CASE WHEN isnotnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), count(1)])
   +- Exchange hashpartitioning(i_item_desc#62, w_warehouse_name#46, d_week_seq#98L, 1024)
      +- *HashAggregate(keys=[i_item_desc#62, w_warehouse_name#46, d_week_seq#98L], functions=[partial_sum(cast(CASE WHEN isnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), partial_sum(cast(CASE WHEN isnotnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), partial_count(1)])
         +- *Project [w_warehouse_name#46, i_item_desc#62, d_week_seq#98L, p_promo_sk#178L]
            +- SortMergeJoin [cs_item_sk#21L, cs_order_number#23L], [cr_item_sk#199L, cr_order_number#214L], LeftOuter
               :- *Sort [cs_item_sk#21L ASC NULLS FIRST, cs_order_number#23L ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(cs_item_sk#21L, cs_order_number#23L, 1024)
               :     +- *Project [cs_item_sk#21L, cs_order_number#23L, w_warehouse_name#46, i_item_desc#62, d_week_seq#98L, p_promo_sk#178L]
               :        +- *BroadcastHashJoin [cs_promo_sk#22L], [p_promo_sk#178L], LeftOuter, BuildRight
               :           :- *Project [cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, w_warehouse_name#46, i_item_desc#62, d_week_seq#98L]
               :           :  +- *BroadcastHashJoin [cs_ship_date_sk#8L], [d_date_sk#150L], Inner, BuildRight, (cast(d_date#152 as double) > (cast(d_date#96 as double) + 5.0))
               :           :     :- *Project [cs_ship_date_sk#8L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, w_warehouse_name#46, i_item_desc#62, d_date#96, d_week_seq#98L]
               :           :     :  +- *BroadcastHashJoin [d_week_seq#98L, inv_date_sk#40L], [d_week_seq#126L, d_date_sk#122L], Inner, BuildRight
               :           :     :     :- *Project [cs_ship_date_sk#8L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, inv_date_sk#40L, w_warehouse_name#46, i_item_desc#62, d_date#96, d_week_seq#98L]
               :           :     :     :  +- *BroadcastHashJoin [cs_sold_date_sk#6L], [d_date_sk#94L], Inner, BuildRight
               :           :     :     :     :- *Project [cs_ship_date_sk#8L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_date_sk#40L, w_warehouse_name#46, i_item_desc#62]
               :           :     :     :     :  +- *BroadcastHashJoin [cs_bill_hdemo_sk#11L], [hd_demo_sk#89L], Inner, BuildRight
               :           :     :     :     :     :- *Project [cs_ship_date_sk#8L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_date_sk#40L, w_warehouse_name#46, i_item_desc#62]
               :           :     :     :     :     :  +- *BroadcastHashJoin [cs_bill_cdemo_sk#10L], [cd_demo_sk#80L], Inner, BuildRight
               :           :     :     :     :     :     :- *Project [cs_ship_date_sk#8L, cs_bill_cdemo_sk#10L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_date_sk#40L, w_warehouse_name#46, i_item_desc#62]
               :           :     :     :     :     :     :  +- *BroadcastHashJoin [cs_item_sk#21L], [i_item_sk#58L], Inner, BuildRight
               :           :     :     :     :     :     :     :- *Project [cs_ship_date_sk#8L, cs_bill_cdemo_sk#10L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_date_sk#40L, w_warehouse_name#46]
               :           :     :     :     :     :     :     :  +- *BroadcastHashJoin [inv_warehouse_sk#42L], [w_warehouse_sk#44L], Inner, BuildRight
               :           :     :     :     :     :     :     :     :- *Project [cs_ship_date_sk#8L, cs_bill_cdemo_sk#10L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_sold_date_sk#6L, inv_warehouse_sk#42L, inv_date_sk#40L]
               :           :     :     :     :     :     :     :     :  +- *SortMergeJoin [cs_item_sk#21L], [inv_item_sk#41L], Inner, (inv_quantity_on_hand#43L < cs_quantity#24L)
               :           :     :     :     :     :     :     :     :     :- *Sort [cs_item_sk#21L ASC NULLS FIRST], false, 0
               :           :     :     :     :     :     :     :     :     :  +- Exchange hashpartitioning(cs_item_sk#21L, 1024)
               :           :     :     :     :     :     :     :     :     :     +- *Project [cs_ship_date_sk#8L, cs_bill_cdemo_sk#10L, cs_bill_hdemo_sk#11L, cs_item_sk#21L, cs_promo_sk#22L, cs_order_number#23L, cs_quantity#24L, cs_sold_date_sk#6L]
               :           :     :     :     :     :     :     :     :     :        +- *Filter ((((isnotnull(cs_quantity#24L) && isnotnull(cs_item_sk#21L)) && isnotnull(cs_bill_cdemo_sk#10L)) && isnotnull(cs_bill_hdemo_sk#11L)) && isnotnull(cs_ship_date_sk#8L))
               :           :     :     :     :     :     :     :     :     :           +- *FileScan parquet tpcds_1t_spark230.catalog_sales[cs_ship_date_sk#8L,cs_bill_cdemo_sk#10L,cs_bill_hdemo_sk#11L,cs_item_sk#21L,cs_promo_sk#22L,cs_order_number#23L,cs_quantity#24L,cs_sold_date_sk#6L] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_..., PartitionCount: 1836, PartitionFilters: [isnotnull(cs_sold_date_sk#6L)], PushedFilters: [IsNotNull(cs_quantity), IsNotNull(cs_item_sk), IsNotNull(cs_bill_cdemo_sk), IsNotNull(cs_bill_hd..., ReadSchema: struct<cs_ship_date_sk:bigint,cs_bill_cdemo_sk:bigint,cs_bill_hdemo_sk:bigint,cs_item_sk:bigint,c...
               :           :     :     :     :     :     :     :     :     +- *Sort [inv_item_sk#41L ASC NULLS FIRST], false, 0
               :           :     :     :     :     :     :     :     :        +- Exchange hashpartitioning(inv_item_sk#41L, 1024)
               :           :     :     :     :     :     :     :     :           +- *Project [inv_item_sk#41L, inv_warehouse_sk#42L, inv_quantity_on_hand#43L, inv_date_sk#40L]
               :           :     :     :     :     :     :     :     :              +- *Filter ((isnotnull(inv_quantity_on_hand#43L) && isnotnull(inv_item_sk#41L)) && isnotnull(inv_warehouse_sk#42L))
               :           :     :     :     :     :     :     :     :                 +- *FileScan parquet tpcds_1t_spark230.inventory[inv_item_sk#41L,inv_warehouse_sk#42L,inv_quantity_on_hand#43L,inv_date_sk#40L] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_..., PartitionCount: 261, PartitionFilters: [isnotnull(inv_date_sk#40L)], PushedFilters: [IsNotNull(inv_quantity_on_hand), IsNotNull(inv_item_sk), IsNotNull(inv_warehouse_sk)], ReadSchema: struct<inv_item_sk:bigint,inv_warehouse_sk:bigint,inv_quantity_on_hand:bigint>
               :           :     :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :     :     :     :     :        +- *Project [w_warehouse_sk#44L, w_warehouse_name#46]
               :           :     :     :     :     :     :     :           +- *Filter isnotnull(w_warehouse_sk#44L)
               :           :     :     :     :     :     :     :              +- *FileScan parquet tpcds_1t_spark230.warehouse[w_warehouse_sk#44L,w_warehouse_name#46] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(w_warehouse_sk)], ReadSchema: struct<w_warehouse_sk:bigint,w_warehouse_name:string>
               :           :     :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :     :     :     :        +- *Project [i_item_sk#58L, i_item_desc#62]
               :           :     :     :     :     :     :           +- *Filter isnotnull(i_item_sk#58L)
               :           :     :     :     :     :     :              +- *FileScan parquet tpcds_1t_spark230.item[i_item_sk#58L,i_item_desc#62] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_item_desc:string>
               :           :     :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :     :     :        +- *Project [cd_demo_sk#80L]
               :           :     :     :     :     :           +- *Filter ((isnotnull(cd_marital_status#82) && (cd_marital_status#82 = M)) && isnotnull(cd_demo_sk#80L))
               :           :     :     :     :     :              +- *FileScan parquet tpcds_1t_spark230.customer_demographics[cd_demo_sk#80L,cd_marital_status#82] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_marital_status), EqualTo(cd_marital_status,M), IsNotNull(cd_demo_sk)], ReadSchema: struct<cd_demo_sk:bigint,cd_marital_status:string>
               :           :     :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :     :        +- *Project [hd_demo_sk#89L]
               :           :     :     :     :           +- *Filter ((isnotnull(hd_buy_potential#91) && (hd_buy_potential#91 = 1001-5000)) && isnotnull(hd_demo_sk#89L))
               :           :     :     :     :              +- *FileScan parquet tpcds_1t_spark230.household_demographics[hd_demo_sk#89L,hd_buy_potential#91] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(hd_buy_potential), EqualTo(hd_buy_potential,1001-5000), IsNotNull(hd_demo_sk)], ReadSchema: struct<hd_demo_sk:bigint,hd_buy_potential:string>
               :           :     :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :     :     :        +- *Project [d_date_sk#94L, d_date#96, d_week_seq#98L]
               :           :     :     :           +- *Filter ((((isnotnull(d_year#100L) && (d_year#100L = 2001)) && isnotnull(d_date_sk#94L)) && isnotnull(d_week_seq#98L)) && isnotnull(d_date#96))
               :           :     :     :              +- *FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#94L,d_date#96,d_week_seq#98L,d_year#100L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk), IsNotNull(d_week_seq), IsNotNull(..., ReadSchema: struct<d_date_sk:bigint,d_date:string,d_week_seq:bigint,d_year:bigint>
               :           :     :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[1, bigint, true], input[0, bigint, true]))
               :           :     :        +- *Project [d_date_sk#122L, d_week_seq#126L]
               :           :     :           +- *Filter (isnotnull(d_date_sk#122L) && isnotnull(d_week_seq#126L))
               :           :     :              +- *FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#122L,d_week_seq#126L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_date_sk), IsNotNull(d_week_seq)], ReadSchema: struct<d_date_sk:bigint,d_week_seq:bigint>
               :           :     +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :           :        +- *Project [d_date_sk#150L, d_date#152]
               :           :           +- *Filter (isnotnull(d_date#152) && isnotnull(d_date_sk#150L))
               :           :              +- *FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#150L,d_date#152] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_date), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:bigint,d_date:string>
               :           +- BroadcastExchange HashedRelationBroadcastMode(List(input[0, bigint, true]))
               :              +- *FileScan parquet tpcds_1t_spark230.promotion[p_promo_sk#178L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<p_promo_sk:bigint>
               +- *Sort [cr_item_sk#199L ASC NULLS FIRST, cr_order_number#214L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(cr_item_sk#199L, cr_order_number#214L, 1024)
                     +- *Project [cr_item_sk#199L, cr_order_number#214L]
                        +- *FileScan parquet tpcds_1t_spark230.catalog_returns[cr_item_sk#199L,cr_order_number#214L,cr_returned_date_sk#197L] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark23..., PartitionCount: 2104, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cr_item_sk:bigint,cr_order_number:bigint>

6.2 实验组执行计划

== Physical Plan ==
TakeOrderedAndProject(limit=100, orderBy=[total_cnt#2L DESC NULLS LAST,i_item_desc#62 ASC NULLS FIRST,w_warehouse_name#46 ASC NULLS FIRST,d_week_seq#98L ASC NULLS FIRST], output=[i_item_desc#62,w_warehouse_name#46,d_week_seq#98L,no_promo#0L,promo#1L,total_cnt#2L])
+- *(42) HashAggregate(keys=[i_item_desc#62, w_warehouse_name#46, d_week_seq#98L], functions=[sum(cast(CASE WHEN isnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), sum(cast(CASE WHEN isnotnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), count(1)])
   +- Exchange hashpartitioning(i_item_desc#62, w_warehouse_name#46, d_week_seq#98L, 1024)
      +- *(41) HashAggregate(keys=[i_item_desc#62, w_warehouse_name#46, d_week_seq#98L], functions=[partial_sum(cast(CASE WHEN isnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), partial_sum(cast(CASE WHEN isnotnull(p_promo_sk#178L) THEN 1 ELSE 0 END as bigint)), partial_count(1)])
         +- *(41) Project [w_warehouse_name#46, i_item_desc#62, d_week_seq#98L, p_promo_sk#178L]
            +- SortMergeJoin [cs_item_sk#20L, cs_order_number#22L], [cr_item_sk#198L, cr_order_number#213L], LeftOuter
               :- *(38) Sort [cs_item_sk#20L ASC NULLS FIRST, cs_order_number#22L ASC NULLS FIRST], false, 0
               :  +- Exchange hashpartitioning(cs_item_sk#20L, cs_order_number#22L, 1024)
               :     +- *(37) Project [cs_item_sk#20L, cs_order_number#22L, w_warehouse_name#46, i_item_desc#62, d_week_seq#98L, p_promo_sk#178L]
               :        +- SortMergeJoin [cs_promo_sk#21L], [p_promo_sk#178L], LeftOuter
               :           :- *(34) Sort [cs_promo_sk#21L ASC NULLS FIRST], false, 0
               :           :  +- Exchange hashpartitioning(cs_promo_sk#21L, 1024)
               :           :     +- *(33) Project [cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, w_warehouse_name#46, i_item_desc#62, d_week_seq#98L]
               :           :        +- *(33) SortMergeJoin [cs_ship_date_sk#7L], [d_date_sk#150L], Inner, (cast(d_date#152 as double) > (cast(d_date#96 as double) + 5.0))
               :           :           :- *(30) Sort [cs_ship_date_sk#7L ASC NULLS FIRST], false, 0
               :           :           :  +- Exchange hashpartitioning(cs_ship_date_sk#7L, 1024)
               :           :           :     +- *(29) Project [cs_ship_date_sk#7L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, w_warehouse_name#46, i_item_desc#62, d_date#96, d_week_seq#98L]
               :           :           :        +- *(29) SortMergeJoin [d_week_seq#98L, inv_date_sk#43L], [d_week_seq#126L, d_date_sk#122L], Inner
               :           :           :           :- *(26) Sort [d_week_seq#98L ASC NULLS FIRST, inv_date_sk#43L ASC NULLS FIRST], false, 0
               :           :           :           :  +- Exchange hashpartitioning(d_week_seq#98L, inv_date_sk#43L, 1024)
               :           :           :           :     +- *(25) Project [cs_ship_date_sk#7L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, inv_date_sk#43L, w_warehouse_name#46, i_item_desc#62, d_date#96, d_week_seq#98L]
               :           :           :           :        +- *(25) SortMergeJoin [cs_sold_date_sk#39L], [d_date_sk#94L], Inner
               :           :           :           :           :- *(22) Sort [cs_sold_date_sk#39L ASC NULLS FIRST], false, 0
               :           :           :           :           :  +- Exchange hashpartitioning(cs_sold_date_sk#39L, 1024)
               :           :           :           :           :     +- *(21) Project [cs_ship_date_sk#7L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_date_sk#43L, w_warehouse_name#46, i_item_desc#62]
               :           :           :           :           :        +- *(21) SortMergeJoin [cs_bill_hdemo_sk#10L], [hd_demo_sk#89L], Inner
               :           :           :           :           :           :- *(18) Sort [cs_bill_hdemo_sk#10L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :  +- Exchange hashpartitioning(cs_bill_hdemo_sk#10L, 1024)
               :           :           :           :           :           :     +- *(17) Project [cs_ship_date_sk#7L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_date_sk#43L, w_warehouse_name#46, i_item_desc#62]
               :           :           :           :           :           :        +- *(17) SortMergeJoin [cs_bill_cdemo_sk#9L], [cd_demo_sk#80L], Inner
               :           :           :           :           :           :           :- *(14) Sort [cs_bill_cdemo_sk#9L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :  +- Exchange hashpartitioning(cs_bill_cdemo_sk#9L, 1024)
               :           :           :           :           :           :           :     +- *(13) Project [cs_ship_date_sk#7L, cs_bill_cdemo_sk#9L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_date_sk#43L, w_warehouse_name#46, i_item_desc#62]
               :           :           :           :           :           :           :        +- *(13) SortMergeJoin [cs_item_sk#20L], [i_item_sk#58L], Inner
               :           :           :           :           :           :           :           :- *(10) Sort [cs_item_sk#20L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :  +- Exchange hashpartitioning(cs_item_sk#20L, 1024)
               :           :           :           :           :           :           :           :     +- *(9) Project [cs_ship_date_sk#7L, cs_bill_cdemo_sk#9L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_date_sk#43L, w_warehouse_name#46]
               :           :           :           :           :           :           :           :        +- *(9) SortMergeJoin [inv_warehouse_sk#41L], [w_warehouse_sk#44L], Inner
               :           :           :           :           :           :           :           :           :- *(6) Sort [inv_warehouse_sk#41L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :           :  +- Exchange hashpartitioning(inv_warehouse_sk#41L, 1024)
               :           :           :           :           :           :           :           :           :     +- *(5) Project [cs_ship_date_sk#7L, cs_bill_cdemo_sk#9L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_sold_date_sk#39L, inv_warehouse_sk#41L, inv_date_sk#43L]
               :           :           :           :           :           :           :           :           :        +- *(5) SortMergeJoin [cs_item_sk#20L], [inv_item_sk#40L], Inner, (inv_quantity_on_hand#42L < cs_quantity#23L)
               :           :           :           :           :           :           :           :           :           :- *(2) Sort [cs_item_sk#20L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :           :           :  +- Exchange hashpartitioning(cs_item_sk#20L, 1024)
               :           :           :           :           :           :           :           :           :           :     +- *(1) Project [cs_ship_date_sk#7L, cs_bill_cdemo_sk#9L, cs_bill_hdemo_sk#10L, cs_item_sk#20L, cs_promo_sk#21L, cs_order_number#22L, cs_quantity#23L, cs_sold_date_sk#39L]
               :           :           :           :           :           :           :           :           :           :        +- *(1) Filter ((((isnotnull(cs_item_sk#20L) && isnotnull(cs_quantity#23L)) && isnotnull(cs_bill_cdemo_sk#9L)) && isnotnull(cs_bill_hdemo_sk#10L)) && isnotnull(cs_ship_date_sk#7L))
               :           :           :           :           :           :           :           :           :           :           +- *(1) FileScan parquet tpcds_1t_spark230.catalog_sales[cs_ship_date_sk#7L,cs_bill_cdemo_sk#9L,cs_bill_hdemo_sk#10L,cs_item_sk#20L,cs_promo_sk#21L,cs_order_number#22L,cs_quantity#23L,cs_sold_date_sk#39L] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_..., PartitionCount: 1836, PartitionFilters: [isnotnull(cs_sold_date_sk#39L)], PushedFilters: [IsNotNull(cs_item_sk), IsNotNull(cs_quantity), IsNotNull(cs_bill_cdemo_sk), IsNotNull(cs_bill_hd..., ReadSchema: struct<cs_ship_date_sk:bigint,cs_bill_cdemo_sk:bigint,cs_bill_hdemo_sk:bigint,cs_item_sk:bigint,c...
               :           :           :           :           :           :           :           :           :           +- *(4) Sort [inv_item_sk#40L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :           :              +- Exchange hashpartitioning(inv_item_sk#40L, 1024)
               :           :           :           :           :           :           :           :           :                 +- *(3) Project [inv_item_sk#40L, inv_warehouse_sk#41L, inv_quantity_on_hand#42L, inv_date_sk#43L]
               :           :           :           :           :           :           :           :           :                    +- *(3) Filter ((isnotnull(inv_item_sk#40L) && isnotnull(inv_quantity_on_hand#42L)) && isnotnull(inv_warehouse_sk#41L))
               :           :           :           :           :           :           :           :           :                       +- *(3) FileScan parquet tpcds_1t_spark230.inventory[inv_item_sk#40L,inv_warehouse_sk#41L,inv_quantity_on_hand#42L,inv_date_sk#43L] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_..., PartitionCount: 261, PartitionFilters: [isnotnull(inv_date_sk#43L)], PushedFilters: [IsNotNull(inv_item_sk), IsNotNull(inv_quantity_on_hand), IsNotNull(inv_warehouse_sk)], ReadSchema: struct<inv_item_sk:bigint,inv_warehouse_sk:bigint,inv_quantity_on_hand:bigint>
               :           :           :           :           :           :           :           :           +- *(8) Sort [w_warehouse_sk#44L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :           :              +- Exchange hashpartitioning(w_warehouse_sk#44L, 1024)
               :           :           :           :           :           :           :           :                 +- *(7) Project [w_warehouse_sk#44L, w_warehouse_name#46]
               :           :           :           :           :           :           :           :                    +- *(7) Filter isnotnull(w_warehouse_sk#44L)
               :           :           :           :           :           :           :           :                       +- *(7) FileScan parquet tpcds_1t_spark230.warehouse[w_warehouse_sk#44L,w_warehouse_name#46] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(w_warehouse_sk)], ReadSchema: struct<w_warehouse_sk:bigint,w_warehouse_name:string>
               :           :           :           :           :           :           :           +- *(12) Sort [i_item_sk#58L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :           :              +- Exchange hashpartitioning(i_item_sk#58L, 1024)
               :           :           :           :           :           :           :                 +- *(11) Project [i_item_sk#58L, i_item_desc#62]
               :           :           :           :           :           :           :                    +- *(11) Filter isnotnull(i_item_sk#58L)
               :           :           :           :           :           :           :                       +- *(11) FileScan parquet tpcds_1t_spark230.item[i_item_sk#58L,i_item_desc#62] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(i_item_sk)], ReadSchema: struct<i_item_sk:bigint,i_item_desc:string>
               :           :           :           :           :           :           +- *(16) Sort [cd_demo_sk#80L ASC NULLS FIRST], false, 0
               :           :           :           :           :           :              +- Exchange hashpartitioning(cd_demo_sk#80L, 1024)
               :           :           :           :           :           :                 +- *(15) Project [cd_demo_sk#80L]
               :           :           :           :           :           :                    +- *(15) Filter ((isnotnull(cd_marital_status#82) && (cd_marital_status#82 = M)) && isnotnull(cd_demo_sk#80L))
               :           :           :           :           :           :                       +- *(15) FileScan parquet tpcds_1t_spark230.customer_demographics[cd_demo_sk#80L,cd_marital_status#82] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(cd_marital_status), EqualTo(cd_marital_status,M), IsNotNull(cd_demo_sk)], ReadSchema: struct<cd_demo_sk:bigint,cd_marital_status:string>
               :           :           :           :           :           +- *(20) Sort [hd_demo_sk#89L ASC NULLS FIRST], false, 0
               :           :           :           :           :              +- Exchange hashpartitioning(hd_demo_sk#89L, 1024)
               :           :           :           :           :                 +- *(19) Project [hd_demo_sk#89L]
               :           :           :           :           :                    +- *(19) Filter ((isnotnull(hd_buy_potential#91) && (hd_buy_potential#91 = 1001-5000)) && isnotnull(hd_demo_sk#89L))
               :           :           :           :           :                       +- *(19) FileScan parquet tpcds_1t_spark230.household_demographics[hd_demo_sk#89L,hd_buy_potential#91] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(hd_buy_potential), EqualTo(hd_buy_potential,1001-5000), IsNotNull(hd_demo_sk)], ReadSchema: struct<hd_demo_sk:bigint,hd_buy_potential:string>
               :           :           :           :           +- *(24) Sort [d_date_sk#94L ASC NULLS FIRST], false, 0
               :           :           :           :              +- Exchange hashpartitioning(d_date_sk#94L, 1024)
               :           :           :           :                 +- *(23) Project [d_date_sk#94L, d_date#96, d_week_seq#98L]
               :           :           :           :                    +- *(23) Filter ((((isnotnull(d_year#100L) && (d_year#100L = 2001)) && isnotnull(d_date_sk#94L)) && isnotnull(d_week_seq#98L)) && isnotnull(d_date#96))
               :           :           :           :                       +- *(23) FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#94L,d_date#96,d_week_seq#98L,d_year#100L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_year), EqualTo(d_year,2001), IsNotNull(d_date_sk), IsNotNull(d_week_seq), IsNotNull(..., ReadSchema: struct<d_date_sk:bigint,d_date:string,d_week_seq:bigint,d_year:bigint>
               :           :           :           +- *(28) Sort [d_week_seq#126L ASC NULLS FIRST, d_date_sk#122L ASC NULLS FIRST], false, 0
               :           :           :              +- Exchange hashpartitioning(d_week_seq#126L, d_date_sk#122L, 1024)
               :           :           :                 +- *(27) Project [d_date_sk#122L, d_week_seq#126L]
               :           :           :                    +- *(27) Filter (isnotnull(d_week_seq#126L) && isnotnull(d_date_sk#122L))
               :           :           :                       +- *(27) FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#122L,d_week_seq#126L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_week_seq), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:bigint,d_week_seq:bigint>
               :           :           +- *(32) Sort [d_date_sk#150L ASC NULLS FIRST], false, 0
               :           :              +- Exchange hashpartitioning(d_date_sk#150L, 1024)
               :           :                 +- *(31) Project [d_date_sk#150L, d_date#152]
               :           :                    +- *(31) Filter (isnotnull(d_date#152) && isnotnull(d_date_sk#150L))
               :           :                       +- *(31) FileScan parquet tpcds_1t_spark230.date_dim[d_date_sk#150L,d_date#152] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [IsNotNull(d_date), IsNotNull(d_date_sk)], ReadSchema: struct<d_date_sk:bigint,d_date:string>
               :           +- *(36) Sort [p_promo_sk#178L ASC NULLS FIRST], false, 0
               :              +- Exchange hashpartitioning(p_promo_sk#178L, 1024)
               :                 +- *(35) FileScan parquet tpcds_1t_spark230.promotion[p_promo_sk#178L] Batched: true, Format: Parquet, Location: InMemoryFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark2..., PartitionFilters: [], PushedFilters: [], ReadSchema: struct<p_promo_sk:bigint>
               +- *(40) Sort [cr_item_sk#198L ASC NULLS FIRST, cr_order_number#213L ASC NULLS FIRST], false, 0
                  +- Exchange hashpartitioning(cr_item_sk#198L, cr_order_number#213L, 1024)
                     +- *(39) Project [cr_item_sk#198L, cr_order_number#213L]
                        +- *(39) FileScan parquet tpcds_1t_spark230.catalog_returns[cr_item_sk#198L,cr_order_number#213L,cr_returned_date_sk#224L] Batched: true, Format: Parquet, Location: CatalogFileIndex[hdfs://hzadg-sparkdev1.server.163.org:8020/user/spark/warehouse/tpcds_1t_spark23..., PartitionCount: 2104, PartitionFilters: [], PushedFilters: [], ReadSchema: struct<cr_item_sk:bigint,cr_order_number:bigint>

6.3 执行计划比较

实验组所有的Join都由SortMergeJoin完成,而参照组很多小表都得以广播。数据是同一份,参数列表也是同一份。产生截然不同的执行计划,这属于用户感知的部分,所以要么就是有文档说明,要么就是bug,要么就是社区玩忽职守。

6.4 确定行为变更

spark-1.6.3-bin-hadoop2.6
spark-2.0.2-bin-hadoop2.7
spark-2.1.0-bin-hadoop2.7
spark-2.1.1-bin-hadoop2.7     
spark-2.1.2-bin-hadoop2.7 
spark-2.2.0-bin-hadoop2.7   
spark-2.2.1-bin-hadoop2.7
spark-2.3.0-bin-hadoop2.7
spark-2.3.1-SNAPSHOT-bin-20180511
spark-2.4.0-SNAPSHOT-bin-20180511

分析了以上几个版本的spark发现该行为是从spark-2.2.0-bin-hadoop2.7版本以后发生了变化,故查阅相关的变更文档
http://spark.apache.org/docs/latest/sql-programming-guide.html#upgrading-from-spark-sql-21-to-22

Spark 2.1.1 introduced a new configuration key: spark.sql.hive.caseSensitiveInferenceMode. It had a default setting of NEVER_INFER, which kept behavior identical to 2.1.0. However, Spark 2.2.0 changes this setting’s default value to INFER_AND_SAVE to restore compatibility with reading Hive metastore tables whose underlying file schema have mixed-case column names. With the INFER_AND_SAVE configuration value, on first access Spark will perform schema inference on any Hive metastore table for which it has not already saved an inferred schema. Note that schema inference can be a very time consuming operation for tables with thousands of partitions. If compatibility with mixed-case column names is not a concern, you can safely set spark.sql.hive.caseSensitiveInferenceMode to NEVER_INFER to avoid the initial overhead of schema inference. Note that with the new default INFER_AND_SAVE setting, the results of the schema inference are saved as a metastore key for future use. Therefore, the initial schema inference occurs only at a table’s first access.

Since Spark 2.2.1 and 2.3.0, the schema is always inferred at runtime when the data source tables have the columns that exist in both partition schema and data schema. The inferred schema does not have the partitioned columns. When reading the table, Spark respects the partition values of these overlapping columns instead of the values stored in the data source files. In 2.2.0 and 2.1.x release, the inferred schema is partitioned but the data of the table is invisible to users (i.e., the result set is empty).

声明并不是很长,讲得是类型推倒的问题,基本确定月本问题没有关联。难道正是BUG?

6.5 Debuging

首先,spark判断build表能否broadcast很简单,不过就是通过大小比较,在代码逻辑中的表征就是Statistics信息,所有的Statistics信息都是从叶子节点LeafNode开始计算的,在没有CBO的场景下就更加简单,比如Filter不能确定你到底过滤多少数据就算成child节点的值,比如Join不知道结果发散收敛就拿两表的乘积来定,所以根据逻辑计划很容易找到代码:

class DetermineTableStats(session: SparkSession) extends Rule[LogicalPlan] {
  override def apply(plan: LogicalPlan): LogicalPlan = plan transformUp {
    case relation: HiveTableRelation
        if DDLUtils.isHiveTable(relation.tableMeta) && relation.tableMeta.stats.isEmpty =>
      val table = relation.tableMeta
      val sizeInBytes = if (session.sessionState.conf.fallBackToHdfsForStatsEnabled) {
        try {
          val hadoopConf = session.sessionState.newHadoopConf()
          val tablePath = new Path(table.location)
          val fs: FileSystem = tablePath.getFileSystem(hadoopConf)
          fs.getContentSummary(tablePath).getLength
        } catch {
          case e: IOException =>
            logWarning("Failed to get table size from hdfs.", e)
            session.sessionState.conf.defaultSizeInBytes
        }
      } else {
        session.sessionState.conf.defaultSizeInBytes
      }

      val withStats = table.copy(stats = Some(CatalogStatistics(sizeInBytes = BigInt(sizeInBytes))))
      relation.copy(tableMeta = withStats)
  }
}
  1. 发现fallBackToHdfsForStatsEnabled(spark.sql.statistics.fallBackToHdfs)这个参数,该参数目前所有版本缺省值一致,测试中也未有进行篡改。
  2. 发现2.2.0及后续版本设置该参数spark.sql.statistics.fallBackToHdfs=true后可广播小表
  3. 进一步debug也确定Statistics信息生成正确,各种Relation转换的时候该信息也没有丢,所以不是BUG
  4. 行为变化,没有声明,也不是BUG,那就是社区玩忽职守了。

P.S. 从这个参数的字面释义上来讲,其实讲2.20之前的版本为bug也不为过,将错得错,错错得对了吧。

7 总结

  1. 性能下降问题基本可以通过设置spark.sql.statistics.fallBackToHdfs=true来规避大部分也可能是所有,看后续新一轮的跑分而定。如果有新的问题,继续同步。
  2. Spark社区对于Hive的暧昧由来已久,对于Hive table的支持也就随之变换来去,出现玩忽职守的状况也可以预见。Spark On Hive的代码乱七八糟的,曾经我也改过蛮多这块的代码的,发现基本上所有的committer对这块的把控也是相对较弱的,可能终有一天是弃子的缘故吧。就不说spark thrift server模块的烂的,有兴趣的可移步Kyuubi,一定给你更好的体验。
  3. Spark 2.3.0上生产基本上可以否决了,除了这个一个参数就有可能可以搞定的问题,还有一个致命的问题:
    https://issues-test.apache.org/jira/browse/SPARK-23852
    还是尽请期待2.3.1吧,社区已经在讨论了,估计上面这个blocker解决了,就提上日程了。

8 后记

春眠不觉晓
处处手机闹
夜来警报声
磁盘爆多少
Shuffle到胃疼
自挂东南枝

    原文作者:Kent_Yao
    原文地址: https://www.jianshu.com/p/c59f1e04e15f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞