flinkcdc 3.0 尝鲜

2024-01-22 00:44
文章标签 尝鲜 3.0 flinkcdc

本文主要是介绍flinkcdc 3.0 尝鲜,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

本文会将从环境搭建到demo来全流程体验flinkcdc 3.0
包含了如下内容

  1. flink1.18 standalone搭建
  2. doris 1fe1be 搭建
  3. 整库数据同步
  4. 测试各同步场景
  5. 从检查点重启同步任务

环境搭建

flink环境(Standalone模式)

下载flink 1.18.0 链接 : https://archive.apache.org/dist/flink/flink-1.18.0/flink-1.18.0-bin-scala_2.12.tgz

解压 :

tar -zxvf flink-1.18.0-bin-scala_2.12.tgz

修改checkpoint 时间间隔 为3秒

vim conf/flink-conf.yaml 
# 94 行(set nu 显示行)
taskmanager.numberOfTaskSlots: 2
# 148 行
execution.checkpointing.interval: 3000

启动

./bin/start-cluster.sh

访问页面 : http://127.0.0.1:8081
image.png

doris环境(1fe1be)

修改环境宿主机的内存映射

# 因为mac内部实现容器的方式不同,直接修改max_map_count值可能无法成功,所以在容器中进行修改
docker run -it --privileged --pid=host --name=change_count debian nsenter -t 1 -m -u -n -i sh
# 修改内存映射值(这个值通常用于限制一个进程打开的文件数量,默认是65530)
sysctl -w vm.max_map_count=2000000
# 退出容器
exit

使用docker compose 搭建doris 1fe1be集群

version: '3'
services:docker-fe-01:image: "apache/doris:1.2.2-fe-arm"container_name: "doris-fe-01"hostname: "fe-01"environment:- FE_SERVERS=fe1:172.20.80.2:9010- FE_ID=1ports:- 8031:8030- 9031:9030volumes:- /Users/antg/docker/doris_1fe_1be/data/fe-01/doris-meta:/opt/apache-doris/fe/doris-meta- /Users/antg/docker/doris_1fe_1be/data/fe-01/conf:/opt/apache-doris/fe/conf- /Users/antg/docker/doris_1fe_1be/data/fe-01/log:/opt/apache-doris/fe/lognetworks:doris_net:ipv4_address: 172.20.80.2docker-be-01:image: "apache/doris:1.2.2-be-arm"container_name: "doris-be-01"hostname: "be-01"depends_on:- docker-fe-01environment:- FE_SERVERS=fe1:172.20.80.2:9010- BE_ADDR=172.20.80.5:9050ports:- 8041:8040volumes:- /Users/antg/docker/doris_1fe_1be/data/be-01/storage:/opt/apache-doris/be/storage- /Users/antg/docker/doris_1fe_1be/data/be-01/conf:/opt/apache-doris/be/conf- /Users/antg/docker/doris_1fe_1be/data/be-01/script:/docker-entrypoint-initdb.d- /Users/antg/docker/doris_1fe_1be/data/be-01/log:/opt/apache-doris/be/lognetworks:doris_net:ipv4_address: 172.20.80.5
networks:doris_net:ipam:config:- subnet: 172.20.80.0/24

启动并验证是否启动成功

# 启动
docker-compose -f 1fe_1be.yaml up -d
# 连接doris
mysql -h127.0.0.1 -P9031 -uroot -p
# 创建数据库 doris_sync
> create database doris_sync;

mysql环境及测试数据准备

使用本机之前安装的mysql

建测试库测试表

create database doris_sync;
CREATE TABLE `a_0` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE `a_1` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE `abc` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE `table_0` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=11 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE `table_1` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=101 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

其中 a_0,a_1 是分表,table_0,table_1是另外一个分表,abc是一个单独的表

初始化插入一些测试数据

