Streamsets Postgresql 实时同步到Kudu

2023-10-17 20:30

本文主要是介绍Streamsets Postgresql 实时同步到Kudu,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Streamsets提供两种方式同步Postgresql,一种是JDBC、query,另一种是CDC方式,实时同步需要两者结合来首次同步。

首先需要全表同步,采用JDBC方式比较好:

这个比同步Mysql方便,可以写多个模式多个表同时同步。

这个是完成一次同步就触发,不至于没有数据进来报错。下一次事务继续同步。

这个一定要配置,不然_int  json 格式就会报错。 

勾选一下,然后Type转换主要是把时间格式转String。 kudu里面记录时间的字段全部是string格式。直接开始就行。

JDBC Multitable Consumer 里面有性能配置,可以参考官网。

等待全量同步的时候,可以配置PostgreSQL CDC Client pipeline流:

 

 

不认识的格式直接当成string 传入流中。 

 Jython Evaluator 配置(ETL重点):

import time
import datetimefor record in records:try:for change in record.value['change']:newRecord = sdcFunctions.createRecord(record.sourceId + str(time.time()))newRecord.value = {}newRecord.attributes['xid'] = str(record.value['xid'])newRecord.attributes['nextlsn'] = record.value['nextlsn']newRecord.attributes['timestamp'] = record.value['timestamp']newRecord.attributes['kind'] = change['kind']newRecord.attributes['schema'] = change['schema']        newRecord.attributes['jdbc.tables'] = change['table']if change['kind'] == 'insert':newRecord.attributes['sdc.operation.type'] = '1'if change['kind'] == 'delete':newRecord.attributes['sdc.operation.type'] = '2'if change['kind'] == 'update':newRecord.attributes['sdc.operation.type'] = '3'if 'columnnames' in change:columns = change['columnnames']types = change['columntypes']values = change['columnvalues']else:columns = change['oldkeys']['keynames']types = change['oldkeys']['keytypes']values = change['oldkeys']['keyvalues']for j in range(len(columns)):name = columns[j]type = types[j]value = values[j]newRecord.value[name] = valueoutput.write(newRecord)## optional, if we want to keep the original record,## otherwise we just put the new record in the batch.#output.write(record)except Exception as e:# Send record to errorerror.write(record, str(e))

Stream Selector 配置:分流

 kudu端配置比较简单。

当看到JDBC同步的数据很缓慢的时候,就可以直接开启CDC,然后关闭JDBC。有条件的可以先停止原始库写入数据的事务,等JDBC 同步完,开启CDC 再开启原始表的写入数据。

这篇关于Streamsets Postgresql 实时同步到Kudu的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/227753

相关文章

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

PostgreSQL核心功能特性与使用领域及场景分析

PostgreSQL有什么优点? 开源和免费 PostgreSQL是一个开源的数据库管理系统,可以免费使用和修改。这降低了企业的成本,并为开发者提供了一个活跃的社区和丰富的资源。 高度兼容 PostgreSQL支持多种操作系统(如Linux、Windows、macOS等)和编程语言(如C、C++、Java、Python、Ruby等),并提供了多种接口(如JDBC、ODBC、ADO.NET等

PostgreSQL中的多版本并发控制(MVCC)深入解析

引言 PostgreSQL作为一款强大的开源关系数据库管理系统,以其高性能、高可靠性和丰富的功能特性而广受欢迎。在并发控制方面,PostgreSQL采用了多版本并发控制(MVCC)机制,该机制为数据库提供了高效的数据访问和更新能力,同时保证了数据的一致性和隔离性。本文将深入解析PostgreSQL中的MVCC功能,探讨其工作原理、使用场景,并通过具体SQL示例来展示其在实际应用中的表现。 一、

MySQL主从同步延迟原理及解决方案

概述 MySQL的主从同步是一个很成熟的架构,优点为: ①在从服务器可以执行查询工作(即我们常说的读功能),降低主服务器压力; ②在从主服务器进行备份,避免备份期间影响主服务器服务; ③当主服务器出现问题时,可以切换到从服务器。 相信大家对于这些好处已经非常了解了,在项目的部署中也采用这种方案。但是MySQL的主从同步一直有从库延迟的问题,那么为什么会有这种问题。这种问题如何解决呢? MyS

PostgreSQL入门介绍

一、PostgreSQL 背景及主要功能介绍 1、背景 PG数据库,全称为PostgreSQL数据库,是一款开源的关系型数据库管理系统(RDBMS)。其起源可以追溯到20世纪80年代末和90年代初,由加拿大的计算机科学家Michael Stonebraker及其团队在加州大学伯克利分校启动。该项目旨在创建一个强大的、开源的关系型数据库管理系统,作为早期关系型数据库系统Ingres的继承者。Mi

使用条件变量实现线程同步:C++实战指南

使用条件变量实现线程同步:C++实战指南 在多线程编程中,线程同步是确保程序正确性和稳定性的关键。条件变量(condition variable)是一种强大的同步原语,用于在线程之间进行协调,避免数据竞争和死锁。本文将详细介绍如何在C++中使用条件变量实现线程同步,并提供完整的代码示例和详细的解释。 什么是条件变量? 条件变量是一种同步机制,允许线程在某个条件满足之前进入等待状态,并在条件满

mysql创建新表,同步数据

import os import argparse import glob import cv2 import numpy as np import onnxruntime import tqdm import pymysql import time import json from datetime import datetime os.environ[“CUDA_VISIBLE_DEVICE

三.海量数据实时分析-FlinkCDC实现Mysql数据同步到Doris

FlinkCDC 同步Mysql到Doris 参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/get-started/quickstart/mysql-to-doris/ 1.安装Flink 下载 Flink 1.18.0,下载后把压缩包上传到服务器,使用tar -zxvf flink-xxx-