编译
carbondata1.2已经支持hive+presto,carbon生态圈基本健全。
基于git checkout到branch-1.2,编译脚本:
#/bin/bash
mvn -DskipTests clean package
注意:虽然在其文档中说支持jdk1.7/1.8,但测试发现有使用到Map[String, String].getOfDefault()方法,导致现分支只支持jdk1.8编译。当然也可以手动修改下,给社区提个PR了。
测试
参考https://carbondata.apache.org/installation-guide.html
配置说明,部署carbondata:
- 由于CarbonData只支持spark2.1.0的小版本,使用spark2.1.3集成时会报无CatalystConf类,这个是由于Spark2.1.0+以后将该类重构了,所以必须依赖Spark2.1.0小版本;
- 按照文档说明,tar zcvf carbondata.tar.gz并在spark.yarn.dist.archives/spark.executor.extraClassPath路径指明,否则会报找不到CarbonData相关类(此处要注意的是,carbondatalib中的
carbondata_xxx.jar
不要是软链-_-!!!); - 配置Spark,需要将编译好的carbondata_xxx.jar包集成至spark依赖中;
- 配置carbondata,将carbondata源码中的参考carbondata.conf.template复制到SPAKR_HOME/conf下,并修改几处主要的路径(carbondata优化参数很多,后续慢慢调整);
- 可以尝试,通过spark-shell测试集成是否成功,测试sql如下;
提供一份比较完整的spark-default.conf,注意有些路径写死了,需要修改:
## Driver/AM Settings ##
spark.yarn.am.waitTime 100s
spark.yarn.am.cores 1
spark.yarn.am.memory 4g
spark.yarn.am.memoryOverhead 2048
spark.yarn.am.extraJavaOptions -XX:PermSize=1024m -XX:MaxPermSize=2048m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution
spark.driver.maxResultSize 1g
spark.driver.memory 4g
## Executor Settings ##
spark.executor.instances 0
spark.executor.cores 4
spark.executor.memory 4g
spark.yarn.executor.memoryOverhead 2048
spark.executor.extraJavaOptions -XX:PermSize=1024m -XX:MaxPermSize=1024m -verbose:gc -XX:+PrintGCDetails -XX:+PrintGCDateStamps -XX:+PrintTenuringDistribution -Dcarbon.properties.filepath=carbon.properties
## Dynamic Allocation Settings ##
spark.shuffle.service.enabled true
spark.dynamicAllocation.enabled true
spark.dynamicAllocation.initialExecutors 0
spark.dynamicAllocation.minExecutors 0
spark.dynamicAllocation.maxExecutors 10
spark.dynamicAllocation.executorIdleTimeout 60s
#spark.dynamicAllocation.cachedExecutorIdleTimeout 10s
## SQL Configurations ##
spark.sql.autoBroadcastJoinThreshold 104857600
spark.sql.warehouse.dir /user/warehouse
#spark.sql.warehouse.dir /user/hadoop/warehouse
# spark.sql.hive.convertCTAS true
# spark.sql.sources.default parquet
spark.sql.shuffle.partitions 100
spark.driver.extraJavaOptions -Dcarbon.properties.filepath=/home/hadoop/work/spark/conf/carbon.properties
spark.driver.extraClassPath /home/hadoop/work/spark/carbonlib/*
spark.executor.extraClassPath carbondata.tar.gz/carbonlib/*
spark.yarn.dist.files /home/hadoop/work/spark/conf/carbon.properties
spark.yarn.dist.archives /home/hadoop/work/spark/carbonlib/carbondata.tar.gz
配置carbon.properties:
#Mandatory. Carbon Store path
carbon.storelocation=hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/CarbonStore
#Base directory for Data files
carbon.ddl.base.hdfs.url=hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/data
#Path where the bad records are stored
carbon.badRecords.location=hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/badrecords
测试SQL中注意的是getOrCreateCarbonSession()方法需要提供两个已经创建好并有权限的HDFS路径,其中前者为storePath存储load/overwrite进来数据的默认路径,后者为metaStorePath(貌似没有起到效果), 如下:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.CarbonSession._
val carbon = SparkSession.builder().config(sc.getConf).getOrCreateCarbonSession("hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/data", "hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/carbon.metastore")
carbon.sql("CREATE TABLE IF NOT EXISTS test_table2(id string, name string, city string, age Int) STORED BY 'carbondata'");
carbon.sql("drop table test_table");
carbon.sql("show tables").show()
carbon.sql("LOAD DATA INPATH '/tmp/carbon/sample.csv' INTO TABLE test_table2")
carbon.sql("SELECT * FROM test_table2").show()
carbon.sql("SELECT city, avg(age), sum(age) FROM test_table2 GROUP BY city").show()
CarbonData ThriftServer
上述方式只能做测试使用,真正的生成环境需要还是需要基于ThriftServer实现。搭建ThriftServer,启动脚本:
#!/bin/bash
export SPARK_HOME=/home/hadoop/work/spark
nohup ./bin/spark-submit \
--conf spark.sql.hive.thriftServer.singleSession=true \
--class org.apache.carbondata.spark.thriftserver.CarbonThriftServer \
$SPARK_HOME/carbonlib/carbondata_2.11-1.2.1-SNAPSHOT-shade-hadoop2.2.0.jar \
hdfs://hzadg-mammut-platform2.server.163.org:8020/Carbon/CarbonStore >logs/carbondata-thrift-server.log 2>&1 &
TPC-DS测试
官方并没有提供一个靠谱的测试集,正好手头上有tpc-ds之前的测试数据,所以就按照carbondata的语法修改了其创建表的语句,参考如下:
use tpcds;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
set hive.exec.max.dynamic.partitions.pernode=2000;
set hive.exec.max.dynamic.partitions=2000;
drop table if exists cb_store_sales;
create table cb_store_sales
(
ss_sold_time_sk bigint,
ss_item_sk bigint,
ss_customer_sk bigint,
ss_cdemo_sk bigint,
ss_hdemo_sk bigint,
ss_addr_sk bigint,
ss_store_sk bigint,
ss_promo_sk bigint,
ss_ticket_number bigint,
ss_quantity int,
ss_wholesale_cost decimal(7,2),
ss_list_price decimal(7,2),
ss_sales_price decimal(7,2),
ss_ext_discount_amt decimal(7,2),
ss_ext_sales_price decimal(7,2),
ss_ext_wholesale_cost decimal(7,2),
ss_ext_list_price decimal(7,2),
ss_ext_tax decimal(7,2),
ss_coupon_amt decimal(7,2),
ss_net_paid decimal(7,2),
ss_net_paid_inc_tax decimal(7,2),
ss_net_profit decimal(7,2)
) partitioned by (ss_sold_date_sk bigint)
stored by 'carbondata'
tblproperties('partition_type'='Hash','num_partitions'='31');
insert overwrite table cb_store_sales partition(ss_sold_date_sk) select ss_sold_time_sk,ss_item_sk,ss_customer_sk,ss_cdemo_sk,ss_hdemo_sk,ss_addr_sk,ss_store_sk,ss_promo_sk,ss_ticket_number,ss_quantity,ss_wholesale_cost,ss_list_price,ss_sales_price,ss_ext_discount_amt,ss_ext_sales_price,ss_ext_wholesale_cost,ss_ext_list_price,ss_ext_tax,ss_coupon_amt,ss_net_paid,ss_net_paid_inc_tax,ss_net_profit,ss_sold_date_sk from et_store_sales distribute by ss_sold_date_sk;
基于上述的ThriftServer提供的端口执行,执行脚本如下:
export SPARK_BEELINE_HOME=/home/hadoop/work/spark
${SPARK_BEELINE_HOME}/bin/beeline -u "jdbc:hive2://hzadg-mammut-platform1.server.163.org:10010/tpcds;hive.server2.proxy.user=hadoop" -f "$bin/create-table-sql/create-load-carbondata-partition-fact.sql"
其中注意:
- SparkSQL其partitioned by并不需要指定partition类型及partition数量,但carbondata sql必须指定,同时其官方文档真实漏洞百出,
num_partitions
貌似文档中也是错的; - overwrite过程很慢,这个应该跟carbondata的设计有关,需要构造全局字典索引并写到HDFS中,所以耗时较长,30min+;
执行完毕后就会在HDFS中找到其相关数据,比如meta信息如下:
hadoop@hzadg-mammut-platform1:~/work/spark/conf$ hdfs dfs -ls /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0
Found 64 items
-rw-r----- 3 hadoop hdfs 21284 2017-10-31 20:25 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/0_batchno0-0-1509450294012.carbonindex
-rw-r----- 3 hadoop hdfs 10170 2017-10-31 20:31 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/10_batchno0-0-1509450294012.carbonindex
-rw-r----- 3 hadoop hdfs 10170 2017-10-31 20:31 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/11_batchno0-0-1509450294012.carbonindex
-rw-r----- 3 hadoop hdfs 10170 2017-10-31 20:17 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact/Part0/Segment_0/12_batchno0-0-1509450294012.carbonindex
...
hadoop@hzadg-mammut-platform1:~/work/spark/conf$ hdfs dfs -ls /Carbon/CarbonStore/tpcds/cb_store_sales
Found 2 items
drwxr-x--- - hadoop hdfs 0 2017-10-31 19:44 /Carbon/CarbonStore/tpcds/cb_store_sales/Fact
drwxr-x--- - hadoop hdfs 0 2017-10-31 20:32 /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata
hadoop@hzadg-mammut-platform1:~/work/spark/conf$ hdfs dfs -ls /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata
Found 2 items
-rw-r----- 3 hadoop hdfs 2796 2017-10-31 19:44 /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata/schema
-rw-r----- 3 hadoop hdfs 268 2017-10-31 20:32 /Carbon/CarbonStore/tpcds/cb_store_sales/Metadata/tablestatus
验证TPC-DS
当前Carbondata还有许多工作要处理,比如基于beeline创建完毕的cb_store_sales,通过beeline访问,设置其hive.server2.proxy.user=hadoop,查询该表时,有如下问题,原因是hive.server2.proxy.user=hadoop这个语句没有生效,但访问之前创建的表则没有问题:
hadoop@hzadg-mammut-platform1:~/work/spark$ ./bin/beeline -u jdbc:hive2://hzadg-mammut-platform1.server.163.org:10010;hive.server2.proxy.user=hadoop
0: jdbc:hive2://hzadg-mammut-platform1.server> select * from cb_store_sales limit 10;
Error: org.apache.hadoop.security.AccessControlException: Permission denied: user=anonymous, access=EXECUTE, inode="/Carbon/CarbonStore/modifiedTime.mdt":hadoop:hdfs:drwxr-x---
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.check(FSPermissionChecker.java:319)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkTraverse(FSPermissionChecker.java:259)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:205)
at org.apache.hadoop.hdfs.server.namenode.FSPermissionChecker.checkPermission(FSPermissionChecker.java:190)
at org.apache.hadoop.hdfs.server.namenode.FSDirectory.checkPermission(FSDirectory.java:1722)
at org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.getFileInfo(FSDirStatAndListingOp.java:108)
at org.apache.hadoop.hdfs.server.namenode.FSNamesystem.getFileInfo(FSNamesystem.java:3863)
at org.apache.hadoop.hdfs.server.namenode.NameNodeRpcServer.getFileInfo(NameNodeRpcServer.java:1012)
at org.apache.hadoop.hdfs.protocolPB.ClientNamenodeProtocolServerSideTranslatorPB.getFileInfo(ClientNamenodeProtocolServerSideTranslatorPB.java:843)
at org.apache.hadoop.hdfs.protocol.proto.ClientNamenodeProtocolProtos$ClientNamenodeProtocol$2.callBlockingMethod(ClientNamenodeProtocolProtos.java)
at org.apache.hadoop.ipc.ProtobufRpcEngine$Server$ProtoBufRpcInvoker.call(ProtobufRpcEngine.java:616)
at org.apache.hadoop.ipc.RPC$Server.call(RPC.java:982)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2049)
at org.apache.hadoop.ipc.Server$Handler$1.run(Server.java:2045)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:415)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1698)
at org.apache.hadoop.ipc.Server$Handler.run(Server.java:2043) (state=,code=0)
现在基于CARBON_HOME/bin下的carbon-spark-sql执行,可以查询使用,测了下TPC-DS的一个数据集,基于carbondata和parquet数据格式,简单的操作(count, count(distinct),sum(), limit 10)carbondata均比parquet格式,性能要提升不少。
后续
- TPC-DS全量SQL性能比较;
- CarbonData优化原理调研;
参考: