二百零七、Flume——Flume实时采集5分钟频率的Kafka数据直接写入ODS层表的HDFS文件路径下

本文主要是介绍二百零七、Flume——Flume实时采集5分钟频率的Kafka数据直接写入ODS层表的HDFS文件路径下,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、目的

在离线数仓中,需要用Flume去采集Kafka中的数据,然后写入HDFS中。

由于每种数据类型的频率、数据大小、数据规模不同,因此每种数据的采集需要不同的Flume配置文件。玩了几天Flume,感觉Flume的使用难点就是配置文件

二、使用场景

转向比数据是数据频率为5分钟的数据类型代表,数据量很小、频率不高,因此搞定了转向比数据的采集就搞定了这一类低频率数据的实时采集问题

1台设备每日的转向比数据规模是30KB,25台设备的数据规模则是750KB

三、转向比数据ODS层建表

create external table  if not exists  ods_turnratio(turnratio_json  string
)
comment '转向比数据外部表——静态分区'
partitioned by (day string)
row format delimited fields terminated by '\x001'
lines terminated by '\n'
stored as SequenceFile
tblproperties("skip.header.line.count"="1");

四、转向比数据的配置文件

## agent a1
a1.sources = s1
a1.channels = c1
a1.sinks = k1

## configure source s1
a1.sources.s1.type = org.apache.flume.source.kafka.KafkaSource
a1.sources.s1.kafka.bootstrap.servers = 192.168.0.27:9092
a1.sources.s1.kafka.topics = topic_b_turnratio
a1.sources.s1.kafka.consumer.group.id = turnratio_group
a1.sources.s1.kafka.consumer.auto.offset.reset = latest
a1.sources.s1.batchSize = 1000

## configure channel c1
## a1.channels.c1.type = memory
## a1.channels.c1.capacity = 10000
## a1.channels.c1.transactionCapacity = 1000
a1.channels.c1.type = file
a1.channels.c1.checkpointDir = /home/data/flumeData/checkpoint/turnratio
a1.channels.c1.dataDirs = /home/data/flumeData/flumedata/turnratio

## configure sink k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.hdfs.path = hdfs://hurys23:8020/user/hive/warehouse/hurys_dc_ods.db/ods_turnratio/day=%Y-%m-%d/
a1.sinks.k1.hdfs.filePrefix = turnratio
a1.sinks.k1.hdfs.fileSuffix = .log
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 62500
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 600
a1.sinks.k1.hdfs.minBlockReplicas = 1

## Bind the source and sink to the channel
a1.sources.s1.channels = c1
a1.sinks.k1.channel = c1

注意:62500约为61KB

五、Flume写入HDFS结果

Flume根据时间戳按照ODS层表的分区,将数据写入对应HDFS文件

25台设备,50分钟1个文件,文件大小66.18 KB 

六、ODS表刷新分区后查验数据

(一)刷新表分区

MSCK REPAIR TABLE ods_turnratio;

(二)查看表数据

select * from ods_turnratio;

(三)验证数据完整性

--2023-11-19 数据基本完整  23时297条 标准300  少3条
--2023-11-20 数据基本完整  23时299条 标准300  少1条

数据基本完整,尤其是调度文件大小之后

19日a1.sinks.k1.hdfs.rollSize = 31250        数据基本完整 23时297条 标准300 少3条

20日a1.sinks.k1.hdfs.rollSize = 62500        数据基本完整 23时299条 标准300 少1条

七、注意点

(一)配置文件中的重点是红色标记的几点

a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = second
a1.sinks.k1.hdfs.rollSize = 62500
a1.sinks.k1.hdfs.rollCount = 0
a1.sinks.k1.hdfs.rollInterval = 0
a1.sinks.k1.hdfs.idleTimeout = 600
a1.sinks.k1.hdfs.minBlockReplicas = 1

(二)任务配置文件中rollSize参数设置可大不可小

rollSize参数小的话数据会丢失,大的话没问题

配置文件的参数还是不断调试中,争取调到最优的状态。能够及时、完整的消费Kafka数据,并且能够最大化的利用HDFS资源。

目前就先这样,如果有问题的话后面再更新!!!

这篇关于二百零七、Flume——Flume实时采集5分钟频率的Kafka数据直接写入ODS层表的HDFS文件路径下的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/418823

相关文章

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下

5分钟获取deepseek api并搭建简易问答应用

《5分钟获取deepseekapi并搭建简易问答应用》本文主要介绍了5分钟获取deepseekapi并搭建简易问答应用,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需... 目录1、获取api2、获取base_url和chat_model3、配置模型参数方法一:终端中临时将加

Java中注解与元数据示例详解

《Java中注解与元数据示例详解》Java注解和元数据是编程中重要的概念,用于描述程序元素的属性和用途,:本文主要介绍Java中注解与元数据的相关资料,文中通过代码介绍的非常详细,需要的朋友可以参... 目录一、引言二、元数据的概念2.1 定义2.2 作用三、Java 注解的基础3.1 注解的定义3.2 内