将Kafka流式数据摄取至Hudi

2023-11-07 01:59
文章标签 数据 kafka hudi 流式 摄取

本文主要是介绍将Kafka流式数据摄取至Hudi,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 Hudi支持以下存储数据的视图

  • 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能
  • 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。
  • 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。

一、将Kafka流式数据摄取至Hudi: DeltaStreamer

Hudi自带的DeltaStreamer工具写数据到Hudi,

开启--enable-hive-sync 即可同步数据到hive表。

HoodieDeltaStreamer实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。

  • Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件
    增量导入
  • 支持json、avro或自定义记录类型的传入数据
  • 管理检查点,回滚和恢复
  • 利用DFS或Confluent schema注册表的Avro模式。
  • 支持自定义转换操作

一、首先从 https://github.com/apache/incubator-hudi.git 将hudi clone到自己本地idea 使用clean install -DskipTests -DskipITs -Dcheckstyle.skip=true进行编译 注意: 1、目前hudi使用的是hadoop2.7.3版本,如果使用hadoop3.x版本,请修改pom重新编译

<hadoop.version>3.0.0</hadoop.version>

2.1 DeltaStreamer启动命令

spark-submit --master yarn   \--driver-memory 1G \--num-executors 2 \--executor-memory 1G \--executor-cores 4 \--deploy-mode cluster \--conf spark.yarn.executor.memoryOverhead=512 \--conf spark.yarn.driver.memoryOverhead=512 \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \--props hdfs://../kafka.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--target-base-path hdfs://../business \--op UPSERT \--target-table business  \    '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'--enable-hive-sync \          '开启同步至hive'--table-type MERGE_ON_READ \--source-ordering-field create_time \--source-limit 5000000

2.2 kafka.properties配置实例

hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=create_time
hoodie.datasource.write.precombine.field=update_time
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc
hoodie.deltastreamer.source.kafka.topic=t_business_topic
group.id=t_business_group
bootstrap.servers=localhost
auto.offset.reset=latest
hoodie.parquet.max.file.size=134217728
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.database=dwd
hoodie.datasource.hive_sync.table=test
hoodie.datasource.hive_sync.username=用户名
hoodie.datasource.hive_sync.password=密码
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....
hoodie.datasource.hive_sync.partition_fields=分区字段

3.1.1 使用Spark查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false   '在进行快照视图查询的时候需要添加此配置'

 如果使用MOR模式写入数据会在hive的dwd库下面生成两张表。分别是test_ro 和 test_rt test_rt表支持:快照视图和增量视图查询 test_ro表支持:读优化视图查询

#快照视图
spark.sql("select count(*) from dwd.test_rt").show() 
#读优化视图
spark.sql("select count(*) from dwd.test_ro").show() 
#增量视图
saprk sql不支持

3.1.2 使用Hive查询 

beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx  -p t3cx  \--hiveconf hive.stats.autogather=false \#读优化查询select * from dwd.test_ro;#快照查询select * from dwd.test_rt;#增量查询set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;set hoodie.test.consume.mode=INCREMENTAL;set hoodie.test.consume.max.commits=3;set hoodie.test.consume.start.timestamp=20200427114546;select count(*) from  dwd.test_rt where `_hoodie_commit_time` > '20200427114546';#注意:
#1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,clouder用户需要必须要重新安装mr所需要的jar
#2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat 
最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定

如果使用COW模式写入数据,会在hive的dwd库下面生成一张表,test test表支持:快照视图和增量视图 

#快照视图 spark.sql("select count(*) from dwd.test").show()

4. 总结

DeltaStreamer是Hudi提供的非常实用的工具,通过DeltaStreamer可以将Kafka、DFS上的数据导入Hudi,而本篇博文主要讲解了如何使用DeltaStreamer将数据从Kafka导入Hudi,并演示了如何使用Spark和Hive查询Hudi数据。

二、将Kafka流式数据摄取至Hudi:SparkStreaming

Hudi 提供了Hudi 表的概念,这些表支持CRUD操作。我们可以基于这个特点,将Mysql Binlog的数据重放至Hudi表,然后基于Hive对Hudi表进行查询分析。数据流向架构如下

