Spark RDD 编程指南中文版 (三)

接上一章 曾革:Spark RDD 编程指南中文版(二)继续翻译 Spark 官方的英文文档。

你可以点击这个链接查看所有已翻译的内容: 曾革:Spark 中文文档目录汇总

Transformations

下面的表格列出了 Spark 支持的一些常见的转换操作。

map(func):返回一个新的分布式数据集,它由对 source 中每个元素调用 func 函数后生成。

filter(func):返回一个新的分布式数据集,它由对 source 中每个元素调用 func 并返回值为 true 的值来生成。

flatMap(func):与 map 类似,但是每一个 input item 可以被映射成 0 个或多个 output item(所以 func 返回的是一个 Seq 而不是一个单独的 item)。

mapPartitions(func) :与 map 类似,但是会单独的在 RDD 的每一个 partition (block)上执行,所以在一个类型为 T 的 RDD 上运行的时候,func 的类型必须是 Iterator<T> => Iterator<U>。

mapPartitionsWithIndex(func) :与 mapPartitions 类似,但是必须提供一个带整型值参数的 func,这个整型值用来代表 partitions的索引,所以在一个类型为 T 的 RDD 上运行的时候,func 的类型必须是 (int,Iterator<T>) => (int,Iterator)。

sample(withReplacement, fraction, seed) :少量的数据样本,可以设置是否放回,采样的百分比,使用指定的随机数生成器。

union(otherDataset) :会返回一个新的 dataset,这个新的dataset是 source 的 elements 和参数里面 otherDataset 的 elements 的并集。

intersection(otherDataset) :会返回一个新的 dataset,这个新的 dataset source 的 elements 和参数里面 otherDataset 的 elements 的交集。

distinct([numTasks])) :会返回一个新的 dataset,这个新的 dataset 是 原数据去重后的数据。

groupByKey([numTasks]) :当我们在一个 <K, V> pairs 的dataset 上调用这个 func 的时候,会返回一个 <K, V> pairs 的 dataset。

注意 1:如果你的 group 是为了在数据上根据 key 进行 aggregation 操作(比如求和和求平均值),那么你最好使用 reduceByKey 或者是 aggregation ,这样或许会有更好的性能。

注意 2 :默认情况下,output 的并行数取决于父 RDD,但是你可以通过可选参数numTasks 来指定不同数量的 tasks。

reduceByKey(func, [numTasks]) :当在 <K,V> pair 数据集上调用时会返回一个 <key, value>数据集,其中会对每一个 key 所对应的 value 调用 func 函数进行聚合操作,它的类型必须是 (V,V) => V。像 groupByKey,你可以通过第二个可选参数来配置 reduce 的 tasks 数量。

aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) :当在 <K,V> pair 数据集上调用时会返回一个 <key, value>数据集,其中会对每一个 key 所对应的 value 调用给定的 combine 函数并且还带有一个 neutral “zero” 值来进行聚合操作,像 groupByKey,你可以通过第二个可选参数来配置 reduce 的 tasks 数量。

sortByKey([ascending], [numTasks]) :当在 <K,V> pair 数据集上调用时会返回一个 <key, value>数据集,其中 K 实现了 Ordered ,返回的数据集会根据 keys 的大小进行升序或者是降序排序,升序或降序取决于你设置的 ascending 参数是 true 还是 false。

join(otherDataset, [numTasks]) :当我们在类型是 (K,V)和(K,W)的数据集上进行调用的时候,会返回一个类型是(K,(V,W))pairs 的数据集,他拥有每个 key 所对应的元素对。外连接支持 leftOuterJoin,rightOuterJoin 和 fullOuterJoin 。

cogroup(otherDataset, [numTasks]) :当我们在类型是 (K,V)和(K,W)的数据集上进行调用的时候,会返回 (K,(Interator<V>,Interator<V>))。这个操作也可以叫做 groupWith。

