Streamsets-JDBC模式使用更新时间字段数据同步

2024-06-01 23:28

本文主要是介绍Streamsets-JDBC模式使用更新时间字段数据同步,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

StreamSets的开源地址:https://github.com/streamsets/datacollector-oss
Streamsets官网地址:https://streamsets.com/
Streamsets文档地址:https://docs.streamsets.com/portal/datacollector/3.16.x/help/index.html

我又来写Streamsets了,各种原因好久没研究Cassandra了。
本次分享主要介绍Streamsets的JDBC模式、为什么使用时间字段同步数据、遇到的问题和解决方案。
解决方案并不是最完美但也是基于当前条件下最优解,如有疑问,欢迎热烈讨论。
提供的脚本毫无保留,可直接使用。

Streamsets在3.22.2之后就闭源了,更高阶的特性已包装为平台产品。

结合周边讨论和网上的资料来看,Streamsets的活跃度不高,在网上搜的资料太少啦,又随着项目闭源,活跃度更低了,归其原因我分析Streamsets是一个大而全的数据同步工具,整合了市面上基本所有的数据源,但是每个公司不可能用到里面所有的数据源,真正能用到大部分数据源的公司,规模肯定大到不会依赖这种外部的工具,自己手写同步的自由度和效率要更好。

Streamsets对于我们的优势在于开箱即用,相比于手搓代码来实现业务细节,Streamsets将数据同步的每个阶段独立开来,将业务变动最大的数据清洗部分以处理器的形式开放出来,数据的转换和转换的实时配置并生效,直观的监控指标。

版本为Streamsets的3.16.0的离线版本,部署到内网时的最新版本为3.16.0,所以方案和问题的解决方案均以3.16.0为基础。

JDBC模式介绍:

JDBC模式的增量模式只支持新增的数据和不需要修改的数据,且官方建议的offsetColumn为PrimaryKey,如:ID。

Incremental mode
When the JDBC Query Consumer performs an incremental query, it uses the initial offset as the offset value in the first SQL query. As the origin completes processing the results of the first query, it saves the last offset value that it processes. Then it waits the specified query interval before performing a subsequent query.
When the origin performs a subsequent query, it returns data based on the last-saved offset. You can reset the origin to use the initial offset value.
Use incremental mode for append-only tables or when you do not need to capture changes to older rows. By default, JDBC Query Consumer uses incremental mode.

SELECT * FROM <table_name> WHERE <primaryKey> > ${OFFSET} ORDER BY <primaryKey>

这样支持的场景为不断的增量数据,无法捕获数据的更新。
但是正常的业务系统一般不存在只新增不更新的场景。
全量同步模式每次加载所有的数据,当表的数据量较大时,同步所需的时间和延迟不能接受。

修改为通过update_time来捕获数据变化:

SELECT * FROM user WHERE update_time > ${OFFSET} ORDER BY update_time

在配置管道时将OffsetColumn指定为update_time,业务系统使用mybatis-plus在数据新增和更新时补充创建时间和更新时间。数据库的时间精度为秒。

使用update_time的好处是对于开发者和运维人员可读性更好,在进行历史数据的同步和数据对接时更方便。
该方案看似非常合理,业务侧只要控制好update_time的逻辑,每次数据变化时update_time是不断滚动向前的,滚动查询不断的进行数据同步。
但是too young too simple。
按照Streamsets的处理逻辑,在两种场景下会丢数据。
分别是当单次同步的数据量超过maxBatchSize时,概率性丢数据和并发写入数据库时概率性丢数据。
这两种丢数据的场景是不可控的,时间不可控,完全看运气。但是不确定往往是最可怕的。

为什么会丢数据?

第一种场景:单次同步的数据量超过maxBatchSize
Offset的更新逻辑和jdbc-protolib源码中的逻辑:
origin会当根据sql查询的数据读取不超过配置的maxBatchSize的数量,并将最新的update_time赋值给offset。

