使用spark自带的上下界限来分区的不均匀性导致传输慢(木桶效应):
scala> a.split("\\n").map(x=>x.toInt)
res25: Array[Int] = Array(123447, 154643, 30561, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 216305, 114099, 254177, 5186719, 46387, 116380, 197942, 224119, 254281, 254261, 131158, 145298, 0, 174433, 187171, 58068, 77121, 45497, 144967)
scala> a.split("\\n").map(x=>x.toInt).sum
res26: Int = 8137034
scala> a.split("\\n").map(x=>x.toInt).max
res27: Int = 5186719
scala> 8137034/32
res58: Int = 254282
oracle结合分页查询防数据传输倾斜:
def query(index:Int,interval:Int):String={val basic = "( select a.*,rownum as rn from EPM_XJ.C_CONS a ) b "; val condition = " where b.rn between " + ((index-1)*interval +1) + " AND " + (index)*interval;"( select * from " + basic+condition+" ) c"
}
import org.apache.spark.sql.DataFrame
def unionTableReducer:(DataFrame,DataFrame)=>DataFrame=(x:DataFrame,y:DataFrame)=>x.union(y)
下面这种写法基本是串行的没有充分利用集群的处理能力,但是感觉要是配上jdbc连接池以及分页,威力应该不错:
val jdbcDF =(1 to 33).map(index => {val hehe = spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,254282) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load();hehe.write.parquet("C_CONS_hahaha/"+index)})
目前而言调用一个map-reduce是最快的 32个partition只要 1.9 min 800w数据
val jdbcDF =(1 to 33).map(index => {spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,254282) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load()}).reduce(unionTableReducer).write.parquet("C_CONS_hahaha")
spark-standalone基本配置:
--num-executors 3 --executor-cores 4 --executor-memory 5G --driver-cores 3 --driver-memory 4G --conf spark.default.parallelism=32
实验结果:
C_CONS 615M数据 约1.9min 32片 不指定fetchsize
C_CONS 615M数据 约1.1min 16片 fetchsize:100
C_CONS 615M数据 约1.1min 16片 fetchsize:150
C_CONS 615M数据 约1min 16片 fetchsize:200
C_CONS 615M数据 约1.1min 16片 fetchsize:400
C_CONS 615M数据 约53s 8片 fetchsize:400
C_CONS 615M数据 约56s 8片 fetchsize:200
C_CONS 615M数据 约48s 9片 fetchsize:600
C_CONS 615M数据 约48s 9片 fetchsize:200
C_CONS 615M数据 约1.3min 3片 fetchsize:400 (num_worker_machine 3)
C_CONS 615M数据 约43s 12片 fetchsize:100 (num_total_cores 12=3*4)
C_CONS 615M数据 约41s 12片 fetchsize:200 (num_total_cores 12=3*4)
C_CONS 615M数据 约41s 12片 fetchsize:800 (num_total_cores 12=3*4)
C_CONS 615M数据 约41s 12片 fetchsize:1600 (num_total_cores 12=3*4)
基本推断:
对一个表的传输分片数接近cores个数为宜
也就是说1T的数据一天就可以拿出来传输到hdfs上!(1000*1025/(615/41)/3600约19个小时)
加机器(cores)然后再试一试!
Append Test_1:
配置修改如下之后:
--num-executors 6 --executor-cores 2 --executor-memory 3G --driver-cores 3 --driver-memory 4G
做如下运行:
val jdbcDF =(1 to 13).map(index => {spark.read.format("jdbc").options( Map("url" -> "jdbc:oracle:thin:username/password@//192.168.0.89:1521/epps", "dbtable" -> query(index,678086) ,"driver" -> "oracle.jdbc.driver.OracleDriver")).load()}).reduce(unionTableReducer).write.parquet("C_CONS_hahaha_13")
结果:
C_CONS 615M数据 约48s 9片 fetchsize:200