使用Flink CDC实时监控MySQL数据库变更

2024-06-24 01:28

本文主要是介绍使用Flink CDC实时监控MySQL数据库变更,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在现代数据架构中,实时数据处理变得越来越重要。Flink CDC(Change Data Capture)是一种强大的工具,可以帮助我们实时捕获数据库的变更,并进行处理。本文将介绍如何使用Flink CDC从MySQL数据库中读取变更数据,并将其打印到控制台。

环境准备

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-java</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-streaming-java_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>3.1.3</version>
</dependency>
<dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.12</artifactId><version>1.12.0</version>
</dependency>
<dependency><groupId>com.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>2.0.0</version>
</dependency>
<dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.75</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.23</version>
</dependency>
  1. 获取Flink执行环境

首先,我们需要获取Flink的执行环境。这是所有Flink作业的起点。

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
  1. 启用检查点和设置并行度

为了确保作业的容错性和状态恢复,我们需要启用检查点,并设置作业的并行度。

env.enableCheckpointing(500); // 每500毫秒创建一个检查点
env.setParallelism(1); // 设置作业的并行度为1
  1. 使用Debezium Source读取MySQL的binlog

接下来,我们使用Debezium Source读取MySQL的binlog。我们需要配置MySQL的连接信息、监控的数据库和表、反序列化器以及启动选项。

DebeziumSourceFunction<String> sourceFunction = MySqlSource.<String>builder().serverTimeZone("Asia/Shanghai") // 设置时区为亚洲/上海.hostname("localhost") // MySQL的IP地址.port(3306) // MySQL的端口.username("root") // MySQL的用户名.password("123456") // MySQL的密码.databaseList("my_db") // 监控的数据库.tableList("my_db.user") // 监控的数据库下的表.deserializer(new JsonDebeziumDeserializationSchema()) // 反序列化.startupOptions(StartupOptions.initial()) // 启动选项.build();

这里 JsonDebeziumDeserializationSchema类的代码如下:

