本文主要是介绍探索利用 LineageLogger 获取hive的字段级血缘关系,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
apache hive
源码中有 org.apache.hadoop.hive.ql.hooks.LineageLogger
类可以获取 insert hql
的字段之间的关系。但是又由于 org.apache.hadoop.hive.ql.optimizer.Optimizer
的原因,使我们重写 hook
类无法实现字段级血缘。
if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_LINEAGE_INFO) // 版本 4.0+加入|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")// 版本 2.3 加入|| postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {transformations.add(new Generator(postExecHooks));}
现在考虑通过LineageLogger
搭配日志监测服务来实现字段级血缘
- 加入插件
conf/hive-site.xml
<property><name>hive.exec.post.hooks</name><value>org.apache.hadoop.hive.ql.hooks.LineageLogger</value></property>
- 打开日志
conf/log4j.properties
log4j.logger.org.apache.hadoop.hive.ql.hooks.LineageLogger=INFO
hive
任务日志目录
>set system:hive.log.dir; # 服务日志>set hive.querylog.location; #查询日志/tmp/hive-{用户名}/
4.写脚本监测
# -*- coding: utf-8 -*-
import hashlib
import json
import os.path
from json import JSONDecodeErrorimport requestslog_path_list = ["/tmp/root/hive.log"
]def read_hive_log(file_path):"""读取Hive日志文件并返回包含关键词的行内容列表参数:file_path (str):Hive日志文件的路径返回:content (list):包含关键词的行内容json列表"""save_dict = {}if os.path.exists('./hash_index.log'):try:with open("./hash_index.log", 'r') as f:file_content = f.read()if file_content != '':save_dict = json.loads(file_content)except json.JSONDecodeError as e:print(f"无法将文件内容转换为JSON:{e}")new_file = log_path.split("/")[-1]if new_file in save_dict.keys():old_size = save_dict.get(new_file).get('size', 0)line_index = save_dict.get('index', 0)else:# print("此为新文件,从头开始读取")old_size = 0line_index = 0is_new_file = Falsetry:new_size: int = os.path.getsize(file_path)except Exception as e:print("读取文件大小失败:", e)new_size = 0if (new_file not in save_dict.keys()) or (new_file in save_dict.keys() and (new_size < old_size or old_size == 0)):is_new_file = Truecontent = []is_new_file_only_one = is_old_file_only_one = is_just_info_only_one = Falsetry:with open(file_path, 'r', encoding='utf-8', errors='replace') as log_file:for line_number, line in enumerate(log_file, 1):if search_keyword in line:if is_new_file:if not is_new_file_only_one:print("是新文件,从头开始读取")is_new_file_only_one = Truecontent.append((line_number, line.split(search_keyword)[-1]))line_index = line_numberelse:if line_number >= line_index:if not is_old_file_only_one:print("是旧文件,从上次读取位置继续读取: {}".format(line_index))is_old_file_only_one = Truecontent.append((line_number, line.split(search_keyword)[-1]))line_index = line_numberexcept Exception as e:print(f"读取Hive日志文件失败:{e}")return content, new_size, line_index, new_filedef parse_vertice(vertices):"""解析顶点数据并返回顶点字典参数:vertices(list): 顶点数据列表返回值:vertex_dict(dict): 顶点字典,键为顶点ID,值为元组,包含数据库名、表名和列名(如果顶点类型为列)"""vertex_dict = {}for vertex in vertices:vertex_id = vertex.get("id", "")vertex_type = vertex.get("vertexType", "")vertex_names = vertex.get("vertexId", "").split(".")if len(vertex_names) >= 3:db_name = vertex_names[0]tb_name = vertex_names[1]col_name = vertex_names[-1] if vertex_type == "COLUMN" else ""if col_name not in partition_field:vertex_dict.setdefault(vertex_id, {"db": db_name, "tb": tb_name, "col": col_name})return vertex_dictdef parse_edge(edges):"""解析边的函数参数:edges (list): 边的列表返回值:list: 边元素的列表,每个元素为一个元组,包含源节点列表、目标节点列表和表达式"""edge_elem_list = []for edge in edges:source_arr = edge.get("sources", [])target_arr = edge.get("targets", [])expression = edge.get("expression", "")edge_type = edge.get("edgeType", "")edge_elem_list.append({"source": source_arr, "target": target_arr, "exp": expression, "type": edge_type})return edge_elem_listdef parse_lineage_log(content: list):column_info_dict = {}# 去重数据for (line_number, line) in content:try:lineage_dict = json.loads(line)vertex_dict = parse_vertice(lineage_dict.get('vertices', []))edge_list = parse_edge(lineage_dict.get('edges', []))tb, column_info = get_column_depend(vertex_dict, edge_list)column_info_dict[tb] = column_infoexcept JSONDecodeError as e:print("json解析错误: {}".format(line))print("该行错误位置: {}".format(line_number))return column_info_dictif __name__ == '__main__':print("开始启动....")log_dict = {}for log_path in log_path_list:contents, file_size, index, new_file_name = read_hive_log(log_path)column_info_dicts = parse_lineage_log(contents)print("{} 文件执行完".format(log_path))log_dict.setdefault(log_path.split('/')[-1], dict(size=file_size, index=index, file=new_file_name))with open("./hash_index.log", 'w') as f:f.write(json.dumps(log_dict))print("执行结束...")
这篇关于探索利用 LineageLogger 获取hive的字段级血缘关系的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!