Hive on spark的架构与解析SQL的过程

一、 Hive on spark的基本架构/

1. Hive 的架构

《Hive on spark的架构与解析SQL的过程》 Hive架构

Hive的整体架构可以分成以下几大部分:

  1. 用户接口 支持CLI, JDBC和Web UI
  2. Driver Driver负责将用户指令翻译转换成为相应的MapReduce Job
  3. MetaStore 元数据存储仓库,像数据库和表的定义这些内容就属于元数据这个范畴,默认使用的是Derby存储引擎

2. Hive on spark的架构

Hive on Spark总体的设计思路是,尽可能重用Hive逻辑层面的功能;从生成物理计划开始,提供一整套针对Spark的实现。

用什么算?算什么?怎么算?

计算引擎:spark

通过hive.execution.engine来设置计算引擎,该参数可选的值为mr、tez和spark。

Hadoop:mr

计算对象:以Hive的表作为RDD

将Hive的表转化为RDD以便Spark处理。本质上,Hive的表和Spark的HadoopRDD都是HDFS上的一组文件,通过InputFormat和RecordReader读取其中的数据,因此这个转化是自然而然的。

Hadoop : DSM
RDD & DSM 的对比

计算逻辑:使用Hive原语

这里主要是指使用Hive的操作符对数据进行处理。将Hive的操作符包装为Function,然后应用到RDD上。这样,我们只需要依赖较少的几种RDD的转换,而主要的计算逻辑仍由Hive提供。

Spark :Transformation

Spark为RDD提供了一系列的转换(Transformation),其中有些转换也是面向SQL的,如groupByKey、join等。但如果使用这些转换(就如Shark所做的那样),就意味着我们要重新实现一些Hive已有的功能;而且当Hive增加新的功能时,我们需要相应地修改Hive on Spark模式。
由于使用了Hive的原语,因此我们需要显式地调用一些Transformation来实现Shuffle的功能。下表中列举了Hive on Spark使用的所有转换。

《Hive on spark的架构与解析SQL的过程》

repartitionAndSortWithinPartitions功能目的是提供一种MapReduce风格的Shuffle。虽然sortByKey也提供了排序的功能,但某些情况下我们并不需要全局有序,另外其使用的Range Partitioner对于某些Hive的查询并不适用。

元数据管理:HiveMetastoreCatalog

HiveMetastoreCatalog是Spark中对Hive Metastore访问的wrapper。HiveMetastoreCatalog通过调用相应的Hive Api可以获得数据库中的表及表的分区,也可以创建新的表和分区。

《Hive on spark的架构与解析SQL的过程》 Paste_Image.png

HiveMetastoreCatalog中会通过hive client来访问metastore中的元数据,使用了大量的Hive Api,对Hive Library依赖。

物理执行计划:Spark Task

通过SparkCompiler将Operator Tree转换为Task Tree,其中需要提交给Spark执行的任务即为SparkTask。

MapReduce:Map+Reduce的两阶段执行模式。
Spark:DAG执行模式。

DAG(Directed acyclic graph,有向无环图)

在Spark作业调度系统中,调度的前提是判断多个作业任务的依赖关系,这些作业任务之间可能存在因果的依赖关系,也就是说有些任务必须先获得执行,然后相关的依赖任务才能执行,但是任务之间显然不应出现任何直接或间接的循环依赖关系,所以本质上这种关系适合用DAG表示。

Spark之所以outperform Hadoop的关键有二:DAG scheduler和intermediate data in memory。Hadoop用的是AG而不是DAG。一个DAG可以包含多个AG。DAG除了可以提升scheduler效率之外,它同时是Spark Fault tolerance机制-Lineage 追溯的基础。

因此一个SparkTask包含了一个表示RDD转换的DAG,我们将这个DAG包装为SparkWork。执行SparkTask时,就根据SparkWork所表示的DAG计算出最终的RDD,然后通过RDD的foreachAsync来触发运算。
使用foreachAsync是因为我们使用了Hive原语,因此不需要RDD返回结果;此外foreachAsync异步提交任务便于我们对任务进行监控。

任务监控与统计信息收集:SparkListener & Spark API & Accumulator

Spark提供了SparkListener接口来监听任务执行期间的各种事件,因此我们可以实现一个Listener来监控任务执行进度以及收集任务级别的统计信息(目前任务级别的统计由SparkListener采集,任务进度则由Spark提供的专门的API来监控)。
另外Hive还提供了Operator级别的统计数据信息,比如读取的行数等。

