原理
大概原理时,自己写个hook,配置在hive里,然后hive每次运行sql时会执行hook,而我们写的这个hook会以http请求,发送这个hql相关信息,所以在这里我们还得写一个接口来获得hook发过来的信息,然后hive信息里有个文件记录MR的进度,分析这个文件即可得到hql的进度。
过程
1.编写hook, JcRestHook.java
package com.jc.hive;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.apache.commons.lang3.CharEncoding;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.ql.QueryPlan;
import org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext;
import org.apache.hadoop.hive.ql.hooks.HookContext;
import org.apache.hadoop.hive.ql.session.SessionState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.net.URLEncoder;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
public class JcRestHook implements ExecuteWithHookContext {
private static Logger logger = LoggerFactory.getLogger(JcRestHook.class);
public void run(HookContext hookContext) throws Exception {
QueryPlan queryPlan = hookContext.getQueryPlan();
HiveConf conf = hookContext.getConf();
String queryId = queryPlan.getQueryId();
if (StringUtils.isEmpty(queryId)) {
logger.warn("queryId is null or empty, return");
return;
}
logger.info("queryId: " + queryId);
String queryStr = URLEncoder.encode(queryPlan.getQueryStr(),
CharEncoding.UTF_8);
if (StringUtils.isEmpty(queryStr)) {
logger.warn("queryStr is null or empty, return");
return;
}
logger.info("queryStr: " + queryStr);
String jobName = conf.getVar(HiveConf.ConfVars.HADOOPJOBNAME);
logger.info("jobName: " + jobName);
String server = (String) conf.getAllProperties().get("hiveserver.execute.hook.server");
if (StringUtils.isEmpty(server)) {
logger.warn("server is null or empty, return");
return;
}
logger.info("server: " + server);
String rest = (String) conf.getAllProperties().get("hiveserver.execute.hook.rest");
logger.info("rest: " + rest);
if (StringUtils.isEmpty(rest)) {
logger.warn("rest is null or empty, return");
return;
}
Map<String, String> params = new HashMap<String, String>();
params.put("server", server);
params.put("hook", hookContext.getHookType().toString());
params.put("queryId", queryId);
params.put("queryStr", queryStr);
params.put("jobName", jobName);
params.put("timestamp", String.valueOf(new Date().getTime()));
params.put("histFileName", SessionState.get().getHiveHistory().getHistFileName());
try {
HttpSender.doPost(rest, params);
} catch (Exception e) {
logger.error("do post error: "
+ ExceptionUtils.getFullStackTrace(e));
}
}
}
简单的http工具类HttpSender.java
import java.io.*;
import java.net.URL;
import java.net.URLConnection;
import java.util.Map;
import java.util.Map.Entry;
public class HttpSender {
public static String sendPost(String url, String param, Map<String, String> header) throws UnsupportedEncodingException, IOException {
String result = "";
URL realUrl = new URL(url);
URLConnection conn = realUrl.openConnection();
conn.setConnectTimeout(5000);
conn.setReadTimeout(15000);
if (header != null) {
for (Entry<String, String> entry : header.entrySet()) {
conn.setRequestProperty(entry.getKey(), entry.getValue());
}
}
conn.setDoOutput(true);
conn.setDoInput(true);
try (PrintWriter out = new PrintWriter(conn.getOutputStream())) {
out.print(param);
out.flush();
try (BufferedReader in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "utf8"))) {
String line;
while ((line = in.readLine()) != null) {
result += line;
}
}
}
return result;
}
public static void doPost(String rest, Map<String, String> params) throws IOException {
StringBuffer urlParameters = new StringBuffer(); //"param1=a¶m2=b¶m3=c"
String delim = "";
for (Entry<String, String> entry : params.entrySet()) {
urlParameters.append(delim).append(entry.getKey()).append("=").append(entry.getValue());
delim = "&";
}
sendPost(rest, urlParameters.toString(), null);
}
}
2.编写接口
获取hive相关信息
package com.jc.web.controller;
import com.jc.domain.ResultVO;
import com.jc.service.TaskService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.ResponseBody;
@Controller
@RequestMapping(path="/hiveserver2")
public class Hiveserver2Controller {
private static Logger logger = LoggerFactory.getLogger(Hiveserver2Controller.class);
@Autowired
private TaskService taskService;
@RequestMapping(path = "/rest", method = RequestMethod.POST)
@ResponseBody
public ResultVO rest(String server, String hook, String queryId, String queryStr, String jobName, String timestamp, String histFileName) {
logger.info("server: " + server);
logger.info("hook: " + hook);
logger.info("queryId: " + queryId);
logger.info("queryStr: " + queryStr);
logger.info("jobName: " + jobName);
logger.info("timestamp: " + timestamp);
logger.info("histFileName: " + histFileName);
return taskService.hiveCallback(server, hook, queryId, queryStr, jobName, timestamp, histFileName);
}
}
由于涉及到敏感数据处理,TaskService.hiveCallback就简要说一下:
其中histFileName是关键,hive会以json形式将进度信息输出到这个histFileName文件中,文件名大概如下:
/data/hiveDataDir/tmpdir/hive/hive_job_log_e3246d8b-8b87-4db7-96f6-34f10fe3e89c_641681466.txt
此文件是每次执行hive的hql时都会生成一个文件。进度信息都会以一行json输出,格式如下:
Counters plan={...}
其中json中有个stageList的值,就是可以分析出当前hql的进度,如果hql比较大,会有多个stage-1、stage-2、stage-3…等等。多个stage时,进度就要所有进度相加然后除以stage的数量才是这个hql的进度
histFileName文件需用apache的commons-io组件中的TailerListenerAdapter来监听
3.在这里有个小技巧,由于是异步的,客户端只有提交hql,而不知道hql对应的queryId是多少,更加不知道jobName。所以sql需要做一些小动作,封装个子查询,在where语句把id加上去,如:
SELECT * FROM (select phone from t_user where l_date>='20180101' and l_date<'20180201' limit 10
) where 'jc_task_id_17_'='jc_task_id_17_'
这样我们就能解析出id=17,从而找到我们对应的hql(所以每次提交hql时,本地需记录id和hql的映射)。
4.配置hook,就是配置hive-site.xml,重启MetaStore和HiveServer
<property>
<name>hive.exec.pre.hooks</name>
<value>com.jc.hive.JcRestHook</value>
<description>
Comma-separated list of pre-execution hooks to be invoked for each statement.
A pre-execution hook is specified as the name of a Java class which implements the
org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface.
</description>
</property>
<property>
<name>hive.exec.post.hooks</name>
<value>com.jc.hive.JcRestHook</value>
<description>
Comma-separated list of post-execution hooks to be invoked for each statement.
A post-execution hook is specified as the name of a Java class which implements the
org.apache.hadoop.hive.ql.hooks.ExecuteWithHookContext interface.
</description>
</property>
<property>
<name>hiveserver.execute.hook.server</name>
<value>localhost:10000</value>
</property>
<property>
<name>hiveserver.execute.hook.rest</name>
<value>http://localhost:8034/hiveserver2/rest</value>
</property>