flume文件名interceptor

2024-06-16 20:08
文章标签 文件名 interceptor flume

本文主要是介绍flume文件名interceptor,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

从文件名提取日期、小时信息,决定数据发送到hdfs哪天哪小时的分区目录。

需要自定义一个拦截器

package interceptor;import java.util.List;  
import java.util.Map;  
import java.util.regex.Matcher;  
import java.util.regex.Pattern;  import org.apache.commons.lang.StringUtils;  
import org.apache.flume.Context;  
import org.apache.flume.Event;
import org.apache.flume.interceptor.Interceptor;
import org.apache.flume.interceptor.RegexExtractorInterceptorPassThroughSerializer;  
import org.apache.flume.interceptor.RegexExtractorInterceptorSerializer;  
import org.slf4j.Logger;  
import org.slf4j.LoggerFactory;  import com.google.common.base.Charsets;  
import com.google.common.base.Preconditions;  
import com.google.common.base.Throwables;  
import com.google.common.collect.Lists; /*** Interceptor that extracts matches using a specified regular expression and* appends the matches to the event headers using the specified serializers</p>* Note that all regular expression matching occurs through Java's built in* java.util.regex package</p>. Properties:* <p>* regex: The regex to use* <p>* serializers: Specifies the group the serializer will be applied to, and the* name of the header that will be added. If no serializer is specified for a* group the default {@link RegexExtractorInterceptorPassThroughSerializer} will* be used* <p>* Sample config:* <p>* agent.sources.r1.channels = c1* <p>* agent.sources.r1.type = SEQ* <p>* agent.sources.r1.interceptors = i1* <p>* agent.sources.r1.interceptors.i1.type = REGEX_EXTRACTOR* <p>* agent.sources.r1.interceptors.i1.regex = (WARNING)|(ERROR)|(FATAL)* <p>* agent.sources.r1.interceptors.i1.serializers = s1 s2* agent.sources.r1.interceptors.i1.serializers.s1.type = com.blah.SomeSerializer* agent.sources.r1.interceptors.i1.serializers.s1.name = warning* agent.sources.r1.interceptors.i1.serializers.s2.type = org.apache.flume.interceptor.RegexExtractorInterceptorTimestampSerializer* agent.sources.r1.interceptors.i1.serializers.s2.name = error* agent.sources.r1.interceptors.i1.serializers.s2.dateFormat = yyyy-MM-dd* </code>* </p>* <pre>* Example 1:* </p>* EventBody: 1:2:3.4foobar5</p> Configuration:* agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)* </p>* agent.sources.r1.interceptors.i1.serializers = s1 s2 s3* agent.sources.r1.interceptors.i1.serializers.s1.name = one* agent.sources.r1.interceptors.i1.serializers.s2.name = two* agent.sources.r1.interceptors.i1.serializers.s3.name = three* </p>* results in an event with the the following** body: 1:2:3.4foobar5 headers: one=>1, two=>2, three=3** Example 2:** EventBody: 1:2:3.4foobar5** Configuration: agent.sources.r1.interceptors.i1.regex = (\\d):(\\d):(\\d)* <p>* agent.sources.r1.interceptors.i1.serializers = s1 s2* agent.sources.r1.interceptors.i1.serializers.s1.name = one* agent.sources.r1.interceptors.i1.se

这篇关于flume文件名interceptor的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1067423

相关文章

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

批处理以当前时间为文件名创建文件

批处理以当前时间为文件名创建文件 批处理创建空文件 有时候,需要创建以当前时间命名的文件,手动输入当然可以,但是有更省心的方法吗? 假设我是 windows 操作系统,打开命令行。 输入以下命令试试: echo %date:~0,4%_%date:~5,2%_%date:~8,2%_%time:~0,2%_%time:~3,2%_%time:~6,2% 输出类似: 2019_06

flume系列之:记录一次flume agent进程被异常oom kill -9的原因定位

flume系列之:记录一次flume agent进程被异常oom kill -9的原因定位 一、背景二、定位问题三、解决方法 一、背景 flume系列之:定位flume没有关闭某个时间点生成的tmp文件的原因,并制定解决方案在博主上面这篇文章的基础上,在机器内存、cpu资源、flume agent资源都足够的情况下,flume agent又出现了tmp文件无法关闭的情况 二、

java读取resource/通过文件名获取文件类型

java读取resource java读取resource目录下文件的方法: 借助Guava库的Resource类 Resources.getResource("test.txt") 通过文件名获取文件类型 mongodb java

关于WebZip乱码目录文件名修改

先引用一段 来描述问题:  在IT行业中,我们经常遇到与编码和字符集有关的问题,特别是在处理包含中文字符的文件或目录时。"WebZip乱码目录文件名修改"这个问题就是一个典型的例子,涉及到Webzip工具在下载包含中文路径的文件时出现的编码问题。Webzip是一款用于网站离线浏览的工具,它能够抓取整个网站并保存到本地,以便在没有网络连接的情况下访问。然而,当Webzip处理含有非

Interceptor拦截器无法拦截根目录的解决方法

今天发现了一个bug,首页home.jsp的某一个值是通过拦截器拦截所有页面,然后赋值的,然而我们的首页是通过index.jsp直接引用首页home.jsp代码(如下),拦截器无法拦截。 <%@ include file="./WEB-INF/jsp/home.jsp" %> 首先,第一个解决方法就是,将首页的引用文件改为跳转即可 <html><head><meta http-equiv

打通实时流处理log4j-flume-kafka-structured-streaming

大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 模拟产生log4j日志 jar包依赖 pom.xml 12345678910111213<dependency><groupId>log4j</groupId><artifactId>log4j</artifactId></dependency><depe

Spark Streaming整合log4j、Flume与Kafka的案例

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 来源:作者TAI_SPARK,http://suo.im/5w7LF8 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 1.框架 2.log4j完成模拟日志输出 设置模拟日志格式,log4j.properties: log4j.rootLogger = INFO,stdo

基于实际业务场景下的Flume部署

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 有这样一个场景,我们要基于某个web服务实时持续收集用户行为数据; 再实施方案前,我们做了以下的准备工作 (不细说) web服务端部署nginx,用于收集用户行为并有形成log (172.17.111.111)我们数据平台是部

过滤器(Filter)和拦截器(Interceptor)

在Web开发中,过滤器(Filter)和拦截器(Interceptor)都是重要的组件,它们都可以对HTTP请求进行预处理、后处理以及一些额外的操作。然而,它们之间在多个方面存在明显的区别 1. 运行位置 过滤器(Filter):运行在Web服务器和Servlet容器之间的组件,可以拦截所有进出该容器的请求和响应。过滤器是Servlet规范的一部分,不依赖于特定的框架。拦截器(Intercep