三.海量数据实时分析-FlinkCDC实现Mysql数据同步到Doris

本文主要是介绍三.海量数据实时分析-FlinkCDC实现Mysql数据同步到Doris,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

FlinkCDC 同步Mysql到Doris

参考:https://nightlies.apache.org/flink/flink-cdc-docs-release-3.0/zh/docs/get-started/quickstart/mysql-to-doris/

1.安装Flink

下载 Flink 1.18.0,下载后把压缩包上传到服务器,使用tar -zxvf flink-xxx-bin-scala_2.12.tgz 解压后得到 flink-1.18.0 目录

cd flink-1.18.1

然后需要配置FLINK_HOME ,执行vi /etc/profile,增加如下内容

export FLINK_HOME=/root/flink/flink-1.18.1 #你的安装目录
export PATH=$PATH:$FLINK_HOME/bin

执行:source /etc/profile 让其生效,然后通过在 conf/flink-conf.yaml 配置文件追加下列参数开启 checkpoint,每隔 3 秒做一次 checkpoint。

execution.checkpointing.interval: 3000

使用下面的命令启动 Flink 集群,

./bin/start-cluster.sh

启动成功的话,可以在 http://localhost:8081/访问到 Flink Web UI,多次执行 start-cluster.sh 可以拉起多个 TaskManager。如下所示:

在这里插入图片描述
访问之前记得开放防火墙端口

firewall-cmd --zone=public --add-port=8081/tcp --permanent;
firewall-cmd --zone=public --add-port=8030/tcp --permanent;
firewall-cmd --zone=public --add-port=8040/tcp --permanent;
firewall-cmd --zone=public --add-port=9030/tcp --permanent;
firewall-cmd --reload ;

2.准备同步的数据库

准备好Mysql数据库,创建数据库 app_db 和表 orders,products,shipments,并插入数据

