Flink系列之:Table API Connectors之Debezium

2023-12-20 10:20

本文主要是介绍Flink系列之:Table API Connectors之Debezium,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Flink系列之:Table API Connectors之Debezium

  • 一、Debezium
  • 二、依赖
  • 三、使用Debezium Format
  • 四、可用元数据
  • 五、Format参数
  • 六、重复的变更事件
  • 七、消费 Debezium Postgres Connector 产生的数据
  • 八、数据类型映射

一、Debezium

Debezium 是一个 CDC(Changelog Data Capture,变更数据捕获)的工具,可以把来自 MySQL、PostgreSQL、Oracle、Microsoft SQL Server 和许多其他数据库的更改实时流式传输到 Kafka 中。 Debezium 为变更日志提供了统一的格式结构,并支持使用 JSON 和 Apache Avro 序列化消息。

Flink 支持将 Debezium JSON 和 Avro 消息解析为 INSERT / UPDATE / DELETE 消息到 Flink SQL 系统中。在很多情况下,利用这个特性非常的有用,例如

  • 将增量数据从数据库同步到其他系统
  • 日志审计
  • 数据库的实时物化视图
  • 关联维度数据库的变更历史,等等。

Flink 还支持将 Flink SQL 中的 INSERT / UPDATE / DELETE 消息编码为 Debezium 格式的 JSON 或 Avro 消息,输出到 Kafka 等存储中。 但需要注意的是,目前 Flink 还不支持将 UPDATE_BEFORE 和 UPDATE_AFTER 合并为一条 UPDATE 消息。因此,Flink 将 UPDATE_BEFORE 和 UPDATE_AFTER 分别编码为 DELETE 和 INSERT 类型的 Debezium 消息。

二、依赖

Debezium Avro

为了使用 Debezium 格式,使用构建自动化工具(例如 Maven 或 SBT)的项目和带有 SQL JAR 包的 SQL 客户端都需要以下依赖项。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-avro-confluent-registry</artifactId><version>1.18.0</version>
</dependency>

Debezium Json

为了使用 Debezium 格式,使用构建自动化工具(例如 Maven 或 SBT)的项目和带有 SQL JAR 包的 SQL 客户端都需要以下依赖项。

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-json</artifactId><version>1.18.0</version>
</dependency>

三、使用Debezium Format

Debezium 为变更日志提供了统一的格式,这是一个 JSON 格式的从 MySQL product 表捕获的更新操作的简单示例:

{"before": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.18},"after": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.15},"source": {...},"op": "u","ts_ms": 1589362330904,"transaction": null
}

MySQL 产品表有4列(id、name、description、weight)。上面的 JSON 消息是 products 表上的一条更新事件,其中 id = 111 的行的 weight 值从 5.18 更改为 5.15。假设此消息已同步到 Kafka 主题 products_binlog,则可以使用以下 DDL 来使用此主题并解析更改事件。

CREATE TABLE topic_products (-- schema 与 MySQL 的 products 表完全相同id BIGINT,name STRING,description STRING,weight DECIMAL(10, 2)
) WITH ('connector' = 'kafka','topic' = 'products_binlog','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup',-- 使用 'debezium-json' format 来解析 Debezium 的 JSON 消息-- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent''format' = 'debezium-json'  -- 如果 Debezium 用 Avro 编码消息,请使用 'debezium-avro-confluent'
)

在某些情况下,用户在设置 Debezium Kafka Connect 时,可能会开启 Kafka 的配置 ‘value.converter.schemas.enable’,用来在消息体中包含 schema 信息。然后,Debezium JSON 消息可能如下所示:

{"schema": {...},"payload": {"before": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.18},"after": {"id": 111,"name": "scooter","description": "Big 2-wheel scooter","weight": 5.15},"source": {...},"op": "u","ts_ms": 1589362330904,"transaction": null}
}

为了解析这一类信息,你需要在上述 DDL WITH 子句中添加选项 ‘debezium-json.schema-include’ = ‘true’(默认为 false)。通常情况下,建议不要包含 schema 的描述,因为这样会使消息变得非常冗长,并降低解析性能。

在将主题注册为 Flink 表之后,可以将 Debezium 消息用作变更日志源。

-- MySQL "products" 的实时物化视图
-- 计算相同产品的最新平均重量
SELECT name, AVG(weight) FROM topic_products GROUP BY name;-- 将 MySQL "products" 表的所有数据和增量更改同步到
-- Elasticsearch "products" 索引,供将来查找
INSERT INTO elasticsearch_products
SELECT * FROM topic_products;

四、可用元数据

以下格式元数据可以在表定义中公开为只读(虚拟)列。

