3.Catalyst中的数据结构

SparkSQL内部实现的基础框架叫做Catalyst。Catalyst涉及了几个基础性概念,包括:InternalRow体系、TreeNode体系和Expression体系。

InternalRow

InternalRow的含义很直观,表示一行数据。物理算子树节点产生和转换的RDD的类型就是RDD[InternalRow]。InternalRow是一个抽象类,包括numFields,update方法和各列数据的get/set方法。
具体的逻辑由InternalRow的不同子类实现,目前包括BaseGenericInternalRow、UnsafeRow和JoinedRow三个子类。

  • BaseGenericInternalRow 同样是抽象类,实现了InternalRow定义的所有get方法,实现是通过调用类中定义的genericGet虚函数进行;
  • JoinedRow 用于实现Join操作,将两个InternalRow放在一起,形成新的InternalRow。
  • UnsafeRow 一种Spark自定义的对象存储格式,不采用Java对象存储,避免了JVM中的垃圾回收;UnsafeRow对行数据进行了编码,使得存储更加高效;
    BaseGenericInternalRow的衍生出3个子类,GenericInternalRow、SpecificInternalRow和MutableUnsafeRow。而GernericInternalRow和SpecificInternalRow的区别在于构造参数的类型,其中GernericInternalRow的构造参数类型是Array[Any], 一旦创建就不允许通过set操作修改; 而SpecificInternalRow的构造参数类型是Array[MutableValue],允许通过set操作修改。
    GenericInternalRow
/**
 * An internal row implementation that uses an array of objects as the underlying storage.
 * Note that, while the array is not copied, and thus could technically be mutated after creation,
 * this is not allowed.
 */
class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow {
  /** No-arg constructor for serialization. */
  protected def this() = this(null)

  def this(size: Int) = this(new Array[Any](size))

  override protected def genericGet(ordinal: Int) = values(ordinal)

  override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = values.clone()

  override def numFields: Int = values.length

  override def setNullAt(i: Int): Unit = { values(i) = null}

  override def update(i: Int, value: Any): Unit = { values(i) = value }

SpecificInternalRow

/**
 * A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
 * based on the dataTypes of each column.  The intent is to decrease garbage when modifying the
 * values of primitive columns.
 */
final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {

  def this(dataTypes: Seq[DataType]) =
    this(
      dataTypes.map {
        case BooleanType => new MutableBoolean
        case ByteType => new MutableByte
        case ShortType => new MutableShort
        // We use INT for DATE internally
        case IntegerType | DateType => new MutableInt
        // We use Long for Timestamp internally
        case LongType | TimestampType => new MutableLong
        case FloatType => new MutableFloat
        case DoubleType => new MutableDouble
        case _ => new MutableAny
      }.toArray)

  def this() = this(Seq.empty)

  def this(schema: StructType) = this(schema.fields.map(_.dataType))

  override def numFields: Int = values.length
  

UnsafeRow

/**
 * An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
* ...
 */
public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable 

TreeNode

TreeNode是Catalyst中和执行计划相关的所有AST,包括表达式Expression、逻辑执行计划LogicalPlan,物理执行计划SparkPlan的基类。

TreeNode继承Scala中的Product类,其目的是可通过Product类中的方法(productArity、productElement、productIterator)来操纵TreeNode实现类的参数,这些实现类一般都是case class。

TreeNode是抽象类,用于被其他抽象或实现类继承,如Expression、LogicalPlan。TreeNode类声明如下:

abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product

并且定义了this的别名:

self: BaseType =>

这里约定TreeNode的泛型参数类型BaseType应是TreeNode[BasType]的子类型,并且继承TreeNode的实现类的类型就是传入的BaseType。比如Aggregate是LogicalPlan的实现类,LogicalPlan继承QueryPlan[LogicalPlan],又有QueryPlan[PlanType <: QueryPlan[PlanType]]继承TreeNode[PlanType],因此Aggregate的self类型就是Aggregate。

TreeNode中的成员和方法

TreeNode可以细分成三种类型的Node:

  • UnaryNode 一元节点,即只有一个子节点。如Limit、Filter操作
  • BinaryNode 二元节点,即有左右子节点的二叉节点。如Jion、Union操作
  • LeafNode 叶子节点,没有子节点的节点。主要用户命令类操作,如SetCommand

TreeNode中定义的方法都是对AST的基本操作方法。

在Catalyst中,对应的是TreeNode。TreeNode是Spark SQL中所有树结构的基类,定义了树遍历的接口和一系列通用的集合操作,例如:

  • collectLeaves : 获取当前TreeNode的所有叶子节点
  • collectFirst : 先序遍历所有节点并返回第一个满足条件的节点
  • withNewChildren : 将当前节点的子节点替换为新的子节点
  • transformDown : 用先序遍历顺序将规则作用于所有节点
  • transformUp : 用后序遍历方式将规则作用于所有节点
  • transformChildren : 递归将规则作用到所有子节点
    TreeNode详细方法说明

Expression

    原文作者:丹之
    原文地址: https://www.jianshu.com/p/09af4c44198f
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