Flink CDC 的 debezium-json 格式和 debezium 原生格式是一回事吗?

2024-04-17 06:28

本文主要是介绍Flink CDC 的 debezium-json 格式和 debezium 原生格式是一回事吗?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《大数据平台架构与原型实现:数据中台建设实战》博主历时三年精心创作的《大数据平台架构与原型实现:数据中台建设实战》一书现已由知名IT图书品牌电子工业出版社博文视点出版发行,点击《重磅推荐:建大数据平台太难了!给我发个工程原型吧!》了解图书详情,京东购书链接:https://item.jd.com/12677623.html,扫描左侧二维码进入京东手机购书页面。

这是一个很容易混淆和误解的问题,值得拿出来讨论对比一下。我们知道 Debezium 是专门用于捕获 CDC 数据的开源框架,它对接了多种数据库,同时也定义了自己的 CDC 数据交换格式,也就是常说的 debezium 格式。而Flink CDC 复用了 Debezium 的部分功能,也就是说:Debezium 是 Flink CDC 的底层采集工具,Flink CDC 的工程依赖会用使用到 Debezium 的 Jar 包,然后 Flink CDC 在 Debezium 基础之上,封装了额外的功能,例如:无锁读取,并发读取(全量数据的读取性能可以水平扩展),断点续传,这些功能是 Debezium 所不具备的,也是 Flink CDC 存在的意义。

同时,Flink 还有一种专门的数据格式 debezium-json,从名称上看,它似乎就是 debezium 格式的 json 表达形式,那 debezium-json 格式和 debezium 原生格式是一回事吗?

首先,我们要主要到这样一个细节:mysql-cdc 作为一个 source connector,并不要求指定 format,实际上,它的 format 是不可配置的,因为 Flink CDC 在内部实现依赖 debezium,获得的原始的数据格式就是 debezium 格式,对外,这不可配置,也不可见,只有向下游传导数据时,才会涉及到解析和转换的问题。

其次,我们还要先澄清一种误解:debezium-json 并不是跟 Flink CDC(例如mysql-cdc)绑定在一起的,作为一种独立的、可描述 changelog 的格式,实际上,它可以应用到任何动态表上,例如:如果上游表是:connector=upsert-kafka,format=json,下游依旧可以使用: connector=kafka,format=debezium-json,关于这一点,可以参考本文的实测 《Flink SQL:debezium-json 格式的表一定是数据库的 CDC 数据吗?》,这个测试给出了这样一个非常明确的结论:

使用 debezium-json 格式的表不一定是数据库的 CDC 数据,但一定是上游动态表的 changelog,然后使用 debezium-json 格式描述。

Flink CDC 从数据库 binlog 中提取数据时使用了 debezium,获得的原始的数据格式也是 debezium 格式,但是,这都是发生在 Flink CDC 内部的,对外是不可见的!当需要把 CDC 数据传给下游时,才会针对下游指定的格式进行转换,这种转换也是根据目标表 DDL 中定义的 Schema 自动地隐式地完成的。

我们还是靠举例和试验来说明这个问题吧。再次看一下 《Flink CDC 与 Kafka 集成:Snapshot 还是 Changelog?Upsert Kafka 还是 Kafka?》 一文的 ”测试组合(1):connector=kafka,format=debezium-json“ 一节给出的案例。

原生 Debezium 格式(样例)

使用如下 SQL 创建一个 mysql-cdc 的源表:

SET 'sql-client.execution.result-mode' = 'TABLEAU';DROP TABLE IF EXISTS orders_mysql_cdc;CREATE TABLE IF NOT EXISTS orders_mysql_cdc (`order_number` INT NOT NULL,`order_date` DATE NOT NULL,`purchaser` INT NOT NULL,`quantity` INT NOT NULL,`product_id` INT NOT NULL,CONSTRAINT `PRIMARY` PRIMARY KEY (`order_number`) NOT ENFORCED
) WITH ('connector' = 'mysql-cdc','hostname' = '10.0.13.30','port' = '3307','username' = 'root','password' = 'Admin1234!','database-name' = 'inventory','table-name' = 'orders'
);

那从 Flink CDC 源表提取出来的数据应该是什么样子呢?前面我们已经说过,这个动作发生在 Flink CDC 内部,提取的数据也是外部不可见的,那我们能不能从其他渠道确定实际的数据格式吗?能,如果说 Flink CDC 就是通过 Debezium 来采集数据,那么采集到的最原始的数据格式就是标准的 Debezium 格式,通常,这是这个样子的:

{"before": null,"after": {"osci.mysql-server-3.inventory.orders.Value": {"order_number": 10006,"order_date": 16852,"purchaser": 1003,"quantity": 1,"product_id": 107}},"source": {"version": "2.2.0.Final","connector": "mysql","name": "osci.mysql-server-3","ts_ms": 1705645511000,"snapshot": {"string": "false"},"db": "inventory","sequence": null,"table": {"string": "orders"},"server_id": 223344,"gtid": null,"file": "mysql-bin.000004","pos": 640,"row": 0,"thread": {"long": 10},"query": null},"op": "c","ts_ms": {"long": 1705645511455},"transaction": null
}

再次强调,上述格式的数据在 Flink CDC 中是不可见的,发生于 Flink CDC 内部,以上格式是标准的 debezium 数据格式,Flink CDC一定是率先拿到了这种格式的数据然后再经处理转发给下游的,比如:如果 DDL 中提取了某些元数据,也是从上面这种原始的 Debezium 数据中获取的。

