Fink CDC数据同步(六)数据入湖Hudi

2024-02-05 17:20
文章标签 数据 同步 fink hudi cdc 入湖

本文主要是介绍Fink CDC数据同步(六)数据入湖Hudi,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

数据入湖Hudi

Apache Hudi(简称:Hudi)使得您能在hadoop兼容的存储之上存储大量数据,同时它还提供两种原语,使得除了经典的批处理之外,还可以在数据湖上进行流处理。这两种原语分别是:

  • Update/Delete记录:Hudi使用细粒度的文件/记录级别索引来支持Update/Delete记录,同时还提供写操作的事务保证。查询会处理最后一个提交的快照,并基于此输出结果。
  • 变更流:Hudi对获取数据变更提供了一流的支持:可以从给定的时间点获取给定表中已updated/inserted/deleted的所有记录的增量流,并解锁新的查询姿势(类别)。

配置

将hudi相关jar包放在flink安装目录的lib下

hudi-flink1.16-bundle-0.13.0.jar

hudi-hadoop-mr-0.13.0.jar

hudi-hive-sync-0.13.0.jar

确保/etc/profile配置了hadoop和hive的环境变量

#HADOOP_HOME
export HADOOP_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_CONF_DIR=/usr/hdp/3.1.5.0-152/hadoop/etc/hadoop
export HADOOP_COMMON_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_HDFS_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_YARN_HOME=/usr/hdp/3.1.5.0-152/hadoop
export HADOOP_MAPRED_HOME=/usr/hdp/3.1.5.0-152/hadoop
export PATH=$PATH:$HADOOP_HOME/bin:$HADOOP_HOME/sbin
export HADOOP_CLASSPATH=`hadoop classpath`#HIVE HOME
export HIVE_HOME=/usr/hdp/3.1.5.0-152/hive
export PATH=$PATH:$HIVE_HOME/bin:$HIVE_HOME/sbin

测试插入hudi表

set sql-client.execution.result-mode = tableau;
set execution.checkpointing.interval=30sec;
SET table.sql-dialect=default;CREATE TABLE hudi_test(uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,name VARCHAR(10),age INT,ts TIMESTAMP(3),`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH ('connector' = 'hudi',  -- 连接器指定hudi'path' = 'hdfs://bigdata101:8020/hudi/hudi_test',  -- 数据存储地址'table.type' = 'MERGE_ON_READ' -- 表类型,默认COPY_ON_WRITE,可选MERGE_ON_READ
);INSERT INTO hudi_test VALUES('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

MySql数据写入Hudi表

建hudi表

create table hudi_user(id string not null,name string,birth string,gender string,primary key (id) not enforced
)
with ('connector' = 'hudi','path' = 'hdfs://bigdata101:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','write.option' = 'bulk_insert','write.precombine.field' = 'id'
);

将MySql映射表的数据插入hudi表,此时会生成一个flink任务

insert into ods.hudi_user select * from mysql_user;

流式查询

上面的查询方式是非流式查询,流式查询会生成一个flink作业,并且实时显示数据源变更的数据。

流式查询(Streaming Query)需要设置read.streaming.enabled = true。再设置read.start-commit,如果想消费所有数据,设置值为earliest。

使用参数如下:

参数名称

是否必填

默认值

备注

read.streaming.enabled

FALSE

FALSE

设置为true,开启stream query

read.start-commit

FALSE

the latest commit

Instant time的格式为:’yyyyMMddHHmmss’

read.streaming_skip_compaction

FALSE

FALSE

是否不消费compaction commit,消费compaction commit会出现重复数据

clean.retain_commits

FALSE

10

当开启change log mode,保留的最大commit数量。如果checkpoint interval为5分钟,则保留50分钟的change log

建表:

create table hudi_user_read_streaming(id int not null ,name string,birth string,gender string,primary key (id) not enforced
)
with ('connector' = 'hudi','path' = 'hdfs://bigdata101:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','write.option' = 'bulk_insert','write.precombine.field' = 'id','read.streaming.enabled' = 'true',  -- 默认值false,设置为true,开启stream query'read.start-commit' = '20231008134557', -- start-commit之前提交的数据不显示,'read.streaming.check-interval' = '4'  -- 检查间隔,默认60s);insert into hudi_user_read_streaming select * from mysql_user;select * from hudi_user_read_streaming;

此时,执行select 语句就会生成一个flink 作业

源端变更数据会实时展示出来


 

 系列文章

Fink CDC数据同步(一)环境部署icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017355?spm=1001.2014.3001.5502​​​​​​​
Fink CDC数据同步(二)MySQL数据同步icon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017472?spm=1001.2014.3001.5501
Fink CDC数据同步(三)Flink集成Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136017571?spm=1001.2014.3001.5501
Fink CDC数据同步(四)Mysql数据同步到Kafkaicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023747?spm=1001.2014.3001.5501
Fink CDC数据同步(五)Kafka数据同步Hiveicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023837?spm=1001.2014.3001.5501

Fink CDC数据同步(六)数据入湖Hudiicon-default.png?t=N7T8https://blog.csdn.net/weixin_44586883/article/details/136023939?spm=1001.2014.3001.5502

这篇关于Fink CDC数据同步(六)数据入湖Hudi的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/681629

相关文章

SpringBoot集成Milvus实现数据增删改查功能

《SpringBoot集成Milvus实现数据增删改查功能》milvus支持的语言比较多,支持python,Java,Go,node等开发语言,本文主要介绍如何使用Java语言,采用springboo... 目录1、Milvus基本概念2、添加maven依赖3、配置yml文件4、创建MilvusClient

SpringValidation数据校验之约束注解与分组校验方式

《SpringValidation数据校验之约束注解与分组校验方式》本文将深入探讨SpringValidation的核心功能,帮助开发者掌握约束注解的使用技巧和分组校验的高级应用,从而构建更加健壮和可... 目录引言一、Spring Validation基础架构1.1 jsR-380标准与Spring整合1

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2

SpringBatch数据写入实现

《SpringBatch数据写入实现》SpringBatch通过ItemWriter接口及其丰富的实现,提供了强大的数据写入能力,本文主要介绍了SpringBatch数据写入实现,具有一定的参考价值,... 目录python引言一、ItemWriter核心概念二、数据库写入实现三、文件写入实现四、多目标写入

使用Python将JSON,XML和YAML数据写入Excel文件

《使用Python将JSON,XML和YAML数据写入Excel文件》JSON、XML和YAML作为主流结构化数据格式,因其层次化表达能力和跨平台兼容性,已成为系统间数据交换的通用载体,本文将介绍如何... 目录如何使用python写入数据到Excel工作表用Python导入jsON数据到Excel工作表用

Mysql如何将数据按照年月分组的统计

《Mysql如何将数据按照年月分组的统计》:本文主要介绍Mysql如何将数据按照年月分组的统计方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql将数据按照年月分组的统计要的效果方案总结Mysql将数据按照年月分组的统计要的效果方案① 使用 DA

鸿蒙中Axios数据请求的封装和配置方法

《鸿蒙中Axios数据请求的封装和配置方法》:本文主要介绍鸿蒙中Axios数据请求的封装和配置方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1.配置权限 应用级权限和系统级权限2.配置网络请求的代码3.下载在Entry中 下载AxIOS4.封装Htt

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分