Flink新增特性 | CDC(Change Data Capture) 原理和实践应用

2024-09-06 20:18

本文主要是介绍Flink新增特性 | CDC(Change Data Capture) 原理和实践应用,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

点击上方蓝色字体,选择“设为星标

回复”资源“获取更多资源

大数据技术与架构

点击右侧关注,大数据开发领域最强公众号!

大数据真好玩

点击右侧关注,大数据真好玩!

CDC简介

CDC,Change Data Capture,变更数据获取的简称,使用CDC我们可以从数据库中获取已提交的更改并将这些更改发送到下游,供下游使用。这些变更可以包括INSERT,DELETE,UPDATE等。

用户可以在以下的场景下使用CDC:

  • 使用flink sql进行数据同步,可以将数据从一个数据同步到其他的地方,比如mysql、elasticsearch等。

  • 可以在源数据库上实时的物化一个聚合视图

  • 因为只是增量同步,所以可以实时的低延迟的同步数据

  • 使用EventTime join 一个temporal表以便可以获取准确的结果

Flink 1.11 将这些changelog提取并转化为Table API和SQL,目前支持两种格式:Debezium和Canal,这就意味着源表不仅仅是append操作,而且还有upsert、delete操作。

Flink CDC 功能适用的一些场景:

  • 数据库之间的增量数据同步

  • 审计日志

  • 数据库之上的实时物化视图

  • 基于CDC的维表join

Flink  CDC使用方式


目前Flink支持两种内置的connector,PostgreSQL和mysql,接下来我们以mysql为例。

Flink 1.11仅支持Kafka作为现成的变更日志源和JSON编码的变更日志,而Avro(Debezium)和Protobuf(Canal)计划在将来的版本中使用。还计划支持MySQL二进制日志和Kafka压缩主题作为源,并将扩展日志支持扩展到批处理执行。

Flink CDC当作监听器获取增量变更

传统的实时链路如何实现业务数据的同步,我们以canal为例,传统业务数据实时同步会涉及到canal处理mysql的binlog然后同步到kafka,在通过计算引擎spark,flink或storm计算转化,再结果数据传输到第三方存储(hbase,es)如下图所示主要分为三个模块E(Extract) ,T(Transform), L(Load).可以看到涉及的组件很多,链路很长。

我们可以直接Flink CDC消费数据库的增量日志,替代了原来作为数据采集层的canal,然后直接进行计算,经过计算之后,将计算结果 发送到下游。整体架构如下:

使用这种架构是好处有:

  • 减少canal和kafka的维护成本,链路更短,延迟更低

  • flink提供了exactly once语义

  • 可以从指定position读取

  • 去掉了kafka,减少了消息的存储成本

我们需要引入相应的pom,mysql的pom如下:

<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.1.0</version>
</dependency>

如果是sql客户端使用,需要下载 flink-sql-connector-mysql-cdc-1.1.0.jar 并且放到<FLINK_HOME>/lib/下面

连接mysql数据库的示例sql如下:

-- creates a mysql cdc table source
CREATE TABLE mysql_binlog (id INT NOT NULL,name STRING,description STRING,weight DECIMAL(10,3)
) WITH ('connector' = 'mysql-cdc','hostname' = 'localhost','port' = '3306','username' = 'flinkuser','password' = 'flinkpw','database-name' = 'inventory','table-name' = 'products'
);

