spark三类算子小总结

其实很早之前就想对spark做一下自己的阐述,一直也无奈于不能系统的进行以下自己的解释,现在还是想粗略的说一下我自己对spark的一些认识。

spark相对于mapreduce来说,计算模型可以提供更强大的功能,他使用的是迭代模型,我们在处理完一个阶段以后,可以继续往下处理很多个阶段,而不只是像mapreduce一样只有两个阶段。

spark大致分为这三种算子:

 1、Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Value型的数据。

          在这里,我会将对map、flatMap、glom、union、cartesian(笛卡尔操作)、groupBy、filter、distinct(去重)、subtract这9种算子进行描述。

 2、Key-Value数据类型的Transformation算子,这种变换不触发提交作业,针对处理的数据项是Key-Value型的数据。

            而对于Key-Value的算子,就简单的解释一下mapValues、combineByKey、reduceByKey、partitionBy、cogroup、join、leftOutJoin、rightOutJoin这几类进行我的解释。

    3、Action算子,这类算子会触发SparkContext提交作业。

            针对action算子,foreach、collect、collectAsMap、reduceByKeyLocally、lookup、count、top、

reduce、fold、aggregate。大致就是这几项了。

一、Value数据类型的Transformation算子

1)map

        val a = sc.parallelize(List(“dog”, “salmon”, “salmon”, “rat”, “elephant”), 3)

        //rdd有5个元素,将他们分成3个partition

        val b = a.map(_.length)//导入数据使用parallelize方式

        val c = a.zip(b)

        c .collect

        res0: Array[(String, Int)] = Array((dog,3), (salmon,6), (salmon,6), (rat,3), (elephant,8))

        Map是把操作映射到每个values里面去。

       上述示意图为:

《spark三类算子小总结》

2)flatMap

        val a = sc.parallelize(1 to 10, 5)  

        //rdd有10个元素,将1到10分成5个partition

        a.flatMap(1 to _).collect

        res47: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5,6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3,           4,5, 6, 7, 8, 9, 10)

        //每个元素输入项都可以被映射到0个或多个的输出项,最终将结果”扁平化“后输出

        sc.parallelize(List(1, 2, 3), 2).flatMap(x => List(x, x, x)).collect

        res85: Array[Int] = Array(1, 1, 1, 2, 2, 2, 3, 3, 3)

        flatMap是把一个vlaue变成数组,再打断。

        该实例的操作示意图为:

《spark三类算子小总结》

3)subtract

          val a = sc.parallelize(1 to 9, 3)

          val b = sc.parallelize(1 to 3, 3)

          val c = a.subtract(b)

          c.collect

          res3: Array[Int] = Array(6, 9, 4,7, 5, 8)

          针对这个实例画出示意图:

《spark三类算子小总结》

4)glom

            val a = sc.parallelize(1 to 100, 3)

            a.glom.collect

            res8: Array[Array[Int]] = Array(Array(1, 2, 3,…, 33), Array(34,35,…, 65, 66), Array(67, …, 100))

        针对这个实例画出示意图:

《spark三类算子小总结》

5)union

         val a = sc.parallelize(1 to 3, 1)

         val b = sc.parallelize(5 to 7, 1)

         (a ++ b).collect

         res0: Array[Int] = Array(1, 2, 3, 5, 6, 7)

        针对这个实例画出示意图:

《spark三类算子小总结》

6)cartesian(笛卡尔操作)

          val x =sc.parallelize(List(1,2,3,4,5))

          val y =sc.parallelize(List(6,7,8,9,10))

          x.cartesian(y).collect

          res0: Array[(Int, Int)] =Array((1,6), (1,7), (1,8), (1,9),(1,10), (2,6), (2,7), (2,8), (2,9),(2,10), (3,6), (3,7), (3,8), (3,9),(3,10), (4,6), (5,6), (4,7), (5,7),

(4,8), (5,8), (4,9), (4,10), (5,9),(5,10))

        针对这个实例画出示意图:

《spark三类算子小总结》

7)groupBy(生成相应的key,相同的放在一起)

         val a = sc.parallelize(1 to 9, 3)

         a.groupBy(x => { if (x % 2 == 0) “even” else “odd” }).collect

         res42: Array[(String, Seq[Int])] =Array((even,ArrayBuffer(2, 4, 6,8)), (odd,ArrayBuffer(1, 3, 5, 7,9)))

        针对这个实例画出示意图:

《spark三类算子小总结》

8)filter

         val a = sc.parallelize(1 to 10, 3)

         val b = a.filter(_ % 2 == 0)

         b.collect

         res3: Array[Int] = Array(2, 4, 6, 8, 10)

        针对这个实例画出示意图:

《spark三类算子小总结》

9)distinct(去重)

        val c =sc.parallelize(List(“Gnu”, “Cat”,”Rat”, “Dog”, “Gnu”, “Rat”), 2)

        rdd有6个元素,将这六个元素分成2个partition

        c.distinct.collect//将重复出现的元素使用distinct函数去除再形成数组

        res6: Array[String] = Array(Dog,Gnu, Cat, Rat)

        针对这个实例画出示意图:

