Flume RegexHbaseEventSerializer自定义rowKey

2024-04-16 18:48

本文主要是介绍Flume RegexHbaseEventSerializer自定义rowKey,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

上篇Flume谈到setwritewal出错的问题,通过注释了3行代码。但是由于rowkey默认是自动产生的,产生的规则通过源代码可以看出,规则是:
String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());

如果要自定义rowkey,修改源代码是唯一的办法,RegexHbaseEventSerializer.java就是我们要修改的文件。我们可以新建一个类来继承,原始文件不要去修改。不过今天我测试的时候是直接修改源文件的。

我的需要的rowkey是:B13612145#1529637655#3#1530085147015#54

String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());

前3个字段都在文件名中,这就意味着,我必须解析文件名获得这3个字段,那么配置文件必须添加header:

a1.sources.r1.type = spooldir  
a1.sources.r1.spoolDir = /data/flume/r1/data
a1.sources.r1.batchSize = 100
#a1.sources.r1.fileHeader = true
a1.sources.r1.basenameHeader = true
a1.sources.r1.channels = c1  

OK,来整理一下思路,我需要解析文件名来获取3个字段作为rowkey组成部分,那么配置需要添加header,然后把源代码自动生成rowkey的规则替换成我们自己的规则,就是这么简单。

1. 新建解析文件名的方法:

	public String splitFileName() {for (Map.Entry<String, String> entry : headers.entrySet()) {return entry.getValue();}return null;}

既然是要解析文件名,很显然要知道怎么获取文件名,从代码可以知道headers.entrySet就是获取header的方法,因为我就一个header,所以一次循环就return结果。

2.  替换默认rowkey生成规则

	protected byte[] getRowKey(Calendar cal) {/** NOTE: This key generation strategy has the following properties:* * 1) Within a single JVM, the same row key will never be duplicated. 2)* Amongst any two JVM's operating at different time periods (according* to their respective clocks), the same row key will never be* duplicated. 3) Amongst any two JVM's operating concurrently* (according to their respective clocks), the odds of duplicating a* row-key are non-zero but infinitesimal. This would require* simultaneous collision in (a) the timestamp (b) the respective nonce* and (c) the random string. The string is necessary since (a) and (b)* could collide if a fleet of Flume agents are restarted in tandem.* * Row-key uniqueness is important because conflicting row-keys will* cause data loss.*/this.fileName = splitFileName();this.machineNo = fileName.split("_")[1];this.fileTimeStamp = fileName.split("_")[2];this.fileNo = fileName.split("_")[3].split("\\.")[0];String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());//String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());return rowKey.getBytes(charset);}

注释掉的那行就是默认的规则,新的是我自己要的规则。

就这样完成了,打个包替换之前的包,消费一个文件来测试,结果正如我们所期望的:

 B13612145#1529637655#3#1530085147016#55   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147016#55   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147016#55   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,7,93,0                                                            B13612145#1529637655#3#1530085147016#55   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147016#55   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     B13612145#1529637655#3#1530085147017#56   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147017#56   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147017#56   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,11,92,0                                                           B13612145#1529637655#3#1530085147017#56   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147017#56   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     B13612145#1529637655#3#1530085147018#57   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147018#57   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147018#57   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,7,93,0                                                            B13612145#1529637655#3#1530085147018#57   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147018#57   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     B13612145#1529637655#3#1530085147018#58   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147018#58   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147018#58   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=5,8,93,0                                                            B13612145#1529637655#3#1530085147018#58   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147018#58   column=cf:ext_toolno, timestamp=1530085147229, value=30                                                                     B13612145#1529637655#3#1530085147019#59   column=cf:cnc_exeprgname, timestamp=1530085147229, value=418                                                                B13612145#1529637655#3#1530085147019#59   column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0                                                                B13612145#1529637655#3#1530085147019#59   column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,8,93,0                                                            B13612145#1529637655#3#1530085147019#59   column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3                                                                 B13612145#1529637655#3#1530085147019#59   column=cf:ext_toolno, timestamp=1530085147229, value=30         


今天测试的时候碰到2个问题:

1. 消费文件有几次出现文件名已经修改为.COMPLETE,但是我HBASE数据没有任何增加,而且没有报任何错误,。给我的感觉就是没有消费。测试了几次,都是如此,很是困惑,后来突然想起来之前有人提到过如果一个很大的文件需要放到spooldir目录会发生错误,因为文件一进去就会消费,但是文件又在拷贝过程。后来我改成先把原始文件名添加.COMPLETE,拷贝完成之后,再修改文件名去掉.COMPLETE.

