CloudCanal x Debezium 打造实时数据流动新范式

2023-12-23 15:01

本文主要是介绍CloudCanal x Debezium 打造实时数据流动新范式,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

简述

Debezium 是一个开源的数据订阅工具,主要功能为捕获数据库变更事件发送到 Kafka。

CloudCanal 近期实现了从 Kafka 消费 Debezium 格式数据,将其 同步到 StarRocks、Doris、Elasticsearch、MongoDB、ClickHouse 等 12 种数据库和数仓,补全其数据到达能力。

本文将先简单介绍该项技术实现的背景,再通过 MySQL -> Kafka -> Starrocks 示例展示此功能。

image.png

为什么要消费 Debezium 格式数据

高流行度

Debezium 是一个高质量、被大量项目集成的开源项目,社区用户活跃,官方维护积极,修复 bug、增加新特性,不断更新版本。

作为 Kafka Connect 生态系统的一部分,Debezium 能够无缝与 Kafka 进行对接,为用户后端数据处理提供了强大的 实时数据准备 能力。

由此形成的高流行度,让每一个数据行业从业者不能忽视其影响力。

合理的消息结构

Schema(数据结构) 遵循 Kafka Connect 标准,提供了详细的字段信息。

"schema": {"type": "struct","fields": [{"type": "int32", "optional": false, "field": "id"},{"type": "string", "optional": false, "field": "name"},{"type": "int32", "optional": false, "field": "age"}],"optional": false, "name": "my_database.user.Value"
}

Payload(数据)包含实际的数据库变更数据,与 Schema 中定义的字段对应。

"payload": {"id": 123,"name": "John Doe","age": 30,"source": {...}
}

此外消息还携带了源端数据源全面的关联信息,包括库、表、时间戳、位点等信息。整体格式实用、简洁。

支持 Schema 演进

Debezium 不仅捕获数据库模式的当前状态,还能感知和记录每次模式变更细节。

当数据库表结构发生变化时(如添加、删除、修改字段等),Debezium 能够 实时捕获这些结构变更,确保变更事件的精准传递。

另外 Debezium 会为每个捕获的变更事件 记录包含当前和先前 Schema 的历史记录

这意味着 可追溯任何时刻数据库 Schema,了解特定时间点表字段、数据类型等信息, 并且可精准还原数据库在某一时刻的结构,无需额外的查询或推测。

CDC 数据格式标准

Debezium 数据 Schema 基于 Kafka Connect 标准设计,这使 Debezium 产生的变更事件能够轻松地集成到各种 Kafka Connect 连接器中,实现了与 Kafka 生态系统的顺畅对接。

这个设计使得 Debezium 数据 Schema 有望成为 CDC(Change Data Capture) 领域标准,为实时数据流的流动提供了基础设施。

端到端的缺憾

Debezium 集如此众多的优点,但是其官方缺少消息到对端的能力(目前有在补充),这让一部分用户感觉束手无策,CloudCanal 支持消费 Debezium 数据即解决这个问题,为用户实时数据生态建设贡献绵薄之力。

支持 Debezium 的主流 CDC 技术比较

对于使用 Debezium 的用户来说,消费 Kafka 中的 Debezium 数据并将其写入其他数据源,有几种主流 CDC 技术可选,如下表。

Kafka-ConnectFlink-CDCCloudCanal
同步配置配置文件代码/配置(新版本)可视化
同步性能(延迟)优秀优秀优秀
社区支持一般积极积极
大规模部署使用一般优秀优秀
消息格式符合其标准的 JSON、Avro…Debezium JSON、Canal JSON、Maxwell JSONDebezium JSON、Canal JSON、CloudCanal JSON 等
插件支持Oracle、MySQL、SqlServer…Oracle、MySQL、SqlServer…StarRocks、Doris、Elasticsearch 等 12 种

CloudCanal 支持 Debezium 做了那些事

CloudCanal 之前即实现了将数据库数据以 Debezium 格式写入目标端 Kafka 的能力,并在兼容性方面做了大量优化。

此次版本更新则支持从 Kafka 消费 Debezium 格式数据,并同步到对端数据库或数仓, 形成基于 Kafka 中转的端到端数据迁移同步能力,同时可平滑对接上/下游已使用其他工具且以 Debezium 数据格式载体的需求。

操作示例

Debezium 环境准备

  • 相关资源一键部署 (Docker): debezium-test.tar.gz
    • Kafka 集群 + Kafka UI
    • Debezium
    • MySQL (源端)
    • Starrocks (目标端)
    tar -xzvf debezium-test.tar.gz
    sh install.sh
    

