Flume_Flume常用配置5_header + filter taildir.source_memory.channel_hdfs.sink

2024-05-03 06:32

本文主要是介绍Flume_Flume常用配置5_header + filter taildir.source_memory.channel_hdfs.sink,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

以下配置基于版本 apache-flume-1.8.0-bin


我们假定已经对Flume有一定了解,并且对Flume 的各个组件有一定了解。

我们演示一个基本的 
source  为 taildir源
channel 为 memory
sink 为 hdfs 类型
的配置示例:


上一个配置中,我们对spooldir 源进行了简单的讲解,也提出了spooldir 中存在的问题,这一章我们对 1.8新推出的

taildir 进行下讲解,taildir 可以完美解决 spooldir 中存在的问题。


我们对 taildir 的原理进行下简单的介绍,

taildir 对一个目录进行监测,目录不支持 正则,等表达式,

会生成一个json文件,其中记录了每个文件的消费偏移量。

1. 宕机后,会从偏移量继续消费 ,所以一般不存在数据重复发送的问题。


但是该功能为 1.8 的实验性功能,目前只支持linux 环境,下面是示例配置

我们在解压好的目录下创建 2个子目录  my-conf, my-bin

my-conf 存放了 对 agent (source, sink, channel) 的配置

my-bin 存放了  agent 的启动脚本


在上一篇的基础上,我们增加了 过滤器 文件压缩 的配置


这里我们要说一说文件压缩,

如果我们配置了文件压缩,在sink 中 就不需要配置后缀了,因为文件压缩默认会有一个后缀,所以我们不需要额外的后缀



my-conf



my-bin



配置文件

my-conf/flume-taildir-memory-hdfs_withhead-codec.properties

# example.conf: A single-node Flume configuration# Name the components on this agent
hdfs_agent.sources = r1
hdfs_agent.sinks = k1
hdfs_agent.channels = c1# Describe/configure the source
hdfs_agent.sources.r1.type = TAILDIR
hdfs_agent.sources.r1.filegroups = f1
hdfs_agent.sources.r1.filegroups.f1 = /tmp/logs/tailDir/.*\.log
hdfs_agent.sources.r1.positionFile = /tmp/logs/tailDir/.flume/taildir_position.jsonhdfs_agent.sources.r1.interceptors = i1 i2 i3#拦截器配置
hdfs_agent.sources.r1.interceptors.i1.type = timestamp
hdfs_agent.sources.r1.interceptors.i1.preserveExisting = truehdfs_agent.sources.r1.interceptors.i2.type = host
hdfs_agent.sources.r1.interceptors.i2.preserveExisting = truehdfs_agent.sources.r1.interceptors.i3.type = static
hdfs_agent.sources.r1.interceptors.i3.key = country
hdfs_agent.sources.r1.interceptors.i3.value = China# Describe the sink
hdfs_agent.sinks.k1.type = hdfs
hdfs_agent.sinks.k1.hdfs.path = hdfs://192.168.75.128:9000/test/flume/hdfs_filegroups_source_header_codec/%{host}/%Y-%m-%d/
#文件转存属性
hdfs_agent.sinks.k1.hdfs.rollInterval = 3600
hdfs_agent.sinks.k1.hdfs.rollSize = 1048576
hdfs_agent.sinks.k1.hdfs.rollCount = 20
#文件的名字
hdfs_agent.sinks.k1.hdfs.filePrefix = %{host}_%{country}_log_%Y%m%d_%H
#当设置压缩属性的时候,不应该指定 文件后缀,否则会覆盖默认的压缩格式的后缀
#hdfs_agent.sinks.k1.hdfs.fileSuffix = .txt
#使得上面按天分目录的设置起作用
hdfs_agent.sinks.k1.hdfs.useLocalTimeStamp = true
#序列化方式
hdfs_agent.sinks.k1.hdfs.serializer = text_with_headers
#输出方式
hdfs_agent.sinks.k1.hdfs.codeC = bzip2
#输出方式
hdfs_agent.sinks.k1.hdfs.fileType = CompressedStream
#下面属性 只针对于 avro_event
#hdfs_agent.sinks.k1.hdfs.serializer.compressionCodec = bzip2# Use a channel which buffers events in memory
hdfs_agent.channels.c1.type = memory
hdfs_agent.channels.c1.capacity = 1000
hdfs_agent.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
hdfs_agent.sources.r1.channels = c1
hdfs_agent.sinks.k1.channel = c1

