Flink构造宽表实时入库案例介绍

2024-01-10 14:28

本文主要是介绍Flink构造宽表实时入库案例介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. 安装包准备

Flink 1.15.4 安装包

Flink cdc的mysql连接器

Flink sql的sdb连接器

MySQL驱动

SDB驱动

Flink jdbc的mysql连接器

 

2. 入库流程图

3. Flink安装部署

  1. 上传Flink压缩包到服务器,并解压

tar -zxvf  flink-1.14.5-bin-scala_2.11.tgz  -C /opt/

  1. 复制依赖至Flink中

cp sdb-flink-connector-3.4.8-jar-with-dependencies.jar /opt/flink-1.14.5/lib
cp sequoiadb-driver-3.4.8.jre8.jar /opt/flink-1.14.5/lib
cp flink-sql-connector-mysql-cdc-2.2.1.jar /opt/flink-1.14.5/lib
cp flink-connector-jdbc_2.11-1.14.6.jar /opt/flink-1.14.5/lib

  1. 修改flink-conf.yaml文件

vi conf/flink-conf.yaml

### 配置Master的机器名(IP地址)
jobmanager.rpc.address: sdb1
### 配置每个taskmanager 生成的临时文件夹
io.tmp.dirs: /opt/flink-1.14.5/tmp

  1. 修改master文件

vi conf/masters

#作为master的ip和端口号
upgrade1:8081

  1. 修改worker文件

vi conf/workers

#集群主机名
upgrade1
upgrade2
upgrade3

  1. 拷贝到集群其他机器

scp -r /opt/flink-1.14.5 sdbadmin@upgrade2:/opt/
scp -r /opt/flink-1.14.5 sdbadmin@upgrade3:/opt/

  1. 启动flink集群

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/start-cluster.sh

  1. 启动flink-SQL

[sdbadmin@upgrade1 flink-1.14.5]$ ./bin/sql-client.sh

4. 实时入库

编写造数程序进行造数

4.1 环境准备

4.1.1 开启mysql的binlog

  1. 创建binlog文件夹

[sdbadmin@upgrade1 mysql]$ mkdir /opt/sequoiasql/mysql/database/3306/binlog

  1. 开启binlog

vim /opt/sequoiasql/mysql/database/3306/auto.cnf

>>配置以下内容:
log_bin=/opt/sequoiasql/mysql/database/3306/binlog
binlog_format=ROW
expire_logs_days=1
server_id=1

配置完成之后,重启mysql