2. 时间冲突

rowkey的规则里有时间,我有一个文件60行数据,消费之后只有48条,因为之前我同过spark 消费也出现过这个问题,因此很容易知道这是因为rowkey冲突了,导致数据覆盖了,因此把源文件的nonce.getAndIncrement()加到ROWKEY即可。

简单说就是循环的过程cal.getTimeInMillis()这个玩意会可能重复,很多人觉得微秒级别不应该出现重复,事实上我碰到过2次,因此现在对通过时间作为rowkey格外小心。



这篇关于Flume RegexHbaseEventSerializer自定义rowKey的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/909604

相关文章

使用Sentinel自定义返回和实现区分来源方式

《使用Sentinel自定义返回和实现区分来源方式》:本文主要介绍使用Sentinel自定义返回和实现区分来源方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Sentinel自定义返回和实现区分来源1. 自定义错误返回2. 实现区分来源总结Sentinel自定

如何自定义Nginx JSON日志格式配置

《如何自定义NginxJSON日志格式配置》Nginx作为最流行的Web服务器之一,其灵活的日志配置能力允许我们根据需求定制日志格式,本文将详细介绍如何配置Nginx以JSON格式记录访问日志,这种... 目录前言为什么选择jsON格式日志?配置步骤详解1. 安装Nginx服务2. 自定义JSON日志格式各

Android自定义Scrollbar的两种实现方式

《Android自定义Scrollbar的两种实现方式》本文介绍两种实现自定义滚动条的方法,分别通过ItemDecoration方案和独立View方案实现滚动条定制化,文章通过代码示例讲解的非常详细,... 目录方案一:ItemDecoration实现(推荐用于RecyclerView)实现原理完整代码实现

基于Spring实现自定义错误信息返回详解

《基于Spring实现自定义错误信息返回详解》这篇文章主要为大家详细介绍了如何基于Spring实现自定义错误信息返回效果,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录背景目标实现产出背景Spring 提供了 @RestConChina编程trollerAdvice 用来实现 HTT

SpringSecurity 认证、注销、权限控制功能(注销、记住密码、自定义登入页)

《SpringSecurity认证、注销、权限控制功能(注销、记住密码、自定义登入页)》SpringSecurity是一个强大的Java框架,用于保护应用程序的安全性,它提供了一套全面的安全解决方案... 目录简介认识Spring Security“认证”(Authentication)“授权” (Auth

SpringBoot自定义注解如何解决公共字段填充问题

《SpringBoot自定义注解如何解决公共字段填充问题》本文介绍了在系统开发中,如何使用AOP切面编程实现公共字段自动填充的功能,从而简化代码,通过自定义注解和切面类,可以统一处理创建时间和修改时间... 目录1.1 问题分析1.2 实现思路1.3 代码开发1.3.1 步骤一1.3.2 步骤二1.3.3

dubbo3 filter(过滤器)如何自定义过滤器

《dubbo3filter(过滤器)如何自定义过滤器》dubbo3filter(过滤器)类似于javaweb中的filter和springmvc中的intercaptor,用于在请求发送前或到达前进... 目录dubbo3 filter(过滤器)简介dubbo 过滤器运行时机自定义 filter第一种 @A

CSS自定义浏览器滚动条样式完整代码

《CSS自定义浏览器滚动条样式完整代码》:本文主要介绍了如何使用CSS自定义浏览器滚动条的样式,包括隐藏滚动条的角落、设置滚动条的基本样式、轨道样式和滑块样式,并提供了完整的CSS代码示例,通过这些技巧,你可以为你的网站添加个性化的滚动条样式,从而提升用户体验,详细内容请阅读本文,希望能对你有所帮助...

SpringBoot 自定义消息转换器使用详解

《SpringBoot自定义消息转换器使用详解》本文详细介绍了SpringBoot消息转换器的知识,并通过案例操作演示了如何进行自定义消息转换器的定制开发和使用,感兴趣的朋友一起看看吧... 目录一、前言二、SpringBoot 内容协商介绍2.1 什么是内容协商2.2 内容协商机制深入理解2.2.1 内容

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06