sparkSQL统计HBase表,写入mysql

val hbaseConf = org.apache.hadoop.hbase.HBaseConfiguration.create()
hbaseConf.set("hbase.zookeeper.quorum","s1sl11,s1ma11,s1sl22")
hbaseConf.set("hbase.zookeeper.property.clientPort", "2181")
hbaseConf.set(org.apache.hadoop.hbase.mapreduce.TableInputFormat.INPUT_TABLE,"VCREDIT_SCORE_HBF")
val hbaseRDD = sc.newAPIHadoopRDD(hbaseConf,
classOf[org.apache.hadoop.hbase.mapreduce.TableInputFormat],
classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],
classOf[org.apache.hadoop.hbase.client.Result])
val hbfrdd = hbaseRDD.map(r=>{
import org.apache.hadoop.hbase.util.Bytes
val job_id:String=Bytes.toString(r._2.getValue(Bytes.toBytes("cf"),Bytes.toBytes("job_id")))
val month:String=Bytes.toString(r._2.getValue(Bytes.toBytes("cf"),Bytes.toBytes("score_time")))
val score:String=Bytes.toString(r._2.getValue(Bytes.toBytes("cf"),Bytes.toBytes("score_total")))
(job_id,month,score.split("\\.")(0))
})
import sqlContext.implicits.rddToDataFrameHolder
val hbfrddDF=hbfrdd.toDF("job_id","month","score")
hbfrddDF.registerTempTable("hbf")
val sqlcommand=" select job_id,month, score,quantity,'1700030110' as model_id,concat_ws('-',job_id,month) as job_name from  (select job_id,month, score,count(1) as quantity from hbf group by job_id,month, score) as u"
val prop = new java.util.Properties
prop.setProperty("user","root")
prop.setProperty("password","root")
prop.setProperty("driver","com.mysql.jdbc.Driver")
prop.setProperty("url","jdbc:mysql://49.4.65.195:3306/mysql")
sqlContext.sql(sqlcommand).write.mode(org.apache.spark.sql.SaveMode.Append).jdbc(prop.getProperty("url"),"model_statistics_score",prop)
    原文作者:王社英
    原文地址: https://www.jianshu.com/p/3ce91c923cfc
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