现在数据仓库基本上采用Hadoop平台了,那么数据仓库里面元数据的血缘分析的思路有哪些呢
基本上有下面这两种思路:
1、解析hql脚本,通过正则表达式去匹配每一行字符串
2、采用Hadoop自带的语法分析类解析
这里比较建议采用第二种,比较直接简单,因为第一种方式比较复杂,需要考虑场景比较多,容易出现遗漏
Hadoop 自带的类 org.apache.hadoop.hive.ql.tools.LineageInfo
将hql语句通过解析语法tree,获取hive表的源表和目标表,达到血缘分析的目的
但是这个类有一点缺陷就是对于create table xx as 这种hql语句无法解析
我们稍加修改代码就可以解决了
代码如下:
package com.neo.datamanager;
import org.apache.hadoop.hive.ql.lib.*;
import org.apache.hadoop.hive.ql.parse.*;
import java.io.IOException;
import java.util.*;
public class HiveLineageInfo implements NodeProcessor {
// private static final Logger logger = LoggerFactory.getLogger(HiveLineageInfo.class);
/**
* Stores input tables in sql.
*/
TreeSet inputTableList = new TreeSet();
/**
* Stores output tables in sql.
*/
TreeSet OutputTableList = new TreeSet();
/**
* @return java.util.TreeSet
*/
public TreeSet getInputTableList() {
return inputTableList;
}
/**
* @return java.util.TreeSet
*/
public TreeSet getOutputTableList() {
return OutputTableList;
}
/**
* Implements the process method for the NodeProcessor interface.
*/
public Object process(Node nd, Stack stack, NodeProcessorCtx procCtx,
Object... nodeOutputs) throws SemanticException {
ASTNode pt = (ASTNode) nd;
switch (pt.getToken().getType()) {
case HiveParser.TOK_CREATETABLE:
OutputTableList.add(BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0)));
break;
case HiveParser.TOK_TAB:
OutputTableList.add(BaseSemanticAnalyzer.getUnescapedName((ASTNode) pt.getChild(0)));
break;
case HiveParser.TOK_TABREF:
ASTNode tabTree = (ASTNode) pt.getChild(0);
String table_name = (tabTree.getChildCount() == 1) ?
BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) :
BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0)) + "." + tabTree.getChild(1);
inputTableList.add(table_name);
break;
}
return null;
}
/**
* parses given query and gets the lineage info.
*
* @param query
* @throws ParseException
*/
public void getLineageInfo(String query) throws ParseException,
SemanticException {
/*
* Get the AST tree
*/
ParseDriver pd = new ParseDriver();
ASTNode tree = pd.parse(query);
while ((tree.getToken() == null) && (tree.getChildCount() > 0)) {
tree = (ASTNode) tree.getChild(0);
}
/*
* initialize Event Processor and dispatcher.
*/
inputTableList.clear();
OutputTableList.clear();
// create a walker which walks the tree in a DFS manner while maintaining
// the operator stack. The dispatcher
// generates the plan from the operator tree
Map<Rule, NodeProcessor> rules = new LinkedHashMap<Rule, NodeProcessor>();
// The dispatcher fires the processor corresponding to the closest matching
// rule and passes the context along
Dispatcher disp = new DefaultRuleDispatcher(this, rules, null);
GraphWalker ogw = new DefaultGraphWalker(disp);
// Create a list of topop nodes
ArrayList topNodes = new ArrayList();
topNodes.add(tree);
ogw.startWalking(topNodes, null);
}
public static void main(String[] args) throws IOException, ParseException, SemanticException {
String query = "insert into table aa select * from bb union all select * from cc";
HiveLineageInfo lep = new HiveLineageInfo();
lep.getLineageInfo(query);
System.out.println("Input tables = " + lep.getInputTableList());
System.out.println("Output tables = " + lep.getOutputTableList());
}
}
运行之后结果如下:
result | table |
---|---|
input_table | [bb, cc] |
output_table | [aa] |