将Kafka流式数据摄取至Hudi

2023-11-07 01:59
文章标签 数据 kafka hudi 流式 摄取

本文主要是介绍将Kafka流式数据摄取至Hudi,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 Hudi支持以下存储数据的视图

  • 读优化视图 : 在此视图上的查询将查看给定提交或压缩操作中数据集的最新快照该视图仅将最新parquet文件暴露给查询,所以它有可能看不到最新的数据,并保证与非Hudi列式数据集相比,具有相同的列式查询性能
  • 增量视图 : 对该视图的查询只能看到从某个提交/压缩后写入数据集的新数据。该视图有效地提供了更改流,来支持增量数据管道。
  • 实时视图 : 在此视图上的查询将查看某个增量提交操作中数据集的最新快照。该视图通过动态合并最新的基本文件和增量文件来提供近实时数据集。

一、将Kafka流式数据摄取至Hudi: DeltaStreamer

Hudi自带的DeltaStreamer工具写数据到Hudi,

开启--enable-hive-sync 即可同步数据到hive表。

HoodieDeltaStreamer实用工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能。

  • Kafka单次摄取新事件,从Sqoop、HiveIncrementalPuller输出或DFS文件夹中的多个文件
    增量导入
  • 支持json、avro或自定义记录类型的传入数据
  • 管理检查点,回滚和恢复
  • 利用DFS或Confluent schema注册表的Avro模式。
  • 支持自定义转换操作

一、首先从 https://github.com/apache/incubator-hudi.git 将hudi clone到自己本地idea 使用clean install -DskipTests -DskipITs -Dcheckstyle.skip=true进行编译 注意: 1、目前hudi使用的是hadoop2.7.3版本,如果使用hadoop3.x版本,请修改pom重新编译

<hadoop.version>3.0.0</hadoop.version>

2.1 DeltaStreamer启动命令

spark-submit --master yarn   \--driver-memory 1G \--num-executors 2 \--executor-memory 1G \--executor-cores 4 \--deploy-mode cluster \--conf spark.yarn.executor.memoryOverhead=512 \--conf spark.yarn.driver.memoryOverhead=512 \--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer `ls /.../hudi-utilities-bundle_2.11-0.5.2-SNAPSHOT.jar` \--props hdfs://../kafka.properties \--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \--target-base-path hdfs://../business \--op UPSERT \--target-table business  \    '这里其实并不是hive表的名称,实际表名是在kafka.properties中配置'--enable-hive-sync \          '开启同步至hive'--table-type MERGE_ON_READ \--source-ordering-field create_time \--source-limit 5000000

2.2 kafka.properties配置实例

hoodie.upsert.shuffle.parallelism=2
hoodie.insert.shuffle.parallelism=2
hoodie.bulkinsert.shuffle.parallelism=2
hoodie.datasource.write.recordkey.field=uuid
hoodie.datasource.write.partitionpath.field=create_time
hoodie.datasource.write.precombine.field=update_time
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://../t_business.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://../t3_trip.t_business.avsc
hoodie.deltastreamer.source.kafka.topic=t_business_topic
group.id=t_business_group
bootstrap.servers=localhost
auto.offset.reset=latest
hoodie.parquet.max.file.size=134217728
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.keygen.TimestampBasedKeyGenerator
hoodie.deltastreamer.keygen.timebased.timestamp.type=DATE_STRING
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss
hoodie.deltastreamer.keygen.timebased.output.dateformat=yyyy/MM/dd
hoodie.datasource.hive_sync.database=dwd
hoodie.datasource.hive_sync.table=test
hoodie.datasource.hive_sync.username=用户名
hoodie.datasource.hive_sync.password=密码
hoodie.datasource.hive_sync.jdbcurl=jdbc:hive2://.....
hoodie.datasource.hive_sync.partition_fields=分区字段

3.1.1 使用Spark查询

spark-shell --master yarn \
--driver-memory 1G \
--num-executors 1 \
--executor-memory 1G \
--executor-cores 1 \
--jars /home/t3cx/apps/hudi/hudi-spark-bundle_2.11-0.5.2-SNAPSHOT.jar \
--conf spark.sql.hive.convertMetastoreParquet=false   '在进行快照视图查询的时候需要添加此配置'

 如果使用MOR模式写入数据会在hive的dwd库下面生成两张表。分别是test_ro 和 test_rt test_rt表支持:快照视图和增量视图查询 test_ro表支持:读优化视图查询

#快照视图
spark.sql("select count(*) from dwd.test_rt").show() 
#读优化视图
spark.sql("select count(*) from dwd.test_ro").show() 
#增量视图
saprk sql不支持

