Spark RDD数据关联过滤案例

前言

最近在专注Spark开发,记录下自己的工作和学习路程,希望能跟大家互相交流成长
QQ:86608625 微信:guofei1990123

背景

detail.txt为用户注册信息全表,filter.txt为需要过滤掉的用户的手机号码

要求detail.txt关联filter.txt并去除detail.txt中包含filter.txt中的用户

原始数据


detail.txt数据

张三 13007080388  man 山西 NULL 5  2015-11-05 21:22:41

李四 13056677799  woman 四川 地推注册邀请 5  2015-10-11 11:32:19

王五 13084470421  man 四川 地推注册邀请 6  2015-08-08 08:11:14

filter.txt数据

13007080388

13056677799

实现思路

两份数据通过手机号主键关联并过滤掉不符合规则数据

具体如下:

1.分别加载两份数据

2.解析数据并拼接成key为手机号的元组

3.用户信息leftOuterJoin过滤表

4.过滤掉不符合规则数据并输出

代码实现

packagespark

importorg.apache.spark.{SparkConf, SparkContext}

/**

* Created by郭飞on 2016/5/30.

*/

objectJoinFilterOne {

defmain(args: Array[String]) {

//创建SparkConf()并设置App名称及Master地址

valconf =newSparkConf().setAppName("JoinFilter").setMaster("local")

//创建SparkContext,该对象是提交spark App的入口

valsc =newSparkContext(conf)

//加载源数据

valsourceAll = sc.textFile("C://work//data//test//detail.txt")

valsourceFilter = sc.textFile("C://work//data//test//filter.txt")

//转换RDD

//(13007080388,(张三,13007080388,man,山西,NULL,5,2015-11-05 21:22:41)),

// (13056677799,(李四,13056677799,woman,四川,地推注册邀请,5,2015-10-11 11:32:19)),

// (13084470421,(王五,13084470421,man,四川,地推注册邀请,6,2015-08-08 08:11:14))

valrddAll = sourceAll.map(x=>{

valline = x.split("\t")

valname = line(0)

valphone = line(1)

valsex = line(2)

valaddr = line(3)

valdataType = line(4)

valdataCode = line(5)

valdataDate = line(6)

(phone,(name,phone,sex,addr,dataType,dataCode,dataDate))

})

//(13007080388,delete), (13056677799,delete)

valrddFilter = sourceFilter.map(x=>{

valline = x.split("\t")

valphone = line(0)

(phone,"delete")

})

//(13056677799,张三,13056677799,woman,四川,地推注册邀请,5,2015-10-11 11:32:19,Some(delete)),

// (13084470421,李四,13084470421,man,四川,地推注册邀请,6,2015-08-08 08:11:14,None),

// (13007080388,王五,13007080388,man,山西,NULL,5,2015-11-05 21:22:41,Some(delete))

rddAll.leftOuterJoin(rddFilter).map(x=>{

(x._1,x._2._1._1,x._2._1._2,x._2._1._3,x._2._1._4,x._2._1._5,x._2._1._6,x._2._1._7,x._2._2)

}).filter(_._9==None).saveAsTextFile("C://work//data//result.txt")

//println(result.toBuffer)

//过滤后

//(13084470421,王宇,13084470421,man,四川,地推注册邀请,6,2015-08-08 08:11:14,None)

//停止sc,结束该任务

sc.stop()

}

}
    原文作者:MichaelFly
    原文地址: https://www.jianshu.com/p/f77cb1c93793
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