Spark源码分析之ClosureCleaner

一、前言

研究过Spark源码的读者也许会发现,RDD中map、filter等方法对传入的函数都会调用sc.clean()进行清理,而这个方法调用了ClosureCleaner中的clean方法。相信有部分读者会对这个方法的作用有疑问,下面我们就对ClosureCleaner的clean方法的源码做深入分析。

二、闭包及其实现

在研究这个方法的源码之前,读者需要了解闭包的概念,同时也需要对Scala的闭包实现有一定了解。下面我们先来介绍下闭包的概念:

1、闭包

所谓闭包,就是能够捕获处于函数外部而又在被创建作用域之内变量的函数。在Scala中,在函数内引用外部函数或类的变量或方法,将形成一个闭包。下面我们通过一个例子来理解闭包:

def outScope = {
      var factor = 3  
      val innerScope = (i:Int) => i * factor 
      innerScope
}

在上例中,函数innerScope引用了外部方法outScope的变量factor,那么innerScope将形成一个所谓的闭包。在函数式语言中,函数是first class,也就是说函数可以像变量一样被传递。当我们传递innerScope函数的时候变量factor也将与innerScope作为一个整体被传递,这个整体就是所谓的闭包。如果闭包中引用了比较大的外部对象,那么闭包在序列化时将带来较大性能开销。闭包不是Scala特有的概念,像Python,JS也有闭包的概念,如果想对闭包有更深入理解,请参考知乎话题:闭包(计算机科学)是什么?

2、Scala闭包的实现

  • Scala函数的实现
    先来看一段示例代码:
class SomethingNotSerializable {
      def someValue = 1
      def scope(name: String)(body: => Unit) = body
      def someMethod(): Unit = scope("one") {
          def x = someValue
          val y = 2
          def z = 1
          scope("two") { println(x + y + z +1) }
      }
}

根据闭包的定义,我们知道类SomethingNotSerializable中有两个闭包,分别是Scope(“one”) 和Scope(“two”)。使用scalac编译这段代码,获得编译后的class文件:

《Spark源码分析之ClosureCleaner》 编译后class文件

可以看到类SomethingNotSerializable编译生成3个class文件,而示例代码只定义了一个类,为什么会生成3个class文件?事实上Scala编译器会为每个闭包(函数)生成一个可序列化类,图中两个名称中包含anonfun的class文件分别是函数Scope(“one”)和Scope(“two”)编译后生成的class文件。

  • Scala闭包的实现
    我们将从JVM字节码层面研究Scala闭包的实现。使用命令javap -v SomethingNotSerializable$$anonfun$someMethod$1$$anonfun$apply$mcv$sp$1.class获取闭包Scope(“two”)生成类的class类文件结构。类文件结构非常复杂,包括常量池、字段表、方法表和属性表等一系列复杂的结构体。为了简单起见,我们主要分析下图中的方法,它包含了函数Scope(“two”)主要逻辑:
    《Spark源码分析之ClosureCleaner》 字节码指令
    下面我们来分析它怎么实现对外部变量y的引用及外部方法x的调用(即闭包的实现):

a、对外部变量y的引用。第10条指令为aload_0,功能是加载局部变量表第0个槽位的变量(this)到栈顶。第11条字节码指令为getfield,功能是获取栈顶指定类的实例域,此处含义即为获取本类y域。本类为什么会有y域呢?因为Scala编译器会在闭包生成类中将引入y域,并通过构造方法赋值(不可变的基本类型传递值,可变基本类型和引用类型传递引用。这里可能大家会疑惑基本类型怎么能传递引用,其实这里Scala会将可变基本类型封装成引用类型,比如Int类型会封装为IntRef)。故我们可以得出结论:在Scala闭包实现中,Scala编译器把闭包外部所有被闭包引用的域,设置为生成类的域
b、对外部方法x的引用。第4条指令使用getfield加载本类域outer。这个outer域的类型为SomethingNotSerializable《Spark源码分析之ClosureCleaner》1,即Scala编译器为函数Scope(“one”)生成的类。接下来第5行字节码调用$outer的x方法,即Scope(“one”)的x方法。故我们可以得出结论:在Scala闭包实现中,如果闭包中含有对外部方法调用,那么Scala编译器将把外部对象(无论是闭包合成类还是客户端定义类)的引用设置为闭包生成类的一个域,然后调用其对应方法

三、ClosureCleaner的作用

在Spark应用开发中,RDD的处理函数将被封装到Task中,并在被序列化后分发至集群各节点并行处理。假设我们向RDD的foreach方法中传入Scope(“two”)函数,而Scope(“two”)的合成类的《Spark源码分析之ClosureCleaner》outer域,那么这个$outer域同样需要被序列化,这可能会导致更多未被引用域被序列化。更极端的是,如果这些类中包含未被引用且不可序列化的域,那么将导致不必要的SparkException异常(Task not serializable)。而ClosureCleaner类的作用就是递归清理外围类中无用域,降低序列化的开销,防止不必要的不可序列化异常