-- 创建数据库
CREATE DATABASE app_db;USE app_db;-- 创建 orders 表
CREATE TABLE `orders` (
`id` INT NOT NULL,
`price` DECIMAL(10,2) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `orders` (`id`, `price`) VALUES (1, 4.00);
INSERT INTO `orders` (`id`, `price`) VALUES (2, 100.00);-- 创建 shipments 表
CREATE TABLE `shipments` (
`id` INT NOT NULL,
`city` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `shipments` (`id`, `city`) VALUES (1, 'beijing');
INSERT INTO `shipments` (`id`, `city`) VALUES (2, 'xian');-- 创建 products 表
CREATE TABLE `products` (
`id` INT NOT NULL,
`product` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`)
);-- 插入数据
INSERT INTO `products` (`id`, `product`) VALUES (1, 'Beer');
INSERT INTO `products` (`id`, `product`) VALUES (2, 'Cap');
INSERT INTO `products` (`id`, `product`) VALUES (3, 'Peanut');

给doris创建数据库,通过 Web UI 创建 app_db 数据库 :create database app_db;

在这里插入图片描述

3.安装FlinkCDC

下载 flink cdc-3.0.0 的二进制压缩包 flink-cdc-3.0.0-bin.tar.gz,并解压得到目录 flink cdc-3.0.0 ':. flink-cdc-3.0.0 下会包含 bin、lib、log、conf 四个目录

在这里插入图片描述

然后把下面2个 connector 包,并且移动到 lib 目录下

  • MySQL pipeline connector 3.0.0 : mysql的驱动
  • Apache Doris pipeline connector 3.0.0 : doris的驱动

在这里插入图片描述
编写任务配置 yaml 文件 文件可以放到config目录下。 下面给出了一个整库同步的示例文件 mysql-to-doris.yaml,

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: 192.168.220.253port: 3307username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030username: rootpassword: 123456table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1pipeline:name: Sync MySQL Database to Dorisparallelism: 1

其中: source 中的 tables: app_db..* 通过正则匹配同步 app_db 下的所有表。 sink 添加table.create.properties.replication_num :1 参数是由于 只有一个 Doris BE 节点。

最后,进入到 flink-cdc-3.0.0 目录,通过命令行提交任务到 Flink Standalone cluster :bash bin/flink-cdc.sh mysql-to-doris.yaml

[root@localhost flink-cdc-3.0.0]# bash bin/flink-cdc.sh conf/mysql-to-doris.yaml 
Pipeline has been submitted to cluster.
Job ID: 13e2925fd46e5840243c9523cd093e11
Job Description: Sync MySQL Database to Doris

执行之后查看flink的控制台界面 : 访问 8081端口
在这里插入图片描述
点击 Job Name 进入任务,可以看到同步的情况,还可以查看任务日志如下
在这里插入图片描述
登录doris的控制台,查看数据是否同步进去,访问:8030端口
在这里插入图片描述
当我们修改了Mysql中的数据后就会自动同步到Doris

4.表结构同步

Flink CDC 提供了将源表的表结构/数据路由到其他表名的配置,借助这种能力,我们能够实现表名库名替换,整库同步等功能。 下面提供一个配置文件说明:

################################################################################
# Description: Sync MySQL all tables to Doris
################################################################################
source:type: mysqlhostname: localhostport: 3306username: rootpassword: 123456tables: app_db.\.*server-id: 5400-5404server-time-zone: UTCsink:type: dorisfenodes: 127.0.0.1:8030benodes: 127.0.0.1:8040username: rootpassword: ""table.create.properties.light_schema_change: truetable.create.properties.replication_num: 1route:- source-table: app_db.orderssink-table: ods_db.ods_orders- source-table: app_db.shipmentssink-table: ods_db.ods_shipments- source-table: app_db.productssink-table: ods_db.ods_productspipeline:name: Sync MySQL Database to Dorisparallelism: 1

通过上面的 route 配置,会将 app_db.orders 表的结构和数据同步到 ods_db.ods_orders 中。从而实现数据库迁移的功能。 特别地,source-table 支持正则表达式匹配多表,从而实现分库分表同步的功能,例如下面的配置:

route:- source-table: app_db.order\.*sink-table: ods_db.ods_orders

这样,就可以将诸如 app_db.order01、app_db.order02、app_db.order03 的表汇总到 ods_db.ods_orders 中。注意,目前还不支持多表中存在相同主键数据的场景,将在后续版本支持。

文章到这就结束了 ,如果对你有帮助请给个好评

这篇关于三.海量数据实时分析-FlinkCDC实现Mysql数据同步到Doris的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1147685

相关文章

大模型研发全揭秘:客服工单数据标注的完整攻略

在人工智能(AI)领域,数据标注是模型训练过程中至关重要的一步。无论你是新手还是有经验的从业者,掌握数据标注的技术细节和常见问题的解决方案都能为你的AI项目增添不少价值。在电信运营商的客服系统中,工单数据是客户问题和解决方案的重要记录。通过对这些工单数据进行有效标注,不仅能够帮助提升客服自动化系统的智能化水平,还能优化客户服务流程,提高客户满意度。本文将详细介绍如何在电信运营商客服工单的背景下进行

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

服务器集群同步时间手记

1.时间服务器配置(必须root用户) (1)检查ntp是否安装 [root@node1 桌面]# rpm -qa|grep ntpntp-4.2.6p5-10.el6.centos.x86_64fontpackages-filesystem-1.41-1.1.el6.noarchntpdate-4.2.6p5-10.el6.centos.x86_64 (2)修改ntp配置文件 [r

关于数据埋点,你需要了解这些基本知识

产品汪每天都在和数据打交道,你知道数据来自哪里吗? 移动app端内的用户行为数据大多来自埋点,了解一些埋点知识,能和数据分析师、技术侃大山,参与到前期的数据采集,更重要是让最终的埋点数据能为我所用,否则可怜巴巴等上几个月是常有的事。   埋点类型 根据埋点方式,可以区分为: 手动埋点半自动埋点全自动埋点 秉承“任何事物都有两面性”的道理:自动程度高的,能解决通用统计,便于统一化管理,但个性化定

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

使用SecondaryNameNode恢复NameNode的数据

1)需求: NameNode进程挂了并且存储的数据也丢失了,如何恢复NameNode 此种方式恢复的数据可能存在小部分数据的丢失。 2)故障模拟 (1)kill -9 NameNode进程 [lytfly@hadoop102 current]$ kill -9 19886 (2)删除NameNode存储的数据(/opt/module/hadoop-3.1.4/data/tmp/dfs/na

异构存储(冷热数据分离)

异构存储主要解决不同的数据,存储在不同类型的硬盘中,达到最佳性能的问题。 异构存储Shell操作 (1)查看当前有哪些存储策略可以用 [lytfly@hadoop102 hadoop-3.1.4]$ hdfs storagepolicies -listPolicies (2)为指定路径(数据存储目录)设置指定的存储策略 hdfs storagepolicies -setStoragePo

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu