Flink新增特性 | CDC(Change Data Capture) 原理和实践应用

2024-09-06 20:18

本文主要是介绍Flink新增特性 | CDC(Change Data Capture) 原理和实践应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

大数据真好玩

点击右侧关注,大数据真好玩!

CDC简介

CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等。

用户可以在以下的场景下使用CDC:

  • 使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。

  • 可以在源数据库上实时的物化一个聚合视图

  • 因为只是增量同步,所以可以实时的低延迟的同步数据

  • 使用EventTime join 一个temporal表以便可以获取准确的结果

Flink 1.11 将这些changelog提取并转化为Table API和SQL,目前支持两种格式:Debezium和Canal,这就意味着源表不仅仅是append操作,而且还有upsert、delete操作。

Flink CDC 功能适用的一些场景:

  • 数据库之间的增量数据同步

  • 审计日志

  • 数据库之上的实时物化视图

  • 基于CDC的维表join

Flink  CDC使用方式


目前Flink支持两种内置的connector,PostgreSQL和mysql,接下来我们以mysql为例。

Flink 1.11仅支持Kafka作为现成的变更日志源和JSON编码的变更日志,而Avro(Debezium)和Protobuf(Canal)计划在将来的版本中使用。还计划支持MySQL二进制日志和Kafka压缩主题作为源,并将扩展日志支持扩展到批处理执行。

Flink CDC当作监听器获取增量变更

传统的实时链路如何实现业务数据的同步,我们以canal为例,传统业务数据实时同步会涉及到canal处理mysql的binlog然后同步到kafka,在通过计算引擎spark,flink或storm计算转化,再结果数据传输到第三方存储(hbase,es)如下图所示主要分为三个模块E(Extract) ,T(Transform), L(Load).可以看到涉及的组件很多,链路很长。

我们可以直接Flink CDC消费数据库的增量日志,替代了原来作为数据采集层的canal,然后直接进行计算,经过计算之后,将计算结果 发送到下游。整体架构如下:

使用这种架构是好处有:

  • 减少canal和kafka的维护成本,链路更短,延迟更低

  • flink提供了exactly once语义

  • 可以从指定position读取

  • 去掉了kafka,减少了消息的存储成本

我们需要引入相应的pom,mysql的pom如下:

<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.1.0</version>
</dependency>

如果是sql客户端使用,需要下载 flink-sql-connector-mysql-cdc-1.1.0.jar 并且放到<FLINK_HOME>/lib/下面

连接mysql数据库的示例sql如下:

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (id INT NOT NULL,name STRING,description STRING,weight DECIMAL(10,3)
) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'inventory','table-name' = 'products'
);

使用API的方式:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;public class MySqlBinlogSourceExample {public static void main(String[] args) throws Exception {SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("inventory") // monitor all tables under inventory database.username("flinkuser").password("flinkpw").deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute();}
}

Flink CDC 当作转换工具

如果需要Flink承担的角色是计算层,那么目前Flink提供的format有两种格式:canal-json和debezium-json,下面我们简单的介绍下。

如果要使用Kafka的canal-json,对于程序而言,需要添加如下依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.11.0</version>
</dependency>

我们可以直接消费canal-json数据:

CREATE TABLE topic_products (id BIGINT,name STRING,description STRING,weight DECIMAL(10, 2)
) WITH ('connector' = 'kafka','topic' = 'products_binlog','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','format' = 'canal-json'  -- using canal-json as the format
)

changelog format

如果要使用Kafka的changelog-json Format,对于程序而言,需要添加如下依赖:

<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-format-changelog-json</artifactId><version>1.0.0</version>
</dependency>

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。

-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3)
) WITH ('connector' = 'kafka',  -- using kafka connector'topic' = 'user_behavior',  -- kafka topic'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address'format' = 'json'  -- the data format is json
);-- we want to store the the UV aggregation result in kafka using changelog-json format
create table day_uv (day_str STRING,uv BIGINT
) WITH ('connector' = 'kafka','topic' = 'day_uv','scan.startup.mode' = 'earliest-offset',  -- reading from the beginning'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address'format' = 'changelog-json'  -- the data format is json
);-- write the UV results into kafka using changelog-json format
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');-- reading the changelog back again
SELECT * FROM day_uv;

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。