INSERT INTO `a_0` (`id`, `name`) VALUES (1, 'a');
INSERT INTO `a_1` (`id`, `name`) VALUES (2, 'b');
BEGIN;
INSERT INTO `abc` (`id`, `name`) VALUES (1, 'Luo Rui');
INSERT INTO `abc` (`id`, `name`) VALUES (2, 'Yung Wing Kuen');
INSERT INTO `abc` (`id`, `name`) VALUES (3, 'Chiang Chun Yu');
INSERT INTO `abc` (`id`, `name`) VALUES (4, 'Tang Ming');
INSERT INTO `abc` (`id`, `name`) VALUES (5, 'Man Wai Lam');
INSERT INTO `abc` (`id`, `name`) VALUES (6, 'Tin Tsz Ching');
INSERT INTO `abc` (`id`, `name`) VALUES (7, 'Doris Moore');
INSERT INTO `abc` (`id`, `name`) VALUES (8, 'Abe Mitsuki');
INSERT INTO `abc` (`id`, `name`) VALUES (9, 'Du Shihan');
INSERT INTO `abc` (`id`, `name`) VALUES (10, 'Chiang Chi Yuen');
COMMIT;
BEGIN;
INSERT INTO `table_0` (`id`, `name`) VALUES (1, 'Luo Rui');
INSERT INTO `table_0` (`id`, `name`) VALUES (2, 'Yung Wing Kuen');
INSERT INTO `table_0` (`id`, `name`) VALUES (3, 'Chiang Chun Yu');
INSERT INTO `table_0` (`id`, `name`) VALUES (4, 'Tang Ming');
INSERT INTO `table_0` (`id`, `name`) VALUES (5, 'Man Wai Lam');
INSERT INTO `table_0` (`id`, `name`) VALUES (6, 'Tin Tsz Ching');
INSERT INTO `table_0` (`id`, `name`) VALUES (7, 'Doris Moore');
INSERT INTO `table_0` (`id`, `name`) VALUES (8, 'Abe Mitsuki');
INSERT INTO `table_0` (`id`, `name`) VALUES (9, 'Du Shihan');
INSERT INTO `table_0` (`id`, `name`) VALUES (10, 'Chiang Chi Yuen');
COMMIT;
INSERT INTO `table_1` (`id`, `name`) VALUES (100, 'tom');

配置容器路由转发

我们在代码中开发过程中可能会用到容器的ip地址,例如上面的172.20.80.0/24这个网段,但是你会发现你是ping不通的,这里设计到了一些docker网络的一些知识,可以在网上看一下资料,这里只给出解决方法
安装路由转发镜像

# 现在连接器
brew install wenjunxiao/brew/docker-connector
# 加入路由
docker network ls --filter driver=bridge --format "{{.ID}}" | xargs docker network inspect --format "route {{range .IPAM.Config}}{{.Subnet}}{{end}}" >> /opt/homebrew/etc/docker-connector.conf
# 启动路由器
sudo /opt/homebrew/opt/docker-connector/bin/docker-connector -config /opt/homebrew/etc/docker-connector.conf
# 启动镜像
docker run -it -d --restart always --net host --cap-add NET_ADMIN --name connector wenjunxiao/mac-docker-connector

如果还是ping不通就重启一下上面的转发容器
这一步很重要,想要通过访问容器的ip就要完成这一步

依赖包准备

下载flinkcdc 的依赖包放到flink目录下并解压
flinkcdc 依赖 : flink-cdc-3.0.0-bin.tar.gz
下载连接器 的依赖包放到flinkcdc的lib目录下
connector 依赖 :

  • MySQL pipeline connector 3.0.0
  • Apache Doris pipeline connector 3.0.0

配置FLINK_HOME环境变量

pwd
/Users/antg/software/flink-1.18.0/
export FLINK_HOME=/Users/antg/software/flink-1.18.0/

数据同步

整库同步

编写yaml文件 mysql-to-doris.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 12345678tables: doris_sync.\.*server-id: 5400-5404server-time-zone: Asia/Shanghaisink:type: dorisfenodes: 127.0.0.1:8031username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 2

启动任务

bash bin/flink-cdc.sh mysql-to-doris.yaml

查看页面效果image.png
这里可以看到同步的数据条数及大小

查看doris的数据及建表情况
image.png
可以看到表被自动创建并且数据也同步过来了

新增数据

INSERT INTO `a_0` (`id`, `name`) VALUES (3, 'jack');

Kapture 2024-01-21 at 15.04.36.gif

更新数据

update a_0 set name='tom' where id=3;

Kapture 2024-01-21 at 15.09.41.gif

删除数据

delete from a_0 where id=1;

没成功同步(已咨询社区是1.2.2的bug,在1.2.3修复了,正常来说会同步)

新增字段

alter table a_0 add column age int;

在这里插入图片描述

修改字段

# 修改名称
alter table a_0 change age age_range int;
# 修改字段类型
alter table a_0 modify column age_range varchar(100);
# 字段字段长度
alter table a_0 modify column age_range varchar(1200);

以上语句不会被同步

删除字段

alter table a_0 drop column age_range;

以上语句不会被同步

删除表

drop table a_0;

不会被同步

结论 :
1.新增数据,新增字段,修改数据会被实时同步到doris
2.delete数据不会被同步(已咨询社区是1.2.2的bug,在1.2.3修复了,正常来说会同步)
3.修改字段名称,类型,长度不会被同步(可能有参数可以开启)
4.删除字段不会被同步
5.删除表不会被同步

路由变更

这里将使用flinkcdc3.0 新增的路由功能来实现分表合一的效果,而且也可以做到同步到doris的库名和表名换成自己想要的名称
将之前的mysql端数据清理,表重新建立

