本文主要是介绍Python实现将MySQL中所有表的数据都导出为CSV文件并压缩,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
《Python实现将MySQL中所有表的数据都导出为CSV文件并压缩》这篇文章主要为大家详细介绍了如何使用Python将MySQL数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到...
python将mysql数据库中所有表的数据都导出为CSV文件到一个目录,并压缩为zip文件到另一个目录下,然后解压缩这个目录中的所有zip文件到第三个目录下。不使用Pandas库,需要考虑SQL结果集是大数据量分批数据导出的情况,通过多线程和异步操作来提高程序性能,程序需要异常处理和输出,输出出错时的错误信息,每次每个查询导出数据的运行状态和表数据行数以及运行时间戳,导出时间,输出每个文件记录数量的日志。
该脚本已在考虑大数据量、异常处理和性能优化的基础上进行了全面设计,能够处理大多数常见场景。根据具体需求可进一步调整批量大小(BATch_size)和线程数(max_workers)以获得最佳性能。
import os import csv import zipfile import logging import mysql.connector from datetime import datetime import time import concurrent.futures import glob # 配置日志 logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s', handlers=[ logging.FileHandler('data_export.log'), logging.StreamHandler() ] ) logger = logging.getLogger(__name__) def export_table_to_csv(table_name, csv_path, db_config, batch_size=1000): """导出单个表的数据到CSV文件,分批处理""" conn = None cursor = None total_rows = 0 try: conn = mysql.connector.connect(**db_config) cursor = conn.cursor() # 获取数据并写入CSV with open(csv_path, 'w', newline='', encoding='utf-8') as csvfile: writer = csv.writer(csvfile) # 执行查询并获取列名 cursor.execute(f"SELECT * FROM `{table_name}`") columns = [col[0] for col in cursor.description] writer.writerow(columns) # 分批获取数据 while True: rows = cursor.fetchmany(batch_size) if not rows: break writer.writerows(rows) total_rows += len(rows) logger.debug(f"{table_name} 已导出 {total_rows} 行") logger.info(f"{table_name} CSV导出完成,总行数:{total_rows}") return total_rows except Exception as e: logger.error(f"导出表 {table_name} 失败: {str(e)}", exc_info=True) raise finally: if cursor: cursor.close() if conn and conn.is_connected(): conn.close() def compress_to_zip(source_path, zip_path): """压缩文件为ZIP格式""" try: China编程 with zipfile.ZipFile(zip_path, 'w', zipfile.ZIP_DEFLATED) as zipf: zipf.write(source_path, arcname=os.path.basename(source_path)) logger.info(f"成功压缩 {source_path} 到 {zip_path}") except Exception as e: logger.error(f"压缩 {source_path} 失败: {str(e)}", exc_info=True) raise def process_table(table_name, db_config, csv_dir, zip_dir): """处理单个表的导出和压缩""" start_time = time.time() logger.info(f"开始处理表: {table_name}") status = "成功" rows_exported = 0 try: # 定义文件路径 csv_filename = f"{table_name}.csv" zip_filename = f"{table_name}.zip" csv_path = os.path.join(csv_dir, csv_filename) zip_path = os.path.join(zip_dir, zip_filename) # 导出CSV rows_exported = export_table_to_csv(table_name, csv_path, db_config) # 压缩文件 compress_to_zip(csv_path, zip_path) except Exception as e: status = f"失败: {str(e)}" # 清理可能存在的中间文件 for path in [csv_path, zip_path]: if path and os.path.exists(path): try: os.remove(path) logger.warning(f"已清理文件: {path}") except Exception as clean_error: logger.error(f"清理文件失败: {clean_error}") finally: duration = time.time() - start_time log_message = ( f"表处理完成 - 表名: {table_name}, " f"状态: {status}, " f"导出行数: {rows_exported}, " f"耗时: {duration:.2f}秒" ) logger.info(log_message) def unzip_files(zip_dir, unzip_dir): """解压指定目录中的所有ZIP文件""" zip_files = glob.glob(os.path.join(zip_dir, '*.zip')) if not zip_files: logger.warning("未找到ZIP文件,跳过解压") return with concurrent.futures.ThreadPoolExecutor() as executor: futures = [] for zip_path in zip_files: futures.append(executor.submit( lambda: extract_zip(zip_path, unzip_dir) )) for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: logger.error(f"解压过程中发生错误: {str(e)}") def extract_zip(zip_path, unzip_dir): """解压单个ZIP文件""" try: start_time = time.time() with zipfile.ZipFile(zip_path, 'r') as zip_ref: zip_ref.extractall(unzip_dir) duration = time.time() - start_time logger.info(f"解压完成: {zip_path} => {unzip_dir} (耗时: {duration:.2f}秒)") except Exception as e: logger.error(f"解压 {zip_path} 失败: {str(e)}", exc_info=True) raise def main(): # 配置参数 db_config = { 'host': 'localhost', 'user': 'your_username', 'password': 'your_password', 'database': 'your_database' } # 目录配置 base_dir = os.path.dirname(os.path.abspath(__file__)) csv_dir = os.path.join(base_dir, 'csv_exports') zip_dir = os.path.join(base_dir, 'zip_archives') unzip_dir = os.path.join(base_dir, 'unzipped_files') # 创建目录 for dir_path in [csv_dir, zip_dir, unzip_dir]: os.makedirs(dir_path, exist_ok=True) logger.info(f"目录已准备: {dir_path}") # 获取所有表名 try: conn = mysql.connector.connect(**db_config) cursor = conn.cursorjs() cursor.execute("SHOW TABLES") tables = [table[0] for table in cursor.fetchall()] logger.info(f"发现 {len(tables)} 个需要处理的表") except Exception as e: logger.error(f"获取数据库表失败: {str(e)}", exc_info=True) return finally: if 'cursor' in locals(): cursor.close() if 'conn' in locals() and conn.is_connected(): conn.close() # 处理所有表(多线程导出和压缩) with concurrent.futures.ThreadPoolExecutor(max_workers=4) as executor: futures = [] for table in tables: futures.append(executor.submit( process_table, table, db_config, China编程 csv_dir, zip_dir )) # 处理任务结果 for future in concurrent.futures.as_completed(futures): try: future.result() except Exception as e: logger.error(f"表处理异常: {str(e)}") # 解压所有ZIP文件(多线程解压) logger.info("开始解压所有ZIP文件") unzip_files(zip_dir, unzip_dir) logger.info("全部处理流程完成") if __name__ == "__main__": main()
关键特性说明:
1.分批处理大数据:
- 使用fetchmany(batch_size)分批获取数据(默认每批1000行)
- 流式处理减少内存占用
2.多线程处理:
- 使用ThreadPoolExecutor并行处理不同表的导出和压缩
- 独立的数据库连接池(每个线程有自己的连接)
- 并行解压处理
3.异常处理:
- 全面的try-http://www.chinasem.cnexcept块覆盖所有关键操作
- 自动清理失败时产生的中间文件
- 详细的错误日志记录(包含堆栈跟踪)
4.日志记录:
- 同时输出到文件和终端
- 记录时间戳、操作类型、状态、耗时等关键信息
- 包含每个表的处理结果统计
5.文件管理:
- 自动创建所需目录
- 使用ZIP_DEFLATED进行高效压缩
- 安全的文件路径处理
6.性能优化:
- 使用服务器端游标避免内存过载
- 可配置的批量大小和线程数
- 异步I/O操作
使用说明:
安装依赖:
pip install mysql-connector-python
修改配置:
更新db_config中的数据库连接信息
根据需要调整目录路径(csv_dir, zip_dir, unzip_dir)
运行脚本:
python script.py
查看日志:
实时终端输出
详细日志文件data_export.log
扩展建议:
通过命令行参数接受数据库配置和目录路径
添加邮件通知功能(处理完成或失败时通知)
实现断点续传功能
添加文件校验(MD5校验和)
支持配置文件(YAML/jsON格式)
添加进度条显示
到此这篇关于PnlHKgwAzbHython实现将MySQL中所有表的数据都导出为CSV文件并压缩的文章就介绍到这了,更多相关Python MySQL数据导出为CSV内容请搜索编程China编程(www.chinasem.cn)以前的文章或继续浏览下面的相关文章希望大家以后多多支持China编程(www.chinasem.cn)!
这篇关于Python实现将MySQL中所有表的数据都导出为CSV文件并压缩的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!