MapReduce:Hadoop Counter
Spark:Accumulator

二、 Hive on spark的内部实现机制

参考链接:

Hive on MR:Hive SQL执行计划深度解析
Hive on spark:Hive on Spark解析
Spark SQL:sql的解析与执行
Hive on spark:hive on spark实现详解

1. Hive流程:

《Hive on spark的架构与解析SQL的过程》

  • 语法分析阶段,Hive利用Antlr将用户提交的SQL语句解析成一棵抽象语法树(Abstract Syntax Tree,AST)。
  • 生成逻辑计划包括通过Metastore获取相关的元数据,以及对AST进行语义分析。得到的逻辑计划为一棵由Hive操作符组成的树,Hive操作符即Hive对表数据的处理逻辑,比如对表进行扫描的TableScanOperator,对表做Group的GroupByOperator等。
  • 逻辑优化即对Operator Tree进行优化,与之后的物理优化的区别主要有两点:一是在操作符级别进行调整;二是这些优化不针对特定的计算引擎。比如谓词下推(Predicate Pushdown)就是一个逻辑优化:尽早的对底层数据进行过滤以减少后续需要处理的数据量,这对于不同的计算引擎都是有优化效果的。
  • 生成物理计划即针对不同的引擎,将Operator Tree划分为若干个Task,并按照依赖关系生成一棵Task的树(在生成物理计划之前,各计算引擎还可以针对自身需求,对Operator Tree再进行一轮逻辑优化)。比如,对于MapReduce,一个GROUP BY+ORDER BY的查询会被转化成两个MapReduce的Task,第一个进行Group,第二个进行排序。
  • 物理优化则是各计算引擎根据自身的特点,对Task Tree进行优化。比如对于MapReduce,Runtime Skew Join的优化就是在原始的Join Task之后加入一个Conditional Task来处理可能出现倾斜的数据。
  • 最后按照依赖关系,依次执行Task Tree中的各个Task,并将结果返回给用户。每个Task按照不同的实现,会把任务提交到不同的计算引擎上执行。

2. Hive on spark解析SQL的过程

SQL

《Hive on spark的架构与解析SQL的过程》

SQL语句在分析执行过程中会经历下图所示的几个步骤

《Hive on spark的架构与解析SQL的过程》

  1. 语法解析
  2. 操作绑定
  3. 优化执行策略
  4. 交付执行

语法解析

语法解析之后,会形成一棵语法树,如下图所示。树中的每个节点是执行的rule,整棵树称之为执行策略。

《Hive on spark的架构与解析SQL的过程》

策略优化

形成上述的执行策略树还只是第一步,因为这个执行策略可以进行优化,所谓的优化就是对树中节点进行合并或是进行顺序上的调整。

以大家熟悉的join操作为例,下图给出一个join优化的示例。A JOIN B等同于B JOIN A,但是顺序的调整可能给执行的性能带来极大的影响,下图就是调整前后的对比图。

《Hive on spark的架构与解析SQL的过程》

在Hash Join中,首先被访问的表称之为“内部构建表”,第二个表为“探针输入”。创建内部表时,会将数据移动到数据仓库指向的路径;创建外部表,仅记录数据所在的路径。

再举一例,一般来说尽可能的先实施聚合操作(Aggregate)然后再join

《Hive on spark的架构与解析SQL的过程》

这种优化自动完成,在调优时不需要考虑。

HQL

HiveContext是Spark提供的用户接口,HiveContext继承自SqlContext。
既然是继承自SqlContext,那么我们将普通sql与hiveql分析执行步骤做一个对比,可以得到下图。

《Hive on spark的架构与解析SQL的过程》

Entrypoint

hiveql是整个的入口点

  def hiveql(hqlQuery: String): SchemaRDD = {
    val result = new SchemaRDD(this, HiveQl.parseSql(hqlQuery))
    // We force query optimization to happen right away instead of letting it happen lazily like
    // when using the query DSL.  This is so DDL commands behave as expected.  This is only
    // generates the RDD lineage for DML queries, but does not perform any execution.
    result.queryExecution.toRdd
    result
  }

上述hiveql的定义与sql的定义几乎一模一样,唯一的不同是sql中使用parseSql的结果作为SchemaRDD的入参而hiveql中使用HiveQl.parseSql作为SchemaRdd的入参。

对比:

sql函数的定义如下

  def sql(sqlText: String): SchemaRDD = {
    val result = new SchemaRDD(this, parseSql(sqlText))

    result.queryExecution.toRdd
    result
  }

HiveQL, parser