binlog数据写入Hudi表

  • binlog-consumer分支使用Spark streaming消费kafka中的Binlog数据,并写入Hudi表。Kafka中的binlog是通过阿里的Canal工具同步拉取的。程序入口是CanalKafkaImport2Hudi,它提供了一系列参数,配置程序的执行行为
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \--name hudi__goods \--master yarn \--deploy-mode cluster \--driver-memory 1G \--executor-memory 4G \--executor-cores 1 \--num-executors 40 \--queue hudi \--conf spark.executor.memoryOverhead=2048 \--conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \--conf spark.core.connection.ack.wait.timeout=300 \--conf spark.locality.wait=100 \--conf spark.streaming.backpressure.enabled=true \--conf spark.streaming.receiver.maxRate=500 \--conf spark.streaming.kafka.maxRatePerPartition=200 \--conf spark.ui.retainedJobs=10 \--conf spark.ui.retainedStages=10 \--conf spark.ui.retainedTasks=10 \--conf spark.worker.ui.retainedExecutors=10 \--conf spark.worker.ui.retainedDrivers=10 \--conf spark.sql.ui.retainedExecutions=10 \--conf spark.yarn.submit.waitAppCompletion=false \--conf spark.yarn.maxAppAttempts=4 \--conf spark.yarn.am.attemptFailuresValidityInterval=1h \--conf spark.yarn.max.executor.failures=20 \--conf spark.yarn.executor.failuresValidityInterval=1h \--conf spark.task.maxFailures=8 \/data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar  --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200

历史数据同步以及表元数据同步至hive

history_import_and_meta_sync 分支提供了将历史数据同步至hudi表,以及将hudi表数据结构同步至hive meta的操作

同步历史数据至hudi表

这里采用的思路是

  • 将mysql全量数据通过注入sqoop等工具,导入到hive表。
  • 然后采用分支代码中的工具HiveImport2HudiConfig,将数据导入Hudi表

HiveImport2HudiConfig提供了如下一些参数,用于配置程序执行行为

一个程序执行demo

nohup java -jar hudi-learn-1.0-SNAPSHOT.jar 
--sync-hive-db-name hudi_temp 
--sync-hive-table-name crm__wx_user_info 
--base-save-path hdfs://192.168.2.2:8020/hudi_table/ 
--mapping-mysql-db-name crm 
--mapping-mysql-table-name "order" 
--primary-key "id" 
--partition-key created_date 
--hive-site-path /etc/lib/hive/conf/hive-site.xml 
--tmp-data-path /data/tmp > order.log &

同步hudi表结构至hive meta

需要将hudi的数据结构和分区,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi数据,并通过sql进行查询分析。Hudi本身在消费Binlog进行存储时,可以顺带将相关表元数据信息同步至hive。但考虑到每条写入Apache Hudi表的数据,都要读写Hive Meta ,对Hive的性能可能影响很大。所以我单独开发了HiveMetaSyncConfig工具,用于同步hudi表元数据至Hive。考虑到目前程序只支持按天分区,所以同步工具可以一天执行一次即可。参数配置如下

参数名含义是否必填默认值
 -hive-db-name指定hudi表同步至哪个hive数据库
 -hive-table-name指定hudi表同步至哪个hive表
 -hive-jdbc-url指定hive meta的jdbc链接地址,例如jdbc:hive2://192.168.16.181:10000
 -hive-user-name指定hive meta的链接用户名默认hive
 -hive-pwd指定hive meta的链接密码默认hive
 -hudi-table-path指定hudi表所在hdfs的文件路径
 -hive-site-path指定hive的hive-site.xml路径

一个程序执行demo

java -jar hudi-learn-1.0-SNAPSHOT.jar 
--hive-db-name streaming 
--hive-table-name crm__order 
--hive-user-name hive 
--hive-pwd hive 
--hive-jdbc-url jdbc:hive2://192.168.16.181:10000 
--hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order 
--hive-site-path /lib/hive/conf/hive-site.xml

