探索利用 LineageLogger 获取hive的字段级血缘关系

2024-05-09 11:28

本文主要是介绍探索利用 LineageLogger 获取hive的字段级血缘关系,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

apache hive 源码中有 org.apache.hadoop.hive.ql.hooks.LineageLogger 类可以获取 insert hql 的字段之间的关系。但是又由于 org.apache.hadoop.hive.ql.optimizer.Optimizer的原因,使我们重写 hook 类无法实现字段级血缘。

  if (hiveConf.getBoolVar(HiveConf.ConfVars.HIVE_LINEAGE_INFO) // 版本 4.0+加入|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.PostExecutePrinter")|| postExecHooks.contains("org.apache.hadoop.hive.ql.hooks.LineageLogger")// 版本 2.3 加入|| postExecHooks.contains("org.apache.atlas.hive.hook.HiveHook")) {transformations.add(new Generator(postExecHooks));}

现在考虑通过LineageLogger 搭配日志监测服务来实现字段级血缘

  1. 加入插件 conf/hive-site.xml
  <property><name>hive.exec.post.hooks</name><value>org.apache.hadoop.hive.ql.hooks.LineageLogger</value></property>
  1. 打开日志 conf/log4j.properties
log4j.logger.org.apache.hadoop.hive.ql.hooks.LineageLogger=INFO
  1. hive任务日志目录
>set system:hive.log.dir; # 服务日志>set hive.querylog.location; #查询日志/tmp/hive-{用户名}/

4.写脚本监测

# -*- coding: utf-8 -*-
import hashlib
import json
import os.path
from json import JSONDecodeErrorimport requestslog_path_list = ["/tmp/root/hive.log"
]def read_hive_log(file_path):"""读取Hive日志文件并返回包含关键词的行内容列表参数:file_path (str):Hive日志文件的路径返回:content (list):包含关键词的行内容json列表"""save_dict = {}if os.path.exists('./hash_index.log'):try:with open("./hash_index.log", 'r') as f:file_content = f.read()if file_content != '':save_dict = json.loads(file_content)except json.JSONDecodeError as e:print(f"无法将文件内容转换为JSON:{e}")new_file = log_path.split("/")[-1]if new_file in save_dict.keys():old_size = save_dict.get(new_file).get('size', 0)line_index = save_dict.get('index', 0)else:# print("此为新文件,从头开始读取")old_size = 0line_index = 0is_new_file = Falsetry:new_size: int = os.path.getsize(file_path)except Exception as e:print("读取文件大小失败:", e)new_size = 0if (new_file not in save_dict.keys()) or (new_file in save_dict.keys() and (new_size < old_size or old_size == 0)):is_new_file = Truecontent = []is_new_file_only_one = is_old_file_only_one = is_just_info_only_one = Falsetry:with open(file_path, 'r', encoding='utf-8', errors='replace') as log_file:for line_number, line in enumerate(log_file, 1):if search_keyword in line:if is_new_file:if not is_new_file_only_one:print("是新文件,从头开始读取")is_new_file_only_one = Truecontent.append((line_number, line.split(search_keyword)[-1]))line_index = line_numberelse:if line_number >= line_index:if not is_old_file_only_one:print("是旧文件,从上次读取位置继续读取: {}".format(line_index))is_old_file_only_one = Truecontent.append((line_number, line.split(search_keyword)[-1]))line_index = line_numberexcept Exception as e:print(f"读取Hive日志文件失败:{e}")return content, new_size, line_index, new_filedef parse_vertice(vertices):"""解析顶点数据并返回顶点字典参数:vertices(list): 顶点数据列表返回值:vertex_dict(dict): 顶点字典,键为顶点ID,值为元组,包含数据库名、表名和列名(如果顶点类型为列)"""vertex_dict = {}for vertex in vertices:vertex_id = vertex.get("id", "")vertex_type = vertex.get("vertexType", "")vertex_names = vertex.get("vertexId", "").split(".")if len(vertex_names) >= 3:db_name = vertex_names[0]tb_name = vertex_names[1]col_name = vertex_names[-1] if vertex_type == "COLUMN" else ""if col_name not in partition_field:vertex_dict.setdefault(vertex_id, {"db": db_name, "tb": tb_name, "col": col_name})return vertex_dictdef parse_edge(edges):"""解析边的函数参数:edges (list): 边的列表返回值:list: 边元素的列表,每个元素为一个元组,包含源节点列表、目标节点列表和表达式"""edge_elem_list = []for edge in edges:source_arr = edge.get("sources", [])target_arr = edge.get("targets", [])expression = edge.get("expression", "")edge_type = edge.get("edgeType", "")edge_elem_list.append({"source": source_arr, "target": target_arr, "exp": expression, "type": edge_type})return edge_elem_listdef parse_lineage_log(content: list):column_info_dict = {}# 去重数据for (line_number, line) in content:try:lineage_dict = json.loads(line)vertex_dict = parse_vertice(lineage_dict.get('vertices', []))edge_list = parse_edge(lineage_dict.get('edges', []))tb, column_info = get_column_depend(vertex_dict, edge_list)column_info_dict[tb] = column_infoexcept JSONDecodeError as e:print("json解析错误: {}".format(line))print("该行错误位置: {}".format(line_number))return column_info_dictif __name__ == '__main__':print("开始启动....")log_dict = {}for log_path in log_path_list:contents, file_size, index, new_file_name = read_hive_log(log_path)column_info_dicts = parse_lineage_log(contents)print("{} 文件执行完".format(log_path))log_dict.setdefault(log_path.split('/')[-1], dict(size=file_size, index=index, file=new_file_name))with open("./hash_index.log", 'w') as f:f.write(json.dumps(log_dict))print("执行结束...")

这篇关于探索利用 LineageLogger 获取hive的字段级血缘关系的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/973283

相关文章

pip install jupyterlab失败的原因问题及探索

《pipinstalljupyterlab失败的原因问题及探索》在学习Yolo模型时,尝试安装JupyterLab但遇到错误,错误提示缺少Rust和Cargo编译环境,因为pywinpty包需要它... 目录背景问题解决方案总结背景最近在学习Yolo模型,然后其中要下载jupyter(有点LSVmu像一个

如何利用Java获取当天的开始和结束时间

《如何利用Java获取当天的开始和结束时间》:本文主要介绍如何使用Java8的LocalDate和LocalDateTime类获取指定日期的开始和结束时间,展示了如何通过这些类进行日期和时间的处... 目录前言1. Java日期时间API概述2. 获取当天的开始和结束时间代码解析运行结果3. 总结前言在J

java获取图片的大小、宽度、高度方式

《java获取图片的大小、宽度、高度方式》文章介绍了如何将File对象转换为MultipartFile对象的过程,并分享了个人经验,希望能为读者提供参考... 目China编程录Java获取图片的大小、宽度、高度File对象(该对象里面是图片)MultipartFile对象(该对象里面是图片)总结java获取图片

Java通过反射获取方法参数名的方式小结

《Java通过反射获取方法参数名的方式小结》这篇文章主要为大家详细介绍了Java如何通过反射获取方法参数名的方式,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1、前言2、解决方式方式2.1: 添加编译参数配置 -parameters方式2.2: 使用Spring的内部工具类 -

Java如何获取视频文件的视频时长

《Java如何获取视频文件的视频时长》文章介绍了如何使用Java获取视频文件的视频时长,包括导入maven依赖和代码案例,同时,也讨论了在运行过程中遇到的SLF4J加载问题,并给出了解决方案... 目录Java获取视频文件的视频时长1、导入maven依赖2、代码案例3、SLF4J: Failed to lo

java如何通过Kerberos认证方式连接hive

《java如何通过Kerberos认证方式连接hive》该文主要介绍了如何在数据源管理功能中适配不同数据源(如MySQL、PostgreSQL和Hive),特别是如何在SpringBoot3框架下通过... 目录Java实现Kerberos认证主要方法依赖示例续期连接hive遇到的问题分析解决方式扩展思考总

使用Java实现获取客户端IP地址

《使用Java实现获取客户端IP地址》这篇文章主要为大家详细介绍了如何使用Java实现获取客户端IP地址,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 首先是获取 IP,直接上代码import org.springframework.web.context.request.Requ

C++实现获取本机MAC地址与IP地址

《C++实现获取本机MAC地址与IP地址》这篇文章主要为大家详细介绍了C++实现获取本机MAC地址与IP地址的两种方式,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 实际工作中,项目上常常需要获取本机的IP地址和MAC地址,在此使用两种方案获取1.MFC中获取IP和MAC地址获取

C/C++通过IP获取局域网网卡MAC地址

《C/C++通过IP获取局域网网卡MAC地址》这篇文章主要为大家详细介绍了C++如何通过Win32API函数SendARP从IP地址获取局域网内网卡的MAC地址,感兴趣的小伙伴可以跟随小编一起学习一下... C/C++通过IP获取局域网网卡MAC地址通过win32 SendARP获取MAC地址代码#i

5分钟获取deepseek api并搭建简易问答应用

《5分钟获取deepseekapi并搭建简易问答应用》本文主要介绍了5分钟获取deepseekapi并搭建简易问答应用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1、获取api2、获取base_url和chat_model3、配置模型参数方法一:终端中临时将加