数据同步大冒险:PostgreSQL到MySQL的奇妙之旅

2024-08-29 19:20

本文主要是介绍数据同步大冒险:PostgreSQL到MySQL的奇妙之旅,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

引言:一场跨数据库的浪漫邂逅 💑

在数据的世界里,不同数据库系统就像是来自不同星球的恋人,它们各自拥有独特的魅力,但偶尔也会渴望一场跨越界限的亲密接触。今天,我们就来见证一场PostgreSQL与MySQL之间的浪漫邂逅——定时获取PostgreSQL中的数据,并将其温柔地同步至MySQL的怀抱中。这不仅是一场技术的挑战,更是一次数据流转的艺术展现!

场景设定:数据的星际旅行 🚀

想象一下,你是一位数据守护者,负责管理着两个星球的数据库:PostgreSQL的“科技星球”和MySQL的“人文星球”。每天,你都需要从“科技星球”收集最新的科研成果(数据),然后运送到“人文星球”上,让那里的居民也能享受到科技进步的果实。

思路分析:星际导航图 🗺️

要完成这场星际旅行,我们需要精心规划航线:

启航准备:确保两艘飞船(数据库连接)都已就绪,且飞船上的储物舱(数据表)结构相似,便于数据转移。
坐标定位:首先,从PostgreSQL中读取最新的数据ID,这是我们的“出发坐标”。然后,在MySQL中查询最新的数据ID,作为我们的“目标坐标”。
航线规划:通过比较两个坐标的差距,我们可以确定哪些数据是新增的,需要被“运送”到MySQL。这就像是在浩瀚的数据海洋中,绘制出一条最优的航线。
数据转移:根据规划好的航线,将选定的数据从PostgreSQL中提取出来,并安全地“降落”在MySQL的相应位置。
实战代码:编写星际导航程序 💻
虽然具体的代码实现需要你们自己来完成(因为你们才是这场冒险的主角!),但我可以给你们一个大致的框架,就像是一个星际导航程序的伪代码:

# 假设这是你的星际导航程序  def connect_to_postgresql():  # 连接PostgreSQL数据库,获取连接对象  # ...  return pg_connection  def connect_to_mysql():  # 连接MySQL数据库,获取连接对象  # ...  return mysql_connection  def fetch_latest_ids(connection, table_name):  # 从指定数据库中获取最新数据ID  # 使用SQL查询,如 SELECT MAX(id) FROM table_name  # ...  return latest_id  def sync_data(pg_connection, mysql_connection, source_table, target_table):  # 1. 获取两个数据库的最新ID  pg_latest_id = fetch_latest_ids(pg_connection, source_table)  mysql_latest_id = fetch_latest_ids(mysql_connection, target_table)  # 2. 确定需要同步的数据范围  if pg_latest_id > mysql_latest_id:  # 3. 编写SQL查询,选择ID在mysql_latest_id到pg_latest_id之间的数据  # 4. 执行查询,获取数据  # 5. 编写SQL语句,将获取的数据插入MySQL  # ...  # 定时执行数据同步任务  
# 可以使用APScheduler等库来实现定时任务

结尾:星际旅行的意义 🌟
通过这场PostgreSQL到MySQL的数据同步冒险,我们不仅实现了数据的跨库流动,更深刻体会到了数据在不同系统间共享的重要性。正如星际旅行不仅仅是为了探索未知,更是为了促进不同文明之间的交流与融合。希望这次经历能激发你们对数据世界更多奇妙的想象和探索!

实际源码