debezium-json 格式(样例)

如下的 SQL 在 Kafka 上创建了一个 debezium-json 格式的目标表,然后使用 INSERT INTO ... SELECT ... 把源表和目标表的数据流驱动起来:

DROP TABLE IF EXISTS orders_kafka_debezium_json;CREATE TABLE IF NOT EXISTS orders_kafka_debezium_json (order_number int,order_date   date,purchaser    int,quantity     int,product_id   int
) WITH ('connector' = 'kafka','topic' = 'orders_kafka_debezium_json','properties.bootstrap.servers' = 'b-2.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-3.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092,b-1.oscimskcluster1.cedsl9.c20.kafka.us-east-1.amazonaws.com:9092','properties.group.id' = 'orders_kafka_debezium_json','scan.startup.mode' = 'earliest-offset','format' = 'debezium-json'
);-- 提交持续查询,驱动整个 Pipelineinsert into orders_kafka_debezium_json select * from orders_mysql_cdc;

这时,写入 Kafka 中的 debezium-json 格式的数据是这样的:

{"before": {"order_number": 10003,"order_date": "2016-02-19","purchaser": 1002,"quantity": 2,"product_id": 106},"after": null,"op": "d"
}

结论

比较上述两种消息格式就能看出:

debezium-json 格式并不等于原生的 debezium 格式,两者有很多相似之处,都有 before,after,op,原生 debezium 格式仅发生并存在于 Flink CDC 内部,对外不可见,debezium-json 格式可用于表达任何动态表的 changelog,与数据库 CDC 数据已无必然的绑定关系。

这篇关于Flink CDC 的 debezium-json 格式和 debezium 原生格式是一回事吗?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/911026

相关文章

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

MyBatis-plus处理存储json数据过程

《MyBatis-plus处理存储json数据过程》文章介绍MyBatis-Plus3.4.21处理对象与集合的差异:对象可用内置Handler配合autoResultMap,集合需自定义处理器继承F... 目录1、如果是对象2、如果需要转换的是List集合总结对象和集合分两种情况处理,目前我用的MP的版本

C#下Newtonsoft.Json的具体使用

《C#下Newtonsoft.Json的具体使用》Newtonsoft.Json是一个非常流行的C#JSON序列化和反序列化库,它可以方便地将C#对象转换为JSON格式,或者将JSON数据解析为C#对... 目录安装 Newtonsoft.json基本用法1. 序列化 C# 对象为 JSON2. 反序列化

Python中Json和其他类型相互转换的实现示例

《Python中Json和其他类型相互转换的实现示例》本文介绍了在Python中使用json模块实现json数据与dict、object之间的高效转换,包括loads(),load(),dumps()... 项目中经常会用到json格式转为object对象、dict字典格式等。在此做个记录,方便后续用到该方

GSON框架下将百度天气JSON数据转JavaBean

《GSON框架下将百度天气JSON数据转JavaBean》这篇文章主要为大家详细介绍了如何在GSON框架下实现将百度天气JSON数据转JavaBean,文中的示例代码讲解详细,感兴趣的小伙伴可以了解下... 目录前言一、百度天气jsON1、请求参数2、返回参数3、属性映射二、GSON属性映射实战1、类对象映

Python进行JSON和Excel文件转换处理指南

《Python进行JSON和Excel文件转换处理指南》在数据交换与系统集成中,JSON与Excel是两种极为常见的数据格式,本文将介绍如何使用Python实现将JSON转换为格式化的Excel文件,... 目录将 jsON 导入为格式化 Excel将 Excel 导出为结构化 JSON处理嵌套 JSON:

SpringBoot 异常处理/自定义格式校验的问题实例详解

《SpringBoot异常处理/自定义格式校验的问题实例详解》文章探讨SpringBoot中自定义注解校验问题,区分参数级与类级约束触发的异常类型,建议通过@RestControllerAdvice... 目录1. 问题简要描述2. 异常触发1) 参数级别约束2) 类级别约束3. 异常处理1) 字段级别约束

详解MySQL中JSON数据类型用法及与传统JSON字符串对比

《详解MySQL中JSON数据类型用法及与传统JSON字符串对比》MySQL从5.7版本开始引入了JSON数据类型,专门用于存储JSON格式的数据,本文将为大家简单介绍一下MySQL中JSON数据类型... 目录前言基本用法jsON数据类型 vs 传统JSON字符串1. 存储方式2. 查询方式对比3. 索引

C#解析JSON数据全攻略指南

《C#解析JSON数据全攻略指南》这篇文章主要为大家详细介绍了使用C#解析JSON数据全攻略指南,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录一、为什么jsON是C#开发必修课?二、四步搞定网络JSON数据1. 获取数据 - HttpClient最佳实践2. 动态解析 - 快速

MySQL 8 中的一个强大功能 JSON_TABLE示例详解

《MySQL8中的一个强大功能JSON_TABLE示例详解》JSON_TABLE是MySQL8中引入的一个强大功能,它允许用户将JSON数据转换为关系表格式,从而可以更方便地在SQL查询中处理J... 目录基本语法示例示例查询解释应用场景不适用场景1. ‌jsON 数据结构过于复杂或动态变化‌2. ‌性能要