spark读取oracle数据调优

使用spark自带的上下界限来分区的不均匀性导致传输慢(木桶效应):


scala> a.split("\\n").map(x=>x.toInt)

res25: Array[Int] = Array(123447, 154643, 30561, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 216305, 114099, 254177, 5186719, 46387, 116380, 197942, 224119, 254281, 254261, 131158, 145298, 0, 174433, 187171, 58068, 77121, 45497, 144967)

scala> a.split("\\n").map(x=>x.toInt).sum

res26: Int = 8137034

scala> a.split("\\n").map(x=>x.toInt).max

res27: Int = 5186719

scala> 8137034/32

res58: Int = 254282

oracle结合分页查询防数据传输倾斜:


def query(index:Int,interval:Int):String={val basic =  "( select a.*,rownum as rn from EPM_XJ.C_CONS a ) b "; val condition =  " where  b.rn between " + ((index-1)*interval +1) + " AND " + (index)*interval;"( select * from " + basic+condition+" ) c"

}

import org.apache.spark.sql.DataFrame

def unionTableReducer:(DataFrame,DataFrame)=>DataFrame=(x:DataFrame,y:DataFrame)=>x.union(y)

下面这种写法基本是串行的没有充分利用集群的处理能力,但是感觉要是配上jdbc连接池以及分页,威力应该不错:

val jdbcDF =(1 to 33).map(index => {val hehe = spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,254282) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load();hehe.write.parquet("C_CONS_hahaha/"+index)})

目前而言调用一个map-reduce是最快的 32个partition只要 1.9 min 800w数据

val jdbcDF =(1 to 33).map(index => {spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,254282) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load()}).reduce(unionTableReducer).write.parquet("C_CONS_hahaha")

spark-standalone基本配置:

--num-executors 3  --executor-cores 4  --executor-memory 5G --driver-cores 3  --driver-memory  4G  --conf spark.default.parallelism=32

实验结果:

C_CONS 615M数据 约1.9min 32片 不指定fetchsize

C_CONS 615M数据 约1.1min 16片 fetchsize:100

C_CONS 615M数据 约1.1min 16片 fetchsize:150

C_CONS 615M数据 约1min 16片 fetchsize:200

C_CONS 615M数据 约1.1min 16片 fetchsize:400

C_CONS 615M数据 约53s 8片 fetchsize:400

C_CONS 615M数据 约56s 8片 fetchsize:200

C_CONS 615M数据 约48s 9片 fetchsize:600

C_CONS 615M数据 约48s 9片 fetchsize:200

C_CONS 615M数据 约1.3min 3片 fetchsize:400 (num_worker_machine 3)

C_CONS 615M数据 约43s 12片 fetchsize:100 (num_total_cores 12=3*4)

C_CONS 615M数据 约41s 12片 fetchsize:200 (num_total_cores 12=3*4)

C_CONS 615M数据 约41s 12片 fetchsize:800 (num_total_cores 12=3*4)

C_CONS 615M数据 约41s 12片 fetchsize:1600 (num_total_cores 12=3*4)

基本推断:

对一个表的传输分片数接近cores个数为宜

也就是说1T的数据一天就可以拿出来传输到hdfs上!(1000*1025/(615/41)/3600约19个小时)

加机器(cores)然后再试一试!

Append Test_1:

配置修改如下之后:

--num-executors 6  --executor-cores 2  --executor-memory 3G --driver-cores 3  --driver-memory  4G

做如下运行:

val jdbcDF =(1 to 13).map(index => {spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,678086) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load()}).reduce(unionTableReducer).write.parquet("C_CONS_hahaha_13")

结果:

C_CONS 615M数据 约48s 9片 fetchsize:200
    原文作者:winyang
    原文地址: https://www.jianshu.com/p/e8f47ed9c1eb
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