我有两个数据集,每个数据集有两个元素.
以下是示例.
数据1 :(名称,动物)
('abc,def', 'monkey(1)')
('df,gh', 'zebra')
...
数据2 :(名称,水果)
('a,efg', 'apple')
('abc,def', 'banana(1)')
...
预期结果:(名称,动物,水果)
('abc,def', 'monkey(1)', 'banana(1)')
...
我想通过使用第一列“名称”来加入这两个数据集.我试过这几个小时,但我想不出来.谁能帮我?
val sparkConf = new SparkConf().setAppName("abc").setMaster("local[2]")
val sc = new SparkContext(sparkConf)
val text1 = sc.textFile(args(0))
val text2 = sc.textFile(args(1))
val joined = text1.join(text2)
上面的代码不起作用!
最佳答案 join是在对的RDD上定义的,即RDD [(K,V)]类型的RDD.
需要的第一步是将输入数据转换为正确的类型.
我们首先需要将String类型的原始数据转换为(Key,Value)对:
val parse:String => (String, String) = s => {
val regex = "^\\('([^']+)',[\\W]*'([^']+)'\\)$".r
s match {
case regex(k,v) => (k,v)
case _ => ("","")
}
}
(注意,我们不能使用简单的split(“,”)表达式,因为键包含逗号)
然后我们使用该函数来解析文本输入数据:
val s1 = Seq("('abc,def', 'monkey(1)')","('df,gh', 'zebra')")
val s2 = Seq("('a,efg', 'apple')","('abc,def', 'banana(1)')")
val rdd1 = sparkContext.parallelize(s1)
val rdd2 = sparkContext.parallelize(s2)
val kvRdd1 = rdd1.map(parse)
val kvRdd2 = rdd2.map(parse)
最后,我们使用join方法连接两个RDD
val joined = kvRdd1.join(kvRdd2)
//让我们查看结果
joined.collect
// res31: Array[(String, (String, String))] = Array((abc,def,(monkey(1),banana(1))))