Flink-cdc更好的流式数据集成工具

2024-05-24 06:12

本文主要是介绍Flink-cdc更好的流式数据集成工具,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

What’s Flink-cdc?

在这里插入图片描述

Flink CDC 是基于Apache Flink的一种数据变更捕获技术,用于从数据源(如数据库)中捕获和处理数据的变更事件。CDC技术允许实时地捕获数据库中的增、删、改操作,将这些变更事件转化为流式数据,并能够对这些事件进行实时处理和分析。

Flink CDC提供了与各种数据源集成的功能,包括常见的关系型数据库(如MySQL、PostgreSQL、Oracle等)以及NoSQL数据库(如MongoDB、HBase等)。它通过监控数据库的日志或轮询方式来捕获数据变更,并将变更事件作为数据流发送到Flink的任务中进行处理。

Flink CDC 深度集成并由 Apache Flink 驱动,提供以下核心功能:

✅ 端到端的数据集成框架
✅ 为数据集成的用户提供了易于构建作业的 API
✅ 支持在 Source 和 Sink 中处理多个表
✅ 整库同步
✅具备表结构变更自动同步的能力(Schema Evolution)

在使用者的角度,就是Flink-cdc可以简化流处理的流程:

  • 引入Flink-cdc之前流处理流程
    在这里插入图片描述

  • 引入Flink-cdc之后后流处理流程
    在这里插入图片描述
    如上所示,在flink-cdc被引入后大大简化了流处理流程

Flink-cdc支持的链接及对应的版本

Pipeline Connectors
在这里插入图片描述
Source Connectors
在这里插入图片描述截止目前(2024-05-23)

Flink-cdc与Flink对应对影版本的关系

在这里插入图片描述截止目前(2024-05-23)

flink-connector-mysql-cdc 实例分析

示例代码

demo代码:

import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;public class MySqlSourceDemo {public static void main(String[] args) throws Exception {MySqlSource<String> mySqlSource = MySqlSource.<String>builder().hostname("mysql-server-host").port(3306).databaseList("mydb") // 设置捕获的数据库.tableList("mydb.products") // 设置捕获的表,如果需要同步整个数据库,请将 tableList 设置为 ".*".
//                .tableList(".*") // 捕获整个数据库的表
//                .tableList("^(?!mysql|information_schema|performance_schema).*") // 设置捕获的表,排除系统库
//                .tableList("mydb.(?!products|orders).*") // 同步排除products和orders表之外的整个my_db库.username("flink-cdc").password("xxx").serverId("5400-5405").deserializer(new JsonDebeziumDeserializationSchema()) // 将 SourceRecord 转换为 JSON 字符串.serverTimeZone("Asia/Shanghai") // 设置时区.startupOptions(StartupOptions.initial()).scanNewlyAddedTableEnabled(true) // 启用扫描新添加的表功能
//                .includeSchemaChanges(true) // 包括 schema 变更.build();org.apache.flink.configuration.Configuration config = new org.apache.flink.configuration.Configuration();config.setString("rest.port", "8081");
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(config); //本地环境,调试用StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 设置 3s 的 checkpoint 间隔env.enableCheckpointing(3000);env.setStateBackend(new HashMapStateBackend());env.getCheckpointConfig().setCheckpointStorage("file:///tmp/ck");//本地文件系统
//        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); 1.14.0 版本开始支持env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")// 设置 source 节点的并行度为 4.setParallelism(5).print().setParallelism(1); // 设置 sink 节点并行度为 1env.execute("Print MySQL Snapshot + Binlog");}
}

maven依赖:

<properties><maven.compiler.source>8</maven.compiler.source><maven.compiler.target>8</maven.compiler.target><project.build.sourceEncoding>UTF-8</project.build.sourceEncoding><flink.version>1.14.5</flink.version><scala.binary.version>2.12</scala.binary.version></properties><dependencies><dependency><groupId>junit</groupId><artifactId>junit</artifactId><scope>test</scope></dependency><!-- 将 Apache Flink 的 Web 运行时模块添加到项目中 --><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_${scala.binary.version}</artifactId><version>${flink.version}</version><scope>provided</scope> <!--provided生命周期在test模式才可以运行,在main模式会找不到包--></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.3.0</version><scope>compile</scope></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version><scope>compile</scope></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-log4j12</artifactId><version>1.7.25</version><scope>provided</scope></dependency></dependencies>

日志配置文件:
log4j.properties

log4j.rootCategory=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss} %p %c{1}:%L - %m%n

启动standalone Flink级群

# jobmanager
docker run -d \
--name flink-jm \
--hostname flink-jm \
-p 8082:8081 \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8  \
jobmanager# taskmanager
docker run -d \
--name flink-tm \
--hostname flink-tm \
--env FLINK_PROPERTIES="jobmanager.rpc.address: flink-jm" \
--network flink-network-standalone \
ponylee/flink:1.15.0-java8 \
taskmanager \
-Dtaskmanager.memory.process.size=1024m \
-Dtaskmanager.numberOfTaskSlots=5 \
-Drest.flamegraph.enabled=true

分析说明

为每个 Reader 设置不同的 Server id

