DataX-Oracle新增writeMode支持update

2024-03-29 12:52

本文主要是介绍DataX-Oracle新增writeMode支持update,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

前言

第一步下载源码

第二步修改源码

1、Oraclewriter

2、WriterUtil

 2.1、修改getWriteTemplate方法

 2.2、新增onMergeIntoDoString与getStrings方法

3、CommonRdbmsWriter

 3.1、修改startWriteWithConnection

 3.2、修改doBatchInsert

 3.3、修改fillPreparedStatement

第三步打包

第四步脚本修改

修改后jar包地址 




前言

目前 DataX更新到datax_v202309版本还不能支持Oracle写入的update,只通过DataX只能修改源码。

原理:oracle 不支持类似 MySQL的 REPLACE INTO 和 INSERT … ON DUPLICATE KEY UPDATE,所以只支持 insert 配置项。要实现此功能,需要利用 Oracle 的 merge 语句,先来看下 merge 语法。

MERGE INTO [target-table] A USING [source-table sql] B 
ON([conditional expression] and [...]...) 
WHEN MATCHED THEN[UPDATE sql] 
WHEN NOT MATCHED THEN [INSERT sql]

第一步下载源码

 地址:datax_v202309。

第二步修改源码

一共修改3个文件

1、Oraclewriter

 

找到该代码直接注释掉就行。 

