Flume RegexHbaseEventSerializer自定义rowKey

2024-04-16 18:48

本文主要是介绍Flume RegexHbaseEventSerializer自定义rowKey,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上篇Flume谈到setwritewal出错的问题,通过注释了3行代码。但是由于rowkey默认是自动产生的,产生的规则通过源代码可以看出,规则是:
String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());

如果要自定义rowkey,修改源代码是唯一的办法,RegexHbaseEventSerializer.java就是我们要修改的文件。我们可以新建一个类来继承,原始文件不要去修改。不过今天我测试的时候是直接修改源文件的。

我的需要的rowkey是:B13612145#1529637655#3#1530085147015#54

String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());

前3个字段都在文件名中,这就意味着,我必须解析文件名获得这3个字段,那么配置文件必须添加header:

a1.sources.r1.type = spooldir  
a1.sources.r1.spoolDir = /data/flume/r1/data
a1.sources.r1.batchSize = 100
#a1.sources.r1.fileHeader = true
a1.sources.r1.basenameHeader = true
a1.sources.r1.channels = c1  

OK,来整理一下思路,我需要解析文件名来获取3个字段作为rowkey组成部分,那么配置需要添加header,然后把源代码自动生成rowkey的规则替换成我们自己的规则,就是这么简单。

1. 新建解析文件名的方法:

	public String splitFileName() {for (Map.Entry<String, String> entry : headers.entrySet()) {return entry.getValue();}return null;}

既然是要解析文件名,很显然要知道怎么获取文件名,从代码可以知道headers.entrySet就是获取header的方法,因为我就一个header,所以一次循环就return结果。

2.  替换默认rowkey生成规则

	protected byte[] getRowKey(Calendar cal) {/** NOTE: This key generation strategy has the following properties:* * 1) Within a single JVM, the same row key will never be duplicated. 2)* Amongst any two JVM's operating at different time periods (according* to their respective clocks), the same row key will never be* duplicated. 3) Amongst any two JVM's operating concurrently* (according to their respective clocks), the odds of duplicating a* row-key are non-zero but infinitesimal. This would require* simultaneous collision in (a) the timestamp (b) the respective nonce* and (c) the random string. The string is necessary since (a) and (b)* could collide if a fleet of Flume agents are restarted in tandem.* * Row-key uniqueness is important because conflicting row-keys will* cause data loss.*/this.fileName = splitFileName();this.machineNo = fileName.split("_")[1];this.fileTimeStamp = fileName.split("_")[2];this.fileNo = fileName.split("_")[3].split("\\.")[0];String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());//String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());return rowKey.getBytes(charset);}

注释掉的那行就是默认的规则,新的是我自己要的规则。

就这样完成了,打个包替换之前的包,消费一个文件来测试,结果正如我们所期望的:

 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147016#55   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147016#55   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,7,93,0                                                            B13612145#1529637655#3#1530085147016#55   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147016#55   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     B13612145#1529637655#3#1530085147017#56   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147017#56   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147017#56   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,11,92,0                                                           B13612145#1529637655#3#1530085147017#56   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147017#56   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     B13612145#1529637655#3#1530085147018#57   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147018#57   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147018#57   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,7,93,0                                                            B13612145#1529637655#3#1530085147018#57   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147018#57   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     B13612145#1529637655#3#1530085147018#58   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147018#58   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147018#58   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=5,8,93,0                                                            B13612145#1529637655#3#1530085147018#58   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147018#58   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     B13612145#1529637655#3#1530085147019#59   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147019#59   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147019#59   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,8,93,0                                                            B13612145#1529637655#3#1530085147019#59   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147019#59   column=cf:ext_toolno, timestamp=1530085147229, value=30         


今天测试的时候碰到2个问题:

1. 消费文件有几次出现文件名已经修改为.COMPLETE,但是我HBASE数据没有任何增加,而且没有报任何错误,。给我的感觉就是没有消费。测试了几次,都是如此,很是困惑,后来突然想起来之前有人提到过如果一个很大的文件需要放到spooldir目录会发生错误,因为文件一进去就会消费,但是文件又在拷贝过程。后来我改成先把原始文件名添加.COMPLETE,拷贝完成之后,再修改文件名去掉.COMPLETE.