使用API的方式:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.alibaba.ververica.cdc.debezium.StringDebeziumDeserializationSchema;
import com.alibaba.ververica.cdc.connectors.mysql.MySQLSource;public class MySqlBinlogSourceExample {public static void main(String[] args) throws Exception {SourceFunction<String> sourceFunction = MySQLSource.<String>builder().hostname("localhost").port(3306).databaseList("inventory") // monitor all tables under inventory database.username("flinkuser").password("flinkpw").deserializer(new StringDebeziumDeserializationSchema()) // converts SourceRecord to String.build();StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.addSource(sourceFunction).print().setParallelism(1); // use parallelism 1 for sink to keep message orderingenv.execute();}
}

Flink CDC 当作转换工具

如果需要Flink承担的角色是计算层,那么目前Flink提供的format有两种格式:canal-json和debezium-json,下面我们简单的介绍下。

如果要使用Kafka的canal-json,对于程序而言,需要添加如下依赖:

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_2.11</artifactId><version>1.11.0</version>
</dependency>

我们可以直接消费canal-json数据:

CREATE TABLE topic_products (id BIGINT,name STRING,description STRING,weight DECIMAL(10, 2)
) WITH ('connector' = 'kafka','topic' = 'products_binlog','properties.bootstrap.servers' = 'localhost:9092','properties.group.id' = 'testGroup','format' = 'canal-json'  -- using canal-json as the format
)

changelog format

如果要使用Kafka的changelog-json Format,对于程序而言,需要添加如下依赖:

<dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-format-changelog-json</artifactId><version>1.0.0</version>
</dependency>

如果要使用Flink SQL Client,需要添加如下jar包:flink-format-changelog-json-1.0.0.jar,将该jar包放在Flink安装目录的lib文件夹下即可。

-- assuming we have a user_behavior logs
CREATE TABLE user_behavior (user_id BIGINT,item_id BIGINT,category_id BIGINT,behavior STRING,ts TIMESTAMP(3)
) WITH ('connector' = 'kafka',  -- using kafka connector'topic' = 'user_behavior',  -- kafka topic'scan.startup.mode' = 'earliest-offset',  -- reading from the beginning'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address'format' = 'json'  -- the data format is json
);-- we want to store the the UV aggregation result in kafka using changelog-json format
create table day_uv (day_str STRING,uv BIGINT
) WITH ('connector' = 'kafka','topic' = 'day_uv','scan.startup.mode' = 'earliest-offset',  -- reading from the beginning'properties.bootstrap.servers' = 'localhost:9092',  -- kafka broker address'format' = 'changelog-json'  -- the data format is json
);-- write the UV results into kafka using changelog-json format
INSERT INTO day_uv
SELECT DATE_FORMAT(ts, 'yyyy-MM-dd') as date_str, count(distinct user_id) as uv
FROM user_behavior
GROUP BY DATE_FORMAT(ts, 'yyyy-MM-dd');-- reading the changelog back again
SELECT * FROM day_uv;

版权声明:

本文为大数据技术与架构整理,原作者独家授权。未经原作者允许转载追究侵权责任。

编辑|冷眼丶

微信公众号|import_bigdata

欢迎点赞+收藏+转发朋友圈素质三连

文章不错?点个【在看】吧! ????

这篇关于Flink新增特性 | CDC(Change Data Capture) 原理和实践应用的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1143013

相关文章

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

csu 1446 Problem J Modified LCS (扩展欧几里得算法的简单应用)

这是一道扩展欧几里得算法的简单应用题,这题是在湖南多校训练赛中队友ac的一道题,在比赛之后请教了队友,然后自己把它a掉 这也是自己独自做扩展欧几里得算法的题目 题意:把题意转变下就变成了:求d1*x - d2*y = f2 - f1的解,很明显用exgcd来解 下面介绍一下exgcd的一些知识点:求ax + by = c的解 一、首先求ax + by = gcd(a,b)的解 这个

hdu1394(线段树点更新的应用)

题意:求一个序列经过一定的操作得到的序列的最小逆序数 这题会用到逆序数的一个性质,在0到n-1这些数字组成的乱序排列,将第一个数字A移到最后一位,得到的逆序数为res-a+(n-a-1) 知道上面的知识点后,可以用暴力来解 代码如下: #include<iostream>#include<algorithm>#include<cstring>#include<stack>#in

zoj3820(树的直径的应用)

题意:在一颗树上找两个点,使得所有点到选择与其更近的一个点的距离的最大值最小。 思路:如果是选择一个点的话,那么点就是直径的中点。现在考虑两个点的情况,先求树的直径,再把直径最中间的边去掉,再求剩下的两个子树中直径的中点。 代码如下: #include <stdio.h>#include <string.h>#include <algorithm>#include <map>#

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

【区块链 + 人才服务】可信教育区块链治理系统 | FISCO BCOS应用案例

伴随着区块链技术的不断完善,其在教育信息化中的应用也在持续发展。利用区块链数据共识、不可篡改的特性, 将与教育相关的数据要素在区块链上进行存证确权,在确保数据可信的前提下,促进教育的公平、透明、开放,为教育教学质量提升赋能,实现教育数据的安全共享、高等教育体系的智慧治理。 可信教育区块链治理系统的顶层治理架构由教育部、高校、企业、学生等多方角色共同参与建设、维护,支撑教育资源共享、教学质量评估、

AI行业应用(不定期更新)

ChatPDF 可以让你上传一个 PDF 文件,然后针对这个 PDF 进行小结和提问。你可以把各种各样你要研究的分析报告交给它,快速获取到想要知道的信息。https://www.chatpdf.com/