// com.streamsets.pipeline.stage.origin.jdbc.JdbcSource.java
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) {// ...try (Connection connection = dataSource.getConnection()) {if (null == resultSet || resultSet.isClosed()) {// 执行查询sql语句resultSet = statement.executeQuery(preparedQuery);}// 超过maxBatchSize的数据不发送到下一阶段,留到下次操作时处理。while (continueReading(rowCount, batchSize) && (haveNext = resultSet.next())) {final Record record = processRow(resultSet, rowCount);if (null != record) {// 记录下数据batchMaker.addRecord(record);}// 更新offsetif (isIncrementalMode) {nextSourceOffset = resultSet.getString(offsetColumn);} else {nextSourceOffset = initialOffset;}// 后续收尾工作}}return nextSourceOffset;}

结合Streamsets的Offset的更新逻辑和jdbc-protolib源码中的逻辑,当一秒内出现多条数据时,会因为精度问题导致数据丢失。

第二种场景:数据并发写入数据库时。
业务侧代码使用mybatis-plus作为ORM来处理数据的读写,当有大数据量写入数据时,如:Excel导入或高并发的数据写入。
mybatis-plus的内置处理逻辑为分批次提交,每次提交1000,所以单个线程写入的qps为1000。
以Excel导入为例,如果批量保存方法没有加@Transaction注解,会大大增加数据丢失的概率。

原因为结合mybatis的处理+没加@Transaction注解导致1000个insert语句一次性发给数据库,这1000条sql语句是以非事务的方式执行,每条数据都是一个完整的事务,执行完毕自动提交,立即可见。
这时当Streamsets触发查询操作时,时机恰好出现在一秒内的前半段,而一秒内的后半段还在数据写入,导致后半段的数据丢失。
场景分析

解决方案:

如果你拿到的是Streamsets的安装包,那第一种场景无法通过配置和升级的方式解决,因为使用的方式和增量模式的设计初衷不符。
有一份折中方案,但不保熟:
1.能力范围内update_time的精度越细越好,越细会有一定的性能损耗,但丢数据的概率大大降低。
2.评估每次同步的数据量大小,maxBatchSize的大小要大于单次同步的数据量。注意内存大小,小心OOM,(插一句:oracle的批量更新会存在连接泄露,需注意。如果有源码顺手改之。)
可以下载一份Streamsets的源码,改之。
代码如下:

// com.streamsets.pipeline.stage.origin.jdbc.JdbcSource.java
public String produce(String lastSourceOffset, int maxBatchSize, BatchMaker batchMaker) {// ...try (Connection connection = dataSource.getConnection()) {if (null == resultSet || resultSet.isClosed()) {// 执行查询sql语句resultSet = statement.executeQuery(preparedQuery);}while ((haveNext = resultSet.next())) {if(continueReading(rowCount, batchSize)){final Record record = processRow(resultSet, rowCount);if (null != record) {// 记录下数据batchMaker.addRecord(record);}// 更新offsetif (isIncrementalMode) {nextSourceOffset = resultSet.getString(offsetColumn);} else {nextSourceOffset = initialOffset;}} else {// 当超过maxBatchSize时,继续查找最后一秒的数据。if(!nextSourceOffset.equals(initialOffset) && nextSourceOffset.equals(resultSet.getString(offsetColumn))){if(null != record) batchMaker.addRecord(record);}// 后续收尾工作}}return nextSourceOffset;}

第二种场景出现的原因是在同一秒内同时出现写入和查询操作,查询时无法取出应取出的数据。
解决的思路为错峰,通过配置手段将查询动作和写入动作错开。

// oracle
select * from user where update_time < TO_TIMESTAMP('${offset}','yyyy-MM-dd HH24:mi:ss.ff') and update_time < SYSDATE - INTERVAL '1' SECOND order by update_time;
// mysql
select * from user where update_time < '${offset}' and update_time < DATE_SUB(now(), INTERVAL 1 SECOND) order by update_time;
// dm
select * from user where update_time < TO_TIMESTAMP('${offset}','yyyy-MM-dd HH24:mi:ss.ff') and update_time < CURRENT_TIMESTAMP- INTERVAL '1' SECOND order by update_time;
// kingbase
select * from user where update_time < '${offset}' and update_time < current_timestamp - INTERVAL '1' SECOND order by update_time;

需要特别注意:因为数据库中存储的时间有可能为业务服务的时间,要保证数据库和业务服务的时区和时间要保持一致。

通道示意图:

新版Streamsets的布局,我的不长这样。streamsets-jdbc示意图
源:无特殊配置
Jython处理器:根据源传过来的数据查询目标表,对数据进行标记。
流选择器:根据数据的标记分发数据,标记为insert的走新增通道,标记为update的走修改通道。
目标:一个配置为INSERT,另一个配置为UPDATE。
Jython脚本:

import java.sql.DriverManager as DriverManager
import java.lang.Class as Class
import timeurl = "jdbc:mysql://localhost:3306/db?autoReconnect=true&useSSL=false&characterEncoding=utf8"
Class.forName("com.mysql.jdbc.Driver")
username = "root"
password = "passwd"
batch_size = 1000primary_key = "id"
table_name = "t_target"
ids = []
db_ids = set()
records = sdc.records
conn = None
stmt = None
rs = None
if len(records) != 0:try:conn = DriverManager.getConnection(url,username,password)if conn is not None:stmt = conn.createStatement()start_time = time.time()for record in records:id = record.value[primary_key]ids.append(id)num_batches = len(ids) // batch_size + (1 if len(ids) % batch_size != 0 else 0)for i in range(num_batches):start_index = i * batch_sizeend_index = min((i+1) * batch_size,len(ids))batch_ids = ids[start_index,end_index]sql = "select ' + primary_key + ' from " + table_name + " where '+ primary_key +' in ('"for j,id in enumerate(batch_ids):if j != 0:sql += "','"sql += str(id)sql += "')"rs = stmt.executeQuery(sql)while rs.next():id = rs.getString(primary_key)db_ids.add(id)end_time = time.time()sdc.log.info('from '+ table_name + 'query:' + str(len(ids)) + 'rows cost:'+str(end_time - start_time) + 's')for record in records:id = record.value[primary_key]if id in db_ids:record.value['insert_or_update'] = 'update'else:record.value['insert_or_update'] = 'insert'sdc.output.write(record)except Exception as e:raise RuntimeError(e)finally:if rs:rs.close()if stmt:stmt.close()if conn:conn.close()
else:sdc.log.trace('no more data')

结语:

截止到此,也算一套完整的解决方案。拷贝之后可直接食用。
后面有时间会分享一些定位时发现的问题和小技巧。

  • 国产化数据库达梦和人大金仓的适配。
  • 国产化服务器加密环境的打包和部署方案。
  • 为Streamsets减负,轻量化安装包。
  • JDBC模式的性能优化小技巧。
  • 穿插一些Streamsets组件的实现原理。
  • Streamsets CDC模式的配置。
  • 手写一份Streamsets的Stage,用以支撑国产化的需求

这篇关于Streamsets-JDBC模式使用更新时间字段数据同步的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1022396

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

Makefile简明使用教程

文章目录 规则makefile文件的基本语法:加在命令前的特殊符号:.PHONY伪目标: Makefilev1 直观写法v2 加上中间过程v3 伪目标v4 变量 make 选项-f-n-C Make 是一种流行的构建工具,常用于将源代码转换成可执行文件或者其他形式的输出文件(如库文件、文档等)。Make 可以自动化地执行编译、链接等一系列操作。 规则 makefile文件