创建 MySQL Source Connector

  • 源端是 MySQL,通过下面的表进行创建。

    CREATE DATABASE `inventory`;CREATE TABLE `inventory`.`customer` (`c_int` int NOT NULL,`c_bigint` bigint NOT NULL, `c_decimal` decimal(10,3) NOT NULL,`c_date` date NOT NULL,`c_datetime` datetime NOT NULL,`c_timestamp` datetime NOT NULL DEFAULT CURRENT_TIMESTAMP,`c_year` int NOT NULL,`c_varchar` varchar(10) NOT NULL,`c_text` text NOT NULL,PRIMARY KEY (`c_int`)
    );
    
  • 通过 Debezium 的 Api 接口创建 Connector 订阅 MySQL 的变更事件。

    curl -i -X POST http://127.0.0.1:7750/connectors \-H 'Content-Type: application/json' \-d '{"name": "connector-test-mx","config": {"connector.class": "io.debezium.connector.mysql.MySqlConnector","database.hostname": "112.124.38.87","database.port": "25000","database.user": "root","database.password": "123456","database.server.id": "1","database.server.name": "mx","database.include.list": "inventory","topic.prefix": "mx","table.include.list": "inventory.customer","snapshot.mode": "never","database.history.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092","schema.history.internal.kafka.bootstrap.servers": "112.124.38.87:19092,112.124.38.87:29092,112.124.38.87:39092","schema.history.internal.kafka.topic": "mx.schemahistory.customer","database.history.kafka.topic": "mx.mx_history_schema","include.schema.changes": "false"     }}'
    
  • 创建后,查看 Connetor 的状态。

    curl -s http://127.0.0.1:7750/connectors/connector-test-mx/status
    

CloudCanal 订阅 Kafka 的数据变更

准备 CloudCanal

  • 下载安装 CloudCanal 私有部署版本

添加数据源

  • 数据源管理 -> 添加数据源, 添加 Kafka、Starrocks、MySQL
    image.png
    image.png
    image.png

创建同步任务

  • 任务管理-> 新建任务

  • Kafka选择 Debezium Envelope Json Format格式

  • 该消息格式的说明,参见:源端 Kafka Debezium Json 使用说明
    image.png
    image.png

  • Kafka 消息中如果有 Schema,需要在 任务详细 -> 参数修改 -> 源数据源配置 中修改 envelopSchemaIncludetrue
    image.png

同步测试

  • 源端数据库做数据变更,Debezium 将数据写入 Kafka 后,CloudCanal 会写入到 Starrocks 中。
    image.png

  • 数据同步结束后校验 MySQL 和 Starrocks 的数据,40 万左右的数据是一致的。
    image.png

总结

本文介绍了 CloudCanal 支持消费 Debezium 格式数据的背景,以及通过 MySQL -> Kafka -> Starrocks 示例介绍其使用。

这篇关于CloudCanal x Debezium 打造实时数据流动新范式的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/528443

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

2024年流动式起重机司机证模拟考试题库及流动式起重机司机理论考试试题

题库来源:安全生产模拟考试一点通公众号小程序 2024年流动式起重机司机证模拟考试题库及流动式起重机司机理论考试试题是由安全生产模拟考试一点通提供,流动式起重机司机证模拟考试题库是根据流动式起重机司机最新版教材,流动式起重机司机大纲整理而成(含2024年流动式起重机司机证模拟考试题库及流动式起重机司机理论考试试题参考答案和部分工种参考解析),掌握本资料和学校方法,考试容易。流动式起重机司机考试技

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

C#实战|大乐透选号器[6]:实现实时显示已选择的红蓝球数量

哈喽,你好啊,我是雷工。 关于大乐透选号器在前面已经记录了5篇笔记,这是第6篇; 接下来实现实时显示当前选中红球数量,蓝球数量; 以下为练习笔记。 01 效果演示 当选择和取消选择红球或蓝球时,在对应的位置显示实时已选择的红球、蓝球的数量; 02 标签名称 分别设置Label标签名称为:lblRedCount、lblBlueCount

烟火目标检测数据集 7800张 烟火检测 带标注 voc yolo

一个包含7800张带标注图像的数据集,专门用于烟火目标检测,是一个非常有价值的资源,尤其对于那些致力于公共安全、事件管理和烟花表演监控等领域的人士而言。下面是对此数据集的一个详细介绍: 数据集名称:烟火目标检测数据集 数据集规模: 图片数量:7800张类别:主要包含烟火类目标,可能还包括其他相关类别,如烟火发射装置、背景等。格式:图像文件通常为JPEG或PNG格式;标注文件可能为X