本文旨在介绍 Spark 通过JDBC读取数据时常用的一些优化手段
关于数据库索引
无论使用哪种JDBC API,spark拉取数据最终都是以select语句
来执行的,所以在自定义分区条件或者指定的long型column时,都需要结合表的索引来综合考虑,才能以更高性能并发读取数据库数据。
API的使用可以参考文档:Spark JDBC系列–取数的四种方式
离散型的分区字段
当使用spark拉取table_example表的数据时,使用的分区字段,并不是连续或均匀分布的。这时如果简单的按预期的numPartitions
做均分,则会造成数据倾斜,读取性能也会受到影响。
ID离散型例举
背景
一般情况下,表的ID字段,都会设置成自增,即使 step!=1
,也是均匀分布的的。但是当数据积累到一定程度,需要进行分库分表时,多个实例中ID的唯一性就需要借助分库分表中间件,使用如snowflake之类的全局唯一编号,来生成全局唯一ID了,此时必定会出现一定程度的ID离散。
入参
min_id:1,max_id:1000000,数据集中在:1~500,10000~20000,100000~400000 。。。即存在多段不均匀分布
普通处理方式
sqlContext.read.jdbc(url,tableName, "id", 1, 1000000,400,prop)
此方式的分区where查询条件,会存在很多的无用查询(返回了空结果),划分的task为400,但实际有效的可能只有200个,且数据还可能存在一定程度的倾斜,对后续的计算产生影响。
自定义处理方式
def getPredicates = {
//1.获取表total数据。
//2.按numPartitions均分,获得offset,可以确保每个分片的数据一致
//3.获取每个分片内的最大最小ID,组装成条件数组
。。。实现细节省略
}
sqlContext.read.jdbc(url,table, getPredicates,connectionProperties)
通过自由组装方式,可以达到精确控制,但是实现成本较高。
ID取模方式
sqlContext.read.jdbc(url,tableName, "id%200", 1, 1000000,400,prop)
根据numPartitions
确定合理的模值,可以尽量做到数据的连续,且写法简单,但是由于在ID字段上使用了函数计算,所以索引将失效,此时需要配合其他包含索引的where条件加以辅助,才能使查询性能最大化。
原理:
API中的columnName
其实只会作为where条件进行简单的拼接,所以数据库中支持的语法,都可以使用。tableName
的原理也一样,仅会作为from 后的内容进行拼接,所以也可以写一个子句传入tableName
中,但依然要在保证性能的前提下。
结语
不仅仅是取模操作
,数据库语法支持的任何函数,都可以在API中传入使用,关键在于性能是否达到预期。