编辑|冷眼丶

微信公众号|import_bigdata

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Flink新增特性 | CDC(Change Data Capture) 原理和实践应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143013

相关文章

Python结合PyWebView库打造跨平台桌面应用

《Python结合PyWebView库打造跨平台桌面应用》随着Web技术的发展,将HTML/CSS/JavaScript与Python结合构建桌面应用成为可能,本文将系统讲解如何使用PyWebView... 目录一、技术原理与优势分析1.1 架构原理1.2 核心优势二、开发环境搭建2.1 安装依赖2.2 验

Java字符串操作技巧之语法、示例与应用场景分析

《Java字符串操作技巧之语法、示例与应用场景分析》在Java算法题和日常开发中,字符串处理是必备的核心技能,本文全面梳理Java中字符串的常用操作语法,结合代码示例、应用场景和避坑指南,可快速掌握字... 目录引言1. 基础操作1.1 创建字符串1.2 获取长度1.3 访问字符2. 字符串处理2.1 子字

Java Optional的使用技巧与最佳实践

《JavaOptional的使用技巧与最佳实践》在Java中,Optional是用于优雅处理null的容器类,其核心目标是显式提醒开发者处理空值场景,避免NullPointerExce... 目录一、Optional 的核心用途二、使用技巧与最佳实践三、常见误区与反模式四、替代方案与扩展五、总结在 Java

Spring Boot循环依赖原理、解决方案与最佳实践(全解析)

《SpringBoot循环依赖原理、解决方案与最佳实践(全解析)》循环依赖指两个或多个Bean相互直接或间接引用,形成闭环依赖关系,:本文主要介绍SpringBoot循环依赖原理、解决方案与最... 目录一、循环依赖的本质与危害1.1 什么是循环依赖?1.2 核心危害二、Spring的三级缓存机制2.1 三

C#中async await异步关键字用法和异步的底层原理全解析

《C#中asyncawait异步关键字用法和异步的底层原理全解析》:本文主要介绍C#中asyncawait异步关键字用法和异步的底层原理全解析,本文给大家介绍的非常详细,对大家的学习或工作具有一... 目录C#异步编程一、异步编程基础二、异步方法的工作原理三、代码示例四、编译后的底层实现五、总结C#异步编程

SpringShell命令行之交互式Shell应用开发方式

《SpringShell命令行之交互式Shell应用开发方式》本文将深入探讨SpringShell的核心特性、实现方式及应用场景,帮助开发者掌握这一强大工具,具有很好的参考价值,希望对大家有所帮助,如... 目录引言一、Spring Shell概述二、创建命令类三、命令参数处理四、命令分组与帮助系统五、自定

SpringBoot应用中出现的Full GC问题的场景与解决

《SpringBoot应用中出现的FullGC问题的场景与解决》这篇文章主要为大家详细介绍了SpringBoot应用中出现的FullGC问题的场景与解决方法,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录Full GC的原理与触发条件原理触发条件对Spring Boot应用的影响示例代码优化建议结论F

Python 中的 with open文件操作的最佳实践

《Python中的withopen文件操作的最佳实践》在Python中,withopen()提供了一个简洁而安全的方式来处理文件操作,它不仅能确保文件在操作完成后自动关闭,还能处理文件操作中的异... 目录什么是 with open()?为什么使用 with open()?使用 with open() 进行

MySQL 分区与分库分表策略应用小结

《MySQL分区与分库分表策略应用小结》在大数据量、复杂查询和高并发的应用场景下,单一数据库往往难以满足性能和扩展性的要求,本文将详细介绍这两种策略的基本概念、实现方法及优缺点,并通过实际案例展示如... 目录mysql 分区与分库分表策略1. 数据库水平拆分的背景2. MySQL 分区策略2.1 分区概念

Spring Shell 命令行实现交互式Shell应用开发

《SpringShell命令行实现交互式Shell应用开发》本文主要介绍了SpringShell命令行实现交互式Shell应用开发,能够帮助开发者快速构建功能丰富的命令行应用程序,具有一定的参考价... 目录引言一、Spring Shell概述二、创建命令类三、命令参数处理四、命令分组与帮助系统五、自定义S