Hive迁移Saprk SQL的坑和改进办法

Qcon 全球软件开发者大会2016北京站
演讲主题:Spark在360的大规模实践与经验分享
李远策

360-Spark集群概况

《Hive迁移Saprk SQL的坑和改进办法》 360-Spark集群概况

360-Spark应用

MLLib
• 算法:LDA、LR、FP-Growth、ALS、KMeans、随机森林等。
• 业务:新闻主题分类、新闻推荐、APP推荐、恶意代码识别、恶意域名检测等。
GraphX
• 算法:PageRank、Louvain、LPA、连通子图等。
• 业务:搜索PageValue、网站安全监测等。
SparkSQL
• 采用HiveContext替换公司90%以上的Hive作业,每天例行1.5W+作业。
• 每个Hive SQL平均3轮MR作业,平均性能提升2~5倍。

SparkSQL替换Hive

Hive迁移到SparkSQL的“正确打开方式”:
1、编译Spark加上-Phive -Phive-thriftserver参数
2、部署Spark(Yarn)集群
3、配置SparkSQL共用Hive的元数据库
4、用spark-hive(spark-sql)工具替换原有的hive命令
5、-e/–f 或者thriftserver提交作业。

SparkSQL部署方案

《Hive迁移Saprk SQL的坑和改进办法》 SparkSQL部署方案

Hive迁移SparkSQL – 坑 & 改进

SQL兼容 (Insert overwrite [local] directory的支持)

例如:insert overwrite directory ‘/tmp/testdir’ select *from T1; Hive中支持,SparkSQL暂时不支持。

因为SparkSQL-HiveContext的SQL解析调用了Hive的ParseDriver. parse完成,所以语法解析上不存在问题。

解决方案:
1、解析AST中的TOK_DIR和TOK_LOCAL_DIR将其转化成新定义的逻辑计划WriteToDirectory
2、将逻辑计划WriteToDirectory转换成新定义的物理计划WriteToDirectory。
3、在物理计划WriteToDirectory执行方法中复用InsertIntoHiveTable中的saveAsHiveFile逻辑将结果写到HDFS中。
4、如果是local directory则将结果再拉回到本地

SQL兼容 (SQL二义性问题)

例如:

    select C.id from (
           select A.id from testb as A
           join
          (select id from testb ) B
    on A.id=B.id) C;

C.id is A.id or B.id ?

transformation bugs (行尾部空列导致的数组越界)

例如:
001\tABC\t002\t [001, ABC, 002]
003\tEFG\t\t [003, EFG]

    new GenericInternalRow(
        prevLine.split(ioschema.outputRowFormatMap(
                              “TOK_TABLEROWFORMATFIELD”))
                     .map(CatalystTypeConverters.convertToCatalyst))

transformation bugs (Script的标准错误缓冲区打满导致transform流程卡住)

《Hive迁移Saprk SQL的坑和改进办法》 transformation

输入小文件合并的改进 (增加支持自定义inputFormat类)

默认采用建表时指定的InpurFormat,如果是默认的TextInputFormat,当小文件比较多是可能会导致RDD的partition数太多,导致性能下降。

解决办法:
通过参数允许用户指定InputFormat,在TableReader中反射生成对应的InputFormat对象并传入到HadoopRDD的构造函数中。

使用方法:

set spark.sql.hive.inputformat=org.apache.hadoop.mapred.lib.CombineTextInputFormat;

输出小文件合并的改进 (增加自动合并结果文件)

当spark.sql.shuffle.partitions设置的比较大且结果数据集比较小时,会产生大量的小文件(文件数等同spark.sql.shuffle.partitions)。
解决办法:
在最后的执行计划中加入一个repartition transformation。通过参数控制最终的partitions数且不影响shuffle partition的数量。
使用方法:
set spark.sql.result.partitions=10;

• 支持yarn-cluster模式,减小client的负载默认的yarn-client模式下Scheduler会运行在client上,加重client机器的负载。
解决办法:
让sparkSQL工具支持yarn-cluster模式。
1)在Yarn集群上部署SparkSQL依赖的hive metastore jar包。
2)开通Yarn nodemanager节点访问Hive metastore数据库的权限。
3)解决“\”“转义问题。如 spark-hive –e “select * fromuser where name = \”张三””;在yarn-cluster模式中会触发两次command执行从而导致“\”被转义两次。

邮箱:
liyuance@gmail.com
liyuance@360.cn
急招大数据运维和运维开发人员,谢谢

    原文作者:Albert陈凯
    原文地址: https://www.jianshu.com/p/00328171b8a6
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