spark flatMap 使用

/**

  • Return a new RDD by first applying a function to all elements of this
  • RDD, and then flattening the results.
    */
    def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.flatMap(cleanF))
    }

对这个RDD内的所有元素调用函数然后 并打平list 返回一个新的RDD
需要注意的是 这个操作必须是集合类的RDD

scala> val list = sc.parallelize(List(1,2,3,4,5,6))
scala> list.flatMap(x =>x ).foreach(println(_))
<console>:27: error: type mismatch;
 found   : Int
 required: TraversableOnce[?]
       list.flatMap(x =>x ).foreach(println(_))
                        ^

有时候我们可能只想把List类型的RDD打平,而不像做map操作可以直接

scala> val list2 = sc.parallelize(List(List(1,2,3,4,5,6),List(2,3,4)))
list2: org.apache.spark.rdd.RDD[List[Int]] = ParallelCollectionRDD[1] at parallelize at <console>:24

scala> list2.flatMap(x=>x).foreach(println(_))
[Stage 0:>                                                          (0 + 0) / 8]1
2
3
4
5
6
2
3
4
                                 

注意flatMap 里面 x=>x 其实这个x并不是你的list中的每一个元素,而是你RDD的中的每个list。也就是List(1,2,3,4,5,6) 或 List(2,3,4)

scala> list2.flatMap(x=>{println(x); x}).foreach(item => println("haha"))
List(2, 3, 4)
List(1, 2, 3, 4, 5, 6)
haha
haha
haha
haha
haha
haha
haha
haha
haha

所以说如果需要做map操作的话需要这样

scala> list2.flatMap(x=> {for(a<-x ) yield {a +1}}).foreach(item => println(item))
2
3
4
5
6
7
3
4
5

    原文作者:pcqlegend
    原文地址: https://www.jianshu.com/p/2e4eacc3686c
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