Spark HiveThriftServer2启动流程源码分析

背景

接触SparkSQL不久,查找了些别人的资料,感觉对整个Spark HiveThriftServer2流程讲的糊里糊涂的,觉得需要从Beeline连接HiveThriftServer2开始,梳理下执行SQL的流程。

由于公司Spark集群对上层提供的主要是SparkSQL服务,之前对Hive2也接触较少,总好奇运行在Spark On Yarn的任务,是如何同Hive2做交互的,最终真正的物理计划(Task)又是如何跑在Spark上。

该文比较简单地梳理下,Spark HiveThrift2是如何重载HiveCli/HiveSession/OperationManger等实现从Beeline入口,到Spark SQL上的操作的。

随后的SQL解析、逻辑计划、物理计划、Spark On Yarn的执行,找时间一连串的梳理一遍。

一、 Spark HiveThriftServer2启动流程

1. 从Beeline开始

Beeline代码没有认真对,主要是找出Beeline->HiveCli交互的入口,实际上Beeline是通过JDBC同HiveCli进行连接的。

beeline.main()->mainWithInputRedirection()->begin()->execute()->dispatch()

boolean dispath(String line){
  ...
  if (isBeeLine) {
      if (line.startsWith(COMMAND_PREFIX)) {
        // handle SQLLine command in beeline which starts with ! and does not end with ;
        return execCommandWithPrefix(line);
      } else {
        return commands.sql(line, getOpts().getEntireLineAsCommand());
      }
    } else {
      return commands.sql(line, getOpts().getEntireLineAsCommand());
   }
}

Commands.sql()->execute()->executeInternal()

private boolean executeInternal(String sql, boolean call) {

 ...
 
 try {
      Statement stmnt = null;
      boolean hasResults;
      Thread logThread = null;

      try {
        long start = System.currentTimeMillis();

        if (call) {
          stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql);
          hasResults = ((CallableStatement) stmnt).execute();
        } else {
          // 创建statement(HiveStatement重载了sql.Statement)
          stmnt = beeLine.createStatement();
          
          if (beeLine.getOpts().isSilent()) {
            // 执行sql
            hasResults = stmnt.execute(sql);
          } else {
            logThread = new Thread(createLogRunnable(stmnt));
            logThread.setDaemon(true);
            logThread.start();
            hasResults = stmnt.execute(sql);
            logThread.interrupt();
            logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
          }
        }
   }finally{
      ...
   }
}     

HiveStatement真正处理sql的方法:

  public boolean execute(String sql) throws SQLException {
    // 此处为真正处理sql的函数
    runAsyncOnServer(sql);
    
    waitForOperationToComplete();
    
     // 此处为已经查询完毕,format输出结果
    // The query should be completed by now
    if (!stmtHandle.isHasResultSet()) {
      return false;
    }
    resultSet =  new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
        .setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
        .setScrollable(isScrollableResultset)
        .build();
    return true;
  }
  
  // 异步地同Server交互(其实是Cli层)
  private void runAsyncOnServer(String sql) throws SQLException {
    checkConnection("execute");

    closeClientOperation();
    initFlags();

    TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
    /**
     * Run asynchronously whenever possible
     * Currently only a SQLOperation can be run asynchronously,
     * in a background operation thread
     * Compilation is synchronous and execution is asynchronous
     */
    execReq.setRunAsync(true);
    execReq.setConfOverlay(sessConf);
    execReq.setQueryTimeout(queryTimeout);
    try {
        // client 为TCLIService.Iface类型,基于RPC执行该Statement
        
      TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
      Utils.verifySuccessWithInfo(execResp.getStatus());
      stmtHandle = execResp.getOperationHandle();
      isExecuteStatementFailed = false;
    } catch (SQLException eS) {
      isExecuteStatementFailed = true;
      throw eS;
    } catch (Exception ex) {
      isExecuteStatementFailed = true;
      throw new SQLException(ex.toString(), "08S01", ex);
    }
  }
  
  public abstract class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
   ...
  }

2. 走到了ThriftCLIService

先来看看被beeline调用的ExecuteStatement()函数做了什么事情,其就是基于Thrift协议获取请求信息,同时将cliService执行的结果以Thrift协议返回。真正的执行还要往下走。