执行脚本

my-bin/start_taildir_memory_hdfs_withhead_codec.sh

#!/bin/bashROOT_PATH=$(dirname $(dirname $(readlink -f $0)))
cd $ROOT_PATHbin/flume-ng agent --conf ./conf/ -f my-conf/flume-taildir-memory-hdfs_withhead-codec.properties -Dflume.root.logger=INFO,console -n hdfs_agent



这篇关于Flume_Flume常用配置5_header + filter taildir.source_memory.channel_hdfs.sink的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/956049

相关文章

Spring配置扩展之JavaConfig的使用小结

《Spring配置扩展之JavaConfig的使用小结》JavaConfig是Spring框架中基于纯Java代码的配置方式,用于替代传统的XML配置,通过注解(如@Bean)定义Spring容器的组... 目录JavaConfig 的概念什么是JavaConfig?为什么使用 JavaConfig?Jav

Spring Boot Interceptor的原理、配置、顺序控制及与Filter的关键区别对比分析

《SpringBootInterceptor的原理、配置、顺序控制及与Filter的关键区别对比分析》本文主要介绍了SpringBoot中的拦截器(Interceptor)及其与过滤器(Filt... 目录前言一、核心功能二、拦截器的实现2.1 定义自定义拦截器2.2 注册拦截器三、多拦截器的执行顺序四、过

springboot的controller中如何获取applicatim.yml的配置值

《springboot的controller中如何获取applicatim.yml的配置值》本文介绍了在SpringBoot的Controller中获取application.yml配置值的四种方式,... 目录1. 使用@Value注解(最常用)application.yml 配置Controller 中

springboot中配置logback-spring.xml的方法

《springboot中配置logback-spring.xml的方法》文章介绍了如何在SpringBoot项目中配置logback-spring.xml文件来进行日志管理,包括如何定义日志输出方式、... 目录一、在src/main/resources目录下,也就是在classpath路径下创建logba

解决idea启动项目报错java: OutOfMemoryError: insufficient memory

《解决idea启动项目报错java:OutOfMemoryError:insufficientmemory》:本文主要介绍解决idea启动项目报错java:OutOfMemoryError... 目录原因:解决:总结 原因:在Java中遇到OutOfMemoryError: insufficient me

MyBatis配置文件中最常用的设置

《MyBatis配置文件中最常用的设置》文章主要介绍了MyBatis配置的优化方法,包括引用外部的properties配置文件、配置外置以实现环境解耦、配置文件中最常用的6个核心设置以及三种常用的Ma... 目录MyBATis配置优化mybatis的配置中引用外部的propertis配置文件⚠️ 注意事项X

JavaWeb 中的 Filter组件详解

《JavaWeb中的Filter组件详解》本文详细介绍了JavaWeb中的Filter组件,包括其基本概念、工作原理、核心接口和类、配置方式以及常见应用示例,Filter可以实现请求预处理、响应后... 目录JavaWeb 中的 Filter 详解1. Filter 基本概念1.1 什么是 Filter1.

C++多线程开发环境配置方法

《C++多线程开发环境配置方法》文章详细介绍了如何在Windows上安装MinGW-w64和VSCode,并配置环境变量和编译任务,使用VSCode创建一个C++多线程测试项目,并通过配置tasks.... 目录下载安装 MinGW-w64下载安装VS code创建测试项目配置编译任务创建 tasks.js

Nginx概念、架构、配置与虚拟主机实战操作指南

《Nginx概念、架构、配置与虚拟主机实战操作指南》Nginx是一个高性能的HTTP服务器、反向代理服务器、负载均衡器和IMAP/POP3/SMTP代理服务器,它支持高并发连接,资源占用低,功能全面且... 目录Nginx 深度解析:概念、架构、配置与虚拟主机实战一、Nginx 的概念二、Nginx 的特点

一文详解Java常用包有哪些

《一文详解Java常用包有哪些》包是Java语言提供的一种确保类名唯一性的机制,是类的一种组织和管理方式、是一组功能相似或相关的类或接口的集合,:本文主要介绍Java常用包有哪些的相关资料,需要的... 目录Java.langjava.utiljava.netjava.iojava.testjava.sql