详解 Flink CDC 的介绍和入门案例

2024-06-14 15:28

本文主要是介绍详解 Flink CDC 的介绍和入门案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、Flink CDC 简介

1. CDC 介绍

​ CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2. CDC 种类

基于查询的 CDC基于 Binlog 的 CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

3. Flink CDC 介绍

​ Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors

二、Flink CDC 案例实操

1. DataStream 实现

1.1 导入依赖
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency>
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
1.2 编写程序代码
public class FlinkCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序//1.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//1.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//1.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//1.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//1.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//1.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "lgb");//2. 创建 FlinkCDC Source/*StartupOptions 有 5 种类型:1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog3. latest:从 binlog 的最近位置监控读取4. specificOffset:从 binlog 的指定位置读取5. timestamp:从 binlog 的指定时间戳读取*/DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder().hostname("hadoop102") //Mysql所在主机名.port(3306) //mysql端口号.username("root") //登录mysql用户名.password("123456") //登录mysql密码.databaseList("cdc_test") //监控的数据库列表,可变参数.tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表.deserializer(new StringDebeziumDeserializationSchema()) //反序列化器.startupOptions(StartupOptions.initial()) //指定读取策略.build();//3. 通过 FlinkCDC Source 创建 DataStreamDataStream<String> dataStream = env.addSource(mysqlSource);//4. 打印输出流dataStream.print();//5. 启动任务env.execute("FlinkCDC");}
}
1.3 测试
1.3.1 本地测试
  • 开启 MySQL Binlog 并重启 MySQL
  • 在 Mysql 中创建对应的数据库和数据表并插入一条数据
  • 启动 FlinkCDC 程序,查看控制台结果,可以看到通过查询的方式获取到了数据表里的所有数据
  • 在数据表中进行增删改操作,查看程序控制台输出结果
1.3.2 集群测试
  • 将 FlinkCDC 程序进行打包并上传到集群

  • 启动 Hadoop、zookeeper 和 Flink 集群

  • 运行 FlinkCDC 程序

    bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    
  • 给当前的 Flink 程序创建 Savepoint

    bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
  • 停止 FlinkCDC 程序

  • 在Mysql数据表中进行增删改操作

  • 从 Savepoint 重启程序查看程序输出结果

    bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
    

2. Flink SQL 实现

2.0.0 版本的 FlinkCDC 通过 FlinkSQL 实现需要 1.13+ 版本的 Flink 支持

public class FlinkSQLCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2. 创建 FlinkSQL 表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offsettableEnv.executeSql("create table user_info (" +"id String primary key, name String, sex String) with (" +" 'connector' = 'mysql-cdc'," +" 'scan.startup.mode' = 'initial'," +" 'hostname' = 'hadoop102'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'database-name' = 'cdc_test'," +" 'table-name' = 'user_info'" +")");//4. 查询输出表中数据Table table = tableEnv.sqlQuery("select * from user_info");DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class);dataStream.print();//5. 启动任务env.execute("FlinkSqlCDC");}
}

3. 自定义反序列化器

规范化数据输出格式,方便后续解析

/**自定义反序列化器:实现 DebeziumDeserializationSchema<T> 接口并实现 deserialize 和 getProducedType 方法 
*/
public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {/*想要展示的数据格式:{"dbName":"","tableName":"","before":{"field1":"value1",...},"after":{"field1":"value1",...},"op":""}*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {JSONObject result = new JSONObject();//1.获取库名和表名String topic = sourceRecord.topic();String[] fields = topic.split("\\.");//2. 获取 before 数据Struct value = (Struct) sourceRecord.value();Struct before = value.getStruct("before");JSONObject beforeJSON = new JSONObject();if(before != null) {Schema schema = before.schema();List<Field> fields = schema.fields();for(Field field : fields) {beforeJSON.put(field.name(), before.get(field));}}//3. 获取 after 数据Struct after = value.getStruct("after");JSONObject afterJSON = new JSONObject();if(after != null) {Schema schema = after.schema();List<Field> fields = schema.fields();for(Field field : fields) {afterJSON.put(field.name(), after.get(field));}}//4. 获取操作类型 READ DELETE UPDATE CREATEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);result.put("dbName", fields[1]);result.put("tableName", fields[2]);result.put("before", beforeJSON);result.put("after", afterJSON);result.put("op", operation);collcetor.collect(result.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}	
}

这篇关于详解 Flink CDC 的介绍和入门案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1060788

相关文章

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

MyBatis分页查询实战案例完整流程

《MyBatis分页查询实战案例完整流程》MyBatis是一个强大的Java持久层框架,支持自定义SQL和高级映射,本案例以员工工资信息管理为例,详细讲解如何在IDEA中使用MyBatis结合Page... 目录1. MyBATis框架简介2. 分页查询原理与应用场景2.1 分页查询的基本原理2.1.1 分

MySQL的JDBC编程详解

《MySQL的JDBC编程详解》:本文主要介绍MySQL的JDBC编程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、前置知识1. 引入依赖2. 认识 url二、JDBC 操作流程1. JDBC 的写操作2. JDBC 的读操作总结前言本文介绍了mysq

Redis 的 SUBSCRIBE命令详解

《Redis的SUBSCRIBE命令详解》Redis的SUBSCRIBE命令用于订阅一个或多个频道,以便接收发送到这些频道的消息,本文给大家介绍Redis的SUBSCRIBE命令,感兴趣的朋友跟随... 目录基本语法工作原理示例消息格式相关命令python 示例Redis 的 SUBSCRIBE 命令用于订

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Python中 try / except / else / finally 异常处理方法详解

《Python中try/except/else/finally异常处理方法详解》:本文主要介绍Python中try/except/else/finally异常处理方法的相关资料,涵... 目录1. 基本结构2. 各部分的作用tryexceptelsefinally3. 执行流程总结4. 常见用法(1)多个e

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志

Java中的抽象类与abstract 关键字使用详解

《Java中的抽象类与abstract关键字使用详解》:本文主要介绍Java中的抽象类与abstract关键字使用详解,本文通过实例代码给大家介绍的非常详细,感兴趣的朋友跟随小编一起看看吧... 目录一、抽象类的概念二、使用 abstract2.1 修饰类 => 抽象类2.2 修饰方法 => 抽象方法,没有

MySQL8 密码强度评估与配置详解

《MySQL8密码强度评估与配置详解》MySQL8默认启用密码强度插件,实施MEDIUM策略(长度8、含数字/字母/特殊字符),支持动态调整与配置文件设置,推荐使用STRONG策略并定期更新密码以提... 目录一、mysql 8 密码强度评估机制1.核心插件:validate_password2.密码策略级

从入门到精通详解Python虚拟环境完全指南

《从入门到精通详解Python虚拟环境完全指南》Python虚拟环境是一个独立的Python运行环境,它允许你为不同的项目创建隔离的Python环境,下面小编就来和大家详细介绍一下吧... 目录什么是python虚拟环境一、使用venv创建和管理虚拟环境1.1 创建虚拟环境1.2 激活虚拟环境1.3 验证虚