@Override
  public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
    TExecuteStatementResp resp = new TExecuteStatementResp();
    try {
      SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
      String statement = req.getStatement();
      Map<String, String> confOverlay = req.getConfOverlay();
      Boolean runAsync = req.isRunAsync();
      long queryTimeout = req.getQueryTimeout();
      
      //此处cliService为CliService变量,真正执行操作还要往下走(快要逼近真相了..)
      OperationHandle operationHandle =
          runAsync ? cliService.executeStatementAsync(sessionHandle, statement, confOverlay,
              queryTimeout) : cliService.executeStatement(sessionHandle, statement, confOverlay,
              queryTimeout);
      resp.setOperationHandle(operationHandle.toTOperationHandle());
      resp.setStatus(OK_STATUS);
    } catch (Exception e) {
      // Note: it's rather important that this (and other methods) catch Exception, not Throwable;
      // in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used
      // to also catch all errors; and now it allows OOMs only to propagate.
      LOG.warn("Error executing statement: ", e);
      resp.setStatus(HiveSQLException.toTStatus(e));
    }
    return resp;
  }

3. CliService做了什么?

此处为破解Spark同HiveCli交互真相的关键,看到之类所有的方法类,是不是都在Spark/sql/HiveThrift下见过相似的变量。

真相其实很简单,Spark HiveThrift2所做的是事情就是将下面这个处理用到的SessionManager/OperationManager/CliService方法重载下,下面这个方法调用的时候就跑到SparkSQL上了。

不信,你往下看。

  /**
   * Execute statement on the server with a timeout. This is a blocking call.
   */
  @Override
  public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
      Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
    OperationHandle opHandle =
        sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay,
            queryTimeout);
    LOG.debug(sessionHandle + ": executeStatement()");
    return opHandle;
  }

4. 回到SparkSQLCliService

SparkSQLCLIService继承CLIService,同时重载了init()方法。init()方法,将sessionManager至为SparkSQLSessionManager()类。

这里有趣的ReflectedCompositeService类,基于反射原理设置制定父类的变量类型。(不细分析了,原理有别的文章解释)

private[hive] class SparkSQLCLIService(
    hiveServer: HiveServer2)
  extends CLIService(hiveServer)
  with ReflectedCompositeService {

  override def init(hiveConf: HiveConf) {
    this.hiveConf = hiveConf
    this.sessionManager = new SparkSQLSessionManager(hiveServer)
    addService(sessionManager)
    this.serviceUGI = MultiSparkSQLEnv.globalUgi
    initCompositeService(hiveConf)
  }
  
  ...
  
  private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
  def initCompositeService(hiveConf: HiveConf) {
    // Emulating `CompositeService.init(hiveConf)`
    val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
    serviceList.asScala.foreach(_.init(hiveConf))

    // Emulating `AbstractService.init(hiveConf)`
    invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
    setAncestorField(this, 3, "hiveConf", hiveConf)
    invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
    getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
  }

}

5. SparkSQLSessionManager做了些什么?

SparkSQLSessionManager为理解这一环最绕的一个地方,它通过重载openSession()来实现SparkSQLOperationManager的调用。

sessionManager.getSession(sessionHandle).executeStatement()

SparkSQLSessionManager类的具体实现:

private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext)
  extends SessionManager(hiveServer)
  with ReflectedCompositeService {

  // !!!
  private lazy val sparkSqlOperationManager = new SparkSQLOperationManager()

  //重载init操作,实现backgroundPool,此为操作operation的线程池
  override def init(hiveConf: HiveConf) {
    setSuperField(this, "hiveConf", hiveConf)

    // Create operation log root directory, if operation logging is enabled
    if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
      invoke(classOf[SessionManager], this, "initOperationLogRootDir")
    }

    val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
    setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
    getAncestorField[Log](this, 3, "LOG").info(
      s"HiveServer2: Async execution pool size $backgroundPoolSize")
      
     // 此处将父类的operationManager致为sparkSqlOperationManager,在sparkSqlOperationManager类中,实现SparkSQL操作,而非Hive的MapReduce操作
    setSuperField(this, "operationManager", sparkSqlOperationManager)
    addService(sparkSqlOperationManager)

    initCompositeService(hiveConf)
  }

  override def openSession(
      protocol: TProtocolVersion,
      username: String,
      passwd: String,
      ipAddress: String,
      sessionConf: java.util.Map[String, String],
      withImpersonation: Boolean,
      delegationToken: String): SessionHandle = {
    // 此处关键:先调用父类的该方法(该方法操作在下面),其实际上将
    val sessionHandle =
      super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation,
          delegationToken)
    val session = super.getSession(sessionHandle)
    HiveThriftServer2.listener.onSessionCreated(
      session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
    val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
      sqlContext
    } else {
      sqlContext.newSession()
    }
    ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
    if (sessionConf != null && sessionConf.containsKey("use:database")) {
      ctx.sql(s"use ${sessionConf.get("use:database")}")
    }
    sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
    sessionHandle
  }
 }

