flume iterceptor

2024-08-28 09:58
文章标签 flume iterceptor

本文主要是介绍flume iterceptor,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

对于flume拦截器,我的理解是:在app(应用程序日志)和 source 之间的,对app日志进行拦截处理的。也即在日志进入到source之前,对日志进行一些包装、清新过滤等等动作。

官方上提供的已有的拦截器有:

Timestamp Interceptor
Host Interceptor
Static Interceptor
Regex Filtering Interceptor
Regex Extractor Interceptor

像很多java的开源项目如springmvc中的拦截器一样,flume的拦截器也是chain形式的,可以对一个source指定多个拦截器,按先后顺序依次处理。
Timestamp Interceptor :在event的header中添加一个key叫:timestamp,value为当前的时间戳。这个拦截器在sink为hdfs 时很有用,后面会举例说到
Host Interceptor:在event的header中添加一个key叫:host,value为当前机器的hostname或者ip。
Static Interceptor:可以在event的header中添加自定义的key和value。
Regex Filtering Interceptor:通过正则来清洗或包含匹配的events。
Regex Extractor Interceptor:通过正则表达式来在header中添加指定的key,value则为正则匹配的部分

下面举例说明这些拦截器的用法,首先我们调整一下第一篇文章中的那个WriteLog类:


public class WriteLog {
protected static final Log logger = LogFactory.getLog(WriteLog.class);

/**
* @param args
* @throws InterruptedException
*/
public static void main(String[] args) throws InterruptedException {
// TODO Auto-generated method stub
while (true) {
logger.info(new Date().getTime());
logger.info("{\"requestTime\":"
+ System.currentTimeMillis()
+ ",\"requestParams\":{\"timestamp\":1405499314238,\"phone\":\"02038824941\",\"cardName\":\"测试商家名称\",\"provinceCode\":\"440000\",\"cityCode\":\"440106\"},\"requestUrl\":\"/reporter-api/reporter/reporter12/init.do\"}");
Thread.sleep(2000);

}
}
}



又多输出了一行日志信息,现在每次循环都会输出两行日志信息,第一行是一个时间戳信息,第二行是一行JSON格式的字符串信息。

接下来我们用regex_filter和 timestamp这两个拦截器来实现这样一个功能:
1 过滤掉LOG4J输出的第一行那个时间戳日志信息,只收集JSON格式的日志信息
2 将收集的日志信息保存到HDFS上,每天的日志保存到以该天命名的目录下面,如2014-7-25号的日志,保存到/flume/events/14-07-25目录下面。

修改后的flume.conf如下:


tier1.sources=source1
tier1.channels=channel1
tier1.sinks=sink1

tier1.sources.source1.type=avro
tier1.sources.source1.bind=0.0.0.0
tier1.sources.source1.port=44444
tier1.sources.source1.channels=channel1

tier1.sources.source1.interceptors=i1 i2
tier1.sources.source1.interceptors.i1.type=regex_filter
tier1.sources.source1.interceptors.i1.regex=\\{.*\\}
tier1.sources.source1.interceptors.i2.type=timestamp

tier1.channels.channel1.type=memory
tier1.channels.channel1.capacity=10000
tier1.channels.channel1.transactionCapacity=1000
tier1.channels.channel1.keep-alive=30

tier1.sinks.sink1.type=hdfs
tier1.sinks.sink1.channel=channel1
tier1.sinks.sink1.hdfs.path=hdfs://master68:8020/flume/events/%y-%m-%d
tier1.sinks.sink1.hdfs.fileType=DataStream
tier1.sinks.sink1.hdfs.writeFormat=Text
tier1.sinks.sink1.hdfs.rollInterval=0
tier1.sinks.sink1.hdfs.rollSize=10240
tier1.sinks.sink1.hdfs.rollCount=0
tier1.sinks.sink1.hdfs.idleTimeout=60


