Datax 支持增量 postgresql writeMode update

2024-09-04 00:18

本文主要是介绍Datax 支持增量 postgresql writeMode update,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Datax 支持 postgresql update

  • datax介绍
    • 支持增量 postgresql update
    • 修改 PostgresqlWriter.java
    • 修改WriterUtil.java
    • 效果
    • 源码

datax介绍

DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。

支持增量 postgresql update

我们使用datax 希望支持postgresql 增量导入数据:地址:https://gitee.com/cecotw/DataX

链接:https://pan.baidu.com/s/1mbEvLsDZZNWMYrTTTeYkAw 密码:v97c

修改 PostgresqlWriter.java

删除限制:
删除pg写入模式限制

修改WriterUtil.java

添加postgresql 数据插入类型转换:
添加postgresql 数据插入类型转换

    public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")|| writeMode.trim().toLowerCase().startsWith("replace")|| writeMode.trim().toLowerCase().startsWith("update");if (!isWriteModeLegal) {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));}// && writeMode.trim().toLowerCase().startsWith("replace")String writeDataSqlTemplate;if (forceUseUpdate ||((dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) && writeMode.trim().toLowerCase().startsWith("update"))) {//update只在mysql下使用writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onDuplicateKeyUpdateString(columnHolders)).toString();} else {if (dataBaseType == DataBaseType.PostgreSQL) {writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onConFlictDoString(writeMode, columnHolders)).toString();} else {//这里是保护,如果其他错误的使用了update,需要更换为replaceif (writeMode.trim().toLowerCase().startsWith("update")) {writeMode = "replace";}writeDataSqlTemplate = new StringBuilder().append(writeMode).append(" INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").toString();}}return writeDataSqlTemplate;}

增加onConFlictDoString方法:
增加onConFlictDoString方法

    public static String onConFlictDoString(String conflict, List<String> columnHolders) {conflict = conflict.replace("update", "");StringBuilder sb = new StringBuilder();sb.append(" ON CONFLICT ");sb.append(conflict);sb.append(" DO ");if (columnHolders == null || columnHolders.size() < 1) {sb.append("NOTHING");return sb.toString();}sb.append(" UPDATE SET ");boolean first = true;for (String column : columnHolders) {if (!first) {sb.append(",");} else {first = false;}sb.append(column);sb.append("=excluded.");sb.append(column);}return sb.toString();}

效果

{"job": {"setting": {"speed": {"byte": 1048576},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "postgresqlreader","parameter": {"username": "postgres","password": "postgres","connection": [{"querySql":["SELECT seq,userid,name FROM user"],"jdbcUrl": ["jdbc:postgresql://127.0.0.1:5432/postgres"]}]}},"writer": {"name": "postgresqlwriter","parameter": {"username": "thsdb","password": "thsdb_outsev","column": ["seq","userid","name"],"connection": [{"jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/postgres","table": ["user1"]}],"writeMode": "update (seq,userid)"}}}]}
}

源码

  • 关于 DATAX改造后的代码 ,参考 这儿.(https://gitee.com/cecotw/DataX)

这篇关于Datax 支持增量 postgresql writeMode update的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1134450

相关文章

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

postgresql数据库基本操作及命令详解

《postgresql数据库基本操作及命令详解》本文介绍了PostgreSQL数据库的基础操作,包括连接、创建、查看数据库,表的增删改查、索引管理、备份恢复及退出命令,适用于数据库管理和开发实践,感兴... 目录1. 连接 PostgreSQL 数据库2. 创建数据库3. 查看当前数据库4. 查看所有数据库

Oracle 数据库数据操作如何精通 INSERT, UPDATE, DELETE

《Oracle数据库数据操作如何精通INSERT,UPDATE,DELETE》在Oracle数据库中,对表内数据进行增加、修改和删除操作是通过数据操作语言来完成的,下面给大家介绍Oracle数... 目录思维导图一、插入数据 (INSERT)1.1 插入单行数据,指定所有列的值语法:1.2 插入单行数据,指

k8s上运行的mysql、mariadb数据库的备份记录(支持x86和arm两种架构)

《k8s上运行的mysql、mariadb数据库的备份记录(支持x86和arm两种架构)》本文记录在K8s上运行的MySQL/MariaDB备份方案,通过工具容器执行mysqldump,结合定时任务实... 目录前言一、获取需要备份的数据库的信息二、备份步骤1.准备工作(X86)1.准备工作(arm)2.手

PostgreSQL数据库密码被遗忘时的操作步骤

《PostgreSQL数据库密码被遗忘时的操作步骤》密码遗忘是常见的用户问题,因此提供一种安全的遗忘密码找回机制是十分必要的,:本文主要介绍PostgreSQL数据库密码被遗忘时的操作步骤的相关资... 目录前言一、背景知识二、Windows环境下的解决步骤1. 找到PostgreSQL安装目录2. 修改p

PostgreSQL 默认隔离级别的设置

《PostgreSQL默认隔离级别的设置》PostgreSQL的默认事务隔离级别是读已提交,这是其事务处理系统的基础行为模式,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价... 目录一 默认隔离级别概述1.1 默认设置1.2 各版本一致性二 读已提交的特性2.1 行为特征2.2

PostgreSQL中MVCC 机制的实现

《PostgreSQL中MVCC机制的实现》本文主要介绍了PostgreSQL中MVCC机制的实现,通过多版本数据存储、快照隔离和事务ID管理实现高并发读写,具有一定的参考价值,感兴趣的可以了解一下... 目录一 MVCC 基本原理python1.1 MVCC 核心概念1.2 与传统锁机制对比二 Postg

华为鸿蒙HarmonyOS 5.1官宣7月开启升级! 首批支持名单公布

《华为鸿蒙HarmonyOS5.1官宣7月开启升级!首批支持名单公布》在刚刚结束的华为Pura80系列及全场景新品发布会上,除了众多新品的发布,还有一个消息也点燃了所有鸿蒙用户的期待,那就是Ha... 在今日的华为 Pura 80 系列及全场景新品发布会上,华为宣布鸿蒙 HarmonyOS 5.1 将于 7

MyBatis Plus 中 update_time 字段自动填充失效的原因分析及解决方案(最新整理)

《MyBatisPlus中update_time字段自动填充失效的原因分析及解决方案(最新整理)》在使用MyBatisPlus时,通常我们会在数据库表中设置create_time和update... 目录前言一、问题现象二、原因分析三、总结:常见原因与解决方法对照表四、推荐写法前言在使用 MyBATis

一文详解PostgreSQL复制参数

《一文详解PostgreSQL复制参数》PostgreSQL作为一款功能强大的开源关系型数据库,其复制功能对于构建高可用性系统至关重要,本文给大家详细介绍了PostgreSQL的复制参数,需要的朋友可... 目录一、复制参数基础概念二、核心复制参数深度解析1. max_wal_seChina编程nders:WAL