Debezium发布历史139

2024-02-20 10:28
文章标签 历史 发布 139 debezium

本文主要是介绍Debezium发布历史139,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文地址: https://debezium.io/blog/2023/02/04/ddd-aggregates-via-cdc-cqrs-pipeline-using-kafka-and-debezium/

欢迎关注留言,我是收集整理小能手,工具翻译,仅供参考,笔芯笔芯.

DDD Aggregates via CDC-CQRS Pipeline using Kafka & Debezium
February 4, 2023 by Purnima Jain
ddd cdc cqrs debezium kafka

在这篇文章中,我们将讨论在规范化的关系数据库(mysql)和去规范化的Nosql数据库(蒙戈数据库)之间的CDC-CQRS管道,这两个数据库是查询数据库,其结果是通过Debezim∓卡夫卡-流创建DDD集合。

您可以找到完整的示例源代码 在这里 .参阅 阅读。 关于构建和运行示例代码的详细信息.

这个例子围绕三个微服务:order-write-service ,order-aggregation-service 和order-read-service .这些服务是在java中作为"弹簧靴"应用程序实现的。

…order-write-service 在mysql数据库中各自的表中,提出了两个保留的端点--------------------------------------------------------Debezum对mysqb日志进行跟踪,以捕捉这些表中的任何事件,并向卡夫卡主题发布消息。这些话题是由order-aggregation-service 这是一个卡夫卡流应用程序,它将来自这两个主题的数据连接起来,创建一个订单集合对象,然后发布到第三个主题。蒙戈数据库接收器连接器使用这个主题,数据在蒙戈数据库中进行持久化,由order-read-service .

解决方案的总体架构见下图:
在这里插入图片描述

其他应用程序:订单书写服务
触发工作流启动的第一个组件是order-write-service .这已作为弹簧靴应用程序实现,并公开了两个休息点:

帖子:api/shipping-details 在mysql数据库中持久保存运输细节

帖子:api/item-details 在mysql数据库中保留项目细节

这两个端点都将它们的数据保存在mysql数据库中各自的表中。

命令数据库:mysql
上述休息端点的后端处理最终将数据持久化到mysql中各自的表中。

航运细节存储在一个表格中SHIPPING_DETAILS .物品的细节存储在一个表格中ITEM_DETAILS .

以下是SHIPPING_DETAILS 表格,一栏ORDER_ID 关键是:
在这里插入图片描述

以下是ITEM_DETAILS 表格,一栏ORDER_ID +ITEM_ID 关键是:
在这里插入图片描述

卡夫卡连接源连接器:MySQL
更改数据捕获(ccc)是一种从数据库事务日志中捕获更改事件的解决方案(在mysql的情况下称为宾基日志),并将这些事件转发给下游消费者EX。卡夫卡主题。

Debezum是一个为更改数据捕获提供低延迟数据流平台的平台,它是在阿帕奇卡夫卡之上建立的。它允许将数据库行级更改作为事件捕获并发布到阿帕奇卡夫卡主题。我们设置和配置Debezum来监视我们的数据库,然后我们的应用程序为对数据库进行的每个行级更改消费事件。

在我们的案例中,我们将使用Debezimmysql源连接器来捕捉上述表中的任何新事件,并将其转发给阿帕奇卡夫卡。为了实现这一点,我们将通过将以下JSON请求发送到卡夫卡连接的其余API来注册我们的连接器:

{
“name”: “app-mysql-db-connector”,
“config”: {
“connector.class”: “io.debezium.connector.mysql.MySqlConnector”,
“tasks.max”: “1”,
“database.hostname”: “mysql_db_server”,
“database.port”: “3306”,
“database.user”: “custom_mysql_user”,
“database.password”: “custom_mysql_user_password”,
“database.server.id”: “184054”,
“database.server.name”: “app-mysql-server”,
“database.whitelist”: “app-mysql-db”,
“table.whitelist”: “app-mysql-db.shipping_details,app-mysql-db.item_details”,
“database.history.kafka.bootstrap.servers”: “kafka_server:29092”,
“database.history.kafka.topic”: “dbhistory.app-mysql-db”,
“include.schema.changes”: “true”,
“transforms”: “unwrap”,
“transforms.unwrap.type”: “io.debezium.transforms.ExtractNewRecordState”
}
}
上述配置是基于1.9.5.最后。请注意,如果您试图使用Debezum2.0+的演示,上面的一些配置属性有了新的名称,配置将需要一些调整。

它建立了一个io.debezium.connector.mysql.MySqlConnector ,从指定的mysql实例捕获更改。请注意,通过表格包括清单,只对SHIPPING_DETAILS 和ITEM_DETAILS 可捕捉到表格。它还应用一个命名为单一消息转换(SMT)ExtractNewRecordState 它提取了after 场来自卡夫卡记录中的德贝兹改变事件。SMT只替换了原来的更改事件。after 创建一个简单的卡夫卡记录。