四、ClosureCleaner详解

有了上面的分析,阅读ClosureCleaner的源码将会轻松很多。我们主要分析clean方法,它包含了ClosureCleaner类的主要逻辑。具体解析请参考注释:

/**
   * Helper method to clean the given closure in place.
   *
   * The mechanism is to traverse the hierarchy of enclosing closures and null out any
   * references along the way that are not actually used by the starting closure, but are
   * nevertheless included in the compiled anonymous classes. Note that it is unsafe to
   * simply mutate the enclosing closures in place, as other code paths may depend on them.
   * Instead, we clone each enclosing closure and set the parent pointers accordingly.
   *
   * By default, closures are cleaned transitively. This means we detect whether enclosing
   * objects are actually referenced by the starting one, either directly or transitively,
   * and, if not, sever these closures from the hierarchy. In other words, in addition to
   * nulling out unused field references, we also null out any parent pointers that refer
   * to enclosing objects not actually needed by the starting closure. We determine
   * transitivity by tracing through the tree of all methods ultimately invoked by the
   * inner closure and record all the fields referenced in the process.
   *
   * For instance, transitive cleaning is necessary in the following scenario:
   *
   *   class SomethingNotSerializable {
   *     def someValue = 1
   *     def scope(name: String)(body: => Unit) = body
   *     def someMethod(): Unit = scope("one") {
   *       def x = someValue
   *       def y = 2
   *       scope("two") { println(y + 1) }
   *     }
   *   }
   *
   * In this example, scope "two" is not serializable because it references scope "one", which
   * references SomethingNotSerializable. Note that, however, the body of scope "two" does not
   * actually depend on SomethingNotSerializable. This means we can safely null out the parent
   * pointer of a cloned scope "one" and set it the parent of scope "two", such that scope "two"
   * no longer references SomethingNotSerializable transitively.
   *
   * @param func the starting closure to clean
   * @param checkSerializable whether to verify that the closure is serializable after cleaning
   * @param cleanTransitively whether to clean enclosing closures transitively 是否需要递归清理
   * @param accessedFields a map from a class to a set of its fields that are accessed by
   *                       the starting closure
   */
  private def clean(
      func: AnyRef,
      checkSerializable: Boolean,
      cleanTransitively: Boolean,
      accessedFields: Map[Class[_], Set[String]]): Unit = {

    //func必须是闭包,也就是类名中包含"$anonfun$"
    if (!isClosure(func.getClass)) {
      logWarning("Expected a closure; got " + func.getClass.getName)
      return
    }

    // TODO: clean all inner closures first. This requires us to find the inner objects.
    // TODO: cache outerClasses / innerClasses / accessedFields

    if (func == null) {
      return
    }

    logDebug(s"+++ Cleaning closure $func (${func.getClass.getName}) +++")

    // A list of classes that represents closures enclosed in the given one
    // 递归获取func内部的所有闭包合成类的class
    val innerClasses = getInnerClosureClasses(func)

    // A list of enclosing objects and their respective classes, from innermost to outermost
    // An outer object at a given index is of type outer class at the same index
    // 递归获取所有闭包及最外部对象的class实例和对象实例。判断逻辑是func中包含$outer域,
    // 且不为null。这里要说明的是,Scala会为每个函数合成对象,每个函数都有一个$outer,
    // 但只有函数是闭包时,$outer才不为空。
    val (outerClasses, outerObjects) = getOuterClassesAndObjects(func)

    // For logging purposes only
    val declaredFields = func.getClass.getDeclaredFields
    val declaredMethods = func.getClass.getDeclaredMethods

    logDebug(" + declared fields: " + declaredFields.size)
    declaredFields.foreach { f => logDebug("     " + f) }
    logDebug(" + declared methods: " + declaredMethods.size)
    declaredMethods.foreach { m => logDebug("     " + m) }
    logDebug(" + inner classes: " + innerClasses.size)
    innerClasses.foreach { c => logDebug("     " + c.getName) }
    logDebug(" + outer classes: " + outerClasses.size)
    outerClasses.foreach { c => logDebug("     " + c.getName) }
    logDebug(" + outer objects: " + outerObjects.size)
    outerObjects.foreach { o => logDebug("     " + o) }

    // Fail fast if we detect return statements in closures
    // 此方法主要使用asm框架访问class对象,这是一个典型的访问者模式的实现。此
    // 处作用主要检查闭包中是否有return语句,这在Spark中是不允许的。
    getClassReader(func.getClass).accept(new ReturnStatementFinder(), 0)

    // If accessed fields is not populated yet, we assume that
    // the closure we are trying to clean is the starting one
    // accessedFields用来保存闭包中实际使用到的域
    if (accessedFields.isEmpty) {
      logDebug(s" + populating accessed fields because this is the starting closure")
      // Initialize accessed fields with the outer classes first
      // This step is needed to associate the fields to the correct classes later
      // func的每个外部对象都需要保存其被使用的域。闭包可以嵌套,
      // 如前面介绍的SomethingNotSerializable示例中,Scope("two")引用外部Scope("one")的方法,
      // 而此方法中又引用Scope("one")的外部SomethingNotSerializable对象的域,因此需要递归查找
      // 每个对象中实际被引用的域。
      for (cls <- outerClasses) {
        accessedFields(cls) = Set[String]()
      }
      // Populate accessed fields by visiting all fields and methods accessed by this and
      // all of its inner closures. If transitive cleaning is enabled, this may recursively
      // visits methods that belong to other classes in search of transitively referenced fields.
      for (cls <- func.getClass :: innerClasses) {
        getClassReader(cls).accept(new FieldAccessFinder(accessedFields, cleanTransitively), 0)
      }
    }

    logDebug(s" + fields accessed by starting closure: " + accessedFields.size)
    accessedFields.foreach { f => logDebug("     " + f) }

    // List of outer (class, object) pairs, ordered from outermost to innermost
    // Note that all outer objects but the outermost one (first one in this list) must be closures
    var outerPairs: List[(Class[_], AnyRef)] = (outerClasses zip outerObjects).reverse
    var parent: AnyRef = null
    if (outerPairs.size > 0) {
      // 如果func外部包含闭包或对象
      val (outermostClass, outermostObject) = outerPairs.head
      if (isClosure(outermostClass)) {
        // 最外部是否是闭包
        logDebug(s" + outermost object is a closure, so we clone it: ${outerPairs.head}")
      } else if (outermostClass.getName.startsWith("$line")) {
        // SPARK-14558: if the outermost object is a REPL line object, we should clone and clean it
        // as it may carray a lot of unnecessary information, e.g. hadoop conf, spark conf, etc.  
        // spark-shell repl传过来的func
        logDebug(s" + outermost object is a REPL line object, so we clone it: ${outerPairs.head}")
      } else {
        // The closure is ultimately nested inside a class; keep the object of that
        // class without cloning it since we don't want to clone the user's objects.
        // Note that we still need to keep around the outermost object itself because
        // we need it to clone its child closure later (see below).
        // 将最外部对象设置为parent
        logDebug(" + outermost object is not a closure or REPL line object, so do not clone it: " +
          outerPairs.head)
        parent = outermostObject // e.g. SparkContext
        outerPairs = outerPairs.tail
      }
    } else {
      logDebug(" + there are no enclosing objects!")
    }

    // Clone the closure objects themselves, nulling out any fields that are not
    // used in the closure we're working on or any of its inner closures.
    // 根据accessedFields中收集到的所有func外部对象中func实际引用到的域,设置func的clone对象,
    // 未引用的域设置为null,达到清理的目的。
    for ((cls, obj) <- outerPairs) {
      logDebug(s" + cloning the object $obj of class ${cls.getName}")
      // We null out these unused references by cloning each object and then filling in all
      // required fields from the original object. We need the parent here because the Java
      // language specification requires the first constructor parameter of any closure to be
      // its enclosing object.
      val clone = instantiateClass(cls, parent)
      for (fieldName <- accessedFields(cls)) {
        val field = cls.getDeclaredField(fieldName)
        field.setAccessible(true)
        val value = field.get(obj)
        field.set(clone, value)
      }
      // If transitive cleaning is enabled, we recursively clean any enclosing closure using
      // the already populated accessed fields map of the starting closure
      // 递归清理每个外部闭包。
      if (cleanTransitively && isClosure(clone.getClass)) {
        logDebug(s" + cleaning cloned closure $clone recursively (${cls.getName})")
        // No need to check serializable here for the outer closures because we're
        // only interested in the serializability of the starting closure
        clean(clone, checkSerializable = false, cleanTransitively, accessedFields)
      }
      parent = clone
    }

    // Update the parent pointer ($outer) of this closure
    if (parent != null) {
      // 如果parent 不为null,设置$outer为parent
      val field = func.getClass.getDeclaredField("$outer")
      field.setAccessible(true)
      // If the starting closure doesn't actually need our enclosing object, then just null it out
      // 如果func没有任何一个域被引用到,则直接把$outer引用设置为null,避免额外序列化开销
      if (accessedFields.contains(func.getClass) &&
        !accessedFields(func.getClass).contains("$outer")) {
        logDebug(s" + the starting closure doesn't actually need $parent, so we null it out")
        field.set(func, null)
      } else {
        // Update this closure's parent pointer to point to our enclosing object,
        // which could either be a cloned closure or the original user object
        field.set(func, parent)
      }
    }

    logDebug(s" +++ closure $func (${func.getClass.getName}) is now cleaned +++")
    // 校验func是否可被序列化,若不能被序列化则抛出异常快速失败。
    if (checkSerializable) {
      ensureSerializable(func)
    }
  }

五、参考资料

深入理解Java虚拟机
JAVA中ASM框架详解

    原文作者:鹿先森vv
    原文地址: https://www.jianshu.com/p/51f5a34e2785
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