每个用于读取 binlog 的 MySQL 数据库客户端都应该有一个唯一的 id,称为 Server id。 MySQL 服务器将使用此 id 来维护网络连接和 binlog 位置。 因此,如果不同的作业共享相同的 Server id, 则可能导致从错误的 binlog 位置读取数据。 因此,建议通过为每个 Reader 设置不同的 Server id , 假设 Source 并行度为 4,server id 配置必须:serverId(“5400-5405”),5405-5400=5 >= 4。来为 4 个 Source readers 中的每一个分配唯一的 Server id。

查看mysql链接发现
select * from information_schema.processlist where user = ‘flink-cdc’;
在这里插入图片描述Flink-cdc对mysql的影响
正常情况下,Flink-cdc是No-lock Read,主库可以继续处理事务和查询,而不会导致主库进程阻塞,对主库产生直接影响。但是,在某些情况下数据同步的过程中可能会对主库产生一些间接影响,比如:网络、IO、CPU负载以及mysql的并发连接数等资源消耗。但这些对主库的开销影响相对较小(全量同步阶段可能比较耗能,但时间相对比较短)。

断点续传

通过从checkpoint/savepoint 恢复,flink-cdc可以保证断点续传。

  • 从checkpoint/savepoint恢复,缩小同步范围,例如:从tableList(“mydb.products,mydb.orders”)或tableList(“.*”) 缩小到 tableList(“mydb.products”),应用更新生效。

  • 应用从checkpoint/savepoint恢复,扩大同步范围的部分不会生效,例如:从tableList(“mydb.products”) 到 tableList(“mydb.products,mydb.orders”)或tableList(“.*”),应用更新不生效生效。若想使动态加表生效,可以显示制定scanNewlyAddedTableEnabled(true) ,来启用扫描新添加的表功能。如没有特殊情况,建议在开发环境开启此配置。

参考:
flink-cdc
flink-cdc docs

这篇关于Flink-cdc更好的流式数据集成工具的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/997598

相关文章

Spring 请求之传递 JSON 数据的操作方法

《Spring请求之传递JSON数据的操作方法》JSON就是一种数据格式,有自己的格式和语法,使用文本表示一个对象或数组的信息,因此JSON本质是字符串,主要负责在不同的语言中数据传递和交换,这... 目录jsON 概念JSON 语法JSON 的语法JSON 的两种结构JSON 字符串和 Java 对象互转

C++如何通过Qt反射机制实现数据类序列化

《C++如何通过Qt反射机制实现数据类序列化》在C++工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作,所以本文就来聊聊C++如何通过Qt反射机制实现数据类序列化吧... 目录设计预期设计思路代码实现使用方法在 C++ 工程中经常需要使用数据类,并对数据类进行存储、打印、调试等操作。由于数据类

SpringBoot使用GZIP压缩反回数据问题

《SpringBoot使用GZIP压缩反回数据问题》:本文主要介绍SpringBoot使用GZIP压缩反回数据问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot使用GZIP压缩反回数据1、初识gzip2、gzip是什么,可以干什么?3、Spr

Spring Boot 集成 Quartz并使用Cron 表达式实现定时任务

《SpringBoot集成Quartz并使用Cron表达式实现定时任务》本篇文章介绍了如何在SpringBoot中集成Quartz进行定时任务调度,并通过Cron表达式控制任务... 目录前言1. 添加 Quartz 依赖2. 创建 Quartz 任务3. 配置 Quartz 任务调度4. 启动 Sprin

MySql match against工具详细用法

《MySqlmatchagainst工具详细用法》在MySQL中,MATCH……AGAINST是全文索引(Full-Textindex)的查询语法,它允许你对文本进行高效的全文搜素,支持自然语言搜... 目录一、全文索引的基本概念二、创建全文索引三、自然语言搜索四、布尔搜索五、相关性排序六、全文索引的限制七

基于Java实现回调监听工具类

《基于Java实现回调监听工具类》这篇文章主要为大家详细介绍了如何基于Java实现一个回调监听工具类,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录监听接口类 Listenable实际用法打印结果首先,会用到 函数式接口 Consumer, 通过这个可以解耦回调方法,下面先写一个

springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法

《springboot整合阿里云百炼DeepSeek实现sse流式打印的操作方法》:本文主要介绍springboot整合阿里云百炼DeepSeek实现sse流式打印,本文给大家介绍的非常详细,对大... 目录1.开通阿里云百炼,获取到key2.新建SpringBoot项目3.工具类4.启动类5.测试类6.测

使用Python构建一个Hexo博客发布工具

《使用Python构建一个Hexo博客发布工具》虽然Hexo的命令行工具非常强大,但对于日常的博客撰写和发布过程,我总觉得缺少一个直观的图形界面来简化操作,下面我们就来看看如何使用Python构建一个... 目录引言Hexo博客系统简介设计需求技术选择代码实现主框架界面设计核心功能实现1. 发布文章2. 加

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient

JS+HTML实现在线图片水印添加工具

《JS+HTML实现在线图片水印添加工具》在社交媒体和内容创作日益频繁的今天,如何保护原创内容、展示品牌身份成了一个不得不面对的问题,本文将实现一个完全基于HTML+CSS构建的现代化图片水印在线工具... 目录概述功能亮点使用方法技术解析延伸思考运行效果项目源码下载总结概述在社交媒体和内容创作日益频繁的