默认情况下,卡夫卡主题名称是"服务器.架构.表名",根据我们的连接器配置,它可以翻译为:

app-mysql-server.app-mysql-db.item_details

app-mysql-server.app-mysql-db.shipping_details

卡夫卡河应用:订单集合服务
卡夫卡河应用,即order-aggregation-service ,将处理来自卡夫卡CDC-主题的数据。这些主题接收疾病预防控制中心事件的基础是在mysql中找到的运输细节和项目细节关系。

在此基础上,可以建立如下用于创建和维护DDD订单的克兰兹拓扑结构。

应用程序读取来自运输细节-CDC主题的数据。由于卡夫卡主题记录是用德贝齐姆JSON格式与未包装的信封,我们需要解析订单标识和运输详细信息,以创建一个以订单标识为键、以运输详细信息为值的KTAD。

// Shipping Details Read
KStream<String, String> shippingDetailsSourceInputKStream = streamsBuilder.stream(shippingDetailsTopicName, Consumed.with(STRING_SERDE, STRING_SERDE));

// Change the Json value of the message to ShippingDetailsDto
KStream<String, ShippingDetailsDto> shippingDetailsDtoWithKeyAsOrderIdKStream = shippingDetailsSourceInputKStream
.map((orderIdJson, shippingDetailsJson) -> new KeyValue<>(parseOrderId(orderIdJson), parseShippingDetails(shippingDetailsJson)));

// Convert KStream to KTable
KTable<String, ShippingDetailsDto> shippingDetailsDtoWithKeyAsOrderIdKTable = shippingDetailsDtoWithKeyAsOrderIdKStream.toTable(
Materialized.<String, ShippingDetailsDto, KeyValueStore<Bytes, byte[]>>as(SHIPPING_DETAILS_DTO_STATE_STORE).withKeySerde(STRING_SERDE).withValueSerde(SHIPPING_DETAILS_DTO_SERDE));
同样,应用程序读取项目细节-CDC-主题的数据,并按一个列表中与同一订单相关的所有项目解析每个邮件到组的订单标识和项目标识-然后将其聚合到一个以订单标识为键、与该特定订单相关的项目列表作为值的KSab中。

// Item Details Read
KStream<String, String> itemDetailsSourceInputKStream = streamsBuilder.stream(itemDetailsTopicName, Consumed.with(STRING_SERDE, STRING_SERDE));

// Change the Key of the message from ItemId + OrderId to only OrderId and parse the Json value to ItemDto
KStream<String, ItemDto> itemDtoWithKeyAsOrderIdKStream = itemDetailsSourceInputKStream
.map((itemIdOrderIdJson, itemDetailsJson) -> new KeyValue<>(parseOrderId(itemIdOrderIdJson), parseItemDetails(itemDetailsJson)));

// Group all the ItemDtos for each OrderId
KGroupedStream<String, ItemDto> itemDtoWithKeyAsOrderIdKGroupedStream = itemDtoWithKeyAsOrderIdKStream.groupByKey(Grouped.with(STRING_SERDE, ITEM_DTO_SERDE));

// Aggregate all the ItemDtos pertaining to each OrderId in a list
KTable<String, ArrayList> itemDtoListWithKeyAsOrderIdKTable = itemDtoWithKeyAsOrderIdKGroupedStream.aggregate(
(Initializer<ArrayList>) ArrayList::new,
(orderId, itemDto, itemDtoList) -> addItemToList(itemDtoList, itemDto),
Materialized.<String, ArrayList, KeyValueStore<Bytes, byte[]>>as(ITEM_DTO_STATE_STORE).withKeySerde(STRING_SERDE).withValueSerde(ITEM_DTO_ARRAYLIST_SERDE));
由于两个KTAS都有订单作为键,使用订单很容易将它们连接起来创建一个叫做订单集合的聚合。订单集合是通过从船舶细节和项目细节中吸收数据而创建的一个复合对象。然后,这个订单集合写到一个订单集合卡夫卡主题。

// Joining the two tables: shippingDetailsDtoWithKeyAsOrderIdKTable and itemDtoListWithKeyAsOrderIdKTable
ValueJoiner<ShippingDetailsDto, ArrayList, OrderAggregate> shippingDetailsAndItemListJoiner = (shippingDetailsDto, itemDtoList) -> instantiateOrderAggregate(shippingDetailsDto, itemDtoList);
KTable<String, OrderAggregate> orderAggregateKTable = shippingDetailsDtoWithKeyAsOrderIdKTable.join(itemDtoListWithKeyAsOrderIdKTable, shippingDetailsAndItemListJoiner);

