spark thrift server 查询日志留存

spark thrift server的web ui在运行时可以看到sql查询的提交用户,执行sql等信息

《spark thrift server 查询日志留存》 image.png
《spark thrift server 查询日志留存》 image.png

但是当这个实例停掉或者异常终止以后,你再去spark history server的webui去查看,发现这部分信息就没有了……

《spark thrift server 查询日志留存》 image.png

究其原因,原来spark thrift server并没有将这部分数据序列化到spark history server的store中,回头有空可以单独讲讲这部分源码的实现

这篇帖子是使用一个折中的办法实现了这部分数据的日志留存

修改spark-hive-thriftserver工程下org.apache.spark.sql.hive.thriftserver.HiveThriftServer2类,做如下修改:

    def onStatementError(id: String, errorMessage: String, errorTrace: String): Unit = {
      synchronized {
        executionList(id).finishTimestamp = System.currentTimeMillis
        executionList(id).detail = errorMessage
        executionList(id).state = ExecutionState.FAILED
        totalRunning -= 1
        //增加下面一句话
        SqlListenerUtil.write(executionList(id))
        
        trimExecutionIfNecessary()
      }
    }

    def onStatementFinish(id: String): Unit = synchronized {
      executionList(id).finishTimestamp = System.currentTimeMillis
      executionList(id).state = ExecutionState.FINISHED
      totalRunning -= 1
      //增加下面一句话
      SqlListenerUtil.write(executionList(id))
      trimExecutionIfNecessary()
    }

新增org.apache.spark.sql.hive.thriftserver.SqlListenerUtil类

package org.apache.spark.sql.hive.thriftserver

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import com.fasterxml.jackson.module.scala.experimental.ScalaObjectMapper
import org.apache.spark.internal.Logging
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2.{ExecutionInfo, uiTab}
import org.apache.spark.status.api.v1.{JobData, StageData}

import scala.collection.mutable.ArrayBuffer

object SqlListenerUtil extends Logging {
  val mapper: ObjectMapper with ScalaObjectMapper = new ObjectMapper() with ScalaObjectMapper
  mapper.registerModule(DefaultScalaModule)
  val stagesInfo: ArrayBuffer[StageData] = ArrayBuffer[StageData]()
  val jobsInfo: ArrayBuffer[JobData] = ArrayBuffer[JobData]()

  def write(executionInfo: ExecutionInfo) = synchronized {
    stagesInfo.clear()
    jobsInfo.clear()

    val sparkUI = uiTab.get.parent
    val store = sparkUI.store
    executionInfo.jobId.foreach {
      id =>
        val jobData = store.job(id.toInt)
        jobsInfo += jobData
        jobData.stageIds.foreach {
          stageId =>
            val stageDatas = store.stageData(stageId)
            stagesInfo ++= stageDatas
        }
    }

    val sqlInfo = SqlInfo(sparkUI.appId, executionInfo, jobsInfo, stagesInfo)

    log.info(mapper.writeValueAsString(sqlInfo))
  }


  case class SqlInfo(appId: String, executionInfo: ExecutionInfo, jobsInfo: ArrayBuffer[JobData], stagesInfo: ArrayBuffer[StageData])

}

重新打包编辑后替换相应的jar包

修改spark安装目录下的log4j.properties,增加如下信息:

# 自定义sql查询监控
log4j.logger.org.apache.spark.sql.hive.thriftserver.SqlListenerUtil=INFO,listener 
log4j.additivity.org.apache.spark.sql.hive.thriftserver.SqlListenerUtil=false 
log4j.appender.listener=org.apache.log4j.DailyRollingFileAppender
log4j.appender.listener.File=/var/log/spark2/spark-sql-listener
log4j.appender.listener.layout=org.apache.log4j.PatternLayout
log4j.appender.listener.layout.ConversionPattern=%m%n
log4j.appender.listener.DatePattern=.yyyy-MM-dd

重启spark-thrift-server

这样查询日志就以json格式记录在/var/log/spark2/spark-sql-listener文件中了

《spark thrift server 查询日志留存》 image.png

《spark thrift server 查询日志留存》 image.png

    原文作者:ron_yang
    原文地址: https://www.jianshu.com/p/b106ee06580c
    本文转自网络文章,转载此文章仅为分享知识,如有侵权,请联系博主进行删除。
点赞