ASP源码.NET源码PHP源码JSP源码JAVA源码DELPHI源码PB源码VC源码VB源码Android源码

HIVE数仓数据血缘分析工具

来源:网络整理     时间:2015-10-27     关键词:Exception,索引,CONCAT,ado

本篇文章主要介绍了"HIVE数仓数据血缘分析工具",主要涉及到Exception,索引,CONCAT,ado方面的内容,对于Javajrs看球网直播吧_低调看直播体育app软件下载_低调看体育直播感兴趣的同学可以参考一下: 一、数仓经常会碰到的几类问题: 1、两个数据报表进行对比,结果差异很大,需要人工核对分析指标的维度信息,比如从头分析数据指标从哪里来,处理条件是什么,最后才能...

一、数仓经常会碰到的几类问题:
1、两个数据报表进行对比,结果差异很大,需要人工核对分析指标的维度信息,比如从头分析数据指标从哪里来,处理条件是什么,最后才能分析出问题原因。
2、基础数据表因某种原因需要修改字段,需要评估其对数仓的影响,费时费力,然后在做方案。

二、问题分析:
数据源长途跋涉,经过大量的处理和组件来传递,呈现在业务用户面前,对数据进行回溯其实很难。元数据回溯在有效决策、策略制定、差异分析等过程中很重要。这两类问题都属于数据血缘分析问题,第一类叫做数据回溯、第二类叫做影响分析,是数据回溯的逆向。

三、解决方法:
自己实现了一套基于hive数仓的数据血缘分析工具,来完成各个数据表、字段之间的关系梳理,进而解决上面两个问题。

  • 工具主要目标:解析计算脚本中的HQL语句,分析得到输入输出表、输入输出字段和相应的处理条件,进行分析展现。
  • 实现思路:对AST深度优先遍历,遇到操作的token则判断当前的操作,遇到子句则压栈当前处理,处理子句。子句处理完,栈弹出。
  • 关键点解析:
    1、遇到TOK_TAB或TOK_TABREF则判断出当前操作的表
    2、压栈判断是否是join,判断join条件
    3、遇到TOK_WHERE判断当前where条件
    4、遇到TOK_SELEXPR判断字段的输入输出
    5、遇到TOK_SUBQUERY保存当前的子查询信息,供父查询使用
    6、遇到select * 或者未明确指出的字段,查询元数据进行辅助分析
    7、解析结果进行相关校验

代码如下:


package XXX;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Stack;
import java.util.TreeSet;
import java.util.Map.Entry;

import org.antlr.runtime.tree.Tree;
import org.apache.hadoop.hive.ql.parse.ASTNode;
import org.apache.hadoop.hive.ql.parse.BaseSemanticAnalyzer;
import org.apache.hadoop.hive.ql.parse.HiveParser;
import org.apache.hadoop.hive.ql.parse.ParseDriver;
import org.apache.hadoop.hive.ql.parse.ParseException;

import com.xiaoju.products.Exception.SQLParseException;
import com.xiaoju.products.Exception.VolidateException;
import com.xiaoju.products.util.Check;
import com.xiaoju.products.util.NumberUtil;
import com.xiaoju.products.util.ParseUtil;

/**
 * hive sql解析类
 * 
 * 目的:实现HQL的语句解析,分析出输入输出表、字段和相应的处理条件。为字段级别的数据血缘提供基础。
 * 重点:获取SELECT操作中的表和列的相关操作。其他操作这判断到字段级别。
 * 实现思路:对AST深度优先遍历,遇到操作的token则判断当前的操作,遇到子句则压栈当前处理,处理子句。子句处理完,栈弹出。 
 * 关键点解析 
 *         1、遇到TOK_TAB或TOK_TABREF则判断出当前操作的表
 *         2、压栈判断是否是join,判断join条件
 *         3、遇到TOK_WHERE判断当前where条件
 *         4、遇到TOK_SELEXPR判断字段的输入输出
 *         5、遇到TOK_SUBQUERY保存当前的子查询信息,供父查询使用
 *         6、遇到select * 或者未明确指出的字段,查询元数据进行辅助分析
 *         7、解析结果进行相关校验
 *         
 * @author yangyangthomas     
 *    
 */
public class LineParser {

    private MetaDataDao dao = new MetaDataDao();
    private static final String SPLIT_DOT = ".";
    private static final String SPLIT_COMMA = ",";
    private static final String SPLIT_AND = "&";
    private static final String TOK_EOF = "";

    private Map> subQueryMap = new HashMap>();
    private Map> dbMap = new HashMap>();
    private List colLineList = new ArrayList(); 

    private Map alias = new HashMap();
    private Set conditions = new HashSet();
    private List cols = new ArrayList(); 
    private Set outputTables = new HashSet();
    private Set inputTables = new HashSet();

    private Stack tableNameStack = new Stack();
    private Stack operStack = new Stack();
    private Stack joinStack = new Stack();
    private Stack joinOnStack = new Stack();
    private int unionTimes = 0;
    private boolean isStaticUnion = true;

    private String nowQueryTable = "";
    private Oper oper ;
    private boolean joinClause = false;
    private ASTNode joinOn = null;
    private String nowQueryDB = "default";

    public List getColLines() {
        return colLineList;
    }
    public Set getOutputTables() {
        return outputTables;
    }
    public Set getInputTables() {
        return inputTables;
    }

