异构数据同步 datax (2)-postgres 写扩展

2024-08-21 00:20

本文主要是介绍异构数据同步 datax (2)-postgres 写扩展,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、postgres SQL 支持  插入更新操作(与mysql 语法有一定差异)

可参考下面文章

MySQL + PostgreSQL批量插入更新insertOrUpdate_mysql insert update-CSDN博客

2、datax中,可通过源码调整来实现

参考来源

https://juejin.cn/post/7124899170615296013

3、源码调整注意事项

datax : 版本 

源码下载,自行用idea进行打包编译,修改完如下类,

com.alibaba.datax.plugin.writer.postgresqlwriter.PostgresqlWriter

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil

编译替换jar文件名:

postgresqlwriter-0.0.1-SNAPSHOT.jar

plugin-rdbms-util-0.0.1-SNAPSHOT.jar

目录树如下:(plugin/writer/postgresqlwriter)

find <目录路径> | sed -e 's/[^-][^\/]*\//--/g' -e 's/--/|-/'

|-lib
|-bin
|-job
|-conf
|-log
|-log_perf
|-tmp
|-script
|-plugin
|---writer
|-----postgresqlwriter
|-------plugin_job_template.json
|-------plugin.json
|-------libs
|---------checker-qual-3.5.0.jar
|---------postgresql-42.3.3.jar
|---------commons-collections-3.0.jar
|---------druid-1.0.15.jar
|---------commons-lang3-3.3.2.jar
|---------logback-core-1.0.13.jar
|---------commons-io-2.4.jar
|---------datax-common-0.0.1-SNAPSHOT.jar
|---------guava-r05.jar
|---------plugin-rdbms-util-0.0.1-SNAPSHOT.jar
|---------hamcrest-core-1.3.jar
|---------logback-classic-1.0.13.jar
|---------commons-math3-3.1.1.jar
|---------slf4j-api-1.7.10.jar
|---------fastjson2-2.0.23.jar
|-------postgresqlwriter-0.0.1-SNAPSHOT.jar

4、使用

4.1 、可以支持带有唯一索引的表的新增或者更新

mysql 表结构

CREATE TABLE `sys_test_copy2` (`user_id` bigint NOT NULL DEFAULT '0' COMMENT '用户ID',`email` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT '' COMMENT '用户邮箱',`iso_country_code` varchar(3) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT 'ISO国家代码',`country` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '国家',`brand_no` varchar(30) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '品牌',`source` varchar(50) CHARACTER SET utf8mb4 COLLATE utf8mb4_general_ci DEFAULT NULL COMMENT '来源',`create_time` datetime DEFAULT NULL COMMENT '创建时间',PRIMARY KEY (`user_id`),UNIQUE KEY `sys_test_copy2_u1` (`email`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci;

目标 PG表结构

CREATE TABLE "public"."sys_test_copy2" ("user_id" int8 NOT NULL,"email" varchar(50) COLLATE "pg_catalog"."default","iso_country_code" varchar(3) COLLATE "pg_catalog"."default","country" varchar(50) COLLATE "pg_catalog"."default","brand_no" varchar(30) COLLATE "pg_catalog"."default","source" varchar(50) COLLATE "pg_catalog"."default","create_time" timestamp(6),CONSTRAINT "sys_test_copy2_pkey" PRIMARY KEY ("user_id")
)
;ALTER TABLE "public"."sys_test_copy2" OWNER TO "postgres";CREATE UNIQUE INDEX "sys_test_copy2_u1" ON "public"."sys_test_copy2" USING btree ("email" COLLATE "pg_catalog"."default" "pg_catalog"."text_ops" ASC NULLS LAST
);

datax job: 

{"job":{"content":[{"reader":{"name":"mysqlreader","parameter":{"username": "root","password": "xxxxxx","connection": [{"jdbcUrl": ["jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"],"querySql": [" SELECT * from  sys_test_copy2"]}}},"writer":{"name":"postgresqlwriter","parameter":{"writeMode": "update!@#(user_id)!@#(email)","column":["id","name"],"connection":[{"jdbcUrl":"jdbc:postgresql://127.0.0.1:5432/postgres","table":["sys_test_copy2"]}],"password":"xxxx","username":"postgres"}}}],"setting":{"speed":{"channel":6}}}
}

执行job,生成的模版语句:

INSERT INTO %s (user_id,email,iso_country_code,country,brand_no,source,create_time) VALUES(?::int8,?::varchar,?::varchar,?::varchar,?::varchar,?::varchar,?::timestamp) ON CONFLICT (user_id) DO UPDATE SET email=EXCLUDED.email

4.2、根据主键进行新增或者更新

INSERT INTO sys_test_copy1(user_id, email) VALUES (5592, 'xxxx5@hotmail.com')  ON CONFLICT (user_id) do nothing;

表结构就不放了,去掉唯一索引

datax job:

{"job": {"setting": {"speed": {"channel": 5},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "数据库密码","connection": [{"jdbcUrl": ["jdbc:mysql://192.168.5.180:3306/xxxx?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"],"querySql": [" SELECT * from  sys_test_copy1"]}]}},"writer": {"name": "postgresqlwriter","parameter": {"username": "postgres","password": "数据库密码","writeMode": "insert!@#(user_id)","column": ["*"],                      "connection": [{"table": ["sys_test_copy1"],"jdbcUrl": "jdbc:postgresql://192.168.5.190:5432/xxxx",}]}}}]}
}