// Outputting to Kafka Topic
orderAggregateKTable.toStream().to(orderAggregateTopicName, Produced.with(STRING_SERDE, ORDER_AGGREGATE_SERDE));
卡夫卡连接槽连接器:蒙戈布连接器
接收器连接器是一个卡夫卡连接器,它读取来自阿帕奇卡夫卡的数据并将数据写入一些数据库。使用蒙戈数据库接收器连接器,很容易将DDD聚合物写入蒙戈数据库。它所需要的只是一个配置,可以发布到卡夫卡连接的其余API,以便运行连接器。

{
“name”: “app-mongo-sink-connector”,
“config”: {
“connector.class”: “com.mongodb.kafka.connect.MongoSinkConnector”,
“topics”: “order_aggregate”,
“connection.uri”: “mongodb://root_mongo_user:root_mongo_user_password@mongodb_server:27017”,
“key.converter”: “org.apache.kafka.connect.storage.StringConverter”,
“value.converter”: “org.apache.kafka.connect.json.JsonConverter”,
“value.converter.schemas.enable”: false,
“database”: “order_db”,
“collection”: “order”,
“document.id.strategy.overwrite.existing”: “true”,
“document.id.strategy”: “com.mongodb.kafka.connect.sink.processor.id.strategy.ProvidedInKeyStrategy”,
“transforms”: “hk,hv”,
“transforms.hk.type”: “org.apache.kafka.connect.transforms.HoistField K e y " , " t r a n s f o r m s . h k . f i e l d " : " i d " , " t r a n s f o r m s . h v . t y p e " : " o r g . a p a c h e . k a f k a . c o n n e c t . t r a n s f o r m s . H o i s t F i e l d Key", "transforms.hk.field": "_id", "transforms.hv.type": "org.apache.kafka.connect.transforms.HoistField Key","transforms.hk.field":"id","transforms.hv.type":"org.apache.kafka.connect.transforms.HoistFieldValue”,
“transforms.hv.field”: “order”
}
}
查询数据库:
将DDD聚合写入数据库order_db 在收藏中order 在蒙戈德。订单会变成_id 在餐桌上order 列存储订单-集合为JSON。

其他应用:订单-阅读-服务
在蒙戈数据库中持久存在的订单集合通过一个休息端点提供。order-read-service .

获取:api/order/{order-id} 从蒙戈数据库检索订单

执行指令
提供了此博客的完整源代码 在这里 在基特布。先克隆这个存储库然后转换成cdc-cqrs-pipeline 目录。该项目提供一个为所有组件提供服务的码头组合文件:

Mysql

通过浏览器管理mysql(原名为ppin行政人员)

蒙戈德

蒙戈快递,通过浏览器管理蒙戈数据库

饲养员

相融合的卡夫卡

卡夫卡连接

一旦所有服务启动,通过执行Create-MySQL-Debezium-Connector 和Create-MongoDB-Sink-Connector 分别要求cdc-cqrs-pipeline.postman_collection.json .执行请求Get-All-Connectors 验证连接器是否已正确创建。

更改为个别目录,并将三个弹簧靴应用程序展开:

order-write-service:在1号端口运行8070

order-aggregation-service:在1号端口运行8071

order-read-service:在1号端口运行8072

有了这个,我们的设置就完成了。

为了测试应用程序,执行请求Post-Shipping-Details 从邮递员收集到插入货运细节Post-Item-Details 插入特定订单ID的详细项目。

最后,执行Get-Order-By-Order-Id 在邮差集合中请求检索完整的订单聚合。

概括的
阿帕奇卡夫卡是服务间消息传递的高度可扩展和可靠的支柱。将阿帕奇卡夫卡置于整体架构的中心,也确保了所涉服务的脱钩。例如,如果解决方案的单个组件失败或在一段时间内无法使用,则将在稍后处理事件:在重新启动后,Debezum连接器将继续跟踪相关表,从它以前关闭的位置开始。同样,任何消费者将继续处理其先前抵消的主题。通过对已经成功处理的消息进行跟踪,可以检测到副本,并将其排除在重复处理之外。

当然,不同服务之间的此类事件管道最终是一致的,即:订单阅读服务等消费者可能比订单写作服务等生产者落后一些。通常情况下,这很好,可以用应用程序的业务逻辑来处理。此外,整个解决方案的端到端延迟通常较低(秒甚至次秒范围),这要归功于基于日志的变化数据捕获,它允许在接近实时的时间内发布事件。

这篇关于Debezium发布历史139的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/727908

相关文章

高效+灵活,万博智云全球发布AWS无代理跨云容灾方案!

