1.RDD是一个基本的抽象,操作RDD就像操作一个本地集合一样,降低了编程的复杂度
RDD的算子分为两类,一类是Transformation(lazy),一类是Action(触发任务执行)
RDD不存真正要计算的数据,而是记录了RDD的转换关系(调用了什么方法,传入什么函数)
创建RDD有哪些中方式呢?
1.通过外部的存储系统创建RDD
2.将Driver的Scala集合通过并行化的方式编程RDD(试验、测验)
3.调用一个已经存在了的RDD的Transformation,会生成一个新的RDD
RDD的Transformation的特点
1.lazy
2.生成新的RDD
RDD分区的数据取决于哪些因素?
1.如果是将Driver端的Scala集合并行化创建RDD,并且没有指定RDD的分区,RDD的分区就是为该app分配的中的和核数
2.如果是重hdfs中读取数据创建RDD,并且设置了最新分区数量是1,那么RDD的分区数据即使输入切片的数据,如果不设置最小分区的数量,即spark调用textFile时会默认传入2,那么RDD的分区数量会打于等于输入切片的数量
——————————————-
RDD的map方法,是Executor中执行时,是一条一条的将数据拿出来处理
mapPartitionsWithIndex 一次拿出一个分区(分区中并没有数据,而是记录要读取哪些数据,真正生成的Task会读取多条数据),并且可以将分区的编号取出来
功能:取分区中对应的数据时,还可以将分区的编号取出来,这样就可以知道数据是属于哪个分区的(哪个区分对应的Task的数据)
//该函数的功能是将对应分区中的数据取出来,并且带上分区编号
val func = (index: Int, it: Iterator[Int]) => {
it.map(e => s"part: $index, ele: $e")
}
val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9))
val rdd2=rdd.mapPartitions
rdd2.collect
def func2(index:Int,iter:Iterator[String]):Iterator[String]={
| iter.map(x => "[partId:" + index+", val:"+x+"]")}
val rdd2=sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2.mapPartitionsWithIndex(func2).collect
//分区为1
scala> rdd.aggregate(0)(_+_,_+_)
res5: Int = 45
//分区为2
scala> val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9),2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:24
scala> rdd.aggregate(0)(_+_,_+_)
res2: Int = 45
scala> rdd.aggregate(0)(math.max(_,_),_+_)
res1: Int = 13
//分区为3
scala> val rdd=sc.parallelize(List(1,2,3,4,5,6,7,8,9),3)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> rdd.aggregate(0)(math.max(_,_),_+_)
res5: Int = 18
//分区为3,增加初始值
scala> rdd.aggregate(5)(math.max(_,_),_+_)
res6: Int = 25
scala> 5+6+9+5
res7: Int = 25
//字符串相加
scala> val rdd2=sc.parallelize(List("a","b","c","d","e","f"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> rdd2.aggregate("")(_+_,_+_)
res9: String = abcdef //不一样是abcdef,也有可能是defabc。不知道哪个task计算的快
scala> rdd2.aggregate("|")(_+_,_+_)
res14: String = ||abc|def
//结果是24 或42 (注意是字符串相加,4+2 或者2+4)
scala> val rdd3=sc.parallelize(List("12","23","345","4567"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[3] at parallelize at <console>:24
scala> rdd3.aggregate("")((x,y)=> math.max(x.length,y.length).toString,(x,y)=>x+y)
res22: String = 24
//注意初始值是参与运算的 10或01
scala> val rdd4=sc.parallelize(List("12","23","345",""),2)
rdd4: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:24
scala> rdd4.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res25: String = 10
//结果是11
scala> val rdd5=sc.parallelize(List("12","23","","345"),2)
rdd5: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[9] at parallelize at <console>:24
scala> rdd5.aggregate("")((x,y)=> math.min(x.length,y.length).toString,(x,y)=>x+y)
res35: String = 11
//reduceByKey
scala> val pairRDD=sc.parallelize(List(("cat",2),("cat",5),("mouse",4),("cat",12),("dog",12),("mouse",2)),2)
pairRDD: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[15] at parallelize at <console>:24
scala> pairRDD.reduceByKey(_+_).collect
res45: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
scala> pairRDD.aggregateByKey(0)(_+_,_+_).collect
res46: Array[(String, Int)] = Array((dog,12), (cat,19), (mouse,6))
//dog初始值只计算了一次
scala> pairRDD.aggregateByKey(100)(_+_,_+_).collect
res47: Array[(String, Int)] = Array((dog,112), (cat,219), (mouse,206))
//查看分区情况
scala> pairRDD.aggregateByKey(100)(_+_,_+_).saveAsTextFile("/home/ecx17ys/abccc")
[ecx17ys@sharc-node002 abccc]$ cat part-00000
(dog,112)
(cat,219)
***************************collect**************************************
scala> val rdd =sc.parallelize(List(("a",1),("b",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> rdd.map
map mapPartitions mapPartitionsWithIndex mapValues
scala> rdd.mapValues(_*100).collect
res0: Array[(String, Int)] = Array((a,100), (b,200))
scala> rdd.mapValues(_*100).collectAsMap
res4: scala.collection.Map[String,Int] = Map(b -> 200, a -> 100)
countByKey
countByValue
filterByRange rdd.filterByRange(“b”,”d”) 不包括b
faltmapValue rdd.faltmapValue(_.split(” “))
aggregateByKey 是Transformation
reduceByKey 是Transformation
filter 是Transformation
flatMap 是Transformation
map 是ransformation
mapPartition 是ransformation (一个迭代器一个分区)
mapPartitionWithIndex 是ransformation
collect 是Action
aggregate 是Action
saveAsTextFile 是Action
foreach 是Action (一条一条)
foreachPartition 是Action (一个分区一个分区)
——————————————-
作业,求最受欢迎的老师
1.在所有的老师中求出最受欢迎的老师Top3
2.求每个学科中最受欢迎老师的top3(至少用2到三种方式实现)
作业,把你以前用mapReduce实现的案例,全部用spark实现
——————————————–
* – A list of partitions (一系列分区,分区有编号,有顺序的)
* – A function for computing each split (每一个切片都会有一个函数作业在上面用于对数据进行处理)
* – A list of dependencies on other RDDs (RDD和RDD之间存在依赖关系)
* – Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
(可选,key value类型的RDD才有RDD[(K,V)])如果是kv类型的RDD,会一个分区器,默认是hash-partitioned
* – Optionally, a list of preferred locations to compute each split on (e.g. block locations for
* an HDFS file)
(可以,如果是从HDFS中读取数据,会得到数据的最优位置(向Namenode请求元数据))