由一条SQL分析SparkSQL执行过程(三)

对于下面一段SQL

SELECT a.uid,b.name,SUM(clk_pv) AS clk_pv 
FROM log  a
JOIN user b ON  a.uid = b.uid
WHERE a.fr = 'android'
GROUP BY a.uid,b.name

在由一条SQL分析SparkSQL执行过程(二)中,我们分析到Spark如何封装SessionState,使得用户只需要通过SparkSQL入口SparkSession就能很容易的做分布式运算。
其大致的思路是:

  1. SparkSession是一个伴生对象。在SparkSession对象中包含一个builder,用户通过Builder配置自己的SparkSession
  2. SparkSession类初始化需传入参数SessionState。Builder在创建SparkSession实例时会通过读取配置,反射加载的方式创建SessionState。
  3. SessionState的初始化过程,是解析器,分析器,CataLog,优化器,Planner,查询执行的创建过程
  4. 后续的SQL操作实际上是调用SessionState的几个组件做对应操作。

在由一条SQL分析SparkSQL执行过程(一)中,我们分析了SQL执行的过程实际是对解析器解析的AST(抽象语法树)遍历使用规则,将一棵抽象语法树转化为另外一棵树的过程。

那么Spark如何实现的呢?

1. 解析器

《由一条SQL分析SparkSQL执行过程(三)》 image.png

对于上图,
首先从类的功能和结构上,我们根据代码追根溯源,

  1. SparkSession底层初始化SessionState。
  2. SessionState通过SessionState的Builder构造。
  3. SessionStateBuidler是BaseSessionStateBuilder的子类,只对其中一个方法重写
  4. 在BaseSessionStateBuilder中完成解析器,分析器等组件的构造
    则还是上部分我们的结论,但是在解析器是如何构造的呢?

在BaseSessionStateBuilder中,我们看到

  /**
   * Session extensions defined in the [[SparkSession]].
   */
  protected def extensions: SparkSessionExtensions = session.extensions

  protected lazy val sqlParser: ParserInterface = {
    extensions.buildParser(session, new SparkSqlParser(conf))
  }
  1. 用户在创建SparkSession时,通过SparkSession的Builder的withExtensions可以指明extensions。
  2. extensions 是一个SparkSessionExtensions类型
    在Spark源码中,有这样一个例子说明如何构造外部的扩展
   SparkSession.builder()
   .master("...")
   .conf("...", true)
   .withExtensions { extensions =>
       extensions.injectResolutionRule { session =>
         ...
       }
      extensions.injectParser { (session, parser) =>
        ...
       }
     }
     .getOrCreate()
  }}}

在SparkSessionExtensions中构造解析器的方式如下:
1). 声明一个函数类型(Type),将SparkSession和当前解析接口转化为另一个解析接口
2). 构造一个空的可变的类型Buffer
3). 将外部的解析规则委托给底层解析器,并返回ParserBuilder类型

//声明一个函数类型(Type)
type ParserBuilder = (SparkSession, ParserInterface) => ParserInterface
//构造一个可变的类型Buffer
 private[this] val parserBuilders = mutable.Buffer.empty[ParserBuilder]
//合并外部的解析规则和自有的规则,
 private[sql] def buildParser(
      session: SparkSession,
      initial: ParserInterface): ParserInterface = {
    parserBuilders.foldLeft(initial) { (parser, builder) =>
      builder(session, parser)
    }
  }
def injectParser(builder: ParserBuilder): Unit = {
    parserBuilders += builder
  }

这段逻辑写的是这么绕。。。只能贴一下注释,以及注释的谷歌翻译

Inject a custom parser into the [[SparkSession]]. Note that the builder is passed a session and an initial parser. The latter allows for a user to create a partial parser and to delegate to the underlying parser for completeness. If a user injects more parsers, then the parsers are stacked on top of each other.
将自定义分析器注入到[[SparkSession]]中。请注意,构建器会传递一个会话和一个初始解析器。后者允许用户创建一个部分解析器并为了完整性委托给底层解析器。如果用户注入更多的解析器,那么解析器会堆叠在一起。

  1. 如果用户不添加外部的解析器,相当于new SparkSqlParser()
class SparkSqlParser(conf: SQLConf) extends AbstractSqlParser {
  val astBuilder = new SparkSqlAstBuilder(conf)