Hive SessionManager中的openSession(),实际上是调用createSession()函数完成的,同时创建一个HiveSessionImpl/HiveSessionImplWithUGI/HiveSessionProxy返回。

public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username,
    String password, String ipAddress, Map<String, String> sessionConf, boolean withImpersonation,
    String delegationToken)
    throws HiveSQLException {

    HiveSession session;
    // If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.
    // Within the proxy object, we wrap the method call in a UserGroupInformation#doAs
    
    
    if (withImpersonation) {
      HiveSessionImplwithUGI hiveSessionUgi;
      if (sessionImplWithUGIclassName == null) {
        hiveSessionUgi = new HiveSessionImplwithUGI(sessionHandle, protocol, username, password,
            hiveConf, ipAddress, delegationToken);
      } else {
        try {
          Class<?> clazz = Class.forName(sessionImplWithUGIclassName);
          
          Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, String.class,
            String.class, HiveConf.class, String.class, String.class);
          hiveSessionUgi = (HiveSessionImplwithUGI) constructor.newInstance(sessionHandle,
              protocol, username, password, hiveConf, ipAddress, delegationToken);
        } catch (Exception e) {
          throw new HiveSQLException("Cannot initilize session class:" + sessionImplWithUGIclassName);
        }
      }
      session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi());
      hiveSessionUgi.setProxySession(session);
    } else {
      if (sessionImplclassName == null) {
        session = new HiveSessionImpl(sessionHandle, protocol, username, password, hiveConf,
          ipAddress);
      } else {
        try {
        Class<?> clazz = Class.forName(sessionImplclassName);
        Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class,
          String.class, String.class, HiveConf.class, String.class);
          
        // 实际上是调用Hive中的HiveSessionImpl/HiveSessionImplWithUGI/HiveSessionProxy类... 实现的
        
        session = (HiveSession) constructor.newInstance(sessionHandle, protocol, username, password,
          hiveConf, ipAddress);
        } catch (Exception e) {
          throw new HiveSQLException("Cannot initilize session class:" + sessionImplclassName, e);
        }
      }
    }
    // 此处是将sparkSqlOperationManager赋值于此,在HiveSessionImpl中会使用
    session.setSessionManager(this);
    session.setOperationManager(operationManager);
    try {
      session.open(sessionConf);
    } catch (Exception e) {
      LOG.warn("Failed to open session", e);
      try {
        session.close();
      } catch (Throwable t) {
        LOG.warn("Error closing session", t);
      }
      session = null;
      throw new HiveSQLException("Failed to open new session: " + e.getMessage(), e);
    }
    if (isOperationLogEnabled) {
      session.setOperationLogSessionDir(operationLogRootDir);
    }
    try {
      executeSessionHooks(session);
    } catch (Exception e) {
      LOG.warn("Failed to execute session hooks", e);
      try {
        session.close();
      } catch (Throwable t) {
        LOG.warn("Error closing session", t);
      }
      session = null;
      throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e);
    }
    handleToSession.put(session.getSessionHandle(), session);
    LOG.info("Session opened, " + session.getSessionHandle() + ", current sessions:" + getOpenSessionCount());
    return session;
  }

既然getSession()方法返回的HiveSessionImpl/HiveSessionImplWithUGI/HiveSessionProxy, 那么自然而然都要实现executeStatement()方法了。而在这些类的内部,真正完成操作的是executeStatementInternal()函数,该函数实现很简单,具体如下:

private OperationHandle executeStatementInternal(String statement,
      Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
    acquire(true, true);

    ExecuteStatementOperation operation = null;
    OperationHandle opHandle = null;
    try {
    
        // 看到了没? 由于之前我们重置了operationManager变量,此处实际上调用的是sparkSqlOperationManager类,总算解开谜底
      operation = getOperationManager().newExecuteStatementOperation(getSession(), statement,
          confOverlay, runAsync, queryTimeout);
      opHandle = operation.getHandle();
      operation.run();
      addOpHandle(opHandle);
      return opHandle;
    } catch (HiveSQLException e) {
      // Refering to SQLOperation.java, there is no chance that a HiveSQLException throws and the
      // async background operation submits to thread pool successfully at the same time. So, Cleanup
      // opHandle directly when got HiveSQLException
      if (opHandle != null) {
        getOperationManager().closeOperation(opHandle);
      }
      throw e;
    } finally {
      if (operation == null || operation.getBackgroundHandle() == null) {
        release(true, true); // Not async, or wasn't submitted for some reason (failure, etc.)
      } else {
        releaseBeforeOpLock(true); // Release, but keep the lock (if present).
      }
    }
  }  