我们对source1添加了两个拦截器i1和i2,i1为regex_filter,过滤的正则为\\{.*\\},注意正则的写法用到了转义字符,不然source1无法启动,会报错。
i2为timestamp,在header中添加了一个timestamp的key,然后我们修改了sink1.hdfs.path在后面加上了/%y-%m-%d这一串字符,这一串字符要求event的header中必须有timestamp这个key,这就是为什么我们需要添加一个timestamp拦截器的原因,如果不添加这个拦截器,无法使用这样的占位符,会报错。还有很多占位符,请参考官方文档。

然后运行WriteLog,去hdfs上查看对应目录下面的文件,会发现内容只有JSON字符串的日志,与我们的功能描述一致。

这篇关于flume iterceptor的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1114443

相关文章

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

flume系列之:记录一次flume agent进程被异常oom kill -9的原因定位

flume系列之:记录一次flume agent进程被异常oom kill -9的原因定位 一、背景二、定位问题三、解决方法 一、背景 flume系列之:定位flume没有关闭某个时间点生成的tmp文件的原因,并制定解决方案在博主上面这篇文章的基础上,在机器内存、cpu资源、flume agent资源都足够的情况下,flume agent又出现了tmp文件无法关闭的情况 二、

打通实时流处理log4j-flume-kafka-structured-streaming

大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 模拟产生log4j日志 jar包依赖 pom.xml 12345678910111213<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId></dependency><depe

Spark Streaming整合log4j、Flume与Kafka的案例

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 来源:作者TAI_SPARK,http://suo.im/5w7LF8 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 1.框架 2.log4j完成模拟日志输出 设置模拟日志格式,log4j.properties: log4j.rootLogger = INFO,stdo

基于实际业务场景下的Flume部署

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 有这样一个场景,我们要基于某个web服务实时持续收集用户行为数据; 再实施方案前,我们做了以下的准备工作 (不细说) web服务端部署nginx,用于收集用户行为并有形成log (172.17.111.111)我们数据平台是部

记一种常用的实时数据同步方案:Canal+Kafka+Flume

记一种常用的实时数据同步方案:Canal+Kafka+Flume 在当今数据驱动的业务环境中,数据同步是确保系统间数据一致性的关键环节。一种高效、稳定且可扩展的数据同步方案对于支撑企业的数据处理和分析需求至关重要。本文将介绍一种结合了Canal、Kafka和Flume的数据同步方案,探讨其架构设计、实现原理以及为何它能在多种场景下提供卓越的性能。通过深入分析这一方案的组件和工作流程,我们将展示其

Spark实战(五)spark streaming + flume(Python版)

一、flume安装 (一)概述    Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等众多外部存储系统中,一般的采集需求,通过对flume的简单配置即可实现,Flume针对特殊场景也具备良好的自定义扩展能力,因此flume可以适用于大部分的日

flume日志采集方案

1、去官网下载flume.tar包,解压。 报:tar: apache-flume-1.8.0-bin/docs/team-list.html:时间戳 2017-09-15 20:47:53 是未来的 1708496.58232717 秒之后 顺手改下日期吧。 date -s "2019-08-27 19:58"。记得要加引号,不然报: date: 参数"19:58" 缺少前导的"+"; 2、

flume系列之:批量并行启动、停止、重启flume agent组

Flume系列之:批量并行启动、停止、重启flume agent组 批量启动flume agent组 批量启动flume agent组 import subprocessimport threadingdef run_command(command):process = subprocess.Popen(command, shell=True)process

大数据技术之Flume 企业开发案例——聚合(7)

目录 聚合 1)案例需求: 2)需求分析  3)实现步骤: 准备工作 创建 flume1-logger-flume.conf 创建 flume2-netcat-flume.conf 创建 flume3-flume-logger.conf 执行配置文件 聚合 1)案例需求: hadoop12 上的 Flume-1 监控文件 /opt/module/group.log,