2. 时间冲突

rowkey的规则里有时间,我有一个文件60行数据,消费之后只有48条,因为之前我同过spark 消费也出现过这个问题,因此很容易知道这是因为rowkey冲突了,导致数据覆盖了,因此把源文件的nonce.getAndIncrement()加到ROWKEY即可。

简单说就是循环的过程cal.getTimeInMillis()这个玩意会可能重复,很多人觉得微秒级别不应该出现重复,事实上我碰到过2次,因此现在对通过时间作为rowkey格外小心。



这篇关于Flume RegexHbaseEventSerializer自定义rowKey的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/909604

相关文章

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

自定义类型:结构体(续)

目录 一. 结构体的内存对齐 1.1 为什么存在内存对齐? 1.2 修改默认对齐数 二. 结构体传参 三. 结构体实现位段 一. 结构体的内存对齐 在前面的文章里我们已经讲过一部分的内存对齐的知识,并举出了两个例子,我们再举出两个例子继续说明: struct S3{double a;int b;char c;};int mian(){printf("%zd\n",s

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

Spring 源码解读:自定义实现Bean定义的注册与解析

引言 在Spring框架中,Bean的注册与解析是整个依赖注入流程的核心步骤。通过Bean定义,Spring容器知道如何创建、配置和管理每个Bean实例。本篇文章将通过实现一个简化版的Bean定义注册与解析机制,帮助你理解Spring框架背后的设计逻辑。我们还将对比Spring中的BeanDefinition和BeanDefinitionRegistry,以全面掌握Bean注册和解析的核心原理。

Oracle type (自定义类型的使用)

oracle - type   type定义: oracle中自定义数据类型 oracle中有基本的数据类型,如number,varchar2,date,numeric,float....但有时候我们需要特殊的格式, 如将name定义为(firstname,lastname)的形式,我们想把这个作为一个表的一列看待,这时候就要我们自己定义一个数据类型 格式 :create or repla

HTML5自定义属性对象Dataset

原文转自HTML5自定义属性对象Dataset简介 一、html5 自定义属性介绍 之前翻译的“你必须知道的28个HTML5特征、窍门和技术”一文中对于HTML5中自定义合法属性data-已经做过些介绍,就是在HTML5中我们可以使用data-前缀设置我们需要的自定义属性,来进行一些数据的存放,例如我们要在一个文字按钮上存放相对应的id: <a href="javascript:" d

一步一步将PlantUML类图导出为自定义格式的XMI文件

一步一步将PlantUML类图导出为自定义格式的XMI文件 说明: 首次发表日期:2024-09-08PlantUML官网: https://plantuml.com/zh/PlantUML命令行文档: https://plantuml.com/zh/command-line#6a26f548831e6a8cPlantUML XMI文档: https://plantuml.com/zh/xmi

argodb自定义函数读取hdfs文件的注意点,避免FileSystem已关闭异常

一、问题描述 一位同学反馈,他写的argo存过中调用了一个自定义函数,函数会加载hdfs上的一个文件,但有些节点会报FileSystem closed异常,同时有时任务会成功,有时会失败。 二、问题分析 argodb的计算引擎是基于spark的定制化引擎,对于自定义函数的调用跟hive on spark的是一致的。udf要通过反射生成实例,然后迭代调用evaluate。通过代码分析,udf在

鸿蒙开发中实现自定义弹窗 (CustomDialog)

效果图 #思路 创建带有 @CustomDialog 修饰的组件 ,并且在组件内部定义controller: CustomDialogController 实例化CustomDialogController,加载组件,open()-> 打开对话框 , close() -> 关闭对话框 #定义弹窗 (CustomDialog)是什么? CustomDialog是自定义弹窗,可用于广告、中

mybatis框架基础以及自定义插件开发

文章目录 框架概览框架预览MyBatis框架的核心组件MyBatis框架的工作原理MyBatis框架的配置MyBatis框架的最佳实践 自定义插件开发1. 添加依赖2. 创建插件类3. 配置插件4. 启动类中注册插件5. 测试插件 参考文献 框架概览 MyBatis是一个优秀的持久层框架,它支持自定义SQL、存储过程以及高级映射,为开发者提供了极大的灵活性和便利性。以下是关于M