本文主要是介绍详解 Flink CDC 的介绍和入门案例,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一、Flink CDC 简介
1. CDC 介绍
CDC 是 Change Data Capture(变更数据获取)的简称。核心思想是监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。
2. CDC 种类
基于查询的 CDC | 基于 Binlog 的 CDC | |
---|---|---|
开源产品 | Sqoop、Kafka JDBC Source | Canal、Maxwell、Debezium |
执行模式 | Batch | Streaming |
是否可以捕获所有数据变化 | 否 | 是 |
延迟性 | 高延迟 | 低延迟 |
是否增加数据库压力 | 是 | 否 |
3. Flink CDC 介绍
Flink CDC 是一个内置了 Debezium 的基于 Binlog 的可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。开源地址:https://github.com/ververica/flink-cdc-connectors
二、Flink CDC 案例实操
1. DataStream 实现
1.1 导入依赖
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.49</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version></dependency><dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version></dependency>
</dependencies>
<build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-assembly-plugin</artifactId><version>3.0.0</version><configuration><descriptorRefs><descriptorRef>jar-with-dependencies</descriptorRef></descriptorRefs></configuration><executions><execution><id>make-assembly</id><phase>package</phase><goals><goal>single</goal></goals></execution></executions></plugin></plugins>
</build>
1.2 编写程序代码
public class FlinkCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//Flink-CDC 将读取 binlog 的位置信息以状态的方式保存在 CK,如果想要做到断点续传,需要从 Checkpoint 或者 Savepoint 启动程序//1.1 开启 Checkpoint,每隔 5 秒钟做一次 CKenv.enableCheckpointing(5000L);//1.2 指定 CK 的一致性语义env.getCheckpointConfig().setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);//1.3 设置任务关闭的时候保留最后一次 CK 数据env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);//1.4 指定从 CK 自动重启策略env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3, 2000L));//1.5 设置状态后端env.setStateBackend(new FsStateBackend("hdfs://hadoop102:8020/flinkCDC"));//1.6 设置访问 HDFS 的用户名System.setProperty("HADOOP_USER_NAME", "lgb");//2. 创建 FlinkCDC Source/*StartupOptions 有 5 种类型:1. initial:默认,先使用查询的方式读取表中所有的数据,然后再从 binlog 的最近位置监控读取2. earliest:从 binlog 最开始的位置读取,要求在数据库创建之前就开启了 binlog3. latest:从 binlog 的最近位置监控读取4. specificOffset:从 binlog 的指定位置读取5. timestamp:从 binlog 的指定时间戳读取*/DebeziumSourceFunction<String> mysqlSource = MysqlSource.<String>builder().hostname("hadoop102") //Mysql所在主机名.port(3306) //mysql端口号.username("root") //登录mysql用户名.password("123456") //登录mysql密码.databaseList("cdc_test") //监控的数据库列表,可变参数.tableList("cdc_test.user_info") //监控的数据表,不指定则监控数据库下所有表.deserializer(new StringDebeziumDeserializationSchema()) //反序列化器.startupOptions(StartupOptions.initial()) //指定读取策略.build();//3. 通过 FlinkCDC Source 创建 DataStreamDataStream<String> dataStream = env.addSource(mysqlSource);//4. 打印输出流dataStream.print();//5. 启动任务env.execute("FlinkCDC");}
}
1.3 测试
1.3.1 本地测试
- 开启 MySQL Binlog 并重启 MySQL
- 在 Mysql 中创建对应的数据库和数据表并插入一条数据
- 启动 FlinkCDC 程序,查看控制台结果,可以看到通过查询的方式获取到了数据表里的所有数据
- 在数据表中进行增删改操作,查看程序控制台输出结果
1.3.2 集群测试
-
将 FlinkCDC 程序进行打包并上传到集群
-
启动 Hadoop、zookeeper 和 Flink 集群
-
运行 FlinkCDC 程序
bin/flink run -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
-
给当前的 Flink 程序创建 Savepoint
bin/flink savepoint [JobId] hdfs://hadoop102:8020/flink/save
-
停止 FlinkCDC 程序
-
在Mysql数据表中进行增删改操作
-
从 Savepoint 重启程序查看程序输出结果
bin/flink run -s hdfs://hadoop102:8020/flink/save/[JobId] -c com.atguigu.FlinkCDC flink-1.0-SNAPSHOT-jar-with-dependencies.jar
2. Flink SQL 实现
2.0.0 版本的 FlinkCDC 通过 FlinkSQL 实现需要 1.13+ 版本的 Flink 支持
public class FlinkSQLCDC {public static void main(String[] args) throws Exception {//1. 创建 Flink 执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);//2. 创建 FlinkSQL 表环境StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);//3. 配置 FlinkSQLCDC 监控单表(只能监控单表),不需要指定反序列化器,读取模式只有 initial 和 latest-offsettableEnv.executeSql("create table user_info (" +"id String primary key, name String, sex String) with (" +" 'connector' = 'mysql-cdc'," +" 'scan.startup.mode' = 'initial'," +" 'hostname' = 'hadoop102'," +" 'port' = '3306'," +" 'username' = 'root'," +" 'password' = '123456'," +" 'database-name' = 'cdc_test'," +" 'table-name' = 'user_info'" +")");//4. 查询输出表中数据Table table = tableEnv.sqlQuery("select * from user_info");DataStream<Tuple2<Boolean, Row>> dataStream = tableEnv.toRetractStream(table, Row.class);dataStream.print();//5. 启动任务env.execute("FlinkSqlCDC");}
}
3. 自定义反序列化器
规范化数据输出格式,方便后续解析
/**自定义反序列化器:实现 DebeziumDeserializationSchema<T> 接口并实现 deserialize 和 getProducedType 方法
*/
public class MyDeserializationSchema implements DebeziumDeserializationSchema<String> {/*想要展示的数据格式:{"dbName":"","tableName":"","before":{"field1":"value1",...},"after":{"field1":"value1",...},"op":""}*/@Overridepublic void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {JSONObject result = new JSONObject();//1.获取库名和表名String topic = sourceRecord.topic();String[] fields = topic.split("\\.");//2. 获取 before 数据Struct value = (Struct) sourceRecord.value();Struct before = value.getStruct("before");JSONObject beforeJSON = new JSONObject();if(before != null) {Schema schema = before.schema();List<Field> fields = schema.fields();for(Field field : fields) {beforeJSON.put(field.name(), before.get(field));}}//3. 获取 after 数据Struct after = value.getStruct("after");JSONObject afterJSON = new JSONObject();if(after != null) {Schema schema = after.schema();List<Field> fields = schema.fields();for(Field field : fields) {afterJSON.put(field.name(), after.get(field));}}//4. 获取操作类型 READ DELETE UPDATE CREATEEnvelope.Operation operation = Envelope.operationFor(sourceRecord);result.put("dbName", fields[1]);result.put("tableName", fields[2]);result.put("before", beforeJSON);result.put("after", afterJSON);result.put("op", operation);collcetor.collect(result.toJSONString());}@Overridepublic TypeInformation<String> getProducedType() {return TypeInformation.of(String.class);}
}
这篇关于详解 Flink CDC 的介绍和入门案例的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!