3.1.2 使用Hive查询 

beeline -u jdbc:hive2://incubator-t3-infra04:10000 -n t3cx  -p t3cx  \--hiveconf hive.stats.autogather=false \#读优化查询select * from dwd.test_ro;#快照查询select * from dwd.test_rt;#增量查询set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;set hoodie.test.consume.mode=INCREMENTAL;set hoodie.test.consume.max.commits=3;set hoodie.test.consume.start.timestamp=20200427114546;select count(*) from  dwd.test_rt where `_hoodie_commit_time` > '20200427114546';#注意:
#1、hudi中parquet做了shaded,我在测试中发现(CDH6.3.0)下必须加载hudi-hadoop-mr中的parquet-avro包才行,clouder用户需要必须要重新安装mr所需要的jar
#2、set hive.input.format=org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat 
最好显示设置,否则有可能在某种情况下无法加载到hive.input.formate,即便在create-table的时候已经指定

如果使用COW模式写入数据,会在hive的dwd库下面生成一张表,test test表支持:快照视图和增量视图 

#快照视图 spark.sql("select count(*) from dwd.test").show()

4. 总结

DeltaStreamer是Hudi提供的非常实用的工具,通过DeltaStreamer可以将Kafka、DFS上的数据导入Hudi,而本篇博文主要讲解了如何使用DeltaStreamer将数据从Kafka导入Hudi,并演示了如何使用Spark和Hive查询Hudi数据。

二、将Kafka流式数据摄取至Hudi:SparkStreaming

Hudi 提供了Hudi 表的概念,这些表支持CRUD操作。我们可以基于这个特点,将Mysql Binlog的数据重放至Hudi表,然后基于Hive对Hudi表进行查询分析。数据流向架构如下

binlog数据写入Hudi表

  • binlog-consumer分支使用Spark streaming消费kafka中的Binlog数据,并写入Hudi表。Kafka中的binlog是通过阿里的Canal工具同步拉取的。程序入口是CanalKafkaImport2Hudi,它提供了一系列参数,配置程序的执行行为
/data/opt/spark-2.4.4-bin-hadoop2.6/bin/spark-submit --class com.niceshot.hudi.CanalKafkaImport2Hudi \--name hudi__goods \--master yarn \--deploy-mode cluster \--driver-memory 1G \--executor-memory 4G \--executor-cores 1 \--num-executors 40 \--queue hudi \--conf spark.executor.memoryOverhead=2048 \--conf "spark.executor.extraJavaOptions=-XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=\tmp\hudi-debug" \--conf spark.core.connection.ack.wait.timeout=300 \--conf spark.locality.wait=100 \--conf spark.streaming.backpressure.enabled=true \--conf spark.streaming.receiver.maxRate=500 \--conf spark.streaming.kafka.maxRatePerPartition=200 \--conf spark.ui.retainedJobs=10 \--conf spark.ui.retainedStages=10 \--conf spark.ui.retainedTasks=10 \--conf spark.worker.ui.retainedExecutors=10 \--conf spark.worker.ui.retainedDrivers=10 \--conf spark.sql.ui.retainedExecutions=10 \--conf spark.yarn.submit.waitAppCompletion=false \--conf spark.yarn.maxAppAttempts=4 \--conf spark.yarn.am.attemptFailuresValidityInterval=1h \--conf spark.yarn.max.executor.failures=20 \--conf spark.yarn.executor.failuresValidityInterval=1h \--conf spark.task.maxFailures=8 \/data/opt/spark-applications/hudi_canal_consumer/hudi-canal-import-1.0-SNAPSHOT-jar-with-dependencies.jar  --kafka-server local:9092 --kafka-topic dt_streaming_canal_xxx --base-save-path hdfs://192.168.2.1:8020/hudi_table/ --mapping-mysql-db-name crm --mapping-mysql-table-name order --primary-key id --partition-key createDate --duration-seconds 1200

历史数据同步以及表元数据同步至hive

history_import_and_meta_sync 分支提供了将历史数据同步至hudi表,以及将hudi表数据结构同步至hive meta的操作

同步历史数据至hudi表

这里采用的思路是

  • 将mysql全量数据通过注入sqoop等工具,导入到hive表。
  • 然后采用分支代码中的工具HiveImport2HudiConfig,将数据导入Hudi表

HiveImport2HudiConfig提供了如下一些参数,用于配置程序执行行为

一个程序执行demo

nohup java -jar hudi-learn-1.0-SNAPSHOT.jar 
--sync-hive-db-name hudi_temp 
--sync-hive-table-name crm__wx_user_info 
--base-save-path hdfs://192.168.2.2:8020/hudi_table/ 
--mapping-mysql-db-name crm 
--mapping-mysql-table-name "order" 
--primary-key "id" 
--partition-key created_date 
--hive-site-path /etc/lib/hive/conf/hive-site.xml 
--tmp-data-path /data/tmp > order.log &

