python 将mysql转为csv、json导入到Doris数据库

2024-08-21 19:20

本文主要是介绍python 将mysql转为csv、json导入到Doris数据库,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上一篇文章是将mysql导出成csv文件,适合csv不是很大的情况,以下对脚本进行了优化,采用分块读取csv,降低了内存的使用率,提高了传输速度。

from pydoris.doris_client import DorisClient
import requests
import pandas as pd
import numpy as npfe_host = ''
username = ''
passwd = ''
fe_http_port = ""
fe_query_port = ""doris_client = DorisClient(fe_host=fe_host,fe_query_port=fe_query_port,fe_http_port=fe_http_port,username=username,password=passwd,db='zst_cep_model')# 由于导入的csv文件过大,都是五六G以上,所以使用chunksize 分块获取数据进行操作。
for key, chunk in enumerate(pd.read_csv('xxxx.csv', chunksize=10000, dtype=str)):# 这一步也是很迷,需要读csv,然后转为字符串类型csv = chunk.to_csv(header=False, index=False, sep=',').encode('utf-8')# table_name 必须的是database.table的形式success = doris_client.write('xxx.xxx', csv)if success:print("数据写入成功!")else:print("数据写入失败。", key)breakdel csv

后面需要进行增量更新,使用csv太麻烦,想着使用json传输可能会好点,并直接从mysql中获取数据直接传输到doris,以下是脚本。

import pandas as pd
from pydoris.doris_client import *from tools import *# 配置
config = {// Mysql的配置'mysql_config': {'host': '','port': ,'user': '','password': '','database': ''},// Doris的配置'doris_db_config': {'fe_host': '','username': '','passwd': '','fe_http_port': "",'fe_query_port': "",'db': ''},# 要传输的mysql的目标表'mysql_table': '',# 要接收的Doris的目标表'doris_table': '',
}# MysqlDataBaseClass 是自己编写的Mysql工具类,返回的是Mysql连接对象
yp_apidb = MysqlDataBaseClass(host=config['mysql_config']['host'], port=config['mysql_config']['port'], user=config['mysql_config']['user'],password=config['mysql_config']['password'], database=config['mysql_config']['database'])doris_client = DorisClient(fe_host=config['doris_db_config']['fe_host'],fe_query_port=config['doris_db_config']['fe_query_port'],fe_http_port=config['doris_db_config']['fe_http_port'],username=config['doris_db_config']['username'],password=config['doris_db_config']['passwd'],db=config['doris_db_config']['db'])def get_data_from_mysql(page=1, end_page=None, total_row_num=None, page_size=10000, limit_date='2024-08-12'):result = {'total_page': 0, 'total': 0, 'now_page': page, 'data': [], 'code': False, 'msg': ''}if total_row_num is None:select_res = yp_apidb.ExecuteSQL_Select(sql=f'''SELECT count(1) as total_num FROM `{config["mysql_table"]}` where collect_batch_date >= '{limit_date}';''')totalRowsNum = int(select_res[0]['total_num'])else:totalRowsNum = total_row_numif (totalRowsNum % page_size) == 0:totalPages = int(totalRowsNum / page_size)else:totalPages = int((totalRowsNum / page_size) + 1)result['total_page'] = totalPagesresult['total'] = totalRowsNumif end_page and page > end_page:result['msg'] = '已经达到设置的最后一页'return resultif page > totalPages:result['msg'] = '已经是最后一页'return resultstart_num = int((page - 1) * page_size)limit = f'{start_num}, {page_size}'sql = f'''SELECT * FROM `{config["mysql_table"]}` where collect_batch_date >= '{limit_date}' limit {limit};'''data_list = yp_apidb.ExecuteSQL_Select(sql)result['data'] = data_listresult['code'] = Trueresult['msg'] = '获取成功'return resultdef insert_to_doris(data_list):if len(data_list) > 0:df = pd.DataFrame(data_list)json_data = df.to_json(orient='records')options = WriteOptions()options.set_json_format()options.set_option("strip_outer_array", "true")success = doris_client.write(f"{config['doris_table']}", json_data, options=options)if success:return Trueelse:print("数据写入失败。")return Falseif __name__ == '__main__':page = 1total_row_num = Nonelimit_date = '2024-08-01'# 循环获取下一页,从而达到自动翻页的功能while True:res = get_data_from_mysql(page=page, total_row_num=total_row_num, limit_date=limit_date)print(res['msg'], res['total_page'], res['total'], res['now_page'])total_row_num = res['total']if res['code']:data_list = res['data']flage = insert_to_doris(data_list)if flage is False:breakpage += 1else:print(res['msg'], page)break

