Spark入门教程(五)创建弹性分布式数据集Rdd以及Transformation操作

本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合!

什么是弹性分布式数据集Rdd?

  • 概念:RDD(Resilient Distributed Datasets)简单来说,就是Spark中元素的集合,如数组、集合、文本等等都能称作RDD,但是和普通数据集相比,它有个特点就是:分布式。每个RDD都有多个分区,这些分区分布在集群不同的节点中。
  • 优势:其实只要理解了分布式的优势就不难理解Rdd的优势了,Spark会自动将Rdd的数据分发到集群上,将操作并行化执行。比如一个非常大的数据量(PB级),一台物理机处理肯定是不行的(除非是超级计算机但那非常昂贵),所以分布到多台机器上处理,不仅提高了处理速度还廉价许多。
    (如果还是不清楚没有关系,下面会演示几个案例,建议自己手动敲一敲代码感受一下就懂了)
  • Spark中对数据的所有操作不外乎:RDD create(创建) 、RDD transformation(转换)、RDD action(行动)

Rdd的创建

创建Rdd有两种方式:

  • (1)从外部存储系统(如共享文件系统,HDFS,HBase等)中引用数据集。使用textFile()方法

    //从本地文件系统读取数据集
    scala> val rdd =sc.textFile("file:///usr/local/spark-2.2.0-bin-hadoop2.6.0-cdh5.11.1/README.md")
    rdd: org.apache.spark.rdd.RDD[String] = file:///usr/local/spark-2.2.0-bin-hadoop2.6.0-cdh5.11.1/README.md MapPartitionsRDD[1] at textFile at <console>:24
     //从HDFS中读取数据集
    scala> val rdd1 =sc.textFile("hdfs://master:8020/user/test.rdd")
    rdd1: org.apache.spark.rdd.RDD[String] = hdfs://master:8020/user/test.rdd MapPartitionsRDD[5] at textFile at <console>:24
    
    //打印操作 注:collect()是将所有元素先收集到一台节点上,有可能会内存溢出,如果只是为了打印部分元素,建议使用take()
      scala> rdd1.collect().foreach(println)
      hello
      spark
      love
      you
    
      scala> rdd1.take(2)
      res5: Array[String] = Array(hello, spark)
    
  • (2)从驱动程序中的现有集合(如List,Array等),使用parallelize()方法

      //区间转Rdd
      scala>val rdd1 = sc.parallelize(1 to 5)
      rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:24
    
      scala>rdd1.collect().foreach(println)
      1
      2
      3
      4
      5
    
      //List集合转Rdd
      scala> val list = List(1,"a",'b')
      list: List[Any] = List(1, a, b)
    
      scala> sc.parallelize(list)
      res12: org.apache.spark.rdd.RDD[Any] = ParallelCollectionRDD[10] at parallelize at <console>:27
    
      scala> res12.collect().foreach(println)
      1
      a
      b
    
      //Array数组转Rdd
      scala> val array = Array("a","b")
      array: Array[String] = Array(a, b)
    
      scala> sc.parallelize(array)
      res8: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:27
    
      scala> res8.collect().foreach(println)
      a
      b
    

Rdd的转换 transformation

转换操作就是从现有的Rdd生成一个新的Rdd的操作,比如filter操作,它从现有的Rdd筛选出符合条件的数据,创建一个新的Rdd。
还是那句话,概念不多说,看实际操作最为直观!

单个Rdd操作

  • map(func) 对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD
