0101模板生成任务与shell命令执行任务-datax-python工具

2024-03-30 17:36

本文主要是介绍0101模板生成任务与shell命令执行任务-datax-python工具,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • 一、前言
    • 二、分析
      • 2.1 mysql工具
      • 2.2 模板
      • 2.2 执行shell命令
    • 三、代码实现
    • 四、演示
    • 五、待优化
    • 结语

一、前言

最近在学习数仓相关内容,需要把mysql业务数据库gmall中的数据全量同步到hdfs中。使用的工具是datax,虽然datax可以在一个job内放置多个表,但是考虑每个表中数据可能处置方式,存放位置等不同,我们一个表安排一个任务job。最后执行这些job。

gmall库中有几十张表,如果手动创建job的json文件,很费事费力,容易出错;而且通过观察,除了表元数据和存放位置不同外,其他是相同的,那么我们考虑通过模板来生成job。语言选择python。通过subprocess来执行shell命令:datax任务。

下面为相关的环境信息:

软件版本描述
hadoop3.3.4大数组生态基础组件
datax同步异构数据源数据
python3.9python语言
mysql8.x关系型数据库

二、分析

2.1 mysql工具

从mysql读取数据,我们不在重复造轮子了,参考下面链接2和3,工具类代码如下:

# -*- encoding: utf-8 -*-
"""
mysql工具类,主要功能如下:
1. 构造连接mysql
2. 获取设置基础信息:选择数据库、查询数据库版本
3. 查询:查询一条数据、查询多条数据
4. 新增、修改、删除
"""import pymysqlclass MySQLUtil:def __init__(self, host="127.0.0.1", user=None, passwd=None, db=None, charset="utf8", *args, **kwargs):"""构造函数:param host: 主机地址 :param user: 用户名:param passwd: 密码:param db: 数据库名:param charset: 字符集:param args: 参数:param kwargs: """self.__host = hostself.__user = userself.__passwd = passwdself.__db = dbself.__conn = pymysql.connect(host=host, user=user, passwd=passwd, db=db, charset=charset, *args, **kwargs)self.__cursor = self.__conn.cursor()def __del__(self):"""析构函数"""self.__cursor.close()self.__conn.close()def get_conn(self):"""获取连接"""return self.__conndef get_cursor(self, cursor=None):"""获取游标"""return self.__conn.cursor(cursor)def select_db(self, db):"""选择数据库:param db: 数据库名 :return: """self.__conn.select_db(db)def list_databases(self, args=None):"""查询所有数据库"""self.__cursor.execute("SHOW DATABASES", args)dbs = []for db in self.__cursor.fetchall():dbs.append(db[0])return dbsdef list_tables(self, args=None):"""查询所有表"""self.__cursor.execute("SHOW TABLES", args)tables = []for table in self.__cursor.fetchall():tables.append(table[0])return tablesdef execute(self, sql, args=None):"""获取SQL执行结果"""self.__cursor.execute(sql, args)return self.__cursor.fetchall()def get_version(self, args=None):"""获取MySQL版本"""self.__cursor.execute("SELECT VERSION()", args)version = self.__cursor.fetchone()print("MySQL Version : %s" % version)return versiondef list_table_metadata(self, args=None):"""查询所有表的元数据信息"""sql = "SELECT * FROM information_schema.TABLES WHERE TABLE_TYPE !='SYSTEM VIEW' AND TABLE_SCHEMA NOT IN ('sys','mysql','information_schema','performance_schema')"self.__cursor.execute(sql, args)return self.__cursor.fetchall()def get_table_fields(self, db, table, args=None):"""获取表字段信息"""sql = 'SELECT COLUMN_NAME FROM information_schema.COLUMNS WHERE table_schema="' + db + '" AND table_name="' + table + '"'self.__cursor.execute(sql, args)fields = []for field in self.__cursor.fetchall():fields.append(field[0])return fieldsdef table_metadata(self, db, table, args=None):"""查询表字段的元数据信息"""db = "'" + db + "'"table = "'" + table + "'"sql = """SELECT column_name,data_type,ordinal_position,column_comment,column_default FROM information_schema.COLUMNS WHERE table_schema = %s AND table_name = %s;""" % (db, table)self.__cursor.execute(sql, args)return self.__cursor.fetchall()def query_one(self, sql, args=None):"""查询单条数据"""result = Nonetry:self.cursor.execute(sql, args)result = self.cursor.fetchone()except Exception as e:print(e)return resultdef query_all(self, sql, args=None):"""查询多条数据"""list_result = ()try:self.cursor.execute(sql, args)list_result = self.cursor.fetchall()except Exception as e:print(e)return list_resultdef insert(self, sql):"""新增数据"""return self.__edit(sql)def update(self, sql):"""更新数据"""return self.__edit(sql)def delete(self, sql):"""删除数据"""return self.__edit(sql)def __edit(self, sql):count = 0try:count = self.cursor.execute(sql)except Exception as e:print(e)return countif __name__ == "__main__":mysqlUtil = MySQLUtil(host='node1', user="root", passwd="123456", db="gmall")mysqlUtil.get_version()dbs = mysqlUtil.list_databases()print(dbs)conn = mysqlUtil.get_conn()mysqlUtil.select_db("gmall")# print(type(conn.db), conn.db)# databases = mysqlUtil.list_databases()# print(type(databases), databases)# tables = mysqlUtil.list_tables()# print(type(tables), tables)# sql = "SELECT * FROM activity_info"# result = mysqlUtil.execute(sql)# for i in result:#     print(i)result = mysqlUtil.table_metadata("gmall", "activity_info")for i in result:print(i[0],'==',i[1],'===', type(i))# result = mysqlUtil.get_table_fields("gmall", "activity_info")# for i in result:#     print(i)