《spark三类算子小总结》

二、Key-Value数据类型的Transformation算子

 1)mapValues

        val a = sc.parallelize(List(“dog”, “tiger”, “lion”, “cat”, “panther”, “eagle”), 2)

       //RDD有6个元素,分别为dog,lion,cat…,将他们分成2个partition

       val b = a.map(x => (x.length, x))

       b.mapValues(“x” + _ + “x”).collect

       res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx), (3,xcatx), (7,xpantherx), (5,xeaglex))

针对这个实例画出示意图:

《spark三类算子小总结》

 2)combineByKey

          val a =sc.parallelize(List(“dog”,”cat”,”gnu”,”salmon”,”rabbit”,”turkey”,”wolf”,”bear”,”bee”),3)

          val b = sc.parallelize(List(1,1,2,2,2,1,2,2,2), 3)

          val c = b.zip(a)

          val d = c.combineByKey(List(_), (x:List[String], y:String) => y :: x,(x:List[String], y:List[String]) => x ::: y)

          d.collect

          res16: Array[(Int, List[String])] = Array((1,List(cat, dog, turkey)), (2,List(gnu,rabbit, salmon, bee, bear, wolf)))

          该实例的操作示意图为:

《spark三类算子小总结》

 3)reduceByKey

          val a = sc.parallelize(List(“dog”, “tiger”, “dog”, “cat”, “dog”, “eagle”, “cat”), 2)

          //rdd有7个元素,将他们分成2个partition

          val b = a.map(x => (x.length, x))

          b.reduceByKey(_ + _).collect   //使用reduceByKey(_+_)的方式

          res87: Array[(Int, String)] = Array((3,dog), (1,tiger), (2,cat),(1,eagle))

          该实例的操作示意图为:

《spark三类算子小总结》

 4)partitionBy

对两个RDD中的KV元素,每个RDD中相同key中的元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同的key的元素进行合并

 5)cogroup

         对两个RDD中的KV元素,每个RDD相同key中元素分别聚合成一个集合。与reduceByKey不同的是针对两个RDD中相同key的元素进行合并

          val a = sc.parallelize(List(1, 2, 1, 3), 1)

          val b = a.map((_, “b”))

          val c = a.map((_, “c”))

          b.cogroup(c).collect

          res7: Array[(Int, (Iterable[String], Iterable[String]))] = Array((2,(ArrayBuffer(b),ArrayBuffer(c))),(3,(ArrayBuffer(b),ArrayBuffer(c))),(1,(ArrayBuffer(b, b),ArrayBuffer(c, c))))

          该实例的操作示意图为:

《spark三类算子小总结》

  6)join

          val a = sc.parallelize(List(“dog”, “salmon”, “salmon”, “rat”, “elephant”), 3)

          val b = a.keyBy(_.length)

          val c =sc.parallelize(List(“dog”,”cat”,”gnu”,”salmon”,”rabbit”,”turkey”,”wolf”,”bear”,”bee”),3)

          val d = c.keyBy(_.length)

          b.join(d).collect

          res0: Array[(Int, (String, String))] = Array((6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (6,(salmon,salmon)), (6,(salmon,rabbit)), (6,(salmon,turkey)), (3,(dog,dog)), (3,(dog,cat)), (3,(dog,gnu)), (3,(dog,bee)), (3,(rat,dog)), (3,(rat,cat)), (3,(rat,gnu)), (3,(rat,bee)))

          该实例的操作示意图为:

《spark三类算子小总结》

  7)leftOutJoin

        将LEFT左边的表名1中的所有记录全部保留,而将右边的表名2中的字段B与表名1.字段A相对应的记录显示出来

          val a = sc.parallelize(List(“dog”, “salmon”, “salmon”, “rat”, “elephant”), 3)

          val b = a.keyBy(_.length)

          val c =sc.parallelize(List(“dog”,”cat”,”gnu”,”salmon”,”rabbit”,”turkey”,”wolf”,”bear”,”bee”),3)

          val d = c.keyBy(_.length)

          b.leftOuterJoin(d).collect

          res1: Array[(Int, (String, Option[String]))] = Array((6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (6,(salmon,Some(salmon))), (6,(salmon,Some(rabbit))), (6,(salmon,Some(turkey))), (3,(dog,Some(dog))), (3,(dog,Some(cat))), (3,(dog,Some(gnu))), (3,(dog,Some(bee))), (3,(rat,Some(dog))), (3,(rat,Some(cat))), (3,(rat,Some(gnu))), (3,(rat,Some(bee))), (8,(elephant,None)))

          该实例的操作示意图为:

《spark三类算子小总结》

  8)rightOutJoin(右外连接)

          val a = sc.parallelize(List(“dog”, “salmon”, “salmon”, “rat”, “elephant”), 3)

          val b = a.keyBy(_.length)

          val c =sc.parallelize(List(“dog”,”cat”,”gnu”,”salmon”,”rabbit”,”turkey”,”wolf”,”bear”,”bee”),3)

          val d = c.keyBy(_.length)

          b.rightOuterJoin(d).collect

          res2: Array[(Int, (Option[String], String))] = Array((6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (6,(Some(salmon),salmon)), (6,(Some(salmon),rabbit)), (6,(Some(salmon),turkey)), (3,(Some(dog),dog)), (3,(Some(dog),cat)), (3,(Some(dog),gnu)), (3,(Some(dog),bee)), (3,(Some(rat),dog)), (3,(Some(rat),cat)), (3,(Some(rat),gnu)), (3,(Some(rat),bee)), (4,(None,wolf)), (4,(None,bear)))

          该实例的操作示意图为:

《spark三类算子小总结》

三、Action算子

  1)foreach

           val c = sc.parallelize(List(“cat”, “dog”, “tiger”, “lion”, “gnu”, “crocodile”, “ant”, “whale”, “dolphin”, “spider”), 3)//导入数据使用parallelize方式

           c.foreach(x => println(x + “s are yummy”))//得到一条数据就处理一条数据

           该实例的操作示意图为:

《spark三类算子小总结》

 2)fold

            val a = sc.parallelize(List(1,2,3), 3)

            a.fold(0)(_ + _)

            res59: Int = 6

           针对这个实例画出示意图:

《spark三类算子小总结》

 3)aggregate

            val z = sc.parallelize(List(1,2,3,4,5,6), 2)

            // lets first print out the contents of the RDD with partition labels

            def myfunc(index: Int, iter: Iterator[(Int)]) : Iterator[String] = {

                iter.toList.map(x => “[partID:” + index + “, val: ” + x + “]”).iterator

           }

            z.mapPartitionsWithIndex(myfunc).collect

           res28: Array[String] = Array([partID:0, val: 1], [partID:0, val: 2], [partID:0, val:3], [partID:1, val: 4], [partID:1, val: 5], [partID:1, val: 6])

           z.aggregate(0)(math.max(_, _), _ + _)

           res40: Int = 9

           针对这个实例画出示意图:

《spark三类算子小总结》

  4)collect

          val c = sc.parallelize(List(“Gnu”, “Cat”, “Rat”, “Dog”, “Gnu”, “Rat”), 2)

          c.collect  //通过collect算子将两个partition结合成一个

          res29: Array[String] = Array(Gnu, Cat, Rat, Dog, Gnu, Rat)

          针对这个实例画出示意图:

《spark三类算子小总结》

  5)collectAsMap

            RDD中同一个Key中存在多个Value,那么后面的Value将会把前面的Value覆盖,最终得到的结果就是Key唯一,而且对应一个Value

            val a = sc.parallelize(List(1, 2, 1, 3), 1)

            val b = a.zip(a)

            b.collectAsMap

            res1: scala.collection.Map[Int,Int] = Map(2 -> 2, 1 -> 1, 3 -> 3)

           针对这个实例画出示意图:

《spark三类算子小总结》

  6)reduceByKeyLocally

            该函数将RDD[K,V]中每个K对应的V值根据映射函数来运算,运算结果映射到一个Map[K,V]中,而不是RDD[K,V]。

            val a =sc.parallelize(List(“dog”,”cat”, “owl”, “gnu”, “ant”), 2)

            val b = a.map(x => (x.length,x))

            b.reduceByKey(_ + _).collect

            res86: Array[(Int, String)] =Array((3,dogcatowlgnuant))

           针对这个实例画出示意图:

《spark三类算子小总结》

  7)lookup

           val a =sc.parallelize(List(“dog”,”tiger”, “lion”, “cat”, “panther”, “eagle”), 2)

           val b = a.map(x => (x.length, x))

           b.lookup(5)

           res0: Seq[String] = WrappedArray(tiger, eagle)

           针对这个实例画出示意图:

《spark三类算子小总结》

  8)count

            val c = sc.parallelize(List(“Gnu”, “Cat”, “Rat”, “Dog”), 2)

            c.count

            res2: Long = 4

           针对这个实例画出示意图:

《spark三类算子小总结》

  9)top 

            val c = sc.parallelize(Array(6, 9, 4,7, 5, 8), 2)

            c.top(2)

            res28: Array[Int] = Array(9, 8)

           针对这个实例画出示意图:

《spark三类算子小总结》

  10)reduce

            val a = sc.parallelize(1 to 100, 3)

            a.reduce(_ + _)

            res41: Int = 5050

           针对这个实例画出示意图:

《spark三类算子小总结》

这篇文章也是写了一个礼拜才出来的成果,本身不是很熟悉,需要一个算子一个算子去理解,有些地方也没有很准确,但也是我理解能力范围之内了,网站上不太能查到有关于这方面的知识,所以绝大部分靠自学,理解不对的地方也请多多包涵啦。对于mapreduce和spark的理解也仅限于参考网上的知识加上自己的一些见解。

——————————————-参考:http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html——————————————-

————————————————————参考:https://www.cnblogs.com/MOBIN/p/5414490.html#12———————————————————–

    原文作者:笙歌如榆
    原文地址: https://www.jianshu.com/p/0d547a6b249d
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