需求 :
将mysql端doris_sync同步到doris的ods库中
a_0,a_1 合并到ods_a表
abc 同步到 ods_abc表
table_0,table_1同步到 ods_table表

任务配置 route.yaml

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 12345678tables: doris_sync.\.*server-id: 5400-5404server-time-zone: Asia/Shanghaisink:type: dorisfenodes: 127.0.0.1:8031username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: doris_sync.a_\.*sink-table: ods.ods_a- source-table: doris_sync.abcsink-table: ods.ods_abc- source-table: doris_sync.table_\.*sink-table: ods.ods_tablepipeline:name: Sync MySQL Database to Dorisparallelism: 2

创建doris端ods库(不会自动创建库,必须手动创建)

create database ods;

将之前的任务停掉,启动这个任务
image.png
在这里插入图片描述

可以看到
1.多个分表在doris只创建了一个目标表
2.多个分表的数据都同步到了一个表中
非常棒的功能 👍👍👍

测试一下新增一个分表是否会自动同步到目标表

CREATE TABLE `a_2` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;INSERT INTO `a_2` (`id`, `name`) VALUES (1000, 'a');

image.png
新增分表后,分表不会被自动同步

重启任务
image.png
重启后数据可以被正常同步

从checkpoint恢复任务并新增分表

先修改一下flink-conf.yaml,否则任务cancel的时候ck不会被保留,还需要修改一下ck存储的路径

# 在flink目录下创建一个路径存储ck
mkdir ckdata

image.png
image.png

启动任务

bash bin/flink-cdc.sh route.yaml

image.png
看一下ck是否正常存储
image.png
新增表,cancel任务,然后从ck处重启