一些踩坑、hive相关配置

有些hive集群的hive.input.format配置,默认是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,这会导致挂载Hudi数据的Hive外表读取到所有Hudi的Parquet数据,从而导致最终的读取结果重复。

需要将hive的format改为org.apache.hadoop.hive.ql.io.HiveInputFormat,为了避免在整个集群层面上更改对其余离线Hive Sql造成不必要的影响,建议只对当前hive session设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

三、将Kafka流式数据摄取至Hudi:Flink sql 

7.1 启动kafka生产者,生产数据

 1.1 启动user生产者,生产数据
bin/kafka-console-producer.sh --broker-list node1:9092 --topic user
{"user_id":"a0001","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a0002","order_amount":12.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a0003","order_amount":13.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a0004","order_amount":14.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a0005","order_amount":15.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a0006","order_amount":16.0,"log_ts":"2020-11-26 12:12:13"}
 
1.2 启动user_hobby生产者,生产数据

bin/kafka-console-producer.sh --broker-list node1:9092 --topic user_hobby
{"user_id":"a0001","name":"yangge","hobby":"足球"}
{"user_id":"a0002","name":"baba","hobby":"电影"}
{"user_id":"a0003","name":"mama","hobby":"游戏"}
{"user_id":"a0004","name":"dudu","hobby":"动画片"}
{"user_id":"a0005","name":"gege","hobby":"手机"}
{"user_id":"a0006","name":"jiejie","hobby":"睡觉"}

7.2 在Flink SQL客户端创建kafka对应的映射表

2.1 在Flink SQL客户端创建user表

CREATE TABLE user_ODS(user_id STRING,order_amount BIGINT,log_ts TIMESTAMP(3))WITH('connector' = 'kafka','topic' = 'user','properties.bootstrap.servers' = 'node1:9092','scan.startup.mode'='earliest-offset','properties.group.id' = 'testGroup','format' = 'json'
);

select  *  from user_ODS; 

2.2 在flink SQL客户端创建user_hobby表

CREATE TABLE user_hobby_ODS(user_id STRING,name STRING,hobby STRING)WITH('connector' = 'kafka','topic' = 'user_hobby','properties.bootstrap.servers' = 'node1:9092','scan.startup.mode'='earliest-offset','properties.group.id' = 'testGroup','format' = 'json'
);


select  *  from user_hobby_ODS;

7.3 Flink sql使用 hudi connector 创建hudi表

3.1 使用 hudi connector 创建hudi表

CREATE TABLE hudi_user(
user_id STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
) WITH ('connector' = 'hudi','path' = 'hdfs://node1:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','changelog.enabled' = 'true','write.precombine.field' = 'log_ts','hoodie.datasource.write.recordkey.field' = 'user_id','compaction.async.enabled' = 'false'
);

网Hudi表插入数据 

insert into hudi_user select  * from  user_ODS; select *  from  hudi_user ;

3.2 使用 hudi connector 创建hudi表

CREATE TABLE hudi_user_hobby(user_id STRING,name STRING,hobby STRING
) WITH ('connector' = 'hudi','path' = 'hdfs://node1:8020/hudi/hudi_user_hobby','table.type' = 'MERGE_ON_READ','changelog.enabled' = 'true','hoodie.datasource.write.recordkey.field' = 'user_id','write.precombine.field' = 'user_id','compaction.async.enabled' = 'false'
);
insert into hudi_user_hobby select  * from  user_hobby_ODS;

select * from  hudi_user_hobby;

7.4 使用 hudi connector 创建hudi DWD表 

4.1 在Flink SQL 创建DWD输出表

CREATE TABLE user_hobby_DWD (user_id STRING,name STRING,hobby STRING,order_amount BIGINT,log_ts TIMESTAMP(3)
)WITH('connector' = 'hudi','path' = 'hdfs://node1:8020/hudi/user_hobby_DWD','table.type' = 'MERGE_ON_READ','changelog.enabled' = 'true','hoodie.datasource.write.recordkey.field' = 'user_id','write.precombine.field' = 'user_id','compaction.async.enabled' = 'false'
);
INSERT INTO user_hobby_DWD
SELECT A.user_id, B.name, B.hobby, A.order_amount, A.log_ts
FROM hudi_user AJOIN hudi_user_hobby B ON A.user_id = B.user_id;

 注意事项:字段的顺序和最终写入表的字段顺序必须一致,不一致会报错.

