Spark的Transformation的lazy策略

Spark分为两种API:Transformations和Actions。

Transformations的常用操作有:map,filter,flatMap,union,sortByKey,reduceByKey等。
更多的解释请参考:Spark Transformations
Actions的常用操作有:reduce,collect,count,countByKey,foreach,saveAsTextFile等。
更多的解释请参考:Spark Actions

官方文档中解释:

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset.

Transformations是转换操作,Actions是执行操作。
关于Transformations还有另外的解释:

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file).

所有的Transformations操作都使用了lazy,它们不会计算结果,只是记录dataset的转换操作。

Transformations代码样例

探索一下源码,看看Transformations是如何记录转换操作的。
先看一下的map为例,查看map的代码(RDD.scala):

def map[U: ClassTag](f: T => U): RDD[U] = withScope {
  val cleanF = sc.clean(f)
  new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF))
}

代码很简单,clean一下f,然后创建一个MapPatitionsRDD。
再看一下filter,查看filter的代码(RDD.scala):

  def filter(f: T => Boolean): RDD[T] = withScope {
    val cleanF = sc.clean(f)
    new MapPartitionsRDD[T, T](
      this,
      (context, pid, iter) => iter.filter(cleanF),
      preservesPartitioning = true)
  }

两个逻辑大致是一样的,值得注意的是:两个方法中有下面一个操作:
val cleanF = sc.clean(f)
为什么要执行这个操作呢。看一下clean方法的解释(SparkContext.scala):

  /**
   * Clean a closure to make it ready to serialized and send to tasks
   * (removes unreferenced variables in $outer's, updates REPL variables)
   * If <tt>checkSerializable</tt> is set, <tt>clean</tt> will also proactively
   * check to see if <tt>f</tt> is serializable and throw a <tt>SparkException</tt>
   * if not.
   *
   * @param f the closure to clean
   * @param checkSerializable whether or not to immediately check <tt>f</tt> for serializability
   * @throws SparkException if <tt>checkSerializable</tt> is set but <tt>f</tt> is not
   *   serializable
   */
  private[spark] def clean[F <: AnyRef](f: F, checkSerializable: Boolean = true): F = {
    ClosureCleaner.clean(f, checkSerializable)
    f
  }

其中比较重要的两句:

Clean a closure to make it ready to serialized and send to tasks
(removes unreferenced variables in $outer’s, updates REPL variables)

解释这句话要有集群组件说起了,请看图:

《Spark的Transformation的lazy策略》 cluster-overview.png

具体参考Cluster Mode Overview
在集群上,运行一个spark程序需要三个组件,Driver,ClusterManager,WorkNode。
Driver负责提交任务和处理结果。ClusterManager负责分配任务。WorkNode负责执行具体的任务。
所有的任务代码都在Driver节点,那么任务要想执行,就必须把代码传到WorkNode节点,所以需要先将代码序列化后再传到相应节点。
那么Clean a closure to make it ready to serialized and send to tasks就很好理解了。

再回到map和filter的代码,两个代码都创建了一个MapPartitionsRDD,看一下MapPartitionsRDD的代码:
主要注意它的类声明和compute方法

private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag](
    var prev: RDD[T],
    f: (TaskContext, Int, Iterator[T]) => Iterator[U],  // (TaskContext, partition index, iterator)
    preservesPartitioning: Boolean = false)
  extends RDD[U](prev) {

MapPartitionsRDD继承了RDD,就像是一个链表,每次Transformations都会在这个链表上加上一个节点
在看看compute方法

override def compute(split: Partition, context: TaskContext): Iterator[U] =
  f(context, split.index, firstParent[T].iterator(split, context))

compute方法就是调用f操作,转换出一个新的Iterator[U]。
猜想,Action操作最终会调用compute方法完成数据的转换。

接下来在写Action的内容。

    原文作者:lsnl8480
    原文地址: https://www.jianshu.com/p/b60dfb856312
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