    private enum Oper {
        SELECT, INSERT, DROP, TRUNCATE, LOAD, CREATETABLE, ALTER
    }

    private Set parseIteral(ASTNode ast) {
        Set set= new HashSet();//当前查询所对应到的表集合
        prepareToParseCurrentNodeAndChilds(ast);
        set.addAll(parseChildNodes(ast));
        set.addAll(parseCurrentNode(ast, set));
        endParseCurrentNode(ast);
        return set;
    }

    /**
     * 解析当前节点
     * @param ast
     * @param set
     * @return
     */
    private Set parseCurrentNode(ASTNode ast, Set set){
        if (ast.getToken() != null) {
            switch (ast.getToken().getType()) {
            case HiveParser.TOK_CREATETABLE: //outputtable
                outputTables.add(fillDB(BaseSemanticAnalyzer.getUnescapedName((ASTNode) ast.getChild(0))));
                break;
            case HiveParser.TOK_TAB:// outputTable
                String tableTab = BaseSemanticAnalyzer.getUnescapedName((ASTNode) ast.getChild(0));
                if (oper == Oper.SELECT) {
                    nowQueryTable = tableTab;
                }
                outputTables.add(fillDB(tableTab));
                break;
            case HiveParser.TOK_TABREF:// inputTable
                ASTNode tabTree = (ASTNode) ast.getChild(0);
                String tableName = (tabTree.getChildCount() == 1) ? 
                        BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0))
                        : BaseSemanticAnalyzer.getUnescapedName((ASTNode) tabTree.getChild(0))
                                + SPLIT_DOT + tabTree.getChild(1);
                if (oper == Oper.SELECT) {
                    if(joinClause && !"".equals(nowQueryTable) ){
                        nowQueryTable += SPLIT_AND + tableName;
                    }else{
                        nowQueryTable = tableName;
                    }
                    set.add(tableName);
                }
                inputTables.add(fillDB(tableName));
                if (ast.getChild(1) != null) { //(TOK_TABREF (TOK_TABNAME detail usersequence_client) c) 
                    String alia = ast.getChild(1).getText().toLowerCase();
                    alias.put(alia, tableName);
                }
                break;
            case HiveParser.TOK_SUBQUERY:
                if (ast.getChildCount() == 2) {
                    String tableAlias = unescapeIdentifier(ast.getChild(1).getText());
                    String aliaReal = "";
                    for(String table : set){
                        aliaReal+=table+SPLIT_AND;
                    }
                    if(aliaReal.length() !=0){
                        aliaReal = aliaReal.substring(0, aliaReal.length()-1);
                    }
                    alias.put(tableAlias, aliaReal);
                    putSubQueryMap(tableAlias); 
                    cols.clear();
                }
                break;

            case HiveParser.TOK_SELEXPR: //输入输出字段的处理
                 /**
                  * (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) 
                  *     (TOK_SELECT (TOK_SELEXPR TOK_ALLCOLREF))
                  * 
                  * (TOK_DESTINATION (TOK_DIR TOK_TMP_FILE)) 
                  *     (TOK_SELECT 
                  *         (TOK_SELEXPR (. (TOK_TABLE_OR_COL p) datekey) datekey) 
                  *         (TOK_SELEXPR (TOK_TABLE_OR_COL datekey)) 
                  *         (TOK_SELEXPR (TOK_FUNCTIONDI count (. (TOK_TABLE_OR_COL base) userid)) buyer_count))
                  *         (TOK_SELEXPR (TOK_FUNCTION when (> (. (TOK_TABLE_OR_COL base) userid) 5) (. (TOK_TABLE_OR_COL base) clienttype) (> (. (TOK_TABLE_OR_COL base) userid) 1) (+ (. (TOK_TABLE_OR_COL base) datekey) 5) (+ (. (TOK_TABLE_OR_COL base) clienttype) 1)) bbbaaa)
                  */
                //解析需要插入的表
                Tree tok_insert = ast.getParent().getParent();
                Tree child = tok_insert.getChild(0).getChild(0); 
                String tName = BaseSemanticAnalyzer.getUnescapedName((ASTNode) child.getChild(0));
                String destTable = "TOK_TMP_FILE".equals(tName) ? "TOK_TMP_FILE" : fillDB(tName); 

                //select * from 的情况
                if (ast.getChild(0).getType() == HiveParser.TOK_ALLCOLREF) { 
                    String tableOrAlias = getColOrData((ASTNode) ast.getChild(0), true, false);
                    String nowTable =  fillDB(getRealTable(null, tableOrAlias));
                    String[] tableArr = nowTable.split(SPLIT_AND); //fact.test&test2
                    for (String tables : tableArr) {
                        String[] split = tables.split("\\.");
                        if (split.length > 2) {
                            throw new SQLParseException("parse table:" + nowTable);
                        }
                        String db = split.length == 2 ? split[0] : "" ;
                        String table = split.length == 2 ? split[1] : split[0] ;
                        List colByTab = dao.getColumnByDBAndTable(db, table);
                        for (String column : colByTab) {
                            cols.add(new ColLineParse(destTable, column, tables + SPLIT_DOT + column, new HashSet()));  
                        }
                    }
                    break;
                }

                //select c1 from t的情况
                String columnOrData = filterData(getColOrData((ASTNode)ast.getChild(0), false, false));
                //2、过滤条件的处理select类
                String condition = getCondition((ASTNode)ast.getChild(0)); 
                Set clone = filterCondition(columnOrData, condition);
                String column = ast.getChild(1) != null ? parseColOrData((ASTNode)ast.getChild(1), false)
                        : parseColOrData((ASTNode)ast.getChild(0), true); //别名
                cols.add(new ColLineParse(destTable, column, columnOrData, clone)); 
                break;
            case HiveParser.TOK_WHERE: //3、过滤条件的处理select类
                conditions.add("WHERE:" + getCondition((ASTNode) ast.getChild(0)));
                break; 
            case HiveParser.TOK_ALTERTABLE_ADDPARTS:
            case HiveParser.TOK_ALTERTABLE_RENAME:
            case HiveParser.TOK_ALTERTABLE_ADDCOLS:
                ASTNode alterTableName = (ASTNode) ast.getChild(0);
                outputTables.add(alterTableName.getText() + "\t" + oper);
                break;
            default:
                /** 
                 * (or 
                 *   (> (. (TOK_TABLE_OR_COL p) orderid) (. (TOK_TABLE_OR_COL c) orderid))
                 *   (and (= (. (TOK_TABLE_OR_COL p) a) (. (TOK_TABLE_OR_COL c) b)) 
                 *        (= (. (TOK_TABLE_OR_COL p) aaa) (. (TOK_TABLE_OR_COL c) bbb))))
                 */
                 //1、过滤条件的处理join类
                if (joinOn != null && joinOn.getTokenStartIndex() == ast.getTokenStartIndex()
                        && joinOn.getTokenStopIndex() == ast.getTokenStopIndex()) {
                    ASTNode astCon = (ASTNode)ast.getChild(2);
                    conditions.add(ast.getText().substring(4) + ":" + getCondition(astCon));
                    break;  
                }
            }
        }
        return set;
    }
    /**
     * 过滤无意义条件(空、与字段相等),应用在select端
     * 如:select col1,col2+1 from table1, 此处col1的条件需要过滤
     * @param columnOrData
     * @param condition
     * @return
     */
    private Set filterCondition(String columnOrData, String condition) {
        Set clone = new HashSet();
        if (Check.notEmpty(condition) //条件为空
                && !columnOrData.equals(condition)) { //条件和字段相等认为没有条件
            clone.add("COLFUN:" + condition);
        }
        return clone;
    }
    /**
     * 取得处理条件,主要应用在WHERE、JOIN和SELECT端
     * 如: 

where a=1 *

t1 join t2 on t1.col1=t2.col1 and t1.col2=123 *

select count(distinct col1) from t1 * @param ast * @return */ private String getCondition(ASTNode ast) { if (ast.getType() == HiveParser.KW_OR ||ast.getType() == HiveParser.KW_AND) { return "(" + getCondition((ASTNode)ast.getChild(0)) + " " + ast.getText() + " " + getCondition((ASTNode)ast.getChild(1)) + ")"; } else if (ast.getType() == HiveParser.NOTEQUAL //判断条件 > < like in || ast.getType() == HiveParser.EQUAL || ast.getType() == HiveParser.LESSTHAN || ast.getType() == HiveParser.LESSTHANOREQUALTO || ast.getType() == HiveParser.GREATERTHAN || ast.getType() == HiveParser.GREATERTHANOREQUALTO || ast.getType() == HiveParser.KW_LIKE || ast.getType() == HiveParser.DIVIDE || ast.getType() == HiveParser.PLUS || ast.getType() == HiveParser.MINUS || ast.getType() == HiveParser.STAR || ast.getType() == HiveParser.MOD || ast.getType() == HiveParser.AMPERSAND || ast.getType() == HiveParser.TILDE || ast.getType() == HiveParser.BITWISEOR || ast.getType() == HiveParser.BITWISEXOR) { return getColOrData((ASTNode)ast.getChild(0), false, true) + " " + ast.getText() + " " + getColOrData((ASTNode)ast.getChild(1), false, true); } else if (ast.getType() == HiveParser.TOK_FUNCTIONDI) { String condition = ast.getChild(0).getText(); return condition + "(distinct (" + getCondition((ASTNode) ast.getChild(1)) +"))"; } else { return getColOrData(ast, false, true); } } /** * 解析when条件 * @param ast * @return case when c1>100 then col1 when c1>0 col2 else col3 end */ private String getWhenCondition(ASTNode ast) { int cnt = ast.getChildCount(); StringBuilder sb = new StringBuilder(); for (int i = 1; i < cnt; i++) { String condition = getCondition((ASTNode)ast.getChild(i)); if (i == 1) { sb.append("case when " + condition); } else if (i == cnt-1) { //else sb.append(" else " + condition + " end"); } else if (i % 2 == 0){ //then sb.append(" then " + condition); } else { sb.append(" when " + condition); } } return sb.toString(); } /*** * 解析when的字段信息 case when c1>100 then col1 when c1>0 col2 else col3 end * @param ast * @param isSimple 是否是简写 * @return col1,col2,col3 */ private String getWhenColumn(ASTNode ast, boolean isSimple) { int cnt = ast.getChildCount(); Set re = new HashSet(); for (int i = 2; i < cnt; i=i+2) { re.add(getColOrData((ASTNode) ast.getChild(i), isSimple, false)); if (i+1 == cnt-1) { //else re.add(getColOrData((ASTNode) ast.getChild(i+1), isSimple, false)); } } StringBuilder sb = new StringBuilder(); for (String string : re) { sb.append(string).append(SPLIT_COMMA); } sb.setLength(sb.length()-1); return sb.toString(); } private void putSubQueryMap(String tableAlias) { putSubQueryMap(0, tableAlias); //一个sql之间不会有别名相同的情况 } /** * 保存subQuery查询别名和字段信息 * @param sqlIndex * @param tableAlias */ private void putSubQueryMap(int sqlIndex, String tableAlias) { List list = new ArrayList(); if (TOK_EOF.equals(tableAlias) && unionTimes > 0) { //开头是union的处理 int size = cols.size(); int tableNum = unionTimes + 1; //1个union,2个表 int colNum = size / tableNum; for (int i = 0; i < colNum; i++) { //合并字段 ColLineParse col = cols.get(i); for (int j = i + colNum; j < size; j = j + colNum) { ColLineParse col2 = cols.get(j); if (notNormalCol(col.getToNameParse()) && !notNormalCol(col2.getToNameParse())) { col.setToNameParse(col2.getToNameParse()); } col.addFromName(col2.getFromName()); Set conditionSet = col2.getConditionSet(); conditionSet.addAll(conditions); col.addConditionSet(conditionSet); } list.add(col); } } else { for (ColLineParse entry : cols) { Set conditionSet = entry.getConditionSet(); conditionSet.addAll(conditions); list.add(new ColLineParse(entry.getToTable(), entry.getToNameParse(), entry.getFromName(), conditionSet)); } } String key = sqlIndex == 0 ? tableAlias : tableAlias + sqlIndex; //没有重名的情况就不用标记 subQueryMap.put(key, list); } /** * 判断正常列, * 正常:a as col, a * 异常:1 ,'a' //数字、字符等作为列名 */ private boolean notNormalCol(String column) { return Check.isEmpty(column) || NumberUtil.isNumeric(column) || column.startsWith("\"") || column.startsWith("\'"); } /** * 从指定索引位置开始解析子树 * @param ast * @param startIndex 开始索引 * @param isSimple 是否简写 * @param withCond 是否包含条件 * @return */ private String processChilds(ASTNode ast,int startIndex, boolean isSimple, boolean withCond) { StringBuilder sb = new StringBuilder(); int cnt = ast.getChildCount(); for (int i = startIndex; i < cnt; i++) { String columnOrData = getColOrData((ASTNode) ast.getChild(i), isSimple, withCond); if (Check.notEmpty(columnOrData)){ sb.append(columnOrData).append(SPLIT_COMMA); } } if (sb.length() > 0) { sb.setLength(sb.length()-1); } return sb.toString(); } /*** * 递归解析获得列名或者字符数字等 * @param ast * @param isSimple 是否是简写, 如isSimple=true:col1;isSimple=false:db1.table1.col1 * @param withCond 是否包含条件,如true:nvl(col1,0)=>nvl(col1,0);false:col1 * @return 解析得到的列名或者字符数字等 */ private String getColOrData(ASTNode ast,boolean isSimple, boolean withCond) { if(ast.getType() == HiveParser.TOK_FUNCTIONDI || ast.getType() == HiveParser.TOK_FUNCTION){ String fun = ast.getChild(0).getText(); String column = getColOrData((ASTNode) ast.getChild(1), isSimple, withCond); if ("when".equalsIgnoreCase(fun)) { return withCond ? getWhenCondition(ast) : getWhenColumn(ast, isSimple); } else if("IN".equalsIgnoreCase(fun)) { String col = getColOrData((ASTNode)ast.getChild(1), false, false); return col + " in (" + processChilds(ast, 2, true, false) + ")"; } else if("TOK_ISNOTNULL".equalsIgnoreCase(fun) //isnull isnotnull || "TOK_ISNULL".equalsIgnoreCase(fun)){ String col = getColOrData((ASTNode)ast.getChild(1), false, false); return col + " " + fun.toLowerCase().substring(4); } else if("CONCAT".equalsIgnoreCase(fun) //CONCAT || "NVL".equalsIgnoreCase(fun) //NVl || "date_sub".equalsIgnoreCase(fun)){ column = processChilds(ast, 1, isSimple, withCond); } return !withCond ? column : fun +"("+ column + ")"; } else if(ast.getType() == HiveParser.LSQUARE){ //map,array String column = getColOrData((ASTNode) ast.getChild(0), isSimple, withCond); String key = getColOrData((ASTNode) ast.getChild(1), isSimple, withCond); return !withCond ? column : column +"["+ key + "]"; } else { String column = parseColOrData(ast, isSimple); if(Check.notEmpty(column)){ return column; } return processChilds(ast, 0, isSimple, withCond); } } /** * 解析获得列名或者字符数字等 * @param ast * @param isSimple * @return */ private String parseColOrData(ASTNode ast, boolean isSimple) { if (ast.getType() == HiveParser.DOT && ast.getChild(0).getType() == HiveParser.TOK_TABLE_OR_COL && ast.getChild(0).getChildCount() == 1 && ast.getChild(1).getType() == HiveParser.Identifier) { String column = BaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(1) .getText().toLowerCase()); String alia = BaseSemanticAnalyzer.unescapeIdentifier(ast.getChild(0) .getChild(0).getText().toLowerCase()); String realTable = getRealTable(column, alia); return isSimple ? column : fillDB(realTable) + SPLIT_DOT + column; } else if (ast.getType() == HiveParser.TOK_TABLE_OR_COL && ast.getChildCount() == 1 && ast.getChild(0).getType() == HiveParser.Identifier) { String column = ast.getChild(0).getText(); return isSimple ? column : fillDB(getRealTable(column, null)) + SPLIT_DOT + column; } else if (ast.getType() == HiveParser.Number || ast.getType() == HiveParser.StringLiteral || ast.getType() == HiveParser.Identifier) { return ast.getText(); } return null; } /** * 获得真实的表 * @param column * @param alia * @return */ private String getRealTable(String column, String alia) { String realTable = nowQueryTable; if (inputTables.contains(alia)) { realTable = alia; } else if (alias.get(alia) != null) { realTable = alias.get(alia); } if (Check.isEmpty(alia)) { alia = fixAlia(realTable); } if (realTable.indexOf(SPLIT_AND) > 0) { realTable = getSubQueryTable(column, alia ,realTable); } else if (Check.isEmpty(realTable)) { throw new SQLParseException("can't parse realTable column:" + column + ",alias:"+alia); } return realTable; } /** * 修正别名 * @param alia * @param realTable * @return */ private String fixAlia(String realTable) { for (Entry entry : alias.entrySet()) { if (entry.getValue().equals(realTable)) { return entry.getKey(); } } return null; } /** * 过滤掉无用的列:如col1,123,'2013',col2 ==>> col1,col2 * @param col * @return */ private String filterData(String col){ String[] split = col.split(SPLIT_COMMA); StringBuilder sb = new StringBuilder(); for (String string : split) { if (!notNormalCol(string)) { sb.append(string).append(SPLIT_COMMA); } } if (sb.length() > 0) { sb.setLength(sb.length()-1); } return sb.toString(); } /** * 获得subquery查询中的table名称 * @param column 对应的列 * @param alia 别名 * @param defaultTable 默认表名 * @return */ private String getSubQueryTable(String column, String alia,String defaultTable) { List list = subQueryMap.get(alia); StringBuilder sb = new StringBuilder(); if (Check.notEmpty(column) && Check.notEmpty(list)) { for (ColLineParse colLine : list) { if (column.equals(colLine.getToNameParse())) { String fromName = colLine.getFromName(); //处理一个字段对应多个字段的情况,如union sb.append(fromName.substring(0, fromName.lastIndexOf(SPLIT_DOT))).append(SPLIT_AND); } } if (sb.length()>0) { sb.setLength(sb.length()-1); } } return sb.length() > 0 ? sb.toString() : defaultTable; } /** * 解析所有子节点 * @param ast * @return */ private Set parseChildNodes(ASTNode ast){ Set set= new HashSet(); int numCh = ast.getChildCount(); if (numCh > 0) { for (int num = 0; num < numCh; num++) { ASTNode child = (ASTNode) ast.getChild(num); set.addAll(parseIteral(child)); } } return set; } /** * 准备解析当前节点 * @param ast */ private void prepareToParseCurrentNodeAndChilds(ASTNode ast){ if (ast.getToken() != null) { switch (ast.getToken().getType()) { case HiveParser.TOK_SWITCHDATABASE: System.out.println("nowQueryDB changed " + nowQueryDB+ " to " +ast.getChild(0).getText()); nowQueryDB = ast.getChild(0).getText(); break; case HiveParser.TOK_UNION: //join 从句开始 if (isStaticUnion && (ast.getParent().isNil() || ast.getParent().getType() == HiveParser.TOK_UNION)) { unionTimes++; } else if (ast.getParent().getType() != HiveParser.TOK_UNION) { isStaticUnion = false; } break; case HiveParser.TOK_RIGHTOUTERJOIN: case HiveParser.TOK_LEFTOUTERJOIN: case HiveParser.TOK_JOIN: case HiveParser.TOK_LEFTSEMIJOIN: case HiveParser.TOK_MAPJOIN: case HiveParser.TOK_FULLOUTERJOIN: case HiveParser.TOK_UNIQUEJOIN: joinStack.push(joinClause); joinClause = true; joinOnStack.push(joinOn); joinOn = ast; break; case HiveParser.TOK_QUERY: tableNameStack.push(nowQueryTable); operStack.push(oper); nowQueryTable = "";//sql22 oper = Oper.SELECT; break; case HiveParser.TOK_INSERT: tableNameStack.push(nowQueryTable); operStack.push(oper); oper = Oper.INSERT; break; case HiveParser.TOK_SELECT: tableNameStack.push(nowQueryTable); operStack.push(oper); oper = Oper.SELECT; break; case HiveParser.TOK_DROPTABLE: oper = Oper.DROP; break; case HiveParser.TOK_TRUNCATETABLE: oper = Oper.TRUNCATE; break; case HiveParser.TOK_LOAD: oper = Oper.LOAD; break; case HiveParser.TOK_CREATETABLE: oper = Oper.CREATETABLE; break; } if (ast.getToken() != null && ast.getToken().getType() >= HiveParser.TOK_ALTERDATABASE_PROPERTIES && ast.getToken().getType() <= HiveParser.TOK_ALTERVIEW_RENAME) { oper = Oper.ALTER; } } } /** * 结束解析当前节点 * @param ast */ private void endParseCurrentNode(ASTNode ast){ if (ast.getToken() != null) { switch (ast.getToken().getType()) { //join 从句结束,跳出join case HiveParser.TOK_RIGHTOUTERJOIN: case HiveParser.TOK_LEFTOUTERJOIN: case HiveParser.TOK_JOIN: case HiveParser.TOK_LEFTSEMIJOIN: case HiveParser.TOK_MAPJOIN: case HiveParser.TOK_FULLOUTERJOIN: case HiveParser.TOK_UNIQUEJOIN: joinClause = joinStack.pop(); joinOn = joinOnStack.pop(); break; case HiveParser.TOK_QUERY: break; case HiveParser.TOK_INSERT: case HiveParser.TOK_SELECT: nowQueryTable = tableNameStack.pop(); oper = operStack.pop(); break; } } } /** * 转义标识符 * @param val * @return */ private static String unescapeIdentifier(String val) { if (val == null) { return null; } if (val.charAt(0) == '`' && val.charAt(val.length() - 1) == '`') { val = val.substring(1, val.length() - 1); } return val; } private void parseAST(ASTNode ast) { parseIteral(ast); } public void parse(String sqlAll, boolean validate){ int i = 0; //当前是第几个sql for (String sql : sqlAll.split("(?Exception e) { e.printStackTrace(); throw new SQLParseException(e); } } if (validate) { LineValidater lineValidater = new LineValidater(); lineValidater.validate(inputTables, outputTables, colLineList, dbMap); } } /** * 所有解析完毕之后的后期处理 */ private void endParse(int sqlIndex) { putSubQueryMap(sqlIndex, TOK_EOF); putDBMap(); setColLineList(); } private void setColLineList() { Map> map = new HashMap>(); for (Entry> entry : subQueryMap.entrySet()) { if (entry.getKey().startsWith(TOK_EOF)) { List value = entry.getValue(); for (ColLineParse colLineParse : value) { List list = map.get(colLineParse.getToTable()); if (Check.isEmpty(list)) { list = new ArrayList(); map.put(colLineParse.getToTable(), list); } list.add(colLineParse); } } } for (Entry> entry : map.entrySet()) { String table = entry.getKey(); List pList = entry.getValue(); List dList = dbMap.get(table); int metaSize = Check.isEmpty(dList) ? 0 : dList.size(); for (int i = 0; i < pList.size(); i++) { //按顺序插入对应的字段 ColLineParse clp = pList.get(i); String colName = null; if (i < metaSize) { colName = table + SPLIT_DOT + dList.get(i); } ColLine colLine = new ColLine(table, colName , clp.getToNameParse(), clp.getFromName(), clp.getConditionSet()); colLineList.add(colLine); } } } private void putDBMap() { Set outputTables = getOutputTables(); for (String table : outputTables) { String[] pdt = ParseUtil.parseDBTable(table); List list = dao.getColumnByDBAndTable(pdt[0], pdt[1]); dbMap.put(table, list); } } /** * 补全db信息 * table1 ==>> db1.table1 * db1.table1 ==>> db1.table1 * db2.t1&t2 ==>> db2.t1&db1.t2 * @param tables */ private String fillDB(String nowTable) { StringBuilder sb = new StringBuilder(); String[] tableArr = nowTable.split(SPLIT_AND); //fact.test&test2&test3 for (String tables : tableArr) { String[] split = tables.split("\\" + SPLIT_DOT); if (split.length > 2) { System.out.println(tables); throw new SQLParseException("parse table:" + nowTable); } String db = split.length == 2 ? split[0] : nowQueryDB ; String table = split.length == 2 ? split[1] : split[0] ; sb.append(db).append(SPLIT_DOT).append(table).append(SPLIT_AND); } if (sb.length()>0) { sb.setLength(sb.length()-1); } return sb.toString(); } }

测试用例:

package XXX;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.hadoop.hive.ql.parse.ParseException;
import org.apache.hadoop.hive.ql.parse.SemanticException;

import junit.framework.TestCase;

public class LineParserTest extends TestCase {
    LineParser parse = new LineParser();

    /*
     * 支持解析 select * from table
     */
    public void testParseAllColumn() {
        Set inputTablesExpected = new HashSet();
        Set outputTablesExpected = new HashSet();
        Set conditions = new HashSet();
        Set lineSetExpected = new HashSet();
        Set outputTablesActual;
        Set inputTablesActual;
        List lineListActualed;
        String sql1 = "use app;insert into table dest select statid from " +
                "(select * from hand_qq_passenger a join return_benefit_base_foo b on a.statid=b.id where a.channel > 10) base";
        parse.parse(sql1, true);
        inputTablesExpected.add("app.hand_qq_passenger");
        inputTablesExpected.add("app.return_benefit_base_foo");
        outputTablesExpected.add("app.dest");
        conditions.add("WHERE:app.hand_qq_passenger.channel > 10");
        conditions.add("JOIN:app.hand_qq_passenger.statid = app.return_benefit_base_foo.id");

        Set clone1 = clone(conditions);
        ColLine col1 = new ColLine("statid", "app.hand_qq_passenger.statid", clone1);
        lineSetExpected.add(col1);

        outputTablesActual = parse.getOutputTables();
        inputTablesActual = parse.getInputTables();
        lineListActualed = parse.getColLines();
        assertSetEquals(outputTablesExpected, outputTablesActual);
        assertSetEquals(inputTablesExpected, inputTablesActual);
        assertCoLineSetEqual(lineSetExpected, lineListActualed);
        printRestult(outputTablesActual, inputTablesActual, lineListActualed);
    }   

    /*
     * 支持解析 where > and in 等
     */
    public void testParseWhere()  {
        Set inputTablesExpected = new HashSet();
        Set outputTablesExpected = new HashSet();
        Set conditions = new HashSet();
        Set lineSetExpected = new HashSet();
        Set outputTablesActual;
        Set inputTablesActual;
        List lineListActualed;
        String sql1 = "INSERT OVERWRITE table app.dest PARTITION (year='2015',m UNION ALL SELECT ac.uid AS uid FROM action_comment ac WHERE ac.date = '2008-06-03' ) actions JOIN users u ON (u.id = actions.uid)
     *   =>> SELECT u.id, actions.date FROM ( SELECT av.uid AS uid, av.date as date FROM action_video av WHERE av.date = '2010-06-03' UNION ALL SELECT ac.uid AS uid, ac.date as date FROM action_comment ac WHERE ac.date = '2008-06-03' ) actions JOIN users u ON (u.id = actions.uid)
     * 

3、不写字段数要一致:select id from t1 union all select id,userName from t2 * */ public void testParseUnion(){ Set inputTablesExpected = new HashSet(); Set outputTablesExpected = new HashSet(); Set conditions = new HashSet(); Set lineSetExpected = new HashSet(); Set outputTablesActual; Set inputTablesActual; List lineListActualed; String sql = "use default;use app;SELECT u.id, actions.date FROM ( " + "SELECT av.uid AS uid, av.date as date " + "FROM action_video av " + "WHERE av.date = '2010-06-03' " + "UNION ALL " + "SELECT ac.uid AS uid,ac.date as date " + "FROM fact.action_comment ac " + "WHERE ac.date = '2008-06-03' " + ") actions JOIN users u ON (u.id = actions.uid)"; parse.parse(sql, false); inputTablesExpected.add("app.users"); inputTablesExpected.add("app.action_video"); inputTablesExpected.add("fact.action_comment"); outputTablesExpected.clear(); conditions.add("WHERE:app.action_video.date = '2010-06-03'"); conditions.add("WHERE:fact.action_comment.date = '2008-06-03'"); conditions.add("JOIN:app.users.id = app.action_video&fact.action_comment.uid"); Set clone1 = clone(conditions); ColLine col1 = new ColLine("id", "app.users.id", clone1); Set clone2 = clone(conditions); ColLine col2 = new ColLine("date", "app.action_video&fact.action_comment.date", clone2); lineSetExpected.add(col1); lineSetExpected.add(col2); outputTablesActual = parse.getOutputTables(); inputTablesActual = parse.getInputTables(); lineListActualed = parse.getColLines(); assertSetEquals(outputTablesExpected, outputTablesActual); assertSetEquals(inputTablesExpected, inputTablesActual); assertCoLineSetEqual(lineSetExpected, lineListActualed); printRestult(outputTablesActual, inputTablesActual, lineListActualed); } public void testParseUnion2(){ Set inputTablesExpected = new HashSet(); Set outputTablesExpected = new HashSet(); Set conditions = new HashSet(); Set lineSetExpected = new HashSet(); Set outputTablesActual; Set inputTablesActual; List lineListActualed; String sql = "INSERT OVERWRITE TABLE target_table " + "SELECT name, id, \"Category159\" FROM source_table_1 " + "UNION ALL " + "SELECT name, id,category FROM source_table_2 " + "UNION ALL " + "SELECT name, id, \"Category160\" FROM source_table_3 where name=123"; parse.parse(sql, false); inputTablesExpected.add("default.source_table_1"); inputTablesExpected.add("default.source_table_2"); inputTablesExpected.add("default.source_table_3"); outputTablesExpected.add("default.target_table"); conditions.add("WHERE:default.source_table_3.name = 123"); Set clone1 = clone(conditions); ColLine col1 = new ColLine("name", "default.source_table_1.name,default.source_table_2.name,default.source_table_3.name", clone1); Set clone2 = clone(conditions); ColLine col2 = new ColLine("id", "default.source_table_1.id,default.source_table_2.id,default.source_table_3.id", clone2); Set clone3 = clone(conditions); clone3.add("COLFUN:\"Category159\""); clone3.add("COLFUN:\"Category160\""); ColLine col3 = new ColLine("category", "default.source_table_2.category", clone3); lineSetExpected.add(col1); lineSetExpected.add(col2); lineSetExpected.add(col3); outputTablesActual = parse.getOutputTables(); inputTablesActual = parse.getInputTables(); lineListActualed = parse.getColLines(); assertSetEquals(outputTablesExpected, outputTablesActual); assertSetEquals(inputTablesExpected, inputTablesActual); assertCoLineSetEqual(lineSetExpected, lineListActualed); printRestult(outputTablesActual, inputTablesActual, lineListActualed); } /** * 支持解析 *

=,<>,>=,<=,>,< *

join,where,case when then else end,+,-,*,/,CONCAT,nvl *

is null, is not null *

sum,count,max,min,avg,distinct *

or,and *

to_date(last_sucgrabord_time ) > date_sub('$data_desc',7) * * @throws SemanticException * @throws ParseException */ public void testParse() throws SemanticException, ParseException { Set inputTablesExpected = new HashSet(); Set outputTablesExpected = new HashSet(); Set conditions = new HashSet(); Set lineSetExpected = new HashSet(); Set outputTablesActual; Set inputTablesActual; List lineListActualed; String sql25 = "from(select p.datekey datekey, p.userid userid, c.clienttype " + "from detail.usersequence_client c join fact.orderpayment p on (p.orderid > c.orderid or p.a = c.b) and p.aaa=c.bbb " + "full outer join dim.user du on du.userid = p.userid where p.datekey = '20131118' and (du.userid in (111,222) or hash(p.test) like '%123%')) base " + "insert overwrite table test.customer_kpi select CONCAT(base.datekey,1,2) as aaa, " + "case when base.userid > 5 then base.clienttype when base.userid > 1 then base.datekey+5 else 1-base.clienttype end bbbaaa,count(distinct hash(base.userid)) buyer_count " + "where base.userid is not null group by base.datekey, base.clienttype"; parse.parse(sql25, false); inputTablesExpected.add("detail.usersequence_client"); inputTablesExpected.add("fact.orderpayment"); inputTablesExpected.add("dim.user"); outputTablesExpected.add("test.customer_kpi"); conditions.add("JOIN:((fact.orderpayment.orderid > detail.usersequence_client.orderid or fact.orderpayment.a = detail.usersequence_client.b) and fact.orderpayment.aaa = detail.usersequence_client.bbb)"); conditions.add("WHERE:(fact.orderpayment.datekey = '20131118' and (dim.user.userid in (111,222) or hash(fact.orderpayment.test) like '%123%'))"); conditions.add("WHERE:fact.orderpayment.userid isnotnull"); conditions.add("FULLOUTERJOIN:dim.user.userid = fact.orderpayment.userid"); Set clone1 = clone(conditions); clone1.add("COLFUN:CONCAT(fact.orderpayment.datekey,1,2)"); ColLine col1 = new ColLine("aaa", "fact.orderpayment.datekey", clone1); Set clone2 = clone(conditions); clone2.add("COLFUN:case when fact.orderpayment.userid > 5 then detail.usersequence_client.clienttype when fact.orderpayment.userid > 1 then fact.orderpayment.datekey + 5 else 1 - detail.usersequence_client.clienttype end"); ColLine col2 = new ColLine("bbbaaa", "detail.usersequence_client.clienttype,detail.usersequence_client.clienttype,fact.orderpayment.datekey", clone2); Set clone3 = clone(conditions); clone3.add("COLFUN:count(distinct (hash(fact.orderpayment.userid)))"); ColLine col3 = new ColLine("buyer_count", "fact.orderpayment.userid", clone3); lineSetExpected.add(col1); lineSetExpected.add(col2); lineSetExpected.add(col3); outputTablesActual = parse.getOutputTables(); inputTablesActual = parse.getInputTables(); lineListActualed = parse.getColLines(); assertSetEquals(outputTablesExpected, outputTablesActual); assertSetEquals(inputTablesExpected, inputTablesActual); assertCoLineSetEqual(lineSetExpected, lineListActualed); printRestult(outputTablesActual, inputTablesActual, lineListActualed); } private void assertCoLineSetEqual(Set lineSetExpected, List lineListActualed) { assertEquals(lineSetExpected.size(), lineListActualed.size()); for (ColLine colLine : lineListActualed) { int i = 0; for (ColLine colLine2 : lineSetExpected) { i++; if (colLine.getToNameParse().equals(colLine2.getToNameParse())) { assertEquals(colLine2.getFromName(), colLine.getFromName()); assertSetEquals(colLine2.getConditionSet(), colLine.getConditionSet()); i = 0; break; } if(i == lineListActualed.size()) { assertFalse(true); } } } } private void assertSetEquals(Set expected, Set actual) { assertEquals(expected.size(), actual.size()); for (String string : expected) { assertTrue(actual.contains(string)); } } private Set clone(Set set){ Set list2 = new HashSet(set.size()); for (String string : set) { list2.add(string); } return list2; } private void printRestult(Set outputTablesActual, Set inputTablesActual, List lineListActualed) { System.out.println("inputTable:"+inputTablesActual); System.out.println("outputTable:"+outputTablesActual); for (ColLine colLine : lineListActualed) { System.out.println("ToTable:" + colLine.getToTable() + ",ToNameParse:" + colLine.getToNameParse() + ",ToName:" + colLine.getToName() + ",FromName:" + colLine.getFromName() + ",Condition:" + colLine.getConditionSet()); } } }


参考文章:
http://tech.meituan.com/hive-sql-to-mapreduce.html
http://www.cnblogs.com/drawwindows/p/4595771.html
https://cwiki.apache.org/confluence/display/Hive/LanguageManual

').addClass('pre-numbering').hide(); $(this).addClass('has-numbering').parent().append($numbering); for (i = 1; i <= lines; i++) { $numbering.append($('
  • ').text(i)); }; $numbering.fadeIn(1700); }); });

    以上就介绍了HIVE数仓数据血缘分析工具,包括了Exception,索引,CONCAT,ado方面的内容,希望对Javajrs看球网直播吧_低调看直播体育app软件下载_低调看体育直播有兴趣的朋友有所帮助。

    本文网址链接:http://www.codes51.com/article/detail_202504.html

    相关图片

    相关文章