以上脚本仅供学习参考,仅为实现临时功能而编写,还有优化的空间。

这篇关于python 将mysql转为csv、json导入到Doris数据库的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1094054

相关文章

MySQL 中的 JSON 查询案例详解

《MySQL中的JSON查询案例详解》:本文主要介绍MySQL的JSON查询的相关知识,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录mysql 的 jsON 路径格式基本结构路径组件详解特殊语法元素实际示例简单路径复杂路径简写操作符注意MySQL 的 J

Spring Boot中JSON数值溢出问题从报错到优雅解决办法

《SpringBoot中JSON数值溢出问题从报错到优雅解决办法》:本文主要介绍SpringBoot中JSON数值溢出问题从报错到优雅的解决办法,通过修改字段类型为Long、添加全局异常处理和... 目录一、问题背景:为什么我的接口突然报错了?二、为什么会发生这个错误?1. Java 数据类型的“容量”限制

如何使用 Python 读取 Excel 数据

《如何使用Python读取Excel数据》:本文主要介绍使用Python读取Excel数据的详细教程,通过pandas和openpyxl,你可以轻松读取Excel文件,并进行各种数据处理操... 目录使用 python 读取 Excel 数据的详细教程1. 安装必要的依赖2. 读取 Excel 文件3. 读

Python的time模块一些常用功能(各种与时间相关的函数)

《Python的time模块一些常用功能(各种与时间相关的函数)》Python的time模块提供了各种与时间相关的函数,包括获取当前时间、处理时间间隔、执行时间测量等,:本文主要介绍Python的... 目录1. 获取当前时间2. 时间格式化3. 延时执行4. 时间戳运算5. 计算代码执行时间6. 转换为指

利用Python调试串口的示例代码

《利用Python调试串口的示例代码》在嵌入式开发、物联网设备调试过程中,串口通信是最基础的调试手段本文将带你用Python+ttkbootstrap打造一款高颜值、多功能的串口调试助手,需要的可以了... 目录概述:为什么需要专业的串口调试工具项目架构设计1.1 技术栈选型1.2 关键类说明1.3 线程模

Python ZIP文件操作技巧详解

《PythonZIP文件操作技巧详解》在数据处理和系统开发中,ZIP文件操作是开发者必须掌握的核心技能,Python标准库提供的zipfile模块以简洁的API和跨平台特性,成为处理ZIP文件的首选... 目录一、ZIP文件操作基础三板斧1.1 创建压缩包1.2 解压操作1.3 文件遍历与信息获取二、进阶技

Python Transformers库(NLP处理库)案例代码讲解

《PythonTransformers库(NLP处理库)案例代码讲解》本文介绍transformers库的全面讲解,包含基础知识、高级用法、案例代码及学习路径,内容经过组织,适合不同阶段的学习者,对... 目录一、基础知识1. Transformers 库简介2. 安装与环境配置3. 快速上手示例二、核心模

Windows 上如果忘记了 MySQL 密码 重置密码的两种方法

《Windows上如果忘记了MySQL密码重置密码的两种方法》:本文主要介绍Windows上如果忘记了MySQL密码重置密码的两种方法,本文通过两种方法结合实例代码给大家介绍的非常详细,感... 目录方法 1:以跳过权限验证模式启动 mysql 并重置密码方法 2:使用 my.ini 文件的临时配置在 Wi

MySQL重复数据处理的七种高效方法

《MySQL重复数据处理的七种高效方法》你是不是也曾遇到过这样的烦恼:明明系统测试时一切正常,上线后却频频出现重复数据,大批量导数据时,总有那么几条不听话的记录导致整个事务莫名回滚,今天,我就跟大家分... 目录1. 重复数据插入问题分析1.1 问题本质1.2 常见场景图2. 基础解决方案:使用异常捕获3.

Spring 请求之传递 JSON 数据的操作方法

《Spring请求之传递JSON数据的操作方法》JSON就是一种数据格式,有自己的格式和语法,使用文本表示一个对象或数组的信息,因此JSON本质是字符串,主要负责在不同的语言中数据传递和交换,这... 目录jsON 概念JSON 语法JSON 的语法JSON 的两种结构JSON 字符串和 Java 对象互转