出现这样的结果,说明join完成.

这篇关于将Kafka流式数据摄取至Hudi的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/360496

相关文章

SQL中如何添加数据(常见方法及示例)

《SQL中如何添加数据(常见方法及示例)》SQL全称为StructuredQueryLanguage,是一种用于管理关系数据库的标准编程语言,下面给大家介绍SQL中如何添加数据,感兴趣的朋友一起看看吧... 目录在mysql中,有多种方法可以添加数据。以下是一些常见的方法及其示例。1. 使用INSERT I

Python使用vllm处理多模态数据的预处理技巧

《Python使用vllm处理多模态数据的预处理技巧》本文深入探讨了在Python环境下使用vLLM处理多模态数据的预处理技巧,我们将从基础概念出发,详细讲解文本、图像、音频等多模态数据的预处理方法,... 目录1. 背景介绍1.1 目的和范围1.2 预期读者1.3 文档结构概述1.4 术语表1.4.1 核

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MyBatisPlus如何优化千万级数据的CRUD

《MyBatisPlus如何优化千万级数据的CRUD》最近负责的一个项目,数据库表量级破千万,每次执行CRUD都像走钢丝,稍有不慎就引起数据库报警,本文就结合这个项目的实战经验,聊聊MyBatisPl... 目录背景一、MyBATis Plus 简介二、千万级数据的挑战三、优化 CRUD 的关键策略1. 查

python实现对数据公钥加密与私钥解密

《python实现对数据公钥加密与私钥解密》这篇文章主要为大家详细介绍了如何使用python实现对数据公钥加密与私钥解密,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录公钥私钥的生成使用公钥加密使用私钥解密公钥私钥的生成这一部分,使用python生成公钥与私钥,然后保存在两个文

mysql中的数据目录用法及说明

《mysql中的数据目录用法及说明》:本文主要介绍mysql中的数据目录用法及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1、背景2、版本3、数据目录4、总结1、背景安装mysql之后,在安装目录下会有一个data目录,我们创建的数据库、创建的表、插入的

Navicat数据表的数据添加,删除及使用sql完成数据的添加过程

《Navicat数据表的数据添加,删除及使用sql完成数据的添加过程》:本文主要介绍Navicat数据表的数据添加,删除及使用sql完成数据的添加过程,具有很好的参考价值,希望对大家有所帮助,如有... 目录Navicat数据表数据添加,删除及使用sql完成数据添加选中操作的表则出现如下界面,查看左下角从左

SpringBoot中4种数据水平分片策略

《SpringBoot中4种数据水平分片策略》数据水平分片作为一种水平扩展策略,通过将数据分散到多个物理节点上,有效解决了存储容量和性能瓶颈问题,下面小编就来和大家分享4种数据分片策略吧... 目录一、前言二、哈希分片2.1 原理2.2 SpringBoot实现2.3 优缺点分析2.4 适用场景三、范围分片

Redis分片集群、数据读写规则问题小结

《Redis分片集群、数据读写规则问题小结》本文介绍了Redis分片集群的原理,通过数据分片和哈希槽机制解决单机内存限制与写瓶颈问题,实现分布式存储和高并发处理,但存在通信开销大、维护复杂及对事务支持... 目录一、分片集群解android决的问题二、分片集群图解 分片集群特征如何解决的上述问题?(与哨兵模

浅析如何保证MySQL与Redis数据一致性

《浅析如何保证MySQL与Redis数据一致性》在互联网应用中,MySQL作为持久化存储引擎,Redis作为高性能缓存层,两者的组合能有效提升系统性能,下面我们来看看如何保证两者的数据一致性吧... 目录一、数据不一致性的根源1.1 典型不一致场景1.2 关键矛盾点二、一致性保障策略2.1 基础策略:更新数