同步hudi表结构至hive meta

需要将hudi的数据结构和分区,以hive外表的形式同步至Hive meta,才能是Hive感知到hudi数据,并通过sql进行查询分析。Hudi本身在消费Binlog进行存储时,可以顺带将相关表元数据信息同步至hive。但考虑到每条写入Apache Hudi表的数据,都要读写Hive Meta ,对Hive的性能可能影响很大。所以我单独开发了HiveMetaSyncConfig工具,用于同步hudi表元数据至Hive。考虑到目前程序只支持按天分区,所以同步工具可以一天执行一次即可。参数配置如下

参数名含义是否必填默认值
 -hive-db-name指定hudi表同步至哪个hive数据库
 -hive-table-name指定hudi表同步至哪个hive表
 -hive-jdbc-url指定hive meta的jdbc链接地址,例如jdbc:hive2://192.168.16.181:10000
 -hive-user-name指定hive meta的链接用户名默认hive
 -hive-pwd指定hive meta的链接密码默认hive
 -hudi-table-path指定hudi表所在hdfs的文件路径
 -hive-site-path指定hive的hive-site.xml路径

一个程序执行demo

java -jar hudi-learn-1.0-SNAPSHOT.jar 
--hive-db-name streaming 
--hive-table-name crm__order 
--hive-user-name hive 
--hive-pwd hive 
--hive-jdbc-url jdbc:hive2://192.168.16.181:10000 
--hudi-table-path hdfs://192.168.16.181:8020/hudi_table/crm__order 
--hive-site-path /lib/hive/conf/hive-site.xml

一些踩坑、hive相关配置

有些hive集群的hive.input.format配置,默认是org.apache.hadoop.hive.ql.io.CombineHiveInputFormat,这会导致挂载Hudi数据的Hive外表读取到所有Hudi的Parquet数据,从而导致最终的读取结果重复。

需要将hive的format改为org.apache.hadoop.hive.ql.io.HiveInputFormat,为了避免在整个集群层面上更改对其余离线Hive Sql造成不必要的影响,建议只对当前hive session设置set hive.input.format=org.apache.hadoop.hive.ql.io.HiveInputFormat;

三、将Kafka流式数据摄取至Hudi:Flink sql 

7.1 启动kafka生产者,生产数据

 1.1 启动user生产者,生产数据
bin/kafka-console-producer.sh --broker-list node1:9092 --topic user
{"user_id":"a0001","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a0002","order_amount":12.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a0003","order_amount":13.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a0004","order_amount":14.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a0005","order_amount":15.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a0006","order_amount":16.0,"log_ts":"2020-11-26 12:12:13"}
 
1.2 启动user_hobby生产者,生产数据

bin/kafka-console-producer.sh --broker-list node1:9092 --topic user_hobby
{"user_id":"a0001","name":"yangge","hobby":"足球"}
{"user_id":"a0002","name":"baba","hobby":"电影"}
{"user_id":"a0003","name":"mama","hobby":"游戏"}
{"user_id":"a0004","name":"dudu","hobby":"动画片"}
{"user_id":"a0005","name":"gege","hobby":"手机"}
{"user_id":"a0006","name":"jiejie","hobby":"睡觉"}

7.2 在Flink SQL客户端创建kafka对应的映射表

2.1 在Flink SQL客户端创建user表

CREATE TABLE user_ODS(user_id STRING,order_amount BIGINT,log_ts TIMESTAMP(3))WITH('connector' = 'kafka','topic' = 'user','properties.bootstrap.servers' = 'node1:9092','scan.startup.mode'='earliest-offset','properties.group.id' = 'testGroup','format' = 'json'
);

select  *  from user_ODS; 

2.2 在flink SQL客户端创建user_hobby表

CREATE TABLE user_hobby_ODS(user_id STRING,name STRING,hobby STRING)WITH('connector' = 'kafka','topic' = 'user_hobby','properties.bootstrap.servers' = 'node1:9092','scan.startup.mode'='earliest-offset','properties.group.id' = 'testGroup','format' = 'json'
);


select  *  from user_hobby_ODS;

7.3 Flink sql使用 hudi connector 创建hudi表

3.1 使用 hudi connector 创建hudi表