[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl stop myinst
[sdbadmin@upgrade1 mysql]$ ./bin/sdb_mysql_ctl start myinst

4.1.2 创建mysql表

创建库

create database sbtest;
use sbtest;

创建表

CREATE TABLE sbtest1 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest2 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name2 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

CREATE TABLE sbtest3 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

创建flink入库表

CREATE TABLE sbtest4 (
    id INT UNSIGNED AUTO_INCREMENT,
    uuid INT(10),
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT(4),
    time1 DATETIME,
    PRIMARY KEY(id)
);

4.1.3 创建flink映射表

需要用到flink-sql-connector-mysql-cdc-2.2.1.jar

CREATE TABLE sbtest1_mysql (
    id INT,
    uuid INT,
    name1 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest1'
);

CREATE TABLE sbtest2_mysql (
    id INT,
    uuid INT,
    name2 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest2'
);

CREATE TABLE sbtest3_mysql (
    id INT,
    uuid INT,
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'mysql-cdc',
    'hostname' = '192.168.223.135',
    'port' = '3306',
    'username' = 'root',
    'password' = 'root',
    'database-name' = 'sbtest',
    'table-name' = 'sbtest3'
);

创建flink -->  mysql入库映射表

需要用到flink-connector-jdbc_2.11-1.14.6.jar

CREATE TABLE sbtest4_mysql (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://192.168.223.135:3306/sbtest',
    'username' = 'root',
    'password' = 'root',
    'table-name' = 'sbtest4'
);

创建flink -->  mysql入库映射表

需要用到sdb-flink-connector-3.4.8-jar-with-dependencies.jar

CREATE TABLE sbtest_sdb (
    id BIGINT,
    uuid INT,
    name1 CHAR(120),
    name2 CHAR(120),
    name3 CHAR(120),
    age INT,
    time1 TIMESTAMP,
    PRIMARY KEY (id) NOT ENFORCED
) WITH (
    'connector' = 'sequoiadb',
    'bulksize' = '1',
    'hosts' = '192.168.223.135:11810',
    'collectionspace' = 'sbtest',
    'collection' = 'sbtest4'
);

4.2 MySQL实时入库

4.2.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.2.2 mysql实时入库

insert into sbtest4_mysql select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

查看可以成功入库

4.3 SDB实时入库

4.3.1 Flink left join

select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

4.3.2 sdb实时入库

insert into sbtest_sdb select sdb1.id, sdb1.uuid, sdb1.name1, sdb2.name2, sdb3.name3, sdb1.age, sdb1.time1
from sbtest1_mysql sdb1
left join sbtest2_mysql sdb2
on sdb1.id = sdb2.id
left join sbtest3_mysql sdb3
on sdb1.id = sdb3.id;

查看Flink任务

显示已经成功入库

这篇关于Flink构造宽表实时入库案例介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/591107

相关文章

使用Navicat工具比对两个数据库所有表结构的差异案例详解

《使用Navicat工具比对两个数据库所有表结构的差异案例详解》:本文主要介绍如何使用Navicat工具对比两个数据库test_old和test_new,并生成相应的DDLSQL语句,以便将te... 目录概要案例一、如图两个数据库test_old和test_new进行比较:二、开始比较总结概要公司存在多

四种Flutter子页面向父组件传递数据的方法介绍

《四种Flutter子页面向父组件传递数据的方法介绍》在Flutter中,如果父组件需要调用子组件的方法,可以通过常用的四种方式实现,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录方法 1:使用 GlobalKey 和 State 调用子组件方法方法 2:通过回调函数(Callb

SpringBoot实现动态插拔的AOP的完整案例

《SpringBoot实现动态插拔的AOP的完整案例》在现代软件开发中,面向切面编程(AOP)是一种非常重要的技术,能够有效实现日志记录、安全控制、性能监控等横切关注点的分离,在传统的AOP实现中,切... 目录引言一、AOP 概述1.1 什么是 AOP1.2 AOP 的典型应用场景1.3 为什么需要动态插

Python进阶之Excel基本操作介绍

《Python进阶之Excel基本操作介绍》在现实中,很多工作都需要与数据打交道,Excel作为常用的数据处理工具,一直备受人们的青睐,本文主要为大家介绍了一些Python中Excel的基本操作,希望... 目录概述写入使用 xlwt使用 XlsxWriter读取修改概述在现实中,很多工作都需要与数据打交

Golang操作DuckDB实战案例分享

《Golang操作DuckDB实战案例分享》DuckDB是一个嵌入式SQL数据库引擎,它与众所周知的SQLite非常相似,但它是为olap风格的工作负载设计的,DuckDB支持各种数据类型和SQL特性... 目录DuckDB的主要优点环境准备初始化表和数据查询单行或多行错误处理和事务完整代码最后总结Duck

java脚本使用不同版本jdk的说明介绍

《java脚本使用不同版本jdk的说明介绍》本文介绍了在Java中执行JavaScript脚本的几种方式,包括使用ScriptEngine、Nashorn和GraalVM,ScriptEngine适用... 目录Java脚本使用不同版本jdk的说明1.使用ScriptEngine执行javascript2.

Python实现NLP的完整流程介绍

《Python实现NLP的完整流程介绍》这篇文章主要为大家详细介绍了Python实现NLP的完整流程,文中的示例代码讲解详细,具有一定的借鉴价值,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1. 编程安装和导入必要的库2. 文本数据准备3. 文本预处理3.1 小写化3.2 分词(Tokenizatio

MySQL不使用子查询的原因及优化案例

《MySQL不使用子查询的原因及优化案例》对于mysql,不推荐使用子查询,效率太差,执行子查询时,MYSQL需要创建临时表,查询完毕后再删除这些临时表,所以,子查询的速度会受到一定的影响,本文给大家... 目录不推荐使用子查询和JOIN的原因解决方案优化案例案例1:查询所有有库存的商品信息案例2:使用EX

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,