SPARK[RDD之转换函数]

前面讲到了RDD的分区、RDD的创建,这节将讲解RDD的转换,RDD的转换就是从父RDD生成一个新的RDD,新的RDD分区可能和父RDD一致也可能不再一致。
常用的转换函数:

map map是对每个元素进行转换,生成新的元素
flatMap flatMap是将每个元素生成多个元素,一般不会重分区
distinct distinct是去重函数,返回不重复的集合,很大可能会发生重分区
coalesce[,koə’lɛs] 合并分区,如果父RDD的分区数小于指定的分区数,需要设定shuffle为true,使用hash分区函数
repartition重分区,repartition是coalesce的shuffle为true实现
glom[ɡlɑm]将分区类的元素t转换成Array[T]
randomSplit将一个RDD拆分成多个RDD,显然会重分区,权重因子数之和和原来的分区数一致,权重大的分到的元素就会多一些。返回的RDD的元素是Array类型的。
union 合并RDD,和前面的合并分区不一样,元素不去重
intersection 合并RDDf返回两个RDD的交集,并去重
subtract 返回前一个减去后一个RDD的余集,不去重
zip,将RDD1的元素作为K,RDD2的元素作为V,合并成一个新的RDD,返回Array[K,V],分区数或元素个数不匹配的话,都会抛出异常
zipPartitions,要求分区数相同,可以是多个RDD,元素个数不做要求,抹去多余的,返回Array[K,K,K,V]
zipWithIndex 将RDD自身进行生成Array[item,index]
zipWithIndexUniqueId 将RDD自身进行生成Array[item,index]

区别:zipWithIndex需要启动作业维护index在不同分区中顺序,而zipWithIndexUniqueId不需要,zipWithIndexUniqueId中index的算法是前一个元素的idenx+分区总数,如RDD有3个分区,第三分区的第一个元素为2,第二个元素就为2+3 =5,第一个分区的第一个元素index为0,第二个元素为0+3 =3,这样index就不会重复。

对于[K,V]类型元素的转换函数

combineByKey,对相同key的[K,V]元素进行合并,有三个合并函数,需要自己实现
foldByKey(init)(fucntion(V,V)),对相同key的[K,V]元素进行合并,设定初始值,以及连续连个值的操作函数
reduceByKey(function(V,V)),类似foldByKey,没有初始值
reduceByKeyLocally(function(V,V)),同reduceByKey,只是返回不是RDD而是一个Map[K,V]
groupByKey,对[K,V]类型的元素返回[K,(v1,v2,v3)]…
join,内连接,对于两个RDD的[K,V]类型的元素,返回[K,(v1,v2)],只返回有公有key的组合
fullOuterJoin,全连接,返回所有[k,(v,v1..)]
leftOuterJoin左外连接,返回第一个RDD的所有key的合并组合
rightOuterJoin右外连接,返回第二个RDD的所有key的合并组合

本地缓存函数

cache,是persist存储在内存的实例
persist,可以指定存储等级,内存,磁盘,内存+磁盘

游标函数

first RDD中的第一个元素
count RDD元素的个数
reduce(function(item,item))
collect 将RDD转换成Array,如Aarray((“A”,1),(“B”,2))
take(index) 取出index前的所有元素,不排序
top(index)取出index前的所有元素,排序(按照item降序)
takeOrderd(index),取出index前的所有元素,排序(按照item升序)
aggregate聚合,aggregate(init)(function(x,y))
fold,fold(init)(function(x,y))
lookup(k),对于[K,V]类型的元素,返回key对应的所有V值
countByKey(k)统计[K,V]种每个K的数量
foreach
sortBy(function(item)

存储函数

saveAsTextFile –>local/hdfs
saveAsNewHadoopDataset –>HBASE

map:

val rdd = sc.parallelize(1 to 5)
val map = rdd.map(x =>x*4)
map.collect
map.partitions.size
map.preferredLocations(map.dependecies(0))

flatMap:

val flatMap = rdd.flatMap(x =>(1 to x))
flatMap.collect 
flatMap.partitions.size
flatMap.preferredLocations(map.dependecies(0))

distinct:

val distinct = flatMap.distinct
distinct.collect 
distinct.partitions.size
distinct.preferredLocations(map.dependecies(0))

union:

val rdd1 =sc.parallelize(1 to 5)
val rdd2 =sc.parallelize(6 to 10)
val rdd3 =rdd1.union(rdd2)

reduce:

val rdd = sc.makeRDD(Array(("A",2),("B",1),("C",4)))
val reduce = rdd.reduce((item1,item2) ={ //2个参数
      (item1._1 + item2._1,item1._2 +item2._2)  //参数元组形式计算,并返回
})
reduce.print
("BCA",7)
    原文作者:北风第一支
    原文地址: https://www.jianshu.com/p/14ac2e7e869a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