摘要 近日,万博智云推出了基于AWS的无代理跨云容灾解决方案,并与拉丁美洲,中东,亚洲的合作伙伴面向全球开展了联合发布。这一方案以AWS应用环境为基础,将HyperBDR平台的高效、灵活和成本效益优势与无代理功能相结合,为全球企业带来实现了更便捷、经济的数据保护。 一、全球联合发布 9月2日,万博智云CEO Michael Wong在线上平台发布AWS无代理跨云容灾解决方案的阐述视频,介绍了

Vue3项目开发——新闻发布管理系统(六)

文章目录 八、首页设计开发1、页面设计2、登录访问拦截实现3、用户基本信息显示①封装用户基本信息获取接口②用户基本信息存储③用户基本信息调用④用户基本信息动态渲染 4、退出功能实现①注册点击事件②添加退出功能③数据清理 5、代码下载 八、首页设计开发 登录成功后,系统就进入了首页。接下来,也就进行首页的开发了。 1、页面设计 系统页面主要分为三部分,左侧为系统的菜单栏,右侧

查看提交历史 —— Git 学习笔记 11

查看提交历史 查看提交历史 不带任何选项的git log-p选项--stat 选项--pretty=oneline选项--pretty=format选项git log常用选项列表参考资料 在提交了若干更新,又或者克隆了某个项目之后,你也许想回顾下提交历史。 完成这个任务最简单而又有效的 工具是 git log 命令。 接下来的例子会用一个用于演示的 simplegit

maven发布项目到私服-snapshot快照库和release发布库的区别和作用及maven常用命令

maven发布项目到私服-snapshot快照库和release发布库的区别和作用及maven常用命令 在日常的工作中由于各种原因,会出现这样一种情况,某些项目并没有打包至mvnrepository。如果采用原始直接打包放到lib目录的方式进行处理,便对项目的管理带来一些不必要的麻烦。例如版本升级后需要重新打包并,替换原有jar包等等一些额外的工作量和麻烦。为了避免这些不必要的麻烦,通常我们

禅道Docker安装包发布

禅道Docker安装包发布 大家好, 禅道Docker安装包发布。 一、下载地址 禅道开源版:   /dl/zentao/docker/docker_zentao.zip  备用下载地址:https://download.csdn.net/download/u013490585/16271485 数据库用户名: root,默认密码: 123456。运行时,可以设置 MYSQL_ROOT_P

从希腊神话到好莱坞大片,人工智能的七大历史时期值得铭记

本文选自historyextra,机器之心编译出品,参与成员:Angulia、小樱、柒柒、孟婷 你可能听过「技术奇点」,即本世纪某个阶段将出现超级智能,那时,技术将会以人类难以想象的速度飞速发展。同样,黑洞也是一个奇点,在其上任何物理定律都不适用;因此,技术奇点也是超越未来理解范围的一点。 然而,在我们到达那个奇点之前(假设我们能到达),还存在另一个极大的不连续问题,我将它称之

C++编程:ZeroMQ进程间(订阅-发布)通信配置优化

文章目录 0. 概述1. 发布者同步发送(pub)与订阅者异步接收(sub)示例代码可能的副作用: 2. 适度增加缓存和队列示例代码副作用: 3. 动态的IPC通道管理示例代码副作用: 4. 接收消息的超时设置示例代码副作用: 5. 增加I/O线程数量示例代码副作用: 6. 异步消息发送(使用`dontwait`标志)示例代码副作用: 7. 其他可以考虑的优化项7.1 立即发送(ZMQ_IM

风格控制水平创新高!南理工InstantX小红书发布CSGO:简单高效的端到端风格迁移框架

论文链接:https://arxiv.org/pdf/2408.16766 项目链接:https://csgo-gen.github.io/ 亮点直击 构建了一个专门用于风格迁移的数据集设计了一个简单但有效的端到端训练的风格迁移框架CSGO框架,以验证这个大规模数据集在风格迁移中的有益效果。引入了内容对齐评分(Content Alignment Score,简称CAS)来评估风格迁移

Eclipse发布Maven项目到tomcat,无法加载到lib文件夹下的jar包

BMS 解决方法: 当我们发布web项目到tomcat时,访问地址时会报一个classnotfound的错误,但是eclipse中的项目中都已经添加了相应的类,有一种比较容易犯的错误是,你没有把额外所需的jar包加到tomcat中的lib文件夹中,在这里介绍一种在项目中直接添加jar包到lib目录下:  右键已创建的web项目——properties属性——点击Deployment Assem

1-3 微信小程序协同工作和发布

协同工作和发布 🥟🥞以权限管理需求为例 一个项目组,一般有不同的岗位,不同角色的员工同时参与项目成员 流程 成员管理的两个方面 不同项目成员对应的权限 版本