本文主要是介绍Datax 支持增量 postgresql writeMode update,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Datax 支持 postgresql update
- datax介绍
- 支持增量 postgresql update
- 修改 PostgresqlWriter.java
- 修改WriterUtil.java
- 效果
- 源码
datax介绍
DataX 是阿里巴巴集团内被广泛使用的离线数据同步工具/平台,实现包括 MySQL、Oracle、SqlServer、Postgre、HDFS、Hive、ADS、HBase、TableStore(OTS)、MaxCompute(ODPS)、DRDS 等各种异构数据源之间高效的数据同步功能。
支持增量 postgresql update
我们使用datax 希望支持postgresql 增量导入数据:地址:https://gitee.com/cecotw/DataX
链接:https://pan.baidu.com/s/1mbEvLsDZZNWMYrTTTeYkAw 密码:v97c
修改 PostgresqlWriter.java
删除限制:
修改WriterUtil.java
添加postgresql 数据插入类型转换:
public static String getWriteTemplate(List<String> columnHolders, List<String> valueHolders, String writeMode, DataBaseType dataBaseType, boolean forceUseUpdate) {boolean isWriteModeLegal = writeMode.trim().toLowerCase().startsWith("insert")|| writeMode.trim().toLowerCase().startsWith("replace")|| writeMode.trim().toLowerCase().startsWith("update");if (!isWriteModeLegal) {throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_VALUE,String.format("您所配置的 writeMode:%s 错误. 因为DataX 目前仅支持replace,update 或 insert 方式. 请检查您的配置并作出修改.", writeMode));}// && writeMode.trim().toLowerCase().startsWith("replace")String writeDataSqlTemplate;if (forceUseUpdate ||((dataBaseType == DataBaseType.MySql || dataBaseType == DataBaseType.Tddl) && writeMode.trim().toLowerCase().startsWith("update"))) {//update只在mysql下使用writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onDuplicateKeyUpdateString(columnHolders)).toString();} else {if (dataBaseType == DataBaseType.PostgreSQL) {writeDataSqlTemplate = new StringBuilder().append("INSERT INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").append(onConFlictDoString(writeMode, columnHolders)).toString();} else {//这里是保护,如果其他错误的使用了update,需要更换为replaceif (writeMode.trim().toLowerCase().startsWith("update")) {writeMode = "replace";}writeDataSqlTemplate = new StringBuilder().append(writeMode).append(" INTO %s (").append(StringUtils.join(columnHolders, ",")).append(") VALUES(").append(StringUtils.join(valueHolders, ",")).append(")").toString();}}return writeDataSqlTemplate;}
增加onConFlictDoString方法:
public static String onConFlictDoString(String conflict, List<String> columnHolders) {conflict = conflict.replace("update", "");StringBuilder sb = new StringBuilder();sb.append(" ON CONFLICT ");sb.append(conflict);sb.append(" DO ");if (columnHolders == null || columnHolders.size() < 1) {sb.append("NOTHING");return sb.toString();}sb.append(" UPDATE SET ");boolean first = true;for (String column : columnHolders) {if (!first) {sb.append(",");} else {first = false;}sb.append(column);sb.append("=excluded.");sb.append(column);}return sb.toString();}
效果
{"job": {"setting": {"speed": {"byte": 1048576},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "postgresqlreader","parameter": {"username": "postgres","password": "postgres","connection": [{"querySql":["SELECT seq,userid,name FROM user"],"jdbcUrl": ["jdbc:postgresql://127.0.0.1:5432/postgres"]}]}},"writer": {"name": "postgresqlwriter","parameter": {"username": "thsdb","password": "thsdb_outsev","column": ["seq","userid","name"],"connection": [{"jdbcUrl": "jdbc:postgresql://127.0.0.1:5432/postgres","table": ["user1"]}],"writeMode": "update (seq,userid)"}}}]}
}
源码
- 关于 DATAX改造后的代码 ,参考 这儿.(https://gitee.com/cecotw/DataX)
这篇关于Datax 支持增量 postgresql writeMode update的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!