注意 仅当相应的连接器转发格式元数据时,格式元数据字段才可用。目前,只有 Kafka 连接器能够公开其值格式的元数据字段。

KeyData TypeDescription
schemaSTRING NULL描述负载模式的 JSON 字符串。如果架构未包含在 Debezium 记录中,则为 Null。
ingestion-timestampTIMESTAMP_LTZ(3) NULL连接器处理事件的时间戳。对应于 Debezium 记录中的 ts_ms 字段。
source.timestampTIMESTAMP_LTZ(3) NULL源系统创建事件的时间戳。对应于 Debezium 记录中的 source.ts_ms 字段。
source.databaseSTRING NULL原始数据库。对应于 Debezium 记录中的 source.db 字段(如果可用)。
source.schemaSTRING NULL原始数据库架构。对应于 Debezium 记录中的 source.schema 字段(如果可用)。
source.tableSTRING NULL原始数据库表。对应于 Debezium 记录中的 source.table 或 source.collection 字段(如果可用)。
source.propertiesMAP<STRING, STRING> NULL各种源属性的映射。对应于 Debezium 记录中的源字段。

以下示例展示了如何访问 Kafka 中的 Debezium 元数据字段:

CREATE TABLE KafkaTable (origin_ts TIMESTAMP(3) METADATA FROM 'value.ingestion-timestamp' VIRTUAL,event_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,origin_database STRING METADATA FROM 'value.source.database' VIRTUAL,origin_schema STRING METADATA FROM 'value.source.schema' VIRTUAL,origin_table STRING METADATA FROM 'value.source.table' VIRTUAL,origin_properties MAP<STRING, STRING> METADATA FROM 'value.source.properties' VIRTUAL,user_id BIGINT,item_id BIGINT,behavior STRING
) WITH ('connector' = 'kafka','topic' = 'user_behavior','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','scan.startup.mode' = 'earliest-offset','value.format' = 'debezium-json'
);

五、Format参数

Flink 提供了 debezium-avro-confluent 和 debezium-json 两种 format 来解析 Debezium 生成的 JSON 格式和 Avro 格式的消息。 请使用 debezium-avro-confluent 来解析 Debezium 的 Avro 消息,使用 debezium-json 来解析 Debezium 的 JSON 消息。

Debezium Avro

参数是否必选默认值类型描述
formatrequired(none)String指定使用什么格式,这里应该是“debezium-avro-confluence”。
debezium-avro-confluent.basic-auth.credentials-sourceoptional(none)String架构注册表的基本身份验证凭据源
debezium-avro-confluent.basic-auth.user-infooptional(none)String架构注册表的基本身份验证用户信息
debezium-avro-confluent.bearer-auth.credentials-sourceoptional(none)String架构注册表的承载身份验证凭据源
debezium-avro-confluent.bearer-auth.tokenoptional(none)String架构注册表的承载身份验证令牌
debezium-avro-confluent.propertiesoptional(none)Map转发到底层架构注册表的属性映射。这对于未通过 Flink 配置选项正式公开的选项很有用。但请注意,Flink 选项具有更高的优先级。
debezium-avro-confluent.ssl.keystore.locationoptional(none)StringSSL 密钥库的位置/文件
debezium-avro-confluent.ssl.keystore.passwordoptional(none)StringSSL 密钥库的密码
debezium-avro-confluent.ssl.truststore.locationoptional(none)String
debezium-avro-confluent.ssl.truststore.passwordoptional(none)StringSSL 信任库的密码
debezium-avro-confluent.schemaoptional(none)String已在 Confluence 模式注册表中注册或将要注册的模式。如果没有提供 schema,Flink 会将表 schema 转换为 avro schema。提供的模式必须与 Debezium 模式匹配,Debezium 模式是可以为空的记录类型,包括字段“before”、“after”、“op”。
debezium-avro-confluent.subjectoptional(none)String融合模式注册表主题,在序列化期间注册此格式使用的模式。默认情况下,如果将此格式用作值或键格式,则“kafka”和“upsert-kafka”连接器使用“<topic_name>-value”或“<topic_name>-key”作为默认主题名称。但对于其他连接器(例如“文件系统”),用作接收器时需要主题选项。
debezium-avro-confluent.urlrequired(none)String用于获取/注册模式的 Confluence 模式注册表的 URL。

Debezium Json