scala> val rdd = sc.parallelize(1 to 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> rdd.map(x=>(x+2))
res14: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[12] at map at <console>:27

scala> res14.collect()
res15: Array[Int] = Array(3, 4, 5, 6, 7)

scala> rdd.map(x=>(x,1))
res18: org.apache.spark.rdd.RDD[(Int, Int)] = MapPartitionsRDD[14] at map at <console>:27

scala> res18.collect()
res19: Array[(Int, Int)] = Array((1,1), (2,1), (3,1), (4,1), (5,1))
  • flatMap(func) 和map类似,但是它会把map“扁平化”,每个输入项可以映射到0个或更多个输出项。常用于统计单词数。举例对比一下和map的区别更容易理解:
scala> val rdd = sc.textFile("hdfs://master/user/spark.hello")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/spark.hello MapPartitionsRDD[16] at textFile at <console>:24

scala> rdd.map(line => line.split(" "))
res20: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[17] at map at <console>:27

scala> res20.collect()
res22: Array[Array[String]] = Array(Array(hello, spark, it, is, perfect), Array(i, want, to, learn))

scala> rdd.flatMap(line => line.split(" "))
res0: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[2] at flatMap at <console>:27

scala> res0.collect()
res1: Array[String] = Array(hello, spark, it, is, perfect, i, want, to, learn)

map输出是Array[Array[String]]、flatMap输出是Array[String],等于将map再次打平。

  • reduceByKey(func,[numTask]) 根据函数定义的规则,按照key进行整合,类似MapReduce中的Reduce操作。比如,在map阶段(spark,1) (spark,1),定义方法(x,y)=>(x+y),即value进行加法运算,1+1 所以使用reduceByKey后变为(spark,2)
[root@master hadoop-2.6.0-cdh5.11.1]# hadoop fs -cat /user/wordcount.test
spark i love you
spark i want learn you

wordcount案例:

scala> val rdd = sc.textFile("hdfs://master/user/wordcount.test")
rdd: org.apache.spark.rdd.RDD[String] = hdfs://master/user/wordcount.test MapPartitionsRDD[4] at textFile at <console>:24

scala> val wordmap = rdd.flatMap(line=>line.split(" ")).map(x=>(x,1))
wordmap: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[6] at map at <console>:26

scala> wordmap.collect()
res2: Array[(String, Int)] = Array((spark,1), (i,1), (love,1), (you,1), (spark,1), (i,1), (want,1), (learn,1), (you,1))

scala> val wordreduce = wordmap.reduceByKey((x,y)=>(x+y))
wordreduce: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[7] at reduceByKey at <console>:28

scala> wordreduce.collect()
res3: Array[(String, Int)] = Array((learn,1), (spark,2), (you,2), (love,1), (i,2), (want,1))
  • filter(func) 很好理解,就是过滤的意思
scala> val rdd = sc.parallelize(Array(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> rdd.filter( x => (x==2)).collect()
res8: Array[Int] = Array(2)

scala> val rdd = sc.parallelize(List("sp","sa","vv"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[11] at parallelize at <console>:24

scala> rdd.filter(x=>x.contains('s')).collect()
res10: Array[String] = Array(sp, sa)
  • groupByKey([numTasks]) 学过SQL的应该都知道group by,这里一样的意思,就是按照key来进行分组。numTasks是可选的,用来设置任务数量。
scala> val rdd = sc.parallelize(List((1,"sp"),(1,"sa"),(2,"vv")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[14] at parallelize at <console>:24

scala> rdd.groupByKey().collect()
res12: Array[(Int, Iterable[String])] = Array((1,CompactBuffer(sp, sa)), (2,CompactBuffer(vv)))
  • distinct([numTasks]) 去除重复元素,但注意:开销大,需要将所有数据通过网络传输进行Shuffle混洗
scala> val rdd = sc.parallelize(List("sp","sp","vv"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[17] at parallelize at <console>:24

scala> rdd.distinct().collect()
res13: Array[String] = Array(sp, vv)
  • sortByKey([ascending], [numTasks]) 排序,ascending表示递增的,默认ascending为true 即按递增排序 可以改为false,也可以自定义排序函数。
scala> val rdd = sc.parallelize(List((1,"sp"),(3,"sa"),(2,"vv")))
rdd: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[21] at parallelize at <console>:24

scala> rdd.sortByKey(false).collect()
res14: Array[(Int, String)] = Array((3,sa), (2,vv), (1,sp))
  • 还有很多很多函数,大家可以参考:SparkAPI(请依据自己的版本号,这个是2.2.0的)
    官方其实说的很详细,但是很多同学可能没有scala基础就会有点看不懂,举个例子解释一下,其他大同小异。比如:
    《Spark入门教程(五)创建弹性分布式数据集Rdd以及Transformation操作》 API截图
    zeroValue是一个初始的给定值,V是范型的意思,可以是Int型等等,但是这个类型必须和RDD中的V一样。
    比如下例:zeroValue为3,是int型,rdd中的List(K,V) ,V也是int型,然后再根据func定义的规则对这两个V进行操作。最后返回RDD[(K,V)] 。
    3(初始值)+1+2=6 即(1,6)。 3+5+6=14 即(2,14) 建议:((x,y) => x+y)可以简写为(+) scala语言真的非常优雅!
scala> val rdd = sc.parallelize(List((1,1),(1,2),(2,6),(2,5)))
rdd: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[23] at parallelize at <console>:24

scala> rdd.foldByKey(3)(_+_).collect()
res15: Array[(Int, Int)] = Array((1,6), (2,14))

多个Rdd操作

  • 交集差集并集
scala> val list1 = sc.parallelize(List("spark","spark","hello"))
list1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[25] at parallelize at <console>:24

scala> val list2 = sc.parallelize(List("spark","love","you"))
list2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[26] at parallelize at <console>:24

//并集不去重
scala> list1.union(list2).collect()
res16: Array[String] = Array(spark, spark, hello, spark, love, you)

//交集去重
scala> list1.intersection(list2).collect()
res17: Array[String] = Array(spark)

//差集(只存在于第一个dataset、不存在于第二个)不去重
scala> list1.subtract(list2).collect()
res18: Array[String] = Array(hello)
  • Join
scala> val list1 = sc.parallelize(List((1,"spark"),(2,"spark"),(3,"hello")))
list1: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[38] at parallelize at <console>:24

scala> val list2 = sc.parallelize(List((1,"spark"),(3,"you"),(4,"good")))
list2: org.apache.spark.rdd.RDD[(Int, String)] = ParallelCollectionRDD[39] at parallelize at <console>:24
//内连接
scala> list1.join(list2).collect()
res20: Array[(Int, (String, String))] = Array((1,(spark,spark)), (3,(hello,you)))
    
// 左外连接,左边rdd全部显示,右边没有的补null
scala> list1.leftOuterJoin(list2).collect()
res21: Array[(Int, (String, Option[String]))] = Array((1,(spark,Some(spark))), (3,(hello,Some(you))), (2,(spark,None)))

// 右外连接,右边rdd全部显示,左边没有的补null    
scala> list1.rightOuterJoin(list2).collect()
res22: Array[(Int, (Option[String], String))] = Array((4,(None,good)), (1,(Some(spark),spark)), (3,(Some(hello),you)))

最后再强调一句,函数太多了,此文只列举了常见的,最好的办法就是直接看官方API !!
Spark官方文档
SparkAPI

    原文作者:Seven_Ki
    原文地址: https://www.jianshu.com/p/f3dd1bb19586
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