背景
接触SparkSQL不久,查找了些别人的资料,感觉对整个Spark HiveThriftServer2流程讲的糊里糊涂的,觉得需要从Beeline连接HiveThriftServer2开始,梳理下执行SQL的流程。
由于公司Spark集群对上层提供的主要是SparkSQL服务,之前对Hive2也接触较少,总好奇运行在Spark On Yarn的任务,是如何同Hive2做交互的,最终真正的物理计划(Task)又是如何跑在Spark上。
该文比较简单地梳理下,Spark HiveThrift2是如何重载HiveCli/HiveSession/OperationManger等实现从Beeline入口,到Spark SQL上的操作的。
随后的SQL解析、逻辑计划、物理计划、Spark On Yarn的执行,找时间一连串的梳理一遍。
一、 Spark HiveThriftServer2启动流程
1. 从Beeline开始
Beeline代码没有认真对,主要是找出Beeline->HiveCli交互的入口,实际上Beeline是通过JDBC同HiveCli进行连接的。
beeline.main()->mainWithInputRedirection()->begin()->execute()->dispatch()
boolean dispath(String line){
...
if (isBeeLine) {
if (line.startsWith(COMMAND_PREFIX)) {
// handle SQLLine command in beeline which starts with ! and does not end with ;
return execCommandWithPrefix(line);
} else {
return commands.sql(line, getOpts().getEntireLineAsCommand());
}
} else {
return commands.sql(line, getOpts().getEntireLineAsCommand());
}
}
Commands.sql()->execute()->executeInternal()
private boolean executeInternal(String sql, boolean call) {
...
try {
Statement stmnt = null;
boolean hasResults;
Thread logThread = null;
try {
long start = System.currentTimeMillis();
if (call) {
stmnt = beeLine.getDatabaseConnection().getConnection().prepareCall(sql);
hasResults = ((CallableStatement) stmnt).execute();
} else {
// 创建statement(HiveStatement重载了sql.Statement)
stmnt = beeLine.createStatement();
if (beeLine.getOpts().isSilent()) {
// 执行sql
hasResults = stmnt.execute(sql);
} else {
logThread = new Thread(createLogRunnable(stmnt));
logThread.setDaemon(true);
logThread.start();
hasResults = stmnt.execute(sql);
logThread.interrupt();
logThread.join(DEFAULT_QUERY_PROGRESS_THREAD_TIMEOUT);
}
}
}finally{
...
}
}
HiveStatement真正处理sql的方法:
public boolean execute(String sql) throws SQLException {
// 此处为真正处理sql的函数
runAsyncOnServer(sql);
waitForOperationToComplete();
// 此处为已经查询完毕,format输出结果
// The query should be completed by now
if (!stmtHandle.isHasResultSet()) {
return false;
}
resultSet = new HiveQueryResultSet.Builder(this).setClient(client).setSessionHandle(sessHandle)
.setStmtHandle(stmtHandle).setMaxRows(maxRows).setFetchSize(fetchSize)
.setScrollable(isScrollableResultset)
.build();
return true;
}
// 异步地同Server交互(其实是Cli层)
private void runAsyncOnServer(String sql) throws SQLException {
checkConnection("execute");
closeClientOperation();
initFlags();
TExecuteStatementReq execReq = new TExecuteStatementReq(sessHandle, sql);
/**
* Run asynchronously whenever possible
* Currently only a SQLOperation can be run asynchronously,
* in a background operation thread
* Compilation is synchronous and execution is asynchronous
*/
execReq.setRunAsync(true);
execReq.setConfOverlay(sessConf);
execReq.setQueryTimeout(queryTimeout);
try {
// client 为TCLIService.Iface类型,基于RPC执行该Statement
TExecuteStatementResp execResp = client.ExecuteStatement(execReq);
Utils.verifySuccessWithInfo(execResp.getStatus());
stmtHandle = execResp.getOperationHandle();
isExecuteStatementFailed = false;
} catch (SQLException eS) {
isExecuteStatementFailed = true;
throw eS;
} catch (Exception ex) {
isExecuteStatementFailed = true;
throw new SQLException(ex.toString(), "08S01", ex);
}
}
public abstract class ThriftCLIService extends AbstractService implements TCLIService.Iface, Runnable {
...
}
2. 走到了ThriftCLIService
先来看看被beeline调用的ExecuteStatement()函数做了什么事情,其就是基于Thrift协议获取请求信息,同时将cliService执行的结果以Thrift协议返回。真正的执行还要往下走。
@Override
public TExecuteStatementResp ExecuteStatement(TExecuteStatementReq req) throws TException {
TExecuteStatementResp resp = new TExecuteStatementResp();
try {
SessionHandle sessionHandle = new SessionHandle(req.getSessionHandle());
String statement = req.getStatement();
Map<String, String> confOverlay = req.getConfOverlay();
Boolean runAsync = req.isRunAsync();
long queryTimeout = req.getQueryTimeout();
//此处cliService为CliService变量,真正执行操作还要往下走(快要逼近真相了..)
OperationHandle operationHandle =
runAsync ? cliService.executeStatementAsync(sessionHandle, statement, confOverlay,
queryTimeout) : cliService.executeStatement(sessionHandle, statement, confOverlay,
queryTimeout);
resp.setOperationHandle(operationHandle.toTOperationHandle());
resp.setStatus(OK_STATUS);
} catch (Exception e) {
// Note: it's rather important that this (and other methods) catch Exception, not Throwable;
// in combination with HiveSessionProxy.invoke code, perhaps unintentionally, it used
// to also catch all errors; and now it allows OOMs only to propagate.
LOG.warn("Error executing statement: ", e);
resp.setStatus(HiveSQLException.toTStatus(e));
}
return resp;
}
3. CliService做了什么?
此处为破解Spark同HiveCli交互真相的关键,看到之类所有的方法类,是不是都在Spark/sql/HiveThrift下见过相似的变量。
真相其实很简单,Spark HiveThrift2所做的是事情就是将下面这个处理用到的SessionManager/OperationManager/CliService方法重载下,下面这个方法调用的时候就跑到SparkSQL上了。
不信,你往下看。
/**
* Execute statement on the server with a timeout. This is a blocking call.
*/
@Override
public OperationHandle executeStatement(SessionHandle sessionHandle, String statement,
Map<String, String> confOverlay, long queryTimeout) throws HiveSQLException {
OperationHandle opHandle =
sessionManager.getSession(sessionHandle).executeStatement(statement, confOverlay,
queryTimeout);
LOG.debug(sessionHandle + ": executeStatement()");
return opHandle;
}
4. 回到SparkSQLCliService
SparkSQLCLIService继承CLIService,同时重载了init()方法。init()方法,将sessionManager至为SparkSQLSessionManager()类。
这里有趣的ReflectedCompositeService类,基于反射原理设置制定父类的变量类型。(不细分析了,原理有别的文章解释)
private[hive] class SparkSQLCLIService(
hiveServer: HiveServer2)
extends CLIService(hiveServer)
with ReflectedCompositeService {
override def init(hiveConf: HiveConf) {
this.hiveConf = hiveConf
this.sessionManager = new SparkSQLSessionManager(hiveServer)
addService(sessionManager)
this.serviceUGI = MultiSparkSQLEnv.globalUgi
initCompositeService(hiveConf)
}
...
private[thriftserver] trait ReflectedCompositeService { this: AbstractService =>
def initCompositeService(hiveConf: HiveConf) {
// Emulating `CompositeService.init(hiveConf)`
val serviceList = getAncestorField[JList[Service]](this, 2, "serviceList")
serviceList.asScala.foreach(_.init(hiveConf))
// Emulating `AbstractService.init(hiveConf)`
invoke(classOf[AbstractService], this, "ensureCurrentState", classOf[STATE] -> STATE.NOTINITED)
setAncestorField(this, 3, "hiveConf", hiveConf)
invoke(classOf[AbstractService], this, "changeState", classOf[STATE] -> STATE.INITED)
getAncestorField[Log](this, 3, "LOG").info(s"Service: $getName is inited.")
}
}
5. SparkSQLSessionManager做了些什么?
SparkSQLSessionManager为理解这一环最绕的一个地方,它通过重载openSession()来实现SparkSQLOperationManager的调用。
sessionManager.getSession(sessionHandle).executeStatement()
SparkSQLSessionManager类的具体实现:
private[hive] class SparkSQLSessionManager(hiveServer: HiveServer2, sqlContext: SQLContext)
extends SessionManager(hiveServer)
with ReflectedCompositeService {
// !!!
private lazy val sparkSqlOperationManager = new SparkSQLOperationManager()
//重载init操作,实现backgroundPool,此为操作operation的线程池
override def init(hiveConf: HiveConf) {
setSuperField(this, "hiveConf", hiveConf)
// Create operation log root directory, if operation logging is enabled
if (hiveConf.getBoolVar(ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED)) {
invoke(classOf[SessionManager], this, "initOperationLogRootDir")
}
val backgroundPoolSize = hiveConf.getIntVar(ConfVars.HIVE_SERVER2_ASYNC_EXEC_THREADS)
setSuperField(this, "backgroundOperationPool", Executors.newFixedThreadPool(backgroundPoolSize))
getAncestorField[Log](this, 3, "LOG").info(
s"HiveServer2: Async execution pool size $backgroundPoolSize")
// 此处将父类的operationManager致为sparkSqlOperationManager,在sparkSqlOperationManager类中,实现SparkSQL操作,而非Hive的MapReduce操作
setSuperField(this, "operationManager", sparkSqlOperationManager)
addService(sparkSqlOperationManager)
initCompositeService(hiveConf)
}
override def openSession(
protocol: TProtocolVersion,
username: String,
passwd: String,
ipAddress: String,
sessionConf: java.util.Map[String, String],
withImpersonation: Boolean,
delegationToken: String): SessionHandle = {
// 此处关键:先调用父类的该方法(该方法操作在下面),其实际上将
val sessionHandle =
super.openSession(protocol, username, passwd, ipAddress, sessionConf, withImpersonation,
delegationToken)
val session = super.getSession(sessionHandle)
HiveThriftServer2.listener.onSessionCreated(
session.getIpAddress, sessionHandle.getSessionId.toString, session.getUsername)
val ctx = if (sqlContext.conf.hiveThriftServerSingleSession) {
sqlContext
} else {
sqlContext.newSession()
}
ctx.setConf("spark.sql.hive.version", HiveUtils.hiveExecutionVersion)
if (sessionConf != null && sessionConf.containsKey("use:database")) {
ctx.sql(s"use ${sessionConf.get("use:database")}")
}
sparkSqlOperationManager.sessionToContexts.put(sessionHandle, ctx)
sessionHandle
}
}
Hive SessionManager中的openSession(),实际上是调用createSession()函数完成的,同时创建一个HiveSessionImpl/HiveSessionImplWithUGI/HiveSessionProxy返回。
public HiveSession createSession(SessionHandle sessionHandle, TProtocolVersion protocol, String username,
String password, String ipAddress, Map<String, String> sessionConf, boolean withImpersonation,
String delegationToken)
throws HiveSQLException {
HiveSession session;
// If doAs is set to true for HiveServer2, we will create a proxy object for the session impl.
// Within the proxy object, we wrap the method call in a UserGroupInformation#doAs
if (withImpersonation) {
HiveSessionImplwithUGI hiveSessionUgi;
if (sessionImplWithUGIclassName == null) {
hiveSessionUgi = new HiveSessionImplwithUGI(sessionHandle, protocol, username, password,
hiveConf, ipAddress, delegationToken);
} else {
try {
Class<?> clazz = Class.forName(sessionImplWithUGIclassName);
Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class, String.class,
String.class, HiveConf.class, String.class, String.class);
hiveSessionUgi = (HiveSessionImplwithUGI) constructor.newInstance(sessionHandle,
protocol, username, password, hiveConf, ipAddress, delegationToken);
} catch (Exception e) {
throw new HiveSQLException("Cannot initilize session class:" + sessionImplWithUGIclassName);
}
}
session = HiveSessionProxy.getProxy(hiveSessionUgi, hiveSessionUgi.getSessionUgi());
hiveSessionUgi.setProxySession(session);
} else {
if (sessionImplclassName == null) {
session = new HiveSessionImpl(sessionHandle, protocol, username, password, hiveConf,
ipAddress);
} else {
try {
Class<?> clazz = Class.forName(sessionImplclassName);
Constructor<?> constructor = clazz.getConstructor(SessionHandle.class, TProtocolVersion.class,
String.class, String.class, HiveConf.class, String.class);
// 实际上是调用Hive中的HiveSessionImpl/HiveSessionImplWithUGI/HiveSessionProxy类... 实现的
session = (HiveSession) constructor.newInstance(sessionHandle, protocol, username, password,
hiveConf, ipAddress);
} catch (Exception e) {
throw new HiveSQLException("Cannot initilize session class:" + sessionImplclassName, e);
}
}
}
// 此处是将sparkSqlOperationManager赋值于此,在HiveSessionImpl中会使用
session.setSessionManager(this);
session.setOperationManager(operationManager);
try {
session.open(sessionConf);
} catch (Exception e) {
LOG.warn("Failed to open session", e);
try {
session.close();
} catch (Throwable t) {
LOG.warn("Error closing session", t);
}
session = null;
throw new HiveSQLException("Failed to open new session: " + e.getMessage(), e);
}
if (isOperationLogEnabled) {
session.setOperationLogSessionDir(operationLogRootDir);
}
try {
executeSessionHooks(session);
} catch (Exception e) {
LOG.warn("Failed to execute session hooks", e);
try {
session.close();
} catch (Throwable t) {
LOG.warn("Error closing session", t);
}
session = null;
throw new HiveSQLException("Failed to execute session hooks: " + e.getMessage(), e);
}
handleToSession.put(session.getSessionHandle(), session);
LOG.info("Session opened, " + session.getSessionHandle() + ", current sessions:" + getOpenSessionCount());
return session;
}
既然getSession()方法返回的HiveSessionImpl/HiveSessionImplWithUGI/HiveSessionProxy
, 那么自然而然都要实现executeStatement()方法了。而在这些类的内部,真正完成操作的是executeStatementInternal()函数,该函数实现很简单,具体如下:
private OperationHandle executeStatementInternal(String statement,
Map<String, String> confOverlay, boolean runAsync, long queryTimeout) throws HiveSQLException {
acquire(true, true);
ExecuteStatementOperation operation = null;
OperationHandle opHandle = null;
try {
// 看到了没? 由于之前我们重置了operationManager变量,此处实际上调用的是sparkSqlOperationManager类,总算解开谜底
operation = getOperationManager().newExecuteStatementOperation(getSession(), statement,
confOverlay, runAsync, queryTimeout);
opHandle = operation.getHandle();
operation.run();
addOpHandle(opHandle);
return opHandle;
} catch (HiveSQLException e) {
// Refering to SQLOperation.java, there is no chance that a HiveSQLException throws and the
// async background operation submits to thread pool successfully at the same time. So, Cleanup
// opHandle directly when got HiveSQLException
if (opHandle != null) {
getOperationManager().closeOperation(opHandle);
}
throw e;
} finally {
if (operation == null || operation.getBackgroundHandle() == null) {
release(true, true); // Not async, or wasn't submitted for some reason (failure, etc.)
} else {
releaseBeforeOpLock(true); // Release, but keep the lock (if present).
}
}
}
Operation类为一个抽象类,继承该类的对象需要实现run()/internalRun()方法,在该执行操作中,真正执行为operator.run()。
6. SparkSQLOperationManager类
该类重载了newExecuteStatementOperation()函数, 最终返回的已经是一个SparkExecuteStatementOperation()对象了。
override def newExecuteStatementOperation(
parentSession: HiveSession,
statement: String,
confOverlay: JMap[String, String],
async: Boolean): ExecuteStatementOperation = synchronized {
val sessionHandle = parentSession.getSessionHandle
val sparkSession = sessionToSparkSession.get(sessionHandle)
var client = sessionToClient.get(sessionHandle)
val formatted = statement.toLowerCase.split("//s+").mkString(" ")
...
val sessionState = sparkSession.sessionState.asInstanceOf[HiveSessionState]
val runInBackground = async && sessionState.hiveThriftServerAsync
val operation = new SparkExecuteStatementOperation(
parentSession,
statement,
client,
confOverlay,
runInBackground)(sparkSession, sessionToActivePool)
handleToOperation.put(operation.getHandle, operation)
logDebug(s"Created Operation for $statement with session=$parentSession, " +
s"runInBackground=$runInBackground")
operation
}
7. 一条SQL真正的命运
上文生成的SparkExecuteStatementOperation对象,调用runInternal()方法调用的是execute()。execute()真正调用的则是result = sparkSession.sql(statement)
, 做了这么多准备,真正的实现其实就像我们自己写个demo时的ss.sql()操作,感觉好费周折啊。
private def execute(): Unit = {
statementId = UUID.randomUUID().toString
logInfo(s"Running query '$statement' with $statementId")
setState(OperationState.RUNNING)
// Always use the latest class loader provided by executionHive's state.
val executionHiveClassLoader = sparkSession.sharedState.jarClassLoader
Thread.currentThread().setContextClassLoader(executionHiveClassLoader)
HiveThriftServer2.listener.onStatementStart(
statementId,
parentSession.getSessionHandle.getSessionId.toString,
statement,
statementId,
parentSession.getUsername)
sparkSession.sparkContext.setJobGroup(statementId, statement)
val pool = sessionToActivePool.get(parentSession.getSessionHandle)
if (pool != null) {
sparkSession.sparkContext.setLocalProperty("spark.scheduler.pool", pool)
}
try {
// 一条SQL真正的命运
result = sparkSession.sql(statement)
logDebug(result.queryExecution.toString())
result.queryExecution.logical match {
case SetCommand(Some((SQLConf.THRIFTSERVER_POOL.key, Some(value)))) =>
sessionToActivePool.put(parentSession.getSessionHandle, value)
logInfo(s"Setting spark.scheduler.pool=$value for future statements in this session.")
case _ =>
}
HiveThriftServer2.listener.onStatementParsed(statementId, result.queryExecution.toString())
iter = {
val useIncrementalCollect =
sparkSession.conf.get("spark.sql.thriftServer.incrementalCollect", "false").toBoolean
if (useIncrementalCollect) {
result.toLocalIterator.asScala
} else {
result.collect().iterator
}
}
val (itra, itrb) = iter.duplicate
iterHeader = itra
iter = itrb
dataTypes = result.queryExecution.analyzed.output.map(_.dataType).toArray
} catch {
case e: HiveSQLException =>
if (getStatus().getState() == OperationState.CANCELED) {
return
} else {
setState(OperationState.ERROR)
throw e
}
// Actually do need to catch Throwable as some failures don't inherit from Exception and
// HiveServer will silently swallow them.
case e: Throwable =>
val currentState = getStatus().getState()
logError(s"Error executing query, currentState $currentState, ", e)
setState(OperationState.ERROR)
HiveThriftServer2.listener.onStatementError(
statementId, e.getMessage, SparkUtils.exceptionString(e))
throw new HiveSQLException(e.toString)
}
setState(OperationState.FINISHED)
HiveThriftServer2.listener.onStatementFinish(statementId)
}
二、总结
本文没有细枝末节的分析各个方法和变量,只是在脑海中走一遍一条SQL在Spark HiveThriftServer2环境中的运行流程,看看大致如何处理的。
而深有感触的是,基于Spark借Hive的壳子,通过java/scala语言的重载功能,很简单、方便的将原Hive功能扩展到Spark上,非常巧妙。
同时基于JDBC查询SQL的功能,中间经过了HiveCommands/HiveStatement/ThriftCli/CliService/SessionManager/SessionImpl/OperationManager/Operator等等许多层的抽象,虽然体会起来很繁杂,但之于上述所说的扩展工作,却显得万分必要。后续工作中,切记学习该项工作。
当然,此文仅仅分析了一条SQL刚刚开始执行的非常简单的逻辑,后续会着手再往下走,一条SQL是如何转变成一个个基于executor执行的task的,把Spark的逻辑梳理一遍。