参数是否必选默认值类型描述
format必选(none)String指定要使用的格式,此处应为 ‘debezium-json’。
debezium-json.schema-include可选falseBoolean设置 Debezium Kafka Connect 时,用户可以启用 Kafka 配置 ‘value.converter.schemas.enable’ 以在消息中包含 schema。此选项表明 Debezium JSON 消息是否包含 schema。
debezium-json.ignore-parse-errors可选falseBoolean当解析异常时,是跳过当前字段或行,还是抛出错误失败(默认为 false,即抛出错误失败)。如果忽略字段的解析异常,则会将该字段值设置为null。
debezium-json.timestamp-format.standard可选‘SQL’String声明输入和输出的时间戳格式。当前支持的格式为’SQL’ 以及 ‘ISO-8601’:可选参数 ‘SQL’ 将会以 “yyyy-MM-dd HH:mm:ss.s{precision}” 的格式解析时间戳, 例如 ‘2020-12-30 12:13:14.123’,且会以相同的格式输出。可选参数 ‘ISO-8601’ 将会以 “yyyy-MM-ddTHH:mm:ss.s{precision}” 的格式解析输入时间戳, 例如 ‘2020-12-30T12:13:14.123’ ,且会以相同的格式输出。
debezium-json.map-null-key.mode选填‘FAIL’String指定处理 Map 中 key 值为空的方法. 当前支持的值有 ‘FAIL’, ‘DROP’ 和 ‘LITERAL’:Option ‘FAIL’ 将抛出异常,如果遇到 Map 中 key 值为空的数据。Option ‘DROP’ 将丢弃 Map 中 key 值为空的数据项。Option ‘LITERAL’ 将使用字符串常量来替换 Map 中的空 key 值。字符串常量的值由 ‘debezium-json.map-null-key.literal’ 定义。
debezium-json.map-null-key.literal选填‘null’String当 ‘debezium-json.map-null-key.mode’ 是 LITERAL 的时候,指定字符串常量替换 Map 中的空 key 值。
debezium-json.encode.decimal-as-plain-number选填falseBoolean将所有 DECIMAL 类型的数据保持原状,不使用科学计数法表示。例:0.000000027 默认会表示为 2.7E-8。当此选项设为 true 时,则会表示为 0.000000027。

六、重复的变更事件

在正常的操作环境下,Debezium 应用能以 exactly-once 的语义投递每条变更事件。在这种情况下,Flink 消费 Debezium 产生的变更事件能够工作得很好。 然而,当有故障发生时,Debezium 应用只能保证 at-least-once 的投递语义。 这也意味着,在非正常情况下,Debezium 可能会投递重复的变更事件到 Kafka 中,当 Flink 从 Kafka 中消费的时候就会得到重复的事件。 这可能会导致 Flink query 的运行得到错误的结果或者非预期的异常。因此,建议在这种情况下,将作业参数 table.exec.source.cdc-events-duplicate 设置成 true,并在该 source 上定义 PRIMARY KEY。 框架会生成一个额外的有状态算子,使用该 primary key 来对变更事件去重并生成一个规范化的 changelog 流。

“table.exec.source.cdc-events-duplicate” 是 Flink 的一个配置选项,用于处理 Change Data Capture (CDC) 事件的重复数据。CDC 是一种常见的数据库技术,用于捕获数据库中的数据更改,并将其作为事件流进行处理。

在 Flink 中,“table.exec.source.cdc-events-duplicate” 选项用于指定如何处理 CDC 事件流中的重复数据。具体来说,它可以接受以下两个值:

  1. “strict”:严格模式。如果启用了严格模式,Flink 会严格检查事件流中是否存在重复的 CDC 事件。如果检测到重复事件出现,Flink 会抛出一个异常并停止任务执行。

  2. “lenient”:宽松模式。如果启用了宽松模式,Flink 会使用一个简单的算法来检测重复事件,并尽量过滤掉这些重复事件。但是,它不能保证完全消除重复事件的可能性。

使用时,可以在 Flink 的配置文件或在代码中通过设置相应的属性来指定 “table.exec.source.cdc-events-duplicate” 的值。例如,在 Flink 的配置文件中,可以添加以下行来启用宽松模式:

table.exec.source.cdc-events-duplicate: lenient

七、消费 Debezium Postgres Connector 产生的数据

如果你正在使用 Debezium PostgreSQL Connector 捕获变更到 Kafka,请确保被监控表的 REPLICA IDENTITY 已经被配置成 FULL 了,默认值是 DEFAULT。 否则,Flink SQL 将无法正确解析 Debezium 数据。

当配置为 FULL 时,更新和删除事件将完整包含所有列的之前的值。当为其他配置时,更新和删除事件的 “before” 字段将只包含 primary key 字段的值,或者为 null(没有 primary key)。 你可以通过运行 ALTER TABLE REPLICA IDENTITY FULL 来更改 REPLICA IDENTITY 的配置。

八、数据类型映射

目前,Debezium Format 使用 JSON Format 进行序列化和反序列化。