"""
功能:
监测数据表是否更新
连接postgresql
连接mysql
比较-同步
"""
import sys
import pymysql
import psycopg2
import pandas as pd
import numpy as npdbname_mysql = 'followup'
table_name_list_mysql = ['s01_issue_table','np_kickoff','np_rfq','s02_np_kickoff','s02_np_rfq','s06_kick_off_list','s06rfqlist']dbname_postgresql = 'lcmbigdata'
table_name_list_postgresql = ['s01_issue_table','np_kickoff','np_rfq','s02_np_kickoff','s02_np_rfq','s06_kick_off_list','s06rfqlist']col_address_lists = [['create_time', 'update_time'],['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date'],['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date'],['create_time', 'update_time','kickoffdate'],['create_time', 'update_time','evaluation_date']]modelname_postgresql = 'digitalelf'# 数据库连接参数
conn_params_mysql = {"database": dbname_mysql,"user": "root","password": "root","host": "localhost","port": 3306  # 端口号应该是整数
}
conn_params_postgresql = {"database": dbname_postgresql,"user": "postgres",  # 数据库用户"password": "root",  # 数据库密码"host": "localhost",  # 数据库服务器地址"port": 6666  # 数据库端口
}issue_columns = ['id' ,'model_no' ,'part_no' ,'issue_type', 'issue_from', 'start_date','end_date' ,'status', 'priority', 'fab', 'issue_description', 'root_cause','customer' ,'customer_operation' ,'phase', 'analysis', 'solution', 'progress','lesson_learnt' ,'create_by' ,'create_time', 'update_by', 'update_time','sys_org_code' ,'site', 'issue_owner', 'is_nudd' ,'attachment' ,'issue_dept','issue_update' ,'func', 'material_structure']
kickoff_columns = ['id', 'week', 'jiazhi', 'jishu', 'modelno', 'pn', 'customer', 'technology', 'cellsite', 'kickoffdate', 'mpdate', 'dvtdate', 'fcst', 'design_processremark', 'yingyong', 'kickoff_gopremium', 'jingzhengduishou', 'kehu_shiyong_fangshi', 'odm', 'others_feiyong', 'renli_target', 'fy_target', 'mpiowner', 'mpi_bumen', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code']
rfq_columns = ['id', 'model_no', 'customer', 'evaluation_date', 'rfq_result', 'fail_cause', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code']
s02_kickoff_columns = ['id', 'week', 'customer', 'technology', 'odm', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'renli_target', 'fy_target', 'jiazhi', 'jishu', 'pn', 'cellsite', 'kickoffdate', 'mpdate', 'dvtdate', 'fcst', 'design_processremark', 'yingyong', 'kickoff_gopremium', 'jingzhengduishou', 'kehu_shiyong_fangshi', 'others_feiyong', 'mpiowner', 'mpi_bumen', 'modelno']
s02_rfq_columns = ['id', 'customer', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'model_no', 'evaluation_date', 'rfq_result', 'fail_cause']
s06_kickoff_columns = ['id', 'modelno', 'technology', 'cellsite', 'fcst', 'jingzhengduishou', 'odm', 'mpiowner', 'pm', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'week', 'jiazhi', 'jishu', 'kickoff_gopremium', 'pn', 'customer', 'kickoffdate', 'mpdate', 'dvtdate', 'yingyong', 'kehu_shiyong_fangshi', 'others_feiyong', 'renli_target', 'fy_target', 'design_processremark', 'mpi_bumen']
s06_rfq_columns = ['id', 'create_by', 'create_time', 'update_by', 'update_time', 'sys_org_code', 'model_no', 'customer', 'evaluation_date', 'rfq_result', 'fail_cause']columns_names = [issue_columns,kickoff_columns,rfq_columns,s02_kickoff_columns,s02_rfq_columns,s06_kickoff_columns,s06_rfq_columns]sql_host = 'localhost'
port = 3306
user = 'root'
password = 'root'
sql_db_name = 'followup'
# sql_table_name = 'np_issue'def replace_nan_with_none(value):"""将 numpy.nan 替换为 None,其他值保持不变。"""return None if np.isnan(value) else value# #数据库SQL上传函数
def sql_connect():try:conn = pymysql.connect(host = sql_host,port = port,user = user,password = password,db = sql_db_name,charset='utf8')return connexcept Exception as e:logging.error('SQL CONNECT' + str(e))def sql_upload(raw_data, table_name):"""将DataFrame raw_data写入MySQL数据库指定的table_name表中。"""# 连接数据库conn = sql_connect()cursor = conn.cursor()# 确保表存在,如果不存在则创建表# 注意:这里简化了表的创建过程,实际应用中可能需要根据raw_data的列名和数据类型创建合适的表结构create_table_sql = f"""CREATE TABLE IF NOT EXISTS {table_name} (-- 假设所有列都是VARCHAR类型,实际情况应根据raw_data的列调整flag VARCHAR(255),period VARCHAR(255),dept VARCHAR(255),-- 其他列...bd VARCHAR(255));"""cursor.execute(create_table_sql)# 构建插入语句insert_sql = f"""INSERT INTO {table_name} ({', '.join(raw_data.columns)})VALUES ({', '.join(['%s'] * len(raw_data.columns))});"""# 执行插入语句for index, row in raw_data.iterrows():try:# 处理列表中的每个元素# print("index:\n",index)row_list = list(row)for i in range(len(row_list)):if row_list[i] is np.nan:row_list[i] = replace_nan_with_none(row_list[i])# if np.nan in row:#     cleaned_row = [replace_nan_with_none(x) for x in list(row)]cursor.execute(insert_sql, row_list)conn.commit()except Exception as e:print("上传失败:", e)print(insert_sql,list(row))conn.rollback()cursor.close()conn.close()print("数据成功上传至数据库")def data_update_PostgreSQL_MySQL(data_type_tips):for i in range(0,len(table_name_list_mysql)):# i=1print("🦁"*30)print(f"第{data_type_tips}次更新{table_name_list_mysql[i]}")print("🐎" * 30)table_name_mysql = table_name_list_mysql[i]table_name_postgresql = table_name_list_postgresql[i]col_address_list = col_address_lists[i]data_columns = columns_names[i]id_list_mysql = []id_list_postgresql = []id_difference = []try:# 使用连接参数建立PostgreSQL连接try:conn_postgresql = psycopg2.connect(**conn_params_postgresql)# print("成功连接到PostgreSQL数据库")# 创建一个cursor对象来执行SQL命令cur_postgresql = conn_postgresql.cursor()# 执行SQL查询(例如:选择所有记录)# cur_postgresql.execute(f"SELECT * FROM {dbname}.{your_table_name};")cur_postgresql.execute(f"SELECT id FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql};")# 获取所有查询结果rows = cur_postgresql.fetchall()for row in rows:id_list_postgresql.append(row[0])# print(row)except (Exception, psycopg2.DatabaseError) as error:print(f"PostgreSQL数据库错误:{error}")# 使用连接参数建立MySQL连接try:conn_mysql = pymysql.connect(**conn_params_mysql)# print("成功连接到MySQL数据库")# 创建一个cursor对象来执行SQL命令cur_mysql = conn_mysql .cursor()# 执行SQL查询(例如:选择所有记录)# 修复 SQL 查询字符串中的表名引用#cur.execute(f"SELECT * FROM {your_table_name};")cur_mysql.execute(f"SELECT id FROM {table_name_mysql};")# 获取所有查询结果rows = cur_mysql.fetchall()for row in rows:id_list_mysql.append(row[0])# print(row)for id in id_list_postgresql:if id not in id_list_mysql:# print(id)id_difference.append(id)print(len(id_difference))# print(id_difference)# sys.exit()print("🐒"*20)if len(id_difference):if len(id_difference) == 1:cur_postgresql.execute(f"SELECT * FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql} where id = '{id_difference[0]}';")else:id_difference = tuple(id_difference)cur_postgresql.execute(f"SELECT * FROM {dbname_postgresql}.{modelname_postgresql}.{table_name_postgresql} where id in {id_difference};")"""新入新的数据库"""# 获取所有查询结果rows_new = cur_postgresql.fetchall()# print(len(rows_new),len(data_columns))print("🐅" * 20)rows_pd = pd.DataFrame(rows_new, columns=data_columns)print("🐍"*20)# print(len(rows_pd.columns),len(rows_new),len(data_columns))print("🐉" * 20)# 处理 DataFrame 中的日期时间列for col in data_columns:data_type = rows_pd[col].dtypeif 'datetime64[ns, UTC+08:00]' in str(data_type):# 移除时区信息rows_pd[col] = rows_pd[col].dt.tz_localize(None)# 将日期时间转换为字符串格式rows_pd[col] = rows_pd[col].dt.strftime('%Y-%m-%d %H:%M:%S')sql_upload(rows_pd, table_name_mysql)print(f"已同步{len(id_difference)}条记录至{table_name_mysql}")# print('★' * 10, '\n', 'successs')else:print(f"已同步{len(id_difference)}条记录至{table_name_mysql}")except (Exception, pymysql.DatabaseError) as error:print(f"MySQL数据库错误:{error}")finally:# 关闭cursor和连接if cur_postgresql:cur_postgresql.close()if conn_postgresql:conn_postgresql.close()# print("PostgreSQL数据库连接已关闭")# 关闭cursor和连接if cur_mysql:cur_mysql.close()if conn_mysql :conn_mysql .close()# print("MySQL数据库连接已关闭")except Exception as e:print(f"数据库同步ERROR {e}")def data_update():for i in range(1,3):# print(f"第{i}次更新数据库")data_update_PostgreSQL_MySQL(i)data_update()

运行结果

在这里插入图片描述

增加定时功能

import schedule
import timedef data_update():print("Updating data...")# 每15分钟执行一次
schedule.every(15).minutes.do(data_update)while True:schedule.run_pending()time.sleep(1)

这篇关于数据同步大冒险:PostgreSQL到MySQL的奇妙之旅的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1118690

相关文章

Python将大量遥感数据的值缩放指定倍数的方法(推荐)

《Python将大量遥感数据的值缩放指定倍数的方法(推荐)》本文介绍基于Python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处理,并将所得处理后数据保存为新的遥感影像... 本文介绍基于python中的gdal模块,批量读取大量多波段遥感影像文件,分别对各波段数据加以数值处

Mysql虚拟列的使用场景

《Mysql虚拟列的使用场景》MySQL虚拟列是一种在查询时动态生成的特殊列,它不占用存储空间,可以提高查询效率和数据处理便利性,本文给大家介绍Mysql虚拟列的相关知识,感兴趣的朋友一起看看吧... 目录1. 介绍mysql虚拟列1.1 定义和作用1.2 虚拟列与普通列的区别2. MySQL虚拟列的类型2

使用MongoDB进行数据存储的操作流程

《使用MongoDB进行数据存储的操作流程》在现代应用开发中,数据存储是一个至关重要的部分,随着数据量的增大和复杂性的增加,传统的关系型数据库有时难以应对高并发和大数据量的处理需求,MongoDB作为... 目录什么是MongoDB?MongoDB的优势使用MongoDB进行数据存储1. 安装MongoDB

mysql数据库分区的使用

《mysql数据库分区的使用》MySQL分区技术通过将大表分割成多个较小片段,提高查询性能、管理效率和数据存储效率,本文就来介绍一下mysql数据库分区的使用,感兴趣的可以了解一下... 目录【一】分区的基本概念【1】物理存储与逻辑分割【2】查询性能提升【3】数据管理与维护【4】扩展性与并行处理【二】分区的

MySQL中时区参数time_zone解读

《MySQL中时区参数time_zone解读》MySQL时区参数time_zone用于控制系统函数和字段的DEFAULTCURRENT_TIMESTAMP属性,修改时区可能会影响timestamp类型... 目录前言1.时区参数影响2.如何设置3.字段类型选择总结前言mysql 时区参数 time_zon

Python MySQL如何通过Binlog获取变更记录恢复数据

《PythonMySQL如何通过Binlog获取变更记录恢复数据》本文介绍了如何使用Python和pymysqlreplication库通过MySQL的二进制日志(Binlog)获取数据库的变更记录... 目录python mysql通过Binlog获取变更记录恢复数据1.安装pymysqlreplicat

Linux使用dd命令来复制和转换数据的操作方法

《Linux使用dd命令来复制和转换数据的操作方法》Linux中的dd命令是一个功能强大的数据复制和转换实用程序,它以较低级别运行,通常用于创建可启动的USB驱动器、克隆磁盘和生成随机数据等任务,本文... 目录简介功能和能力语法常用选项示例用法基础用法创建可启动www.chinasem.cn的 USB 驱动

使用SQL语言查询多个Excel表格的操作方法

《使用SQL语言查询多个Excel表格的操作方法》本文介绍了如何使用SQL语言查询多个Excel表格,通过将所有Excel表格放入一个.xlsx文件中,并使用pandas和pandasql库进行读取和... 目录如何用SQL语言查询多个Excel表格如何使用sql查询excel内容1. 简介2. 实现思路3

Oracle数据库使用 listagg去重删除重复数据的方法汇总

《Oracle数据库使用listagg去重删除重复数据的方法汇总》文章介绍了在Oracle数据库中使用LISTAGG和XMLAGG函数进行字符串聚合并去重的方法,包括去重聚合、使用XML解析和CLO... 目录案例表第一种:使用wm_concat() + distinct去重聚合第二种:使用listagg,

Python实现将实体类列表数据导出到Excel文件

《Python实现将实体类列表数据导出到Excel文件》在数据处理和报告生成中,将实体类的列表数据导出到Excel文件是一项常见任务,Python提供了多种库来实现这一目标,下面就来跟随小编一起学习一... 目录一、环境准备二、定义实体类三、创建实体类列表四、将实体类列表转换为DataFrame五、导出Da