  private val substitutor = new VariableSubstitution(conf)

  protected override def parse[T](command: String)(toResult: SqlBaseParser => T): T = {
    super.parse(substitutor.substitute(command))(toResult)
  }
}

实际是通过SparkSqlAstBuilder 构造一个astBuilder.并重写父类AbstractSqlParser 的parse方法。父类是一个ParseInferface类型。

  1. SparkSqlAstBuilder 是将ANTLR ParseTree 转化为LogicalPlan/Expression/TableIdentifier.

  2. SparkSqlAstBuilder 继承自AstBuilder包含常见的命令解析如create Database,show Database 等,这是因为AstBuilder继承自SqlBaseBaseVisitor。SqlBaseBaseVisitor实现了数据库常见的操作。

总结
SparkSQL是通过AntlrV4这个开源解析框架解析的http://www.antlr.org/.
在使用的时候,做了几层抽象和封装。

  • 构造这模式抽象AstBuilder,将AntlrV4的SQL语法实现细节封装
  • 封装SparksqlParser ,并使用构造这模式,封装SparkSqlAstbuilder继承AstBuilder,那么AntlrV4的特性就都能被SparkSqlParser使用。
  • 为了兼容用户自定义的解析器,解析器定定一个为ParseInferface接口类型。用户可以通过SparkSession注入自己的解析器,Spark底层会将用户的解析器和SPark的解析器做合并和统一,并保证完整性。

回到最开始的SQL:

sessionState.sqlParser.parsePlan(sqlText)

实际调用的是SparkSqlParser父类AbstractSqlParser(继承ParserInterface)的方法:

  override def parsePlan(sqlText: String): LogicalPlan = parse(sqlText) { parser =>
    astBuilder.visitSingleStatement(parser.singleStatement()) match {
      case plan: LogicalPlan => plan
      case _ =>
        val position = Origin(None, None)
        throw new ParseException(Option(sqlText), "Unsupported SQL statement", position, position)
    }
  }

实际使用的是Ast的visitSingleStatement方法解析单条SQL语句。

2. 分析器

《由一条SQL分析SparkSQL执行过程(三)》 image.png

语法规则千千万,怎么有效的组织起来呢?观察Spark的代码,我们发现就会像这样的问题:

  1. 规则检查是对解析器解析的逻辑计划做检查(包括CataLog检查)
  2. 逻辑计划是一颗树,按照递归的思维,每个节点都是一个逻辑计划,需进心规则检查
  3. 待验证的规则是可以穷举的,比如关系,子查询,CTE查询
  4. 规则检测的过程,就是对指定的规则列表,递归遍历逻辑计划验证是否满足规则条件。
    所以就有下面的抽象:
    一批规则—->规则<— 具体规则
    Batch——–>Rule<———-ResolveRelations(例子)

规则检查的过程
规则执行器RuleExecutor:待检查的一批规则以及规则检查运行的策略
Strangy(Once,fixedPoint)

一个分析器,就是一个RuleExecutor,包括一批规则以及执行规则的策略。

一份不完整的规则列表

《由一条SQL分析SparkSQL执行过程(三)》 image.png

分析器
分析器是按照指定的规则对逻辑计划做检查。Spark的做法是将规则封装成Batch,将规则的检查封装成规则执行器,一个规则执行器包括待检查的逻辑计划和检查的规则Batch。一旦调用Analysis方法,就会执行规则执行器的execute方法根据Batch规则对逻辑计划做检查。

3. 优化器

优化器和分析器的思路是一样的,只不过,这时候规则是按照预先设定的优化规则对解析和分析过的逻辑计划做进一步变换。比如子查询公用等。
一份不完整的优化规则列表

《由一条SQL分析SparkSQL执行过程(三)》 image.png

4. Planner

《由一条SQL分析SparkSQL执行过程(三)》 image.png

Planner 是一个转化器,将优化后的逻辑计划转换为物理计划。
因此,我们有以下的结论

  1. 首先定一个物理计划SparkPlan.物理几乎是从逻辑计划转化而来,所以物理计划也是一棵树。所以SparkPlan是目标的物理执行计划。
  2. 再次需有一个转化器,就是SparkPlaner,负责将逻辑计划转化为物理计划
  3. 转化的思路是:对逻辑计划树递归,通过Scala模式匹配的方式找到物理执行计划,再返回最好的物理计划(目前是返回第一个)。
  4. 转化的过程叫做Strategy。