Operation类为一个抽象类,继承该类的对象需要实现run()/internalRun()方法,在该执行操作中,真正执行为operator.run()。

6. SparkSQLOperationManager类

该类重载了newExecuteStatementOperation()函数, 最终返回的已经是一个SparkExecuteStatementOperation()对象了。

override def newExecuteStatementOperation(
      parentSession: HiveSession,
      statement: String,
      confOverlay: JMap[String, String],
      async: Boolean): ExecuteStatementOperation = synchronized {

    val sessionHandle = parentSession.getSessionHandle
    val sparkSession = sessionToSparkSession.get(sessionHandle)
    var client = sessionToClient.get(sessionHandle)
    val formatted = statement.toLowerCase.split("//s+").mkString(" ")
   ...
   
    val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
    val runInBackground = async && sessionState.hiveThriftServerAsync
    val operation = new SparkExecuteStatementOperation(
      parentSession,
      statement,
      client,
      confOverlay,
      runInBackground)(sparkSession, sessionToActivePool)
    handleToOperation.put(operation.getHandle, operation)
    logDebug(s"Created Operation for $statement with session=$parentSession, " +
      s"runInBackground=$runInBackground")
    operation
  }

7. 一条SQL真正的命运

上文生成的SparkExecuteStatementOperation对象,调用runInternal()方法调用的是execute()。execute()真正调用的则是result = sparkSession.sql(statement), 做了这么多准备,真正的实现其实就像我们自己写个demo时的ss.sql()操作,感觉好费周折啊。

private def execute(): Unit = {
    statementId = UUID.randomUUID().toString
    logInfo(s"Running query '$statement' with $statementId")
    setState(OperationState.RUNNING)
    // Always use the latest class loader provided by executionHive's state.
    val executionHiveClassLoader = sparkSession.sharedState.jarClassLoader
    Thread.currentThread().setContextClassLoader(executionHiveClassLoader)

    HiveThriftServer2.listener.onStatementStart(
      statementId,
      parentSession.getSessionHandle.getSessionId.toString,
      statement,
      statementId,
      parentSession.getUsername)
    sparkSession.sparkContext.setJobGroup(statementId, statement)
    val pool = sessionToActivePool.get(parentSession.getSessionHandle)
    if (pool != null) {
      sparkSession.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
    }
    try {
        // 一条SQL真正的命运
      result = sparkSession.sql(statement)
      logDebug(result.queryExecution.toString())
      result.queryExecution.logical match {
        case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
          sessionToActivePool.put(parentSession.getSessionHandle, value)
          logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
        case _ =>
      }
      HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
      iter = {
        val useIncrementalCollect =
          sparkSession.conf.get("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
        if (useIncrementalCollect) {
          result.toLocalIterator.asScala
        } else {
          result.collect().iterator
        }
      }
      val (itra, itrb) = iter.duplicate
      iterHeader = itra
      iter = itrb
      dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
    } catch {
      case e: HiveSQLException =>
        if (getStatus().getState() == OperationState.CANCELED) {
          return
        } else {
          setState(OperationState.ERROR)
          throw e
        }
      // Actually do need to catch Throwable as some failures don't inherit from Exception and
      // HiveServer will silently swallow them.
      case e: Throwable =>
        val currentState = getStatus().getState()
        logError(s"Error executing query, currentState $currentState, ", e)
        setState(OperationState.ERROR)
        HiveThriftServer2.listener.onStatementError(
          statementId, e.getMessage, SparkUtils.exceptionString(e))
        throw new HiveSQLException(e.toString)
    }
    setState(OperationState.FINISHED)
    HiveThriftServer2.listener.onStatementFinish(statementId)
  }

二、总结

本文没有细枝末节的分析各个方法和变量,只是在脑海中走一遍一条SQL在Spark HiveThriftServer2环境中的运行流程,看看大致如何处理的。

而深有感触的是,基于Spark借Hive的壳子,通过java/scala语言的重载功能,很简单、方便的将原Hive功能扩展到Spark上,非常巧妙。

同时基于JDBC查询SQL的功能,中间经过了HiveCommands/HiveStatement/ThriftCli/CliService/SessionManager/SessionImpl/OperationManager/Operator等等许多层的抽象,虽然体会起来很繁杂,但之于上述所说的扩展工作,却显得万分必要。后续工作中,切记学习该项工作。

当然,此文仅仅分析了一条SQL刚刚开始执行的非常简单的逻辑,后续会着手再往下走,一条SQL是如何转变成一个个基于executor执行的task的,把Spark的逻辑梳理一遍。

参考

    原文作者:分裂四人组
    原文地址: https://www.jianshu.com/p/679de255735a
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