Streamsets Postgresql 实时同步到Kudu

2023-10-17 20:30

本文主要是介绍Streamsets Postgresql 实时同步到Kudu,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Streamsets提供两种方式同步Postgresql,一种是JDBC、query,另一种是CDC方式,实时同步需要两者结合来首次同步。

首先需要全表同步,采用JDBC方式比较好:

这个比同步Mysql方便,可以写多个模式多个表同时同步。

这个是完成一次同步就触发,不至于没有数据进来报错。下一次事务继续同步。

这个一定要配置,不然_int  json 格式就会报错。 

勾选一下,然后Type转换主要是把时间格式转String。 kudu里面记录时间的字段全部是string格式。直接开始就行。

JDBC Multitable Consumer 里面有性能配置,可以参考官网。

等待全量同步的时候,可以配置PostgreSQL CDC Client pipeline流:

 

 

不认识的格式直接当成string 传入流中。 

 Jython Evaluator 配置(ETL重点):

import time
import datetimefor record in records:try:for change in record.value['change']:newRecord = sdcFunctions.createRecord(record.sourceId + str(time.time()))newRecord.value = {}newRecord.attributes['xid'] = str(record.value['xid'])newRecord.attributes['nextlsn'] = record.value['nextlsn']newRecord.attributes['timestamp'] = record.value['timestamp']newRecord.attributes['kind'] = change['kind']newRecord.attributes['schema'] = change['schema']        newRecord.attributes['jdbc.tables'] = change['table']if change['kind'] == 'insert':newRecord.attributes['sdc.operation.type'] = '1'if change['kind'] == 'delete':newRecord.attributes['sdc.operation.type'] = '2'if change['kind'] == 'update':newRecord.attributes['sdc.operation.type'] = '3'if 'columnnames' in change:columns = change['columnnames']types = change['columntypes']values = change['columnvalues']else:columns = change['oldkeys']['keynames']types = change['oldkeys']['keytypes']values = change['oldkeys']['keyvalues']for j in range(len(columns)):name = columns[j]type = types[j]value = values[j]newRecord.value[name] = valueoutput.write(newRecord)## optional, if we want to keep the original record,## otherwise we just put the new record in the batch.#output.write(record)except Exception as e:# Send record to errorerror.write(record, str(e))

Stream Selector 配置:分流

 kudu端配置比较简单。

当看到JDBC同步的数据很缓慢的时候,就可以直接开启CDC,然后关闭JDBC。有条件的可以先停止原始库写入数据的事务,等JDBC 同步完,开启CDC 再开启原始表的写入数据。

这篇关于Streamsets Postgresql 实时同步到Kudu的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/227753

相关文章

Java中将异步调用转为同步的五种实现方法

《Java中将异步调用转为同步的五种实现方法》本文介绍了将异步调用转为同步阻塞模式的五种方法:wait/notify、ReentrantLock+Condition、Future、CountDownL... 目录异步与同步的核心区别方法一:使用wait/notify + synchronized代码示例关键

对postgresql日期和时间的比较

《对postgresql日期和时间的比较》文章介绍了在数据库中处理日期和时间类型时的一些注意事项,包括如何将字符串转换为日期或时间类型,以及在比较时自动转换的情况,作者建议在使用数据库时,根据具体情况... 目录PostgreSQL日期和时间比较DB里保存到时分秒,需要和年月日比较db里存储date或者ti

详谈redis跟数据库的数据同步问题

《详谈redis跟数据库的数据同步问题》文章讨论了在Redis和数据库数据一致性问题上的解决方案,主要比较了先更新Redis缓存再更新数据库和先更新数据库再更新Redis缓存两种方案,文章指出,删除R... 目录一、Redis 数据库数据一致性的解决方案1.1、更新Redis缓存、删除Redis缓存的区别二

Nacos集群数据同步方式

《Nacos集群数据同步方式》文章主要介绍了Nacos集群中服务注册信息的同步机制,涉及到负责节点和非负责节点之间的数据同步过程,以及DistroProtocol协议在同步中的应用... 目录引言负责节点(发起同步)DistroProtocolDistroSyncChangeTask获取同步数据getDis

PostgreSQL如何查询表结构和索引信息

《PostgreSQL如何查询表结构和索引信息》文章介绍了在PostgreSQL中查询表结构和索引信息的几种方法,包括使用`d`元命令、系统数据字典查询以及使用可视化工具DBeaver... 目录前言使用\d元命令查看表字段信息和索引信息通过系统数据字典查询表结构通过系统数据字典查询索引信息查询所有的表名可

PostgreSQL如何用psql运行SQL文件

《PostgreSQL如何用psql运行SQL文件》文章介绍了两种运行预写好的SQL文件的方式:首先连接数据库后执行,或者直接通过psql命令执行,需要注意的是,文件路径在Linux系统中应使用斜杠/... 目录PostgreSQ编程L用psql运行SQL文件方式一方式二总结PostgreSQL用psql运

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

PostgreSQL核心功能特性与使用领域及场景分析

PostgreSQL有什么优点? 开源和免费 PostgreSQL是一个开源的数据库管理系统,可以免费使用和修改。这降低了企业的成本,并为开发者提供了一个活跃的社区和丰富的资源。 高度兼容 PostgreSQL支持多种操作系统(如Linux、Windows、macOS等)和编程语言(如C、C++、Java、Python、Ruby等),并提供了多种接口(如JDBC、ODBC、ADO.NET等