import com.alibaba.fastjson.JSONObject;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.util.Collector;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;import java.util.List;/**
*  自定义DeserializationSchema进行反序列化。
*/public class JsonDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {@Overridepublic void deserialize(SourceRecord sourceRecord, Collector collector) throws Exception {//创建JSON对象用于存储最终数据JSONObject result = new JSONObject();String topic = sourceRecord.topic();String[] fields = topic.split("\\.");String database = fields[1];String tableName = fields[2];Struct value  = (Struct)sourceRecord.value();//获取before数据Struct before = value.getStruct("before");JSONObject beforeJson = getJson(before);//获取after数据Struct after = value.getStruct("after");JSONObject afterJson = getJson(after);//获取操作类型Envelope.Operation operation = Envelope.operationFor(sourceRecord);//将字段写入JSON对象result.put("database",database);result.put("tableName",tableName);result.put("type",operation);result.put("before",beforeJson);result.put("after",afterJson);//输出数据collector.collect(result.toJSONString());}/***  获取字段值并写入result对象* @param before* @return*/private JSONObject getJson(Struct before) {JSONObject jsonObject = new JSONObject();if(before != null){Schema beforeSchema = before.schema();List<Field> beforeFields = beforeSchema.fields();for (Field field : beforeFields) {Object beforeValue = before.get(field);jsonObject.put(field.name(), beforeValue);}}return jsonObject;}@Overridepublic TypeInformation getProducedType() {return BasicTypeInfo.STRING_TYPE_INFO;}
}
  1. 添加数据源并打印数据

将Debezium源函数添加到Flink环境中,生成一个数据流,并将数据流中的数据打印到控制台。

DataStream<String> dataStreamSource = env.addSource(sourceFunction, TypeInformation.of(String.class));
DataStreamSink<String> print = dataStreamSource.print();
  1. 启动任务

最后,启动Flink作业,开始处理数据流。

env.execute("Flink-CDC");

6.测试

在这里插入图片描述

总结

通过上述步骤,我们可以使用Flink CDC实时监控MySQL数据库的变更,并将变更数据以JSON格式打印出来。这种方法不仅适用于数据监控,还可以用于实时数据处理和分析。

这篇关于使用Flink CDC实时监控MySQL数据库变更的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1088804

相关文章

C语言中联合体union的使用

本文编辑整理自: http://bbs.chinaunix.net/forum.php?mod=viewthread&tid=179471 一、前言 “联合体”(union)与“结构体”(struct)有一些相似之处。但两者有本质上的不同。在结构体中,各成员有各自的内存空间, 一个结构变量的总长度是各成员长度之和。而在“联合”中,各成员共享一段内存空间, 一个联合变量

Tolua使用笔记(上)

目录   1.准备工作 2.运行例子 01.HelloWorld:在C#中,创建和销毁Lua虚拟机 和 简单调用。 02.ScriptsFromFile:在C#中,对一个lua文件的执行调用 03.CallLuaFunction:在C#中,对lua函数的操作 04.AccessingLuaVariables:在C#中,对lua变量的操作 05.LuaCoroutine:在Lua中,

Vim使用基础篇

本文内容大部分来自 vimtutor,自带的教程的总结。在终端输入vimtutor 即可进入教程。 先总结一下,然后再分别介绍正常模式,插入模式,和可视模式三种模式下的命令。 目录 看完以后的汇总 1.正常模式(Normal模式) 1.移动光标 2.删除 3.【:】输入符 4.撤销 5.替换 6.重复命令【. ; ,】 7.复制粘贴 8.缩进 2.插入模式 INSERT

mysql索引四(组合索引)

单列索引,即一个索引只包含单个列,一个表可以有多个单列索引,但这不是组合索引;组合索引,即一个索引包含多个列。 因为有事,下面内容全部转自:https://www.cnblogs.com/farmer-cabbage/p/5793589.html 为了形象地对比单列索引和组合索引,为表添加多个字段:    CREATE TABLE mytable( ID INT NOT NULL, use

mysql索引三(全文索引)

前面分别介绍了mysql索引一(普通索引)、mysql索引二(唯一索引)。 本文学习mysql全文索引。 全文索引(也称全文检索)是目前搜索引擎使用的一种关键技术。它能够利用【分词技术】等多种算法智能分析出文本文字中关键词的频率和重要性,然后按照一定的算法规则智能地筛选出我们想要的搜索结果。 在MySql中,创建全文索引相对比较简单。例如:我们有一个文章表(article),其中有主键ID(

mysql索引二(唯一索引)

前文中介绍了MySQL中普通索引用法,和没有索引的区别。mysql索引一(普通索引) 下面学习一下唯一索引。 创建唯一索引的目的不是为了提高访问速度,而只是为了避免数据出现重复。唯一索引可以有多个但索引列的值必须唯一,索引列的值允许有空值。如果能确定某个数据列将只包含彼此各不相同的值,在为这个数据列创建索引的时候就应该使用关键字UNIQUE,把它定义为一个唯一索引。 添加数据库唯一索引的几种

mysql索引一(普通索引)

mysql的索引分为两大类,聚簇索引、非聚簇索引。聚簇索引是按照数据存放的物理位置为顺序的,而非聚簇索引则不同。聚簇索引能够提高多行检索的速度、非聚簇索引则对单行检索的速度很快。         在这两大类的索引类型下,还可以降索引分为4个小类型:         1,普通索引:最基本的索引,没有任何限制,是我们经常使用到的索引。         2,唯一索引:与普通索引

Lipowerline5.0 雷达电力应用软件下载使用

1.配网数据处理分析 针对配网线路点云数据,优化了分类算法,支持杆塔、导线、交跨线、建筑物、地面点和其他线路的自动分类;一键生成危险点报告和交跨报告;还能生成点云数据采集航线和自主巡检航线。 获取软件安装包联系邮箱:2895356150@qq.com,资源源于网络,本介绍用于学习使用,如有侵权请您联系删除! 2.新增快速版,简洁易上手 支持快速版和专业版切换使用,快速版界面简洁,保留主

如何免费的去使用connectedpapers?

免费使用connectedpapers 1. 打开谷歌浏览器2. 按住ctrl+shift+N,进入无痕模式3. 不需要登录(也就是访客模式)4. 两次用完,关闭无痕模式(继续重复步骤 2 - 4) 1. 打开谷歌浏览器 2. 按住ctrl+shift+N,进入无痕模式 输入网址:https://www.connectedpapers.com/ 3. 不需要登录(也就是

关于如何更好管理好数据库的一点思考

本文尝试从数据库设计理论、ER图简介、性能优化、避免过度设计及权限管理方面进行思考阐述。 一、数据库范式 以下通过详细的示例说明数据库范式的概念,将逐步规范化一个例子,逐级说明每个范式的要求和变换过程。 示例:学生课程登记系统 初始表格如下: 学生ID学生姓名课程ID课程名称教师教师办公室1张三101数学王老师101室2李四102英语李老师102室3王五101数学王老师101室4赵六103物理陈