/**
- Return a new RDD by first applying a function to all elements of this
- RDD, and then flattening the results.
*/
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
val cleanF = sc.clean(f)
new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
}
对这个RDD内的所有元素调用函数然后 并打平list 返回一个新的RDD
需要注意的是 这个操作必须是集合类的RDD
scala> val list = sc.parallelize(List(1,2,3,4,5,6))
scala> list.flatMap(x =>x ).foreach(println(_))
<console>:27: error: type mismatch;
found : Int
required: TraversableOnce[?]
list.flatMap(x =>x ).foreach(println(_))
^
有时候我们可能只想把List类型的RDD打平,而不像做map操作可以直接
scala> val list2 = sc.parallelize(List(List(1,2,3,4,5,6),List(2,3,4)))
list2: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> list2.flatMap(x=>x).foreach(println(_))
[Stage 0:> (0 + 0) / 8]1
2
3
4
5
6
2
3
4
注意flatMap 里面 x=>x 其实这个x并不是你的list中的每一个元素,而是你RDD的中的每个list。也就是List(1,2,3,4,5,6) 或 List(2,3,4)
scala> list2.flatMap(x=>{println(x); x}).foreach(item => println("haha"))
List(2, 3, 4)
List(1, 2, 3, 4, 5, 6)
haha
haha
haha
haha
haha
haha
haha
haha
haha
所以说如果需要做map操作的话需要这样
scala> list2.flatMap(x=> {for(a<-x ) yield {a +1}}).foreach(item => println(item))
2
3
4
5
6
7
3
4
5