2、WriterUtil
 2.1、修改getWriteTemplate方法
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {boolean update = writeMode.trim().toLowerCase().startsWith("update");boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")|| writeMode.trim().toLowerCase().startsWith("replace")|| update;if (!isWriteModeLegal) {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));}// && writeMode.trim().toLowerCase().startsWith("replace")String writeDataSqlTemplate;if (forceUseUpdate || update) {//update只在mysql下使用if (dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) {writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onDuplicateKeyUpdateString(columnHolders)).toString();}//update在Oracle下使用else if (dataBaseType == DataBaseType.Oracle) {writeDataSqlTemplate = onMergeIntoDoString(writeMode, columnHolders, valueHolders) + "INSERT (" +StringUtils.join(columnHolders, ",") +") VALUES(" + StringUtils.join(valueHolders, ",") +")";}else {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("当前数据库不支持 writeMode:%s 模式.", writeMode));}} else {//这里是保护,如果其他错误的使用了update,需要更换为replaceif (update) {writeMode = "replace";}writeDataSqlTemplate = new StringBuilder().append(writeMode).append(" INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").toString();}return writeDataSqlTemplate;}
 2.2、新增onMergeIntoDoString与getStrings方法

代码作用:对Oracle进行update的MERGE拼接

public static String onMergeIntoDoString(String merge, List<String> columnHolders, List<String> valueHolders) {String[] sArray = getStrings(merge);StringBuilder sb = new StringBuilder();sb.append("MERGE INTO %s A USING ( SELECT ");boolean first = true;boolean first1 = true;StringBuilder str = new StringBuilder();StringBuilder update = new StringBuilder();for (String columnHolder : columnHolders) {if (Arrays.asList(sArray).contains(columnHolder)) {if (!first) {sb.append(",");str.append(" AND ");} else {first = false;}str.append("TMP.").append(columnHolder);sb.append("?");str.append(" = ");sb.append(" AS ");str.append("A.").append(columnHolder);sb.append(columnHolder);}}for (String columnHolder : columnHolders) {if (!Arrays.asList(sArray).contains(columnHolder)) {if (!first1) {update.append(",");} else {first1 = false;}update.append(columnHolder);update.append(" = ");update.append("?");}}sb.append(" FROM DUAL ) TMP ON (");sb.append(str);sb.append(" ) WHEN MATCHED THEN UPDATE SET ");sb.append(update);sb.append(" WHEN NOT MATCHED THEN ");return sb.toString();}public static String[] getStrings(String merge) {merge = merge.replace("update", "");merge = merge.replace("(", "");merge = merge.replace(")", "");merge = merge.replace(" ", "");return merge.split(",");}
3、CommonRdbmsWriter
 3.1、修改startWriteWithConnection
        // 替换原先的代码块public void startWriteWithConnection(RecordReceiver recordReceiver, TaskPluginCollector taskPluginCollector, Connection connection) {this.taskPluginCollector = taskPluginCollector;List<String> columns = new LinkedList<>();if (this.dataBaseType == DataBaseType.Oracle && writeMode.trim().toLowerCase().startsWith("update") ) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);this.columns.forEach(column->{if (Arrays.asList(sArray).contains(column)) {columns.add(column);}});this.columns.forEach(column->{if (!Arrays.asList(sArray).contains(column)) {columns.add(column);}});}columns.addAll(this.columns);// 用于写入数据的时候的类型根据目的表字段类型转换this.resultSetMetaData = DBUtil.getColumnMetaData(connection, this.table, StringUtils.join(columns, ","));// 写数据库的SQL语句calcWriteRecordSql();List<Record> writeBuffer = new ArrayList<Record>(this.batchSize);int bufferBytes = 0;try {Record record;while ((record = recordReceiver.getFromReader()) != null) {if (record.getColumnNumber() != this.columnNumber) {// 源头读取字段列数与目的表字段写入列数不相等,直接报错throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,String.format("列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",record.getColumnNumber(),this.columnNumber));}writeBuffer.add(record);bufferBytes += record.getMemorySize();if (writeBuffer.size() >= batchSize || bufferBytes >= batchByteSize) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}}if (!writeBuffer.isEmpty()) {doBatchInsert(connection, writeBuffer);writeBuffer.clear();bufferBytes = 0;}} catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);} finally {writeBuffer.clear();bufferBytes = 0;DBUtil.closeDBResources(null, null, connection);}}
 3.2、修改doBatchInsert
 protected void doBatchInsert(Connection connection, List<Record> buffer)throws SQLException{PreparedStatement preparedStatement = null;try {connection.setAutoCommit(false);preparedStatement = connection.prepareStatement(this.writeRecordSql);if (this.dataBaseType == DataBaseType.Oracle && !"insert".equalsIgnoreCase(this.writeMode)) {String merge = this.writeMode;String[] sArray = WriterUtil.getStrings(merge);for (Record record : buffer) {List<Column> recordOne = new ArrayList<>();for (int j = 0; j < this.columns.size(); j++) {if (Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {if (!Arrays.asList(sArray).contains(this.columns.get(j))) {recordOne.add(record.getColumn(j));}}for (int j = 0; j < this.columns.size(); j++) {recordOne.add(record.getColumn(j));}for (int j = 0; j < recordOne.size(); j++) {record.setColumn(j, recordOne.get(j));}preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}else {for (Record record : buffer) {preparedStatement = fillPreparedStatement(preparedStatement, record);preparedStatement.addBatch();}}preparedStatement.executeBatch();connection.commit();}catch (SQLException e) {LOG.warn("回滚此次写入, 采用每次写入一行方式提交. 因为: {}", e.getMessage());connection.rollback();doOneInsert(connection, buffer);}catch (Exception e) {throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);}finally {DBUtil.closeDBResources(preparedStatement, null);}}
 3.3、修改fillPreparedStatement
  protected PreparedStatement fillPreparedStatement(PreparedStatement preparedStatement, Record record)throws SQLException{for (int i = 0; i < record.getColumnNumber(); i++) {int columnSqltype = this.resultSetMetaData.getMiddle().get(i);String typeName = this.resultSetMetaData.getRight().get(i);preparedStatement = fillPreparedStatementColumnType(preparedStatement, i,columnSqltype, typeName,record.getColumn(i));}return preparedStatement;}

第三步打包

1、只需要在idea里面打包修改的两个程序就可以

 2、打包成功后获取两个jar包

 3、将包替换到datax的插件里面

 将oraclewriter-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter

 将plugin-rdbms-util-0.0.1-SNAPSHOT.jar替换到datax\plugin\writer\oraclewriter\libs

第四步脚本修改

{"job": {"setting": {"speed": {"byte": 1048576},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "${r_username}","password": "${r_password}","connection": [{	   "querySql": ["SELECT f_year,f_code,f_name,f_order FROM tableName"],"jdbcUrl": ["${r_jdbcUrl}"]}]}},"writer": {"name": "oraclewriter","parameter": {"writeMode": "update(f_year,f_code)","username": "${w_username}","password": "${w_password}","column": ["f_year","f_code","f_name","f_order"],"session": [],"preSql": [],"connection": [{"jdbcUrl": "${w_jdbcUrl}","table": ["tableName"]}]}}		   }]}
}

参数 "writeMode": "update(f_year,f_code)" 里面f_year,f_code就是主键, 参数上不要加/"