parseSql的函数定义如代码所示,解析过程中将指令分成两大类:

  • nativecommand 非select语句,这类语句的特点是执行时间不会因为条件的不同而有很大的差异,基本上都能在较短的时间内完成

  • 非nativecommand 主要是select语句

def parseSql(sql: String): LogicalPlan = {
    try {
      if (sql.toLowerCase.startsWith("set")) {
        NativeCommand(sql)
      } else if (sql.toLowerCase.startsWith("add jar")) {
        AddJar(sql.drop(8))
      } else if (sql.toLowerCase.startsWith("add file")) {
        AddFile(sql.drop(9))
      } else if (sql.startsWith("dfs")) {
        DfsCommand(sql)
      } else if (sql.startsWith("source")) {
        SourceCommand(sql.split(" ").toSeq match { case Seq("source", filePath) => filePath })
      } else if (sql.startsWith("!")) {
        ShellCommand(sql.drop(1))
      } else {
        val tree = getAst(sql)

        if (nativeCommands contains tree.getText) {
          NativeCommand(sql)
        } else {
          nodeToPlan(tree) match {
            case NativePlaceholder => NativeCommand(sql)
            case other => other
          }
        }
      }
    } catch {
      case e: Exception => throw new ParseException(sql, e)
      case e: NotImplementedError => sys.error(
        s"""
          |Unsupported language features in query: $sql
          |${dumpTree(getAst(sql))}
        """.stripMargin)
    }
  } 

哪些指令是nativecommand呢,答案在HiveQl.scala中的nativeCommands变量。

对于非nativeCommand,最重要的解析函数就是nodeToPlan

Spark对HiveQL所做的优化主要体现在Query相关的操作,其它的依然使用Hive的原生执行引擎。

3. SQL到Spark作业的转换过程

native command的执行流程

由于native command是一些非耗时的操作,直接使用Hive中原有的exeucte engine来执行即可。这些command的执行示意图如下

《Hive on spark的架构与解析SQL的过程》

SparkTask的生成和执行

我们通过一个例子来看一下一个简单的两表JOIN查询如何被转换为SparkTask并被执行。下图左半部分展示了这个查询的Operator Tree,以及该Operator Tree如何被转化成SparkTask;右半部分展示了该SparkTask执行时如何得到最终的RDD并通过foreachAsync提交Spark任务。

《Hive on spark的架构与解析SQL的过程》

SparkCompiler遍历Operator Tree,将其划分为不同的MapWork和ReduceWork。

MapWork为根节点,总是由TableScanOperator(Hive中对表进行扫描的操作符)开始;后续的Work均为ReduceWork。ReduceSinkOperator(Hive中进行Shuffle输出的操作符)用来标记两个Work之间的界线,出现ReduceSinkOperator表示当前Work到下一个Work之间的数据需要进行Shuffle。因此,当我们发现ReduceSinkOperator时,就会创建一个新的ReduceWork并作为当前Work的子节点。包含了FileSinkOperator(Hive中将结果输出到文件的操作符)的Work为叶子节点。

与MapReduce最大的不同在于,我们并不要求ReduceWork一定是叶子节点,即ReduceWork之后可以链接更多的ReduceWork,并在同一个SparkTask中执行。

从该图可以看出,这个查询的Operator Tree被转化成了两个MapWork和一个ReduceWork。

执行SparkTask步骤:

  1. 根据MapWork来生成最底层的HadoopRDD,
  2. 将各个MapWork和ReduceWork包装成Function应用到RDD上。
  3. 在有依赖的Work之间,需要显式地调用Shuffle转换,具体选用哪种Shuffle则要根据查询的类型来确定。另外,由于这个例子涉及多表查询,因此在Shuffle之前还要对RDD进行Union。
  4. 经过这一系列转换后,得到最终的RDD,并通过foreachAsync提交到Spark集群上进行计算。

toRdd

在logicalPlan到physicalPlan的转换过程中,toRdd最关键的元素

override lazy val toRdd: RDD[Row] =
      analyzed match {
        case NativeCommand(cmd) =>
          val output = runSqlHive(cmd)

          if (output.size == 0) {
            emptyResult
          } else {
            val asRows = output.map(r => new GenericRow(r.split("\t").asInstanceOf[Array[Any]]))
            sparkContext.parallelize(asRows, 1)
          }
        case _ =>
          executedPlan.execute().map(_.copy())
      }
    原文作者:ZYJ2016
    原文地址: https://www.jianshu.com/p/6e7c8a4b5c37
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