2.2 模板

datax任务为从mysql读取数据同步到hdfs,可以在datax官网查看相应的示例,这里直接给出我们的模板文件mysql2hdfs.tpl

{"job": {"content": [{"reader": {"name": "mysqlreader","parameter": {"column": ["*"],"connection": [{"jdbcUrl": ["jdbc:mysql://node1:3306/gmall?useUnicode=true&allowKeyRetrieval=tru&characterEncoding=utf-8"],"table": ["$table_name"]}],"password": "123456","splitPk": "","username": "root"}},"writer": {"name": "hdfswriter","parameter": {"column": [$COLUMN],"compress": "gzip","defaultFS": "hdfs://node1:8020","fieldDelimiter": "\t","fileName": "$table_name","fileType": "text","path": "/origin_data/gmall/db/$DIRNAME","writeMode": "append"}}}],"setting": {"speed": {"channel": 1}}}
}

变量说明:

  • $table_name:填充对应的表名
  • $COLUMN:填充表对应的列名和数据类型(hdfs类型)
  • $DIRNAME:填充对应表存储在hdfs中的父路径名称

tips: 模板中相关的配置参数值改成自己的,包括数据库连接相关参数,hdfs连接参数,存储路径等等。

实现使用string中的Template。

2.2 执行shell命令

python执行shell命令,参考最后链接5,这里我们使用subprocess

三、代码实现

完整结构和主要代码,如下所示:

在这里插入图片描述

主要逻辑代码mysql2hdfs.py:

"""
读取mysql元数据,通过模板生成datax任务job:同步全量mysql数据到hdfs,调用shell执行领料
1. 基础配置信息1.1 mysql与hdfs数据类型对应map
2. 读取mysql元数据信息,通过模板文件生成job文件
3. 执行shell命令,运行datax任务
"""import os
import pathlib
import subprocess
from string import Template
from time import sleepimport MySQLUtil as util# mysql与hdfs数据类型对应map
type_mysql_hdfs = {'tinyint': 'tinyint','smallint': 'smallint','int': 'int','bigint': 'bigint','float': 'float','double': 'double','varchar': 'string','bool': 'boolean','timestamp': 'timestamp','datetime': 'string','date': 'date','decimal': 'double','text': 'string'
}def tmp2job(db_util, tmp_file):"""读取mysql数据,用任务模板生成datax 任务job:全量同步mysql数据到hdfs:param db_util: 数据库工具:param tmp_file: 模板文件:return:"""# 获取模版文件tmp_file = pathlib.Path(__file__).parent.joinpath(tmp_file)# 生成文件路径target_dir = pathlib.Path(__file__).parent.joinpath('gmall')# 获取全部表名db_util.select_db("gmall")tables = db_util.list_tables()# tables= [tables[0]]for table in tables:if table == 'z_log':continuetarget_file = target_dir.joinpath(table + '.json')with open(tmp_file, mode="r", encoding="utf-8") as r_f, open(target_file, mode="w", encoding="utf8") as w_f:template_content = r_f.read()# print(f"template_content:{template_content}")template = Template(template_content)columns = db_util.table_metadata(db='gmall', table=table)column_str = ''# 拼接hdfswriter columnfor column in columns:# print(column)type1 = type_mysql_hdfs.get(column[1])# print(type1)column_str += '{\"name\":\"' + column[0] + '\",\"type\":\"' + type1 + '\"},'column_str = column_str[:-1]# print(os.path.split(table)[0])# 替换模板中的文件名,hdfswriter中的column,及hdfs文件存储路径data = template.substitute(table_name=table, COLUMN=column_str, DIRNAME=os.path.splitext(table)[0])w_f.write(data)# 执行job脚本
def execute_shell(db_util):"""执行shell命令:运行datax任务:param db_util: 数据库工具:return:"""# cmd_ls = 'ls /export/server/datax/job/gmall'# name = subprocess.check_output(cmd_ls, shell=True)# names = str(name, encoding='utf-8').split('\n')[:-1]db_util.select_db("gmall")names = db_util.list_tables()# tables= [tables[0]]for name in names:if name == 'z_log':continue# 确保hdfs父路径存在hdfs_mkdir = 'hdfs dfs -mkdir -p /origin_data/gmall/db/' + os.path.splitext(name)[0]# print('-'*5, hdfs_mkdir,'-'*5,name)ret = subprocess.check_call(hdfs_mkdir, shell=True)# print('---ret--',ret)# 执行datax job任务commond = "python /export/server/datax/bin/datax.py  /export/server/datax/job/gmall/" + name + '.json'# print(commond)subprocess.call(commond, shell=True)sleep(1)if __name__ == '__main__':db_util = util.MySQLUtil(host="node1", user="root", passwd="123456", db="gmall")tmp2job(db_util, tmp_file='mysql2hdfs.tpl')execute_shell(db_util)

完整在下面源代码仓库

四、演示

在这里插入图片描述

把相关代码放置在datax的job目录下,创建gmall存放模板生成的任务。

昨天执行过,这里不再执行,我们去web端查看hdfs同步文件,如下图所示:

在这里插入图片描述

五、待优化

  • 我们的数据库名、存储位置写死了,可以改为传参。
  • mysqlreader中column把*改为具体的表中列名。
  • 空值校验,在获取表名、列名等地方进行空值校验,避免生成无意义的文件或者路径。
  • 执行效率:如果硬件配置可以,可以考虑并行执行,每个任务可独立运行,提高效率。

结语

如果小伙伴什么问题或者指教,欢迎交流。

❓QQ:806797785

⭐️源代码仓库地址:https://gitee.com/gaogzhen/smart-utilities

参考链接:

[1]数仓视频-模拟数据生成[CP/OL].2023-12-12.p98.

[2]python3连接MySQL的工具类

[3]python-mysql数据库连接工具类封装

[4]datax 官方文档

[5]python执行shell脚本的几种方法

这篇关于0101模板生成任务与shell命令执行任务-datax-python工具的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/862131

相关文章

python: 多模块(.py)中全局变量的导入

文章目录 global关键字可变类型和不可变类型数据的内存地址单模块(单个py文件)的全局变量示例总结 多模块(多个py文件)的全局变量from x import x导入全局变量示例 import x导入全局变量示例 总结 global关键字 global 的作用范围是模块(.py)级别: 当你在一个模块(文件)中使用 global 声明变量时,这个变量只在该模块的全局命名空