Spark 在SparkPlanner中定义的一组策略

  def strategies: Seq[Strategy] =
    experimentalMethods.extraStrategies ++
      extraPlanningStrategies ++ (
      FileSourceStrategy ::
      DataSourceStrategy(conf) ::
      SpecialLimits ::
      Aggregation ::
      JoinSelection ::
      InMemoryScans ::
      BasicOperators :: Nil)
  1. SparkPlanner 本身是SparkStrategies的子类。SparkStrategies类定义了所有的逻辑计划到物理计划的策略。比如聚合操作,投影,连接等。
  2. 一个SparkStrategy 又是GenericStrategy[SparkPlan] 的子类。也就是说一个strategy有两个动作,planLater和apply
    planLater可以用来执行,apply将逻辑计划转成物理计划。
case class PlanLater(plan: LogicalPlan) extends LeafExecNode {

  override def output: Seq[Attribute] = plan.output

  protected override def doExecute(): RDD[InternalRow] = {
    throw new UnsupportedOperationException()
  }
}

举一个Limit的例子:

  1. 将逻辑计划转化 物理计划
 object SpecialLimits extends Strategy {
    override def apply(plan: LogicalPlan): Seq[SparkPlan] = plan match {
      case logical.ReturnAnswer(rootPlan) => rootPlan match {
        case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
          execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
        case logical.Limit(
            IntegerLiteral(limit),
            logical.Project(projectList, logical.Sort(order, true, child))) =>
          execution.TakeOrderedAndProjectExec(
            limit, order, projectList, planLater(child)) :: Nil
        case logical.Limit(IntegerLiteral(limit), child) =>
          execution.CollectLimitExec(limit, planLater(child)) :: Nil
        case other => planLater(other) :: Nil
      }
      case logical.Limit(IntegerLiteral(limit), logical.Sort(order, true, child)) =>
        execution.TakeOrderedAndProjectExec(limit, order, child.output, planLater(child)) :: Nil
      case logical.Limit(
          IntegerLiteral(limit), logical.Project(projectList, logical.Sort(order, true, child))) =>
        execution.TakeOrderedAndProjectExec(
          limit, order, projectList, planLater(child)) :: Nil
      case _ => Nil
    }
  }

  1. 匹配到对应的物理计划类,转化为执行RDD运算doExecute
case class TakeOrderedAndProjectExec(
    limit: Int,
    sortOrder: Seq[SortOrder],
    projectList: Seq[NamedExpression],
    child: SparkPlan) extends UnaryExecNode {

  override def output: Seq[Attribute] = {
    projectList.map(_.toAttribute)
  }

  override def executeCollect(): Array[InternalRow] = {
    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
    val data = child.execute().map(_.copy()).takeOrdered(limit)(ord)
    if (projectList != child.output) {
      val proj = UnsafeProjection.create(projectList, child.output)
      data.map(r => proj(r).copy())
    } else {
      data
    }
  }

  private val serializer: Serializer = new UnsafeRowSerializer(child.output.size)

  protected override def doExecute(): RDD[InternalRow] = {
    val ord = new LazilyGeneratedOrdering(sortOrder, child.output)
    val localTopK: RDD[InternalRow] = {
      child.execute().map(_.copy()).mapPartitions { iter =>
        org.apache.spark.util.collection.Utils.takeOrdered(iter, limit)(ord)
      }
    }
    val shuffled = new ShuffledRowRDD(
      ShuffleExchange.prepareShuffleDependency(
        localTopK, child.output, SinglePartition, serializer))
    shuffled.mapPartitions { iter =>
      val topK = org.apache.spark.util.collection.Utils.takeOrdered(iter.map(_.copy()), limit)(ord)
      if (projectList != child.output) {
        val proj = UnsafeProjection.create(projectList, child.output)
        topK.map(r => proj(r))
      } else {
        topK
      }
    }
  }

一旦查询执行调用executePlan.execute方法时,就会触发执行对应物理计划的doExecute,执行RDD运算。

    原文作者:阿海与蜗牛
    原文地址: https://www.jianshu.com/p/534d6868d3d6
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