CREATE TABLE hudi_user(
user_id STRING,
order_amount BIGINT,
log_ts TIMESTAMP(3)
) WITH ('connector' = 'hudi','path' = 'hdfs://node1:8020/hudi/hudi_user','table.type' = 'MERGE_ON_READ','changelog.enabled' = 'true','write.precombine.field' = 'log_ts','hoodie.datasource.write.recordkey.field' = 'user_id','compaction.async.enabled' = 'false'
);

网Hudi表插入数据 

insert into hudi_user select  * from  user_ODS; select *  from  hudi_user ;

3.2 使用 hudi connector 创建hudi表

CREATE TABLE hudi_user_hobby(user_id STRING,name STRING,hobby STRING
) WITH ('connector' = 'hudi','path' = 'hdfs://node1:8020/hudi/hudi_user_hobby','table.type' = 'MERGE_ON_READ','changelog.enabled' = 'true','hoodie.datasource.write.recordkey.field' = 'user_id','write.precombine.field' = 'user_id','compaction.async.enabled' = 'false'
);
insert into hudi_user_hobby select  * from  user_hobby_ODS;

select * from  hudi_user_hobby;

7.4 使用 hudi connector 创建hudi DWD表 

4.1 在Flink SQL 创建DWD输出表

CREATE TABLE user_hobby_DWD (user_id STRING,name STRING,hobby STRING,order_amount BIGINT,log_ts TIMESTAMP(3)
)WITH('connector' = 'hudi','path' = 'hdfs://node1:8020/hudi/user_hobby_DWD','table.type' = 'MERGE_ON_READ','changelog.enabled' = 'true','hoodie.datasource.write.recordkey.field' = 'user_id','write.precombine.field' = 'user_id','compaction.async.enabled' = 'false'
);
INSERT INTO user_hobby_DWD
SELECT A.user_id, B.name, B.hobby, A.order_amount, A.log_ts
FROM hudi_user AJOIN hudi_user_hobby B ON A.user_id = B.user_id;

 注意事项:字段的顺序和最终写入表的字段顺序必须一致,不一致会报错.

出现这样的结果,说明join完成.

这篇关于将Kafka流式数据摄取至Hudi的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/360496

相关文章

MySQL 中查询 VARCHAR 类型 JSON 数据的问题记录

《MySQL中查询VARCHAR类型JSON数据的问题记录》在数据库设计中,有时我们会将JSON数据存储在VARCHAR或TEXT类型字段中,本文将详细介绍如何在MySQL中有效查询存储为V... 目录一、问题背景二、mysql jsON 函数2.1 常用 JSON 函数三、查询示例3.1 基本查询3.2

SpringBatch数据写入实现

《SpringBatch数据写入实现》SpringBatch通过ItemWriter接口及其丰富的实现,提供了强大的数据写入能力,本文主要介绍了SpringBatch数据写入实现,具有一定的参考价值,... 目录python引言一、ItemWriter核心概念二、数据库写入实现三、文件写入实现四、多目标写入

使用Python将JSON,XML和YAML数据写入Excel文件

《使用Python将JSON,XML和YAML数据写入Excel文件》JSON、XML和YAML作为主流结构化数据格式,因其层次化表达能力和跨平台兼容性,已成为系统间数据交换的通用载体,本文将介绍如何... 目录如何使用python写入数据到Excel工作表用Python导入jsON数据到Excel工作表用

Mysql如何将数据按照年月分组的统计

《Mysql如何将数据按照年月分组的统计》:本文主要介绍Mysql如何将数据按照年月分组的统计方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录mysql将数据按照年月分组的统计要的效果方案总结Mysql将数据按照年月分组的统计要的效果方案① 使用 DA

鸿蒙中Axios数据请求的封装和配置方法

《鸿蒙中Axios数据请求的封装和配置方法》:本文主要介绍鸿蒙中Axios数据请求的封装和配置方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录1.配置权限 应用级权限和系统级权限2.配置网络请求的代码3.下载在Entry中 下载AxIOS4.封装Htt

Python获取中国节假日数据记录入JSON文件

《Python获取中国节假日数据记录入JSON文件》项目系统内置的日历应用为了提升用户体验,特别设置了在调休日期显示“休”的UI图标功能,那么问题是这些调休数据从哪里来呢?我尝试一种更为智能的方法:P... 目录节假日数据获取存入jsON文件节假日数据读取封装完整代码项目系统内置的日历应用为了提升用户体验,

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

Python Dash框架在数据可视化仪表板中的应用与实践记录

《PythonDash框架在数据可视化仪表板中的应用与实践记录》Python的PlotlyDash库提供了一种简便且强大的方式来构建和展示互动式数据仪表板,本篇文章将深入探讨如何使用Dash设计一... 目录python Dash框架在数据可视化仪表板中的应用与实践1. 什么是Plotly Dash?1.1