当前,JSON schema 将会自动从 table schema 之中自动推导得到。不支持显式地定义 JSON schema。

在 Flink 中,JSON Format 使用 jackson databind API 去解析和生成 JSON。

下表列出了 Flink 中的数据类型与 JSON 中的数据类型的映射关系。

Flink SQL类型JSON类型
CHAR/VARCHAR/STRINGstring
BOOLEANboolean
BINARY/VARBINARYstring with encoding: base64
DECIMALnumber
TINYINTnumber
SMALLINTnumber
INTnumber
BIGINTnumber
FLOATnumber
DOUBLEnumber
DATEstring with format: date
TIMEstring with format: time
TIMESTAMPstring with format: date-time
TIMESTAMP_WITH_LOCAL_TIME_ZONEstring with format: date-time (with UTC time zone)
INTERVALnumber
ARRAYarray
MAP / MULTISETobject
ROWobject

这篇关于Flink系列之:Table API Connectors之Debezium的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/515661

相关文章

基于Flask框架添加多个AI模型的API并进行交互

《基于Flask框架添加多个AI模型的API并进行交互》:本文主要介绍如何基于Flask框架开发AI模型API管理系统,允许用户添加、删除不同AI模型的API密钥,感兴趣的可以了解下... 目录1. 概述2. 后端代码说明2.1 依赖库导入2.2 应用初始化2.3 API 存储字典2.4 路由函数2.5 应

GORM中Model和Table的区别及使用

《GORM中Model和Table的区别及使用》Model和Table是两种与数据库表交互的核心方法,但它们的用途和行为存在著差异,本文主要介绍了GORM中Model和Table的区别及使用,具有一... 目录1. Model 的作用与特点1.1 核心用途1.2 行为特点1.3 示例China编程代码2. Tab

C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)

《C#集成DeepSeek模型实现AI私有化的流程步骤(本地部署与API调用教程)》本文主要介绍了C#集成DeepSeek模型实现AI私有化的方法,包括搭建基础环境,如安装Ollama和下载DeepS... 目录前言搭建基础环境1、安装 Ollama2、下载 DeepSeek R1 模型客户端 ChatBo

Debezium 与 Apache Kafka 的集成方式步骤详解

《Debezium与ApacheKafka的集成方式步骤详解》本文详细介绍了如何将Debezium与ApacheKafka集成,包括集成概述、步骤、注意事项等,通过KafkaConnect,D... 目录一、集成概述二、集成步骤1. 准备 Kafka 环境2. 配置 Kafka Connect3. 安装 D

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Deepseek R1模型本地化部署+API接口调用详细教程(释放AI生产力)

《DeepseekR1模型本地化部署+API接口调用详细教程(释放AI生产力)》本文介绍了本地部署DeepSeekR1模型和通过API调用将其集成到VSCode中的过程,作者详细步骤展示了如何下载和... 目录前言一、deepseek R1模型与chatGPT o1系列模型对比二、本地部署步骤1.安装oll

浅析如何使用Swagger生成带权限控制的API文档

《浅析如何使用Swagger生成带权限控制的API文档》当涉及到权限控制时,如何生成既安全又详细的API文档就成了一个关键问题,所以这篇文章小编就来和大家好好聊聊如何用Swagger来生成带有... 目录准备工作配置 Swagger权限控制给 API 加上权限注解查看文档注意事项在咱们的开发工作里,API

一分钟带你上手Python调用DeepSeek的API

《一分钟带你上手Python调用DeepSeek的API》最近DeepSeek非常火,作为一枚对前言技术非常关注的程序员来说,自然都想对接DeepSeek的API来体验一把,下面小编就来为大家介绍一下... 目录前言免费体验API-Key申请首次调用API基本概念最小单元推理模型智能体自定义界面总结前言最

JAVA调用Deepseek的api完成基本对话简单代码示例

《JAVA调用Deepseek的api完成基本对话简单代码示例》:本文主要介绍JAVA调用Deepseek的api完成基本对话的相关资料,文中详细讲解了如何获取DeepSeekAPI密钥、添加H... 获取API密钥首先,从DeepSeek平台获取API密钥,用于身份验证。添加HTTP客户端依赖使用Jav

C#使用DeepSeek API实现自然语言处理,文本分类和情感分析

《C#使用DeepSeekAPI实现自然语言处理,文本分类和情感分析》在C#中使用DeepSeekAPI可以实现多种功能,例如自然语言处理、文本分类、情感分析等,本文主要为大家介绍了具体实现步骤,... 目录准备工作文本生成文本分类问答系统代码生成翻译功能文本摘要文本校对图像描述生成总结在C#中使用Deep