CREATE TABLE `a_4` (`id` int NOT NULL AUTO_INCREMENT,`name` varchar(255) DEFAULT NULL,PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;INSERT INTO `a_4` (`id`, `name`) VALUES (1000000, 'a');
################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 12345678tables: doris_sync.\.*server-id: 5400-5404server-time-zone: Asia/Shanghaisink:type: dorisfenodes: 127.0.0.1:8031username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: doris_sync.a_\.*sink-table: ods.ods_a- source-table: doris_sync.abcsink-table: ods.ods_abc- source-table: doris_sync.table_\.*sink-table: ods.ods_tablepipeline:name: Sync MySQL Database to Dorisparallelism: 2

在flink-conf最后加上ck的重启路径

# 查看当前路径
pwd
/Users/antg/software/flink-1.18.0/flink-cdc-3.0.0# 找到最新的ck存储路径
ll -rth ../ckdata
drwxr-xr-x@ 5 antg  staff   160B Jan 21 16:27 436dfeb839b2c877d6e49023e3e099b5
drwxr-xr-x@ 5 antg  staff   160B Jan 21 17:12 d519a3f930d9f410e048f63a883e1dce
drwxr-xr-x@ 5 antg  staff   160B Jan 21 18:59 b0ed22a804ad34336ab3e9b328d13257
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:01 394d7a89885bbd319e8ab92043283de9
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:05 1547d3cf60ed278ccd3787025bb4b5f6
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:07 51ff313e98fb9882f20f57bc697a8ae6
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:08 f10623b642135002499775274c078b9e
drwxr-xr-x@ 5 antg  staff   160B Jan 21 19:09 73b47091ca00547a5d8121474b3dbd79ll ../ckdata/73b47091ca00547a5d8121474b3dbd79
drwxr-xr-x@ 3 antg  staff    96B Jan 21 19:09 chk-172
drwxr-xr-x@ 2 antg  staff    64B Jan 21 19:09 shared
drwxr-xr-x@ 2 antg  staff    64B Jan 21 19:09 taskowned# 将ck路径加到flink-conf的最后一行
vim ../conf/flink-conf.yaml
execution.savepoint.path: file:///Users/antg/software/flink-1.18.0/ckdata/73b47091ca00547a5d8121474b3dbd79/chk-172# 启动任务
bin/flink-cdc.sh route.yaml

image.png
可以看到任务从检查点重启了
image.png
数据也正常同步

这里从ck重启是修改了flink-conf,但是感觉这样很不方便,尝试过在yaml的pipeline下加上这个属性,但是不起作用,其他位置也没找到加ck路径的地方,如果各位大神有其他好的方法欢迎评论区留言,也欢迎加我的个人微信一起交流各种技术.

参考

[基于 Flink CDC 3.0 构建 MySQL 到 Doris 的 Streaming ELT] : https://ververica.github.io/flink-cdc-connectors/release-3.0/content/%E5%BF%AB%E9%80%9F%E4%B8%8A%E6%89%8B/mysql-doris-pipeline-tutorial-zh.html
[vm.max_map_count参数详解] : https://blog.csdn.net/a772304419/article/details/132585239

这篇关于flinkcdc 3.0 尝鲜的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/631364

相关文章

4B参数秒杀GPT-3.5:MiniCPM 3.0惊艳登场!

​ 面壁智能 在 AI 的世界里,总有那么几个时刻让人惊叹不已。面壁智能推出的 MiniCPM 3.0,这个仅有4B参数的"小钢炮",正在以惊人的实力挑战着 GPT-3.5 这个曾经的AI巨人。 MiniCPM 3.0 MiniCPM 3.0 MiniCPM 3.0 目前的主要功能有: 长上下文功能:原生支持 32k 上下文长度,性能完美。我们引入了

Cmake之3.0版本重要特性及用法实例(十三)

简介: CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【原创干货持续更新中……】🚀 优质视频课程:AAOS车载系统+AOSP14系统攻城狮入门视频实战课 🚀 人生格言: 人生从来没有捷径,只有行动才是治疗恐惧

三.海量数据实时分析-FlinkCDC实现Mysql数据同步到Doris

FlinkCDC 同步Mysql到Doris 参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/get-started/quickstart/mysql-to-doris/ 1.安装Flink 下载 Flink 1.18.0,下载后把压缩包上传到服务器,使用tar -zxvf flink-xxx-

【C-实践】文件服务器(3.0)

文件服务器1.0文件服务器2.0文件服务器4.0 概述 使用了 tcp + epoll + 线程池 + 生产者消费者模型,实现文件服务器 有两个进程,主进程负责接收退出信号用来退出整个程序;子进程负责管理线程池、客户端连接以及线程池的退出 子进程中的主线程生产任务,其他子线程消费任务 功能 主要功能:客户端连接服务器,然后自动下载文件 注意 实际传输速度

如何为 MongoDB 3.0.4 以下版本选择合适的 PyMongo 版本

在使用 MongoDB 时,开发者通常会使用 Python 的 pymongo 库来与 MongoDB 进行交互。然而,不同版本的 MongoDB 需要匹配相应版本的 pymongo 才能正常运行。如果你的 MongoDB 版本较低(例如 3.0.4 以下),而使用了不兼容的 pymongo 版本,就会遇到连接失败或功能异常的问题。 在这篇文章中,我们将介绍如何为 MongoDB 3.0.4 以

面壁小钢炮3.0发布:端侧ChatGPT时代的技术飞跃

一、面壁小钢炮3.0模型介绍  ➤  MiniCPM 3.0 开源地址: 🔗 https://github.com/OpenBMB/MiniCPM 🔗 https://huggingface.co/openbmb/MiniCPM3-4B         2024年9月5日,面壁智能发布 MiniCPM3-4B!该模型的表现超越 Phi-3.5-mini-instruct 和 GPT-3

数据倾斜?Spark 3.0 AQE专治各种不服

Spark3.0已经发布半年之久,这次大版本的升级主要是集中在性能优化和文档丰富上,其中46%的优化都集中在Spark SQL上,SQL优化里最引人注意的非Adaptive Query Execution莫属了。 Adaptive Query Execution(AQE)是英特尔大数据技术团队和百度大数据基础架构部工程师在Spark 社区版本的基础上,改进并实现的自适应执行引擎。近些年来,S

尝鲜!Flink1.12.2+Hudi0.9.0集成开发

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 Hudi社区最近发生了一些有趣的变化,Hudi集成Flink的方案也已经发布,我个人在官网根据文档试验了一把,整体感觉还不错。我们目前并没有在生产环境中使用,但是随着社区发展和功能越来越完善,相信会有更多的业务开始尝试使用Hudi。本文在此做一个Flink和Hudi集成的分享,作者明喆sama。 一、组件下载 1.1、Flink1.

Cocos2dx 3.0 过渡篇(五) 随机数的获取

1、简单的随机数用法:CCRANDOM_0_1 示例如下: [cpp] int HelloWorld::getRand(int start,int end)  {   float i = CCRANDOM_0_1()*(end-start+1)+start;  //产生一个从start到end间的随机数   return (int)i;  }   2、上述的方法虽然简便,但是运行

Cocos2dx 3.0 过渡篇(三) 触摸机制

尊重原创,转载请注明来自:star特530的CSDN博客 http://blog.csdn.net/start530/article/details/18325493 本来在中午休息时间打算大展拳脚,好好写一篇新触摸机制相关的博文,结果,等真正下手的时候才发现无从下手,很多地方自己都说不清,赶紧看了下testCpp,才发现原来是这样,还可以这样,哦?这样都行?哎,我还是太年轻了。   咱也只能