SparkSQL内部实现的基础框架叫做Catalyst。Catalyst涉及了几个基础性概念,包括:InternalRow体系、TreeNode体系和Expression体系。
InternalRow
InternalRow的含义很直观,表示一行数据。物理算子树节点产生和转换的RDD的类型就是RDD[InternalRow]。InternalRow是一个抽象类,包括numFields,update方法和各列数据的get/set方法。
具体的逻辑由InternalRow的不同子类实现,目前包括BaseGenericInternalRow、UnsafeRow和JoinedRow三个子类。
- BaseGenericInternalRow 同样是抽象类,实现了InternalRow定义的所有get方法,实现是通过调用类中定义的genericGet虚函数进行;
- JoinedRow 用于实现Join操作,将两个InternalRow放在一起,形成新的InternalRow。
- UnsafeRow 一种Spark自定义的对象存储格式,不采用Java对象存储,避免了JVM中的垃圾回收;UnsafeRow对行数据进行了编码,使得存储更加高效;
BaseGenericInternalRow的衍生出3个子类,GenericInternalRow、SpecificInternalRow和MutableUnsafeRow。而GernericInternalRow和SpecificInternalRow的区别在于构造参数的类型,其中GernericInternalRow的构造参数类型是Array[Any], 一旦创建就不允许通过set操作修改; 而SpecificInternalRow的构造参数类型是Array[MutableValue],允许通过set操作修改。
GenericInternalRow
/**
* An internal row implementation that uses an array of objects as the underlying storage.
* Note that, while the array is not copied, and thus could technically be mutated after creation,
* this is not allowed.
*/
class GenericInternalRow(val values: Array[Any]) extends BaseGenericInternalRow {
/** No-arg constructor for serialization. */
protected def this() = this(null)
def this(size: Int) = this(new Array[Any](size))
override protected def genericGet(ordinal: Int) = values(ordinal)
override def toSeq(fieldTypes: Seq[DataType]): Seq[Any] = values.clone()
override def numFields: Int = values.length
override def setNullAt(i: Int): Unit = { values(i) = null}
override def update(i: Int, value: Any): Unit = { values(i) = value }
SpecificInternalRow
/**
* A row type that holds an array specialized container objects, of type [[MutableValue]], chosen
* based on the dataTypes of each column. The intent is to decrease garbage when modifying the
* values of primitive columns.
*/
final class SpecificInternalRow(val values: Array[MutableValue]) extends BaseGenericInternalRow {
def this(dataTypes: Seq[DataType]) =
this(
dataTypes.map {
case BooleanType => new MutableBoolean
case ByteType => new MutableByte
case ShortType => new MutableShort
// We use INT for DATE internally
case IntegerType | DateType => new MutableInt
// We use Long for Timestamp internally
case LongType | TimestampType => new MutableLong
case FloatType => new MutableFloat
case DoubleType => new MutableDouble
case _ => new MutableAny
}.toArray)
def this() = this(Seq.empty)
def this(schema: StructType) = this(schema.fields.map(_.dataType))
override def numFields: Int = values.length
UnsafeRow
/**
* An Unsafe implementation of Row which is backed by raw memory instead of Java objects.
* ...
*/
public final class UnsafeRow extends InternalRow implements Externalizable, KryoSerializable
TreeNode
TreeNode是Catalyst中和执行计划相关的所有AST,包括表达式Expression、逻辑执行计划LogicalPlan,物理执行计划SparkPlan的基类。
TreeNode继承Scala中的Product类,其目的是可通过Product类中的方法(productArity、productElement、productIterator)来操纵TreeNode实现类的参数,这些实现类一般都是case class。
TreeNode是抽象类,用于被其他抽象或实现类继承,如Expression、LogicalPlan。TreeNode类声明如下:
abstract class TreeNode[BaseType <: TreeNode[BaseType]] extends Product
并且定义了this的别名:
self: BaseType =>
这里约定TreeNode的泛型参数类型BaseType应是TreeNode[BasType]的子类型,并且继承TreeNode的实现类的类型就是传入的BaseType。比如Aggregate是LogicalPlan的实现类,LogicalPlan继承QueryPlan[LogicalPlan],又有QueryPlan[PlanType <: QueryPlan[PlanType]]继承TreeNode[PlanType],因此Aggregate的self类型就是Aggregate。
TreeNode中的成员和方法
TreeNode可以细分成三种类型的Node:
- UnaryNode 一元节点,即只有一个子节点。如Limit、Filter操作
- BinaryNode 二元节点,即有左右子节点的二叉节点。如Jion、Union操作
- LeafNode 叶子节点,没有子节点的节点。主要用户命令类操作,如SetCommand
TreeNode中定义的方法都是对AST的基本操作方法。
在Catalyst中,对应的是TreeNode。TreeNode是Spark SQL中所有树结构的基类,定义了树遍历的接口和一系列通用的集合操作,例如:
- collectLeaves : 获取当前TreeNode的所有叶子节点
- collectFirst : 先序遍历所有节点并返回第一个满足条件的节点
- withNewChildren : 将当前节点的子节点替换为新的子节点
- transformDown : 用先序遍历顺序将规则作用于所有节点
- transformUp : 用后序遍历方式将规则作用于所有节点
- transformChildren : 递归将规则作用到所有子节点
TreeNode详细方法说明