Streamsets Postgresql 实时同步到Kudu

2023-10-17 20:30

本文主要是介绍Streamsets Postgresql 实时同步到Kudu,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Streamsets提供两种方式同步Postgresql,一种是JDBC、query,另一种是CDC方式,实时同步需要两者结合来首次同步。

首先需要全表同步,采用JDBC方式比较好:

这个比同步Mysql方便,可以写多个模式多个表同时同步。

这个是完成一次同步就触发,不至于没有数据进来报错。下一次事务继续同步。

这个一定要配置,不然_int  json 格式就会报错。 

勾选一下,然后Type转换主要是把时间格式转String。 kudu里面记录时间的字段全部是string格式。直接开始就行。

JDBC Multitable Consumer 里面有性能配置,可以参考官网。

等待全量同步的时候,可以配置PostgreSQL CDC Client pipeline流:

 

 

不认识的格式直接当成string 传入流中。 

 Jython Evaluator 配置(ETL重点):

import time
import datetimefor record in records:try:for change in record.value['change']:newRecord = sdcFunctions.createRecord(record.sourceId + str(time.time()))newRecord.value = {}newRecord.attributes['xid'] = str(record.value['xid'])newRecord.attributes['nextlsn'] = record.value['nextlsn']newRecord.attributes['timestamp'] = record.value['timestamp']newRecord.attributes['kind'] = change['kind']newRecord.attributes['schema'] = change['schema']        newRecord.attributes['jdbc.tables'] = change['table']if change['kind'] == 'insert':newRecord.attributes['sdc.operation.type'] = '1'if change['kind'] == 'delete':newRecord.attributes['sdc.operation.type'] = '2'if change['kind'] == 'update':newRecord.attributes['sdc.operation.type'] = '3'if 'columnnames' in change:columns = change['columnnames']types = change['columntypes']values = change['columnvalues']else:columns = change['oldkeys']['keynames']types = change['oldkeys']['keytypes']values = change['oldkeys']['keyvalues']for j in range(len(columns)):name = columns[j]type = types[j]value = values[j]newRecord.value[name] = valueoutput.write(newRecord)## optional, if we want to keep the original record,## otherwise we just put the new record in the batch.#output.write(record)except Exception as e:# Send record to errorerror.write(record, str(e))

Stream Selector 配置:分流

 kudu端配置比较简单。

当看到JDBC同步的数据很缓慢的时候,就可以直接开启CDC,然后关闭JDBC。有条件的可以先停止原始库写入数据的事务,等JDBC 同步完,开启CDC 再开启原始表的写入数据。

这篇关于Streamsets Postgresql 实时同步到Kudu的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/227753

相关文章

Win11安装PostgreSQL数据库的两种方式详细步骤

《Win11安装PostgreSQL数据库的两种方式详细步骤》PostgreSQL是备受业界青睐的关系型数据库,尤其是在地理空间和移动领域,:本文主要介绍Win11安装PostgreSQL数据库的... 目录一、exe文件安装 (推荐)下载安装包1. 选择操作系统2. 跳转到EDB(PostgreSQL 的

Python 中的异步与同步深度解析(实践记录)

《Python中的异步与同步深度解析(实践记录)》在Python编程世界里,异步和同步的概念是理解程序执行流程和性能优化的关键,这篇文章将带你深入了解它们的差异,以及阻塞和非阻塞的特性,同时通过实际... 目录python中的异步与同步:深度解析与实践异步与同步的定义异步同步阻塞与非阻塞的概念阻塞非阻塞同步

Linux搭建Mysql主从同步的教程

《Linux搭建Mysql主从同步的教程》:本文主要介绍Linux搭建Mysql主从同步的教程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux搭建mysql主从同步1.启动mysql服务2.修改Mysql主库配置文件/etc/my.cnf3.重启主库my

Ubuntu 22.04 服务器安装部署(nginx+postgresql)

《Ubuntu22.04服务器安装部署(nginx+postgresql)》Ubuntu22.04LTS是迄今为止最好的Ubuntu版本之一,很多linux的应用服务器都是选择的这个版本... 目录是什么让 Ubuntu 22.04 LTS 变得安全?更新了安全包linux 内核改进一、部署环境二、安装系统

Java中将异步调用转为同步的五种实现方法

《Java中将异步调用转为同步的五种实现方法》本文介绍了将异步调用转为同步阻塞模式的五种方法:wait/notify、ReentrantLock+Condition、Future、CountDownL... 目录异步与同步的核心区别方法一:使用wait/notify + synchronized代码示例关键

对postgresql日期和时间的比较

《对postgresql日期和时间的比较》文章介绍了在数据库中处理日期和时间类型时的一些注意事项,包括如何将字符串转换为日期或时间类型,以及在比较时自动转换的情况,作者建议在使用数据库时,根据具体情况... 目录PostgreSQL日期和时间比较DB里保存到时分秒,需要和年月日比较db里存储date或者ti

详谈redis跟数据库的数据同步问题

《详谈redis跟数据库的数据同步问题》文章讨论了在Redis和数据库数据一致性问题上的解决方案,主要比较了先更新Redis缓存再更新数据库和先更新数据库再更新Redis缓存两种方案,文章指出,删除R... 目录一、Redis 数据库数据一致性的解决方案1.1、更新Redis缓存、删除Redis缓存的区别二

Nacos集群数据同步方式

《Nacos集群数据同步方式》文章主要介绍了Nacos集群中服务注册信息的同步机制,涉及到负责节点和非负责节点之间的数据同步过程,以及DistroProtocol协议在同步中的应用... 目录引言负责节点(发起同步)DistroProtocolDistroSyncChangeTask获取同步数据getDis

PostgreSQL如何查询表结构和索引信息

《PostgreSQL如何查询表结构和索引信息》文章介绍了在PostgreSQL中查询表结构和索引信息的几种方法,包括使用`d`元命令、系统数据字典查询以及使用可视化工具DBeaver... 目录前言使用\d元命令查看表字段信息和索引信息通过系统数据字典查询表结构通过系统数据字典查询索引信息查询所有的表名可

PostgreSQL如何用psql运行SQL文件

《PostgreSQL如何用psql运行SQL文件》文章介绍了两种运行预写好的SQL文件的方式:首先连接数据库后执行,或者直接通过psql命令执行,需要注意的是,文件路径在Linux系统中应使用斜杠/... 目录PostgreSQ编程L用psql运行SQL文件方式一方式二总结PostgreSQL用psql运