本文主要是介绍Flume RegexHbaseEventSerializer自定义rowKey,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
上篇Flume谈到setwritewal出错的问题,通过注释了3行代码。但是由于rowkey默认是自动产生的,产生的规则通过源代码可以看出,规则是:String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());
如果要自定义rowkey,修改源代码是唯一的办法,RegexHbaseEventSerializer.java就是我们要修改的文件。我们可以新建一个类来继承,原始文件不要去修改。不过今天我测试的时候是直接修改源文件的。
我的需要的rowkey是:B13612145#1529637655#3#1530085147015#54
String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());
前3个字段都在文件名中,这就意味着,我必须解析文件名获得这3个字段,那么配置文件必须添加header:
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /data/flume/r1/data
a1.sources.r1.batchSize = 100
#a1.sources.r1.fileHeader = true
a1.sources.r1.basenameHeader = true
a1.sources.r1.channels = c1
OK,来整理一下思路,我需要解析文件名来获取3个字段作为rowkey组成部分,那么配置需要添加header,然后把源代码自动生成rowkey的规则替换成我们自己的规则,就是这么简单。
1. 新建解析文件名的方法:
public String splitFileName() {for (Map.Entry<String, String> entry : headers.entrySet()) {return entry.getValue();}return null;}
既然是要解析文件名,很显然要知道怎么获取文件名,从代码可以知道headers.entrySet就是获取header的方法,因为我就一个header,所以一次循环就return结果。
2. 替换默认rowkey生成规则
protected byte[] getRowKey(Calendar cal) {/** NOTE: This key generation strategy has the following properties:* * 1) Within a single JVM, the same row key will never be duplicated. 2)* Amongst any two JVM's operating at different time periods (according* to their respective clocks), the same row key will never be* duplicated. 3) Amongst any two JVM's operating concurrently* (according to their respective clocks), the odds of duplicating a* row-key are non-zero but infinitesimal. This would require* simultaneous collision in (a) the timestamp (b) the respective nonce* and (c) the random string. The string is necessary since (a) and (b)* could collide if a fleet of Flume agents are restarted in tandem.* * Row-key uniqueness is important because conflicting row-keys will* cause data loss.*/this.fileName = splitFileName();this.machineNo = fileName.split("_")[1];this.fileTimeStamp = fileName.split("_")[2];this.fileNo = fileName.split("_")[3].split("\\.")[0];String rowKey = String.format("%s#%s#%s#%s#%s", machineNo, fileTimeStamp , fileNo , cal.getTimeInMillis(), nonce.getAndIncrement());//String rowKey = String.format("%s-%s-%s", cal.getTimeInMillis(), randomKey, nonce.getAndIncrement());return rowKey.getBytes(charset);}
注释掉的那行就是默认的规则,新的是我自己要的规则。
就这样完成了,打个包替换之前的包,消费一个文件来测试,结果正如我们所期望的:
B13612145#1529637655#3#1530085147016#55 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418 B13612145#1529637655#3#1530085147016#55 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0 B13612145#1529637655#3#1530085147016#55 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,7,93,0 B13612145#1529637655#3#1530085147016#55 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3 B13612145#1529637655#3#1530085147016#55 column=cf:ext_toolno, timestamp=1530085147229, value=30 B13612145#1529637655#3#1530085147017#56 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418 B13612145#1529637655#3#1530085147017#56 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0 B13612145#1529637655#3#1530085147017#56 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,11,92,0 B13612145#1529637655#3#1530085147017#56 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3 B13612145#1529637655#3#1530085147017#56 column=cf:ext_toolno, timestamp=1530085147229, value=30 B13612145#1529637655#3#1530085147018#57 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418 B13612145#1529637655#3#1530085147018#57 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0 B13612145#1529637655#3#1530085147018#57 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=6,7,93,0 B13612145#1529637655#3#1530085147018#57 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3 B13612145#1529637655#3#1530085147018#57 column=cf:ext_toolno, timestamp=1530085147229, value=30 B13612145#1529637655#3#1530085147018#58 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418 B13612145#1529637655#3#1530085147018#58 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0 B13612145#1529637655#3#1530085147018#58 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=5,8,93,0 B13612145#1529637655#3#1530085147018#58 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3 B13612145#1529637655#3#1530085147018#58 column=cf:ext_toolno, timestamp=1530085147229, value=30 B13612145#1529637655#3#1530085147019#59 column=cf:cnc_exeprgname, timestamp=1530085147229, value=418 B13612145#1529637655#3#1530085147019#59 column=cf:cnc_rdspmeter[0], timestamp=1530085147229, value=0 B13612145#1529637655#3#1530085147019#59 column=cf:cnc_rdsvmeter, timestamp=1530085147229, value=7,8,93,0 B13612145#1529637655#3#1530085147019#59 column=cf:cnc_statinfo[3], timestamp=1530085147229, value=3 B13612145#1529637655#3#1530085147019#59 column=cf:ext_toolno, timestamp=1530085147229, value=30
今天测试的时候碰到2个问题:
1. 消费文件有几次出现文件名已经修改为.COMPLETE,但是我HBASE数据没有任何增加,而且没有报任何错误,。给我的感觉就是没有消费。测试了几次,都是如此,很是困惑,后来突然想起来之前有人提到过如果一个很大的文件需要放到spooldir目录会发生错误,因为文件一进去就会消费,但是文件又在拷贝过程。后来我改成先把原始文件名添加.COMPLETE,拷贝完成之后,再修改文件名去掉.COMPLETE.
2. 时间冲突
rowkey的规则里有时间,我有一个文件60行数据,消费之后只有48条,因为之前我同过spark 消费也出现过这个问题,因此很容易知道这是因为rowkey冲突了,导致数据覆盖了,因此把源文件的nonce.getAndIncrement()加到ROWKEY即可。
简单说就是循环的过程cal.getTimeInMillis()这个玩意会可能重复,很多人觉得微秒级别不应该出现重复,事实上我碰到过2次,因此现在对通过时间作为rowkey格外小心。
这篇关于Flume RegexHbaseEventSerializer自定义rowKey的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!