cartesian(otherDataset):当我们在一个 T 和 U 类型的 dataset 上调用时,会返回一个 (T, U) pairs 类型的 dataset(所有元素的 pairs,即笛卡尔积)。

pipe(command, [envVars]) :Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.

coalesce(numPartitions) :降低 RDD 中分区的数量至 numPartitions 个。这通常在对一个大的数据集进行过滤后使用会有很好的效果。

repartition(numPartitions) :Reshuffle RDD 里面的数据以创建更多的分区或者是更少的分区,用来保持数据量的平衡,这通常会通过网络传输来对所有的数据进行 shuffle。

repartitionAndSortWithinPartitions(partitioner) :根据给定的 partitioner对 RDD 进行重新分区,并在每个结果分区中,按照 key 值对记录排序。这比每一个分区中先调 repartition 然后再 sorting 效率更高,因为它可以将排序过程推送到执行 shuffle 的机器上进行。

Actions

下面的表格列出了 Spark 支持的一些常见的行动操作。

reduce(func) :对数据集中的每一个元素调用 func 函数进行 aggregate (这个函数需要传两个参数并且只会返回一个值)。这个函数应该是可交换(commutative )和可关联的(associative),这样才能保证它可以被正确的进行并行计算。

collect() :以数组的形式返回 driver program 中的数据元素,这个操作通常会在执行 filter 或者是其他操作并只返回一个足够小的数据的时候有用。

count() :返回数据集里的元素的总数。

first():返回数据集的第一个元素(类似于 take(1))。

take(n):返回数据集的前 n 个元素。

takeSample(withReplacement, num, [seed]) :返回一个随机抽样的数据,数量是 num 个,参数 withReplacement 指定是否有放回的抽取,参数 seed 指定生成随机数的种子。

takeOrdered(n, [ordering]) :返回 RDD 按自然顺序(natural order)或自定义比较器(custom comparator)排序后的前 n 个元素。

saveAsTextFile(path) : 将 dataset 中的元素以文本文件(或文本文件的集合)的形式写入本地文件系统、HDFS 或其它 Hadoop 支持的文件系统中的给定目录中。Spark 将对每个元素调用 toString 方法,将数据元素转换为文本文件中的一行记录。

saveAsSequenceFile(path) :将 dataset 中的元素以 Hadoop SequenceFile 的形式写入到本地文件系统、HDFS 或其它 Hadoop 支持的文件系统指定的路径中。该操作可以在实现了 Hadoop 的 Writable 接口的键值对(key-value pairs)的 RDD 上使用。在 Scala 中,它还适用于能通过隐式转换为 Writable 的类型的数据(Spark 包括了基本类型的转换,例如 Int, Double, String 等等)。

saveAsObjectFile(path) :使用 Java 序列化(serialization)以 simple format 的格式编写数据集的元素,然后使用 SparkContext.objectFile() 进行加载。

countByKey() : 仅适用于(K,V)类型的 RDD 。返回具有对每个 key 进行了计数的 (K , Int)pairs 的 hashmap。

foreach(func) :对 dataset 中每个元素调用函数 func 。这通常会产生一些side effects,例如更新一个 累加器或与外部存储系统(external storage systems)进行交互。Note:修改除 foreach()之外的累加器以外的变量(variables)可能会导致undefined behavior。详细介绍请阅读 Understanding closures 部分。

该 Spark RDD API 还提供了一些 actions(操作)的异步版本,例如针对 foreach 的 foreachAsync,它们会立即给调用者返回一个 FutureAction ,而不是在完成 action 时阻塞。 这可以用于管理或等待 action 的异步执行。

Shuffle operations

一些确定的操作会触发 Spark 的 shuffle 机制 。shuffle 是重新分发数据的一种机制,使得数据能跨机器进行分组,这种跨 executor 和 machine 的数据复制使得 shuffle 成为一种复制且代价沉重的操作。

(未完待续————————————-)

    原文作者:曾革
    原文地址: https://zhuanlan.zhihu.com/p/32732625
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