AI一键生成 PPT

AI一键生成 PPT 操作步骤 作为一名打工人,是不是经常需要制作各种PPT来分享我的生活和想法。但是,你们知道,有时候灵感来了,时间却不够用了!😩直到我发现了Kimi AI——一个能够自动生成PPT的神奇助手!🌟 什么是Kimi? 一款月之暗面科技有限公司开发的AI办公工具,帮助用户快速生成高质量的演示文稿。 无论你是职场人士、学生还是教师,Kimi都能够为你的办公文

poj3468(线段树成段更新模板题)

题意:包括两个操作:1、将[a.b]上的数字加上v;2、查询区间[a,b]上的和 下面的介绍是下解题思路: 首先介绍  lazy-tag思想:用一个变量记录每一个线段树节点的变化值,当这部分线段的一致性被破坏我们就将这个变化值传递给子区间,大大增加了线段树的效率。 比如现在需要对[a,b]区间值进行加c操作,那么就从根节点[1,n]开始调用update函数进行操作,如果刚好执行到一个子节点,

C++11第三弹:lambda表达式 | 新的类功能 | 模板的可变参数

🌈个人主页: 南桥几晴秋 🌈C++专栏: 南桥谈C++ 🌈C语言专栏: C语言学习系列 🌈Linux学习专栏: 南桥谈Linux 🌈数据结构学习专栏: 数据结构杂谈 🌈数据库学习专栏: 南桥谈MySQL 🌈Qt学习专栏: 南桥谈Qt 🌈菜鸡代码练习: 练习随想记录 🌈git学习: 南桥谈Git 🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈🌈�

高效录音转文字:2024年四大工具精选!

在快节奏的工作生活中,能够快速将录音转换成文字是一项非常实用的能力。特别是在需要记录会议纪要、讲座内容或者是采访素材的时候,一款优秀的在线录音转文字工具能派上大用场。以下推荐几个好用的录音转文字工具! 365在线转文字 直达链接:https://www.pdf365.cn/ 365在线转文字是一款提供在线录音转文字服务的工具,它以其高效、便捷的特点受到用户的青睐。用户无需下载安装任何软件,只

【Python编程】Linux创建虚拟环境并配置与notebook相连接

1.创建 使用 venv 创建虚拟环境。例如,在当前目录下创建一个名为 myenv 的虚拟环境: python3 -m venv myenv 2.激活 激活虚拟环境使其成为当前终端会话的活动环境。运行: source myenv/bin/activate 3.与notebook连接 在虚拟环境中,使用 pip 安装 Jupyter 和 ipykernel: pip instal

pdfmake生成pdf的使用

实际项目中有时会有根据填写的表单数据或者其他格式的数据,将数据自动填充到pdf文件中根据固定模板生成pdf文件的需求 文章目录 利用pdfmake生成pdf文件1.下载安装pdfmake第三方包2.封装生成pdf文件的共用配置3.生成pdf文件的文件模板内容4.调用方法生成pdf 利用pdfmake生成pdf文件 1.下载安装pdfmake第三方包 npm i pdfma

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

30常用 Maven 命令

Maven 是一个强大的项目管理和构建工具,它广泛用于 Java 项目的依赖管理、构建流程和插件集成。Maven 的命令行工具提供了大量的命令来帮助开发人员管理项目的生命周期、依赖和插件。以下是 常用 Maven 命令的使用场景及其详细解释。 1. mvn clean 使用场景:清理项目的生成目录,通常用于删除项目中自动生成的文件(如 target/ 目录)。共性规律:清理操作

poj 1258 Agri-Net(最小生成树模板代码)

感觉用这题来当模板更适合。 题意就是给你邻接矩阵求最小生成树啦。~ prim代码:效率很高。172k...0ms。 #include<stdio.h>#include<algorithm>using namespace std;const int MaxN = 101;const int INF = 0x3f3f3f3f;int g[MaxN][MaxN];int n