异构数据同步 datax (2)-postgres 写扩展

2024-08-21 00:20

本文主要是介绍异构数据同步 datax (2)-postgres 写扩展,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、postgres SQL 支持  插入更新操作(与mysql 语法有一定差异)

可参考下面文章

MySQL + PostgreSQL批量插入更新insertOrUpdate_mysql insert update-CSDN博客

2、datax中,可通过源码调整来实现

参考来源

https://juejin.cn/post/7124899170615296013

3、源码调整注意事项

datax : 版本 

源码下载,自行用idea进行打包编译,修改完如下类,

com.alibaba.datax.plugin.writer.postgresqlwriter.PostgresqlWriter

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil

编译替换jar文件名:

postgresqlwriter-0.0.1-SNAPSHOT.jar

plugin-rdbms-util-0.0.1-SNAPSHOT.jar

目录树如下:(plugin/writer/postgresqlwriter)

find <目录路径> | sed -e 's/[^-][^\/]*\//--/g' -e 's/--/|-/'

|-lib
|-bin
|-job
|-conf
|-log
|-log_perf
|-tmp
|-script
|-plugin
|---writer
|-----postgresqlwriter
|-------plugin_job_template.json
|-------plugin.json
|-------libs
|---------checker-qual-3.5.0.jar
|---------postgresql-42.3.3.jar
|---------commons-collections-3.0.jar
|---------druid-1.0.15.jar
|---------commons-lang3-3.3.2.jar
|---------logback-core-1.0.13.jar
|---------commons-io-2.4.jar
|---------datax-common-0.0.1-SNAPSHOT.jar
|---------guava-r05.jar
|---------plugin-rdbms-util-0.0.1-SNAPSHOT.jar
|---------hamcrest-core-1.3.jar
|---------logback-classic-1.0.13.jar
|---------commons-math3-3.1.1.jar
|---------slf4j-api-1.7.10.jar
|---------fastjson2-2.0.23.jar
|-------postgresqlwriter-0.0.1-SNAPSHOT.jar

4、使用

4.1 、可以支持带有唯一索引的表的新增或者更新

mysql 表结构

CREATE TABLE `sys_test_copy2` (`user_id` bigint NOT NULL DEFAULT '0' COMMENT '用户ID',`email` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '' COMMENT '用户邮箱',`iso_country_code` varchar(3) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'ISO国家代码',`country` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '国家',`brand_no` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '品牌',`source` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '来源',`create_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`user_id`),UNIQUE KEY `sys_test_copy2_u1` (`email`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

目标 PG表结构

CREATE TABLE "public"."sys_test_copy2" ("user_id" int8 NOT NULL,"email" varchar(50) COLLATE "pg_catalog"."default","iso_country_code" varchar(3) COLLATE "pg_catalog"."default","country" varchar(50) COLLATE "pg_catalog"."default","brand_no" varchar(30) COLLATE "pg_catalog"."default","source" varchar(50) COLLATE "pg_catalog"."default","create_time" timestamp(6),CONSTRAINT "sys_test_copy2_pkey" PRIMARY KEY ("user_id")
)
;ALTER TABLE "public"."sys_test_copy2" OWNER TO "postgres";CREATE UNIQUE INDEX "sys_test_copy2_u1" ON "public"."sys_test_copy2" USING btree ("email" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);

datax job: 

{"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"username": "root","password": "xxxxxx","connection": [{"jdbcUrl": ["jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"],"querySql": [" SELECT * from  sys_test_copy2"]}}},"writer":{"name":"postgresqlwriter","parameter":{"writeMode": "update!@#(user_id)!@#(email)","column":["id","name"],"connection":[{"jdbcUrl":"jdbc:postgresql://127.0.0.1:5432/postgres","table":["sys_test_copy2"]}],"password":"xxxx","username":"postgres"}}}],"setting":{"speed":{"channel":6}}}
}

执行job,生成的模版语句:

INSERT INTO %s (user_id,email,iso_country_code,country,brand_no,source,create_time) VALUES(?::int8,?::varchar,?::varchar,?::varchar,?::varchar,?::varchar,?::timestamp) ON CONFLICT (user_id) DO UPDATE SET email=EXCLUDED.email

4.2、根据主键进行新增或者更新

INSERT INTO sys_test_copy1(user_id, email) VALUES (5592, 'xxxx5@hotmail.com')  ON CONFLICT (user_id) do nothing;

表结构就不放了,去掉唯一索引

datax job:

{"job": {"setting": {"speed": {"channel": 5},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "数据库密码","connection": [{"jdbcUrl": ["jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"],"querySql": [" SELECT * from  sys_test_copy1"]}]}},"writer": {"name": "postgresqlwriter","parameter": {"username": "postgres","password": "数据库密码","writeMode": "insert!@#(user_id)","column": ["*"],                      "connection": [{"table": ["sys_test_copy1"],"jdbcUrl": "jdbc:postgresql://192.168.5.190:5432/xxxx",}]}}}]}
}

其实都是写的 insert into on CONFLICT 语句

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil

下面的代码后续调整下规则,

private static String onDuplicateKeyUpdateString(String writeMode, List<String> columnHolders) {String[] writeModeArr = writeMode.split("!@#", -1);int writeModeArrLen = writeModeArr.length;writeMode = writeModeArr[0];StringBuilder sb = new StringBuilder();if ("insert".equals(writeMode) && writeModeArrLen == 2) {sb.append(" ON CONFLICT ").append(writeModeArr[1]).append(" do nothing");}if ("update".equals(writeMode) && writeModeArrLen == 3) {sb.append(" ON CONFLICT ").append(writeModeArr[1]);String[] updateFieldArr = writeModeArr[2].replace("(","").replace(")","").split(",", -1);List<String> updateSqlList = new ArrayList<>();for (String updateField : updateFieldArr) {if (!columnHolders.contains(updateField)) {continue;}updateSqlList.add(updateField + "=EXCLUDED." + updateField);}if (updateSqlList.isEmpty()) {sb.append(" DO NOTHING");} else {sb.append(" DO UPDATE SET ").append(StringUtils.join(updateSqlList, ","));}}return sb.toString();}

小结:

pg插件,目前不支持插入更新操作,需要手工调整源码来适配。适配注意点,是根据你是否配置唯一索引来决定。(insert or update)

下期将简单介绍下,如果通过xxl-job 来执行 脚本

python datax.py ./job/mysql_postgres_job.json


 

这篇关于异构数据同步 datax (2)-postgres 写扩展的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1091592

相关文章

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模