其实都是写的 insert into on CONFLICT 语句

com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil

下面的代码后续调整下规则,

private static String onDuplicateKeyUpdateString(String writeMode, List<String> columnHolders) {String[] writeModeArr = writeMode.split("!@#", -1);int writeModeArrLen = writeModeArr.length;writeMode = writeModeArr[0];StringBuilder sb = new StringBuilder();if ("insert".equals(writeMode) && writeModeArrLen == 2) {sb.append(" ON CONFLICT ").append(writeModeArr[1]).append(" do nothing");}if ("update".equals(writeMode) && writeModeArrLen == 3) {sb.append(" ON CONFLICT ").append(writeModeArr[1]);String[] updateFieldArr = writeModeArr[2].replace("(","").replace(")","").split(",", -1);List<String> updateSqlList = new ArrayList<>();for (String updateField : updateFieldArr) {if (!columnHolders.contains(updateField)) {continue;}updateSqlList.add(updateField + "=EXCLUDED." + updateField);}if (updateSqlList.isEmpty()) {sb.append(" DO NOTHING");} else {sb.append(" DO UPDATE SET ").append(StringUtils.join(updateSqlList, ","));}}return sb.toString();}

小结:

pg插件,目前不支持插入更新操作,需要手工调整源码来适配。适配注意点,是根据你是否配置唯一索引来决定。(insert or update)

下期将简单介绍下,如果通过xxl-job 来执行 脚本

python datax.py ./job/mysql_postgres_job.json


 

这篇关于异构数据同步 datax (2)-postgres 写扩展的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1091592

相关文章

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient

SpringValidation数据校验之约束注解与分组校验方式

《SpringValidation数据校验之约束注解与分组校验方式》本文将深入探讨SpringValidation的核心功能,帮助开发者掌握约束注解的使用技巧和分组校验的高级应用,从而构建更加健壮和可... 目录引言一、Spring Validation基础架构1.1 jsR-380标准与Spring整合1

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2

SpringBatch数据写入实现

《SpringBatch数据写入实现》SpringBatch通过ItemWriter接口及其丰富的实现,提供了强大的数据写入能力,本文主要介绍了SpringBatch数据写入实现,具有一定的参考价值,... 目录python引言一、ItemWriter核心概念二、数据库写入实现三、文件写入实现四、多目标写入

使用Python将JSON,XML和YAML数据写入Excel文件

《使用Python将JSON,XML和YAML数据写入Excel文件》JSON、XML和YAML作为主流结构化数据格式,因其层次化表达能力和跨平台兼容性,已成为系统间数据交换的通用载体,本文将介绍如何... 目录如何使用python写入数据到Excel工作表用Python导入jsON数据到Excel工作表用

Mysql如何将数据按照年月分组的统计

《Mysql如何将数据按照年月分组的统计》:本文主要介绍Mysql如何将数据按照年月分组的统计方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql将数据按照年月分组的统计要的效果方案总结Mysql将数据按照年月分组的统计要的效果方案① 使用 DA

鸿蒙中Axios数据请求的封装和配置方法

《鸿蒙中Axios数据请求的封装和配置方法》:本文主要介绍鸿蒙中Axios数据请求的封装和配置方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1.配置权限 应用级权限和系统级权限2.配置网络请求的代码3.下载在Entry中 下载AxIOS4.封装Htt

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4