update(\"f_year\",\"f_code\")这样是拼不上sql的,这个问题调试了好久才解决。

这时候运行就成功了

参考文章DataX 二次开发支持 Oracle 更新数据icon-default.png?t=N7T8https://blog.csdn.net/xch_yang/article/details/128250190?utm_medium=distribute.pc_relevant.none-task-blog-2~default~baidujs_baidulandingword~default-0-128250190-blog-106881907.235%5Ev43%5Epc_blog_bottom_relevance_base8&spm=1001.2101.3001.4242.1&utm_relevant_index=3Datax oracle 支持增量并且支持全量更新icon-default.png?t=N7T8https://blog.csdn.net/weixin_41250031/article/details/122615271?spm=1001.2101.3001.6650.5&utm_medium=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2~default~CTRLIST~Rate-5-122615271-blog-129723622.235%5Ev43%5Epc_blog_bottom_relevance_base8&utm_relevant_index=7

修改后jar包地址 

懒得修改可以直接下载两个jar替换到你们的datax对应目录。

https://download.csdn.net/download/qq_36802726/89046154icon-default.png?t=N7T8https://download.csdn.net/download/qq_36802726/89046154

这篇关于DataX-Oracle新增writeMode支持update的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/858646

相关文章

oracle数据库索引失效的问题及解决

《oracle数据库索引失效的问题及解决》本文总结了在Oracle数据库中索引失效的一些常见场景,包括使用isnull、isnotnull、!=、、、函数处理、like前置%查询以及范围索引和等值索引... 目录oracle数据库索引失效问题场景环境索引失效情况及验证结论一结论二结论三结论四结论五总结ora

Oracle Expdp按条件导出指定表数据的方法实例

《OracleExpdp按条件导出指定表数据的方法实例》:本文主要介绍Oracle的expdp数据泵方式导出特定机构和时间范围的数据,并通过parfile文件进行条件限制和配置,文中通过代码介绍... 目录1.场景描述 2.方案分析3.实验验证 3.1 parfile文件3.2 expdp命令导出4.总结

Oracle数据库执行计划的查看与分析技巧

《Oracle数据库执行计划的查看与分析技巧》在Oracle数据库中,执行计划能够帮助我们深入了解SQL语句在数据库内部的执行细节,进而优化查询性能、提升系统效率,执行计划是Oracle数据库优化器为... 目录一、什么是执行计划二、查看执行计划的方法(一)使用 EXPLAIN PLAN 命令(二)通过 S

Android13_SystemUI下拉框新增音量控制条

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 Android13_SystemUI下拉框新增音量控制条 一、必备知识二、源码分析对比1.brightness模块分析对比2.statusbar/phone 对应模块对比对比初始化类声明对比构造方法 三、源码修改四、相关资源 一、必备知识 在Android12 版本上面已经完成了功能的实现,目前是在And

Oracle type (自定义类型的使用)

oracle - type   type定义: oracle中自定义数据类型 oracle中有基本的数据类型,如number,varchar2,date,numeric,float....但有时候我们需要特殊的格式, 如将name定义为(firstname,lastname)的形式,我们想把这个作为一个表的一列看待,这时候就要我们自己定义一个数据类型 格式 :create or repla

ORACLE 11g 创建数据库时 Enterprise Manager配置失败的解决办法 无法打开OEM的解决办法

在win7 64位系统下安装oracle11g,在使用Database configuration Assistant创建数据库时,在创建到85%的时候报错,错误如下: 解决办法: 在listener.ora中增加对BlueAeri-PC或ip地址的侦听,具体步骤如下: 1.启动Net Manager,在“监听程序”--Listener下添加一个地址,主机名写计

Oracle Start With关键字

Oracle Start With关键字 前言 旨在记录一些Oracle使用中遇到的各种各样的问题. 同时希望能帮到和我遇到同样问题的人. Start With (树查询) 问题描述: 在数据库中, 有一种比较常见得 设计模式, 层级结构 设计模式, 具体到 Oracle table中, 字段特点如下: ID, DSC, PID; 三个字段, 分别表示 当前标识的 ID(主键), DSC 当

oracle分页和mysql分页

mysql 分页 --查前5 数据select * from table_name limit 0,5 select * from table_name limit 5 --limit关键字的用法:LIMIT [offset,] rows--offset指定要返回的第一行的偏移量,rows第二个指定返回行的最大数目。初始行的偏移量是0(不是1)。   oracle 分页 --查前1-9

详解Tomcat 7的七大新特性和新增功能(1)

http://developer.51cto.com/art/201009/228537.htm http://tomcat.apache.org/tomcat-7.0-doc/index.html  Apache发布首个Tomcat 7版本已经发布了有一段时间了,Tomcat 7引入了许多新功能,并对现有功能进行了增强。很多文章列出了Tomcat 7的新功能,但大多数并没有详细解释它们

Golang支持平滑升级的HTTP服务

前段时间用Golang在做一个HTTP的接口,因编译型语言的特性,修改了代码需要重新编译可执行文件,关闭正在运行的老程序,并启动新程序。对于访问量较大的面向用户的产品,关闭、重启的过程中势必会出现无法访问的情况,从而影响用户体验。 使用Golang的系统包开发HTTP服务,是无法支持平滑升级(优雅重启)的,本文将探讨如何解决该问题。 一、平滑升级(优雅重启)的一般思路 一般情况下,要实现平滑