flume source、sink、Channels测试

2024-05-03 23:32

本文主要是介绍flume source、sink、Channels测试,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

3.一个简单的例子

#设置配置文件

[root@cc-staging-loginmgr2 conf]# cat example.conf

# example.conf: A single-node Flume configuration



# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = netcat

a1.sources.r1.bind = localhost

a1.sources.r1.port = 44444



# Describe the sink

a1.sinks.k1.type = logger



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1



#命令参数说明

-c conf 指定配置目录为conf

-f conf/example.conf 指定配置文件为conf/example.conf

-n a1 指定agent名字为a1,需要与example.conf中的一致

-Dflume.root.logger=INFO,console 指定DEBUF模式在console输出INFO信息



#启动agent

cd /usr/local/apache-flume-1.3.1-bin

flume-ng agent -c conf -f conf/example.conf -n a1 -Dflume.root.logger=INFO,console



2013-05-24 00:00:09,288 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:150)] Source starting

2013-05-24 00:00:09,303 (lifecycleSupervisor-1-0) [INFO - org.apache.flume.source.NetcatSource.start(NetcatSource.java:164)] Created serverSocket:sun.nio.ch.ServerSocketChannelImpl[/127.0.0.1:44444]



#在另一个终端进行测试

[root@cc-staging-loginmgr2 conf]# telnet 127.0.0.1 44444

Trying 127.0.0.1...

Connected to localhost.localdomain (127.0.0.1).

Escape character is '^]'.

hello world!

OK



#在启动的终端查看console输出

2013-05-24 00:00:24,306 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 21 0D hello world!. }



#测试成功,flume可以正常使用





4. Flume Source测试

测试1:

avro source可以发送一个给定的文件给Flume,Avro 源使用AVRO RPC机制

#设置avro配置文件

[root@cc-staging-loginmgr2 conf]# cat avro.conf

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = avro

a1.sources.r1.channels = c1

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 4141



# Describe the sink

a1.sinks.k1.type = logger



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1



#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console



#创建指定文件

echo "hello world" > /usr/logs/log.10



#使用avro-client发送文件

flume-ng avro-client -c . -H localhost -p 4141 -F /usr/logs/log.10



#在启动的终端查看console输出

2013-05-27 01:11:45,852 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }



测试2:

Exec source runs a given Unix command on start-up and expects that process to continuously produce data on standard out



#修改的配置文件

[root@cc-staging-loginmgr2 conf]# cat exec.conf

# Describe/configure the source

a1.sources.r1.type = exec

a1.sources.r1.command = cat /usr/logs/log.10

a1.sources.r1.channels = c1





#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console



#追加内容到文件

echo "exec test" >> /usr/logs/log.10



#在启动的终端查看console输出

2013-05-27 01:50:12,825 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 68 65 6C 6C 6F 20 77 6F 72 6C 64 hello world }

2013-05-27 01:50:12,826 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 exec test }



#如果要使用tail命令,必选使得file足够大才能看到输出内容

a1.sources.r1.command = tail -F /usr/logs/log.10



#生成足够多的内容在文件里

for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done



#可以在console看到output

2013-05-27 19:17:18,157 (lifecycleSupervisor-1-1) [INFO - org.apache.flume.source.ExecSource.start(ExecSource.java:155)] Exec source starting with command:tail -n 5 -F /usr/logs/log.10

2013-05-27 19:19:50,334 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{} body: 65 78 65 63 20 74 65 73 74 37 exec test7 }



测试3:

Spooling directory source

This source lets you ingest data by dropping files in a spooling directory on disk. Unlike other asynchronous sources, this source avoids data loss even if Flume is restarted or fails.

SpoolSource:是监测配置的目录下新增的文件,并将文件中的数据读取出来。需要注意两点:1) 拷贝到spool目录下的文件不可以再打开编辑。



2) spool目录下不可包含相应的子目录


#修改的配置文件

[root@cc-staging-loginmgr2 conf]# cat spool.conf

# Describe/configure the source

a1.sources.r1.type = spooldir

a1.sources.r1.spoolDir = /usr/logs/flumeSpool

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1



#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f spool.conf -n a1 -Dflume.root.logger=INFO,console



#追加内容到spool目录

[root@cc-staging-loginmgr2 ~]# echo "spool test1" > /usr/logs/flumeSpool/spool1.log



#在启动的终端查看console输出

2013-05-27 22:49:06,098 (pool-4-thread-1) [INFO - org.apache.flume.client.avro.SpoolingFileLineReader.retireCurrentFile(SpoolingFileLineReader.java:229)] Preparing to move file /usr/logs/flumeSpool/spool1.log to /usr/logs/flumeSpool/spool1.log.COMPLETED

2013-05-27 22:49:06,101 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{file=/usr/logs/flumeSpool/spool1.log} body: 73 70 6F 6F 6C 20 74 65 73 74 31 spool test1 }



测试4

Netcat source 参见第3部分一个简单的例子



测试5

Syslog tcp source



#修改的配置文件

[root@cc-staging-loginmgr2 conf]# cat syslog.conf

# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1



#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f syslog.conf -n a1 -Dflume.root.logger=INFO,console



#测试产生syslog, <37>因为需要wire format数据,否则会报错” Failed to extract syslog wire entry”

echo "<37>hello via syslog" | nc localhost 5140



#在启动的终端查看console输出

2013-05-27 23:39:10,755 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 73 79 73 6C 6F 67 hello via syslog }



#UDP需要修改配置文件

a1.sources.r1.type = syslogudp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1



#测试产生syslog

echo "<37>hello via syslog" | nc -u localhost 5140



测试6

HTTP source JSONHandler



#修改的配置文件

[root@cc-staging-loginmgr2 conf]# cat post.conf

# Describe/configure the source

a1.sources = r1

a1.channels = c1

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1



#启动flume agent a1

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f post.conf -n a1 -Dflume.root.logger=INFO,console



#生成JSON 格式的POST request

curl -X POST -d '[{ "headers" :{"namenode" : "namenode.example.com","datanode" : "random_datanode.example.com"},"body" : "really_random_body"}]' http://localhost:5140



#在启动的终端查看console输出

2013-05-28 01:17:47,186 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{namenode=namenode.example.com, datanode=random_datanode.example.com} body: 72 65 61 6C 6C 79 5F 72 61 6E 64 6F 6D 5F 62 6F really_random_bo }






5.flume sink 测试

测试1  #hdfs sink
Using this sink requires Hadoop to be installed so that Flume can use the Hadoop jars to communicate with the HDFS cluster
需要安装hadoop


在/usr/local/apache-flume-1.3.1-bin/conf/flume-env.sh加入
export HADOOP_HOME=/usr/local/hadoop

#修改配置文件
a1.sources.r1.type = syslogtcp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1

a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/
a1.sinks.k1.hdfs.filePrefix = Syslog
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

#启动flume agent a1
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f hdfs.conf -n a1 -Dflume.root.logger=INFO,console

#测试产生syslog
echo "<37> hello via syslog to hdfs testing one " | nc -u localhost 5140

#在启动的终端查看console输出,文件生成成功
2013-05-29 00:53:58,078 (hdfs-k1-call-runner-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected//Syslog.1369814037714.tmp
2013-05-29 00:54:28,220 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714.tmp to hdfs://master:9000/user/hadoop/flume/collected/Syslog.1369814037714

#在hadoop上查看文件
./hadoop dfs -cat hdfs://172.25.4.35:9000/user/hadoop/flume/collected/Syslog.1369814037714
SEQ!org.apache.hadoop.io.LongWritable"org.apache.hadoop.io.BytesWritable^;>Gv$hello  via syslog to hdfs testing one

#修改配置文件以时间形式自动生成目录
a1.sources.r1.type = org.apache.flume.source.http.HTTPSource
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 5140
a1.sources.r1.channels = c1

# Describe the sink
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = hdfs://master:9000/user/hadoop/flume/collected/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = Syslog.%{host}
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute

#生成JSON 格式的POST request,  header的timestamp 参数如果格式不对则无法解析
需要生成13为的timestamp才能解析出正确的时间,包含MilliSec
#linux生成当前时间10位Unix timestamp
date +%s
#linux生成当前时间13位Unix timestamp
date +%s%N|awk '{print substr($0,1,13)}'

curl -X POST -d '[{ "headers":{"timestamp":"1369818213654","host":"cc-staging-loginmgr2"},"body": "hello via post"}]' http://localhost:5140

#在启动的终端查看console输出,文件生成成功
2013-05-29 02:03:38,646 (hdfs-k1-call-runner-4) [INFO - org.apache.flume.sink.hdfs.BucketWriter.doOpen(BucketWriter.java:208)] Creating hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp
2013-05-29 02:04:08,714 (hdfs-k1-roll-timer-0) [INFO - org.apache.flume.sink.hdfs.BucketWriter.renameBucket(BucketWriter.java:427)] Renaming hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614.tmp to hdfs://master:9000/user/hadoop/flume/collected/2013-05-29/0203/cc-staging-loginmgr2..1369818218614

#在hadoop上查看文件
./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/ 2013-05-29 /0203
Found 1 items
-rw-r--r-- 3 root supergroup 129 2013-05-29 02:04 /user/hadoop/flume/collected/ 2013-05-29/0203 /cc-staging-loginmgr2..1369818218614

#测试2  logger sink
Logs event at INFO level. Typically useful for testing/debugging purpose

#测试3  Avro sink
Flume events sent to this sink are turned into Avro events and sent to the configured hostname / port pair

#Avro Source配置文件
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4545

#Avro Sink配置文件
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 172.25.4.23
a1.sinks.k1.port = 4545

#先启动Avro的Source,监听端口
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f avro.conf -n a1 -Dflume.root.logger=INFO,console

#再启动Avro的Sink
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f avro_sink.conf -n a1 -Dflume.root.logger=INFO,console

#可以看到已经建立连接
2013-06-02 19:23:00,237 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x7a0e28bf, /172.25.4.32:14894 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:14894

#在Avro Sink上生成测试log
echo "<37>hello via avro sink" | nc localhost 5140

#在Avro Source上可以看到log已经生成
2013-06-02 19:24:13,740 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 61 76 72 6F 20 73 hello via avro s }

#测试4  File Roll Sink
Stores events on the local filesystem

#修改配置文件
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume

#启动file roll 配置文件
cd /usr/local/apache-flume-1.3.1-bin/conf
flume-ng agent -c . -f file_roll.conf -n a1 -Dflume.root.logger=INFO,console

#生成测试log
echo "<37>hello via file roll" | nc localhost 5140
echo "<37>hello via file roll 2" | nc localhost 5140

#查看/var/log/flume下是否生成文件,默认每30秒生成一个新文件
-rw-r--r-- 1 root root 20 Jun 2 19:44 1370227443397-1
-rw-r--r-- 1 root root 0 Jun 2 19:44 1370227443397-2
-rw-r--r-- 1 root root 22 Jun 2 19:45 1370227443397-3

cat 1370227443397-1 1370227443397-3
hello via file roll
hello via file roll 2






6.Flume Channels测试

#Memory Channel

The events are stored in a an in-memory queue with configurable max size. It’s ideal for flow that needs higher throughput and prepared to lose the staged data in the event of a agent failures



#flume channel selectors

# Replicating Channel Selector通道复制测试

#2个channel和2个sink的配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.selector.type = replicating

a1.sources.r1.channels = c1 c2



# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545



a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100



#查看是否都建立了连接

2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] BOUND: /172.25.4.23:4545

2013-06-04 00:01:53,467 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x122a0fad, /172.25.4.32:55518 => /172.25.4.23:4545] CONNECTED: /172.25.4.32:55518



2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] BOUND: /172.25.4.33:4545

2013-06-04 00:01:53,773 (pool-5-thread-1) [INFO - org.apache.avro.ipc.NettyServer$NettyServerAvroHandler.handleUpstream(NettyServer.java:171)] [id: 0x021881a7, /172.25.4.32:23731 => /172.25.4.33:4545] CONNECTED: /172.25.4.32:23731



#生成测试log

echo "<37> hello via channel selector " | nc localhost 5140



#查看2个sink是否得到数据

2013-06-04 00:02:06,479 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65  hello via channe  }



2013-06-04 00:02:09,788 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 68 65 6C 6C 6F 20 76 69 61 20 63 68 61 6E 6E 65  hello via channe  }



#flume channel selectors

# Multiplexing Channel Selector 通道复用测试

#2个channel和2个sink的配置文件

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2



# Describe/configure the source

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.host = 0.0.0.0

a1.sources.r1.selector.type = multiplexing

a1.sources.r1.channels = c1 c2



a1.sources.r1.selector.header = state

a1.sources.r1.selector.mapping.CZ = c1

a1.sources.r1.selector.mapping.US = c2

a1.sources.r1.selector.default = c1



# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545



a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100



#根据配置文件生成测试的header 为state的POST请求

curl -X POST -d '[{ "headers" :{" state " : " CZ "},"body" : "TEST1"}]' http://localhost:5140

curl -X POST -d '[{ "headers" :{" state " : " US "},"body" : "TEST2"}]' http://localhost:5140

curl -X POST -d '[{ "headers" :{" state " : " SH "},"body" : "TEST3"}]' http://localhost:5140



#查看2个sink得到数据是否和配置文件一致

Sink1:

2013-06-04 23:45:35,296 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{ state=CZ } body: 54 45 53 54 31 TEST1 }

2013-06-04 23:45:50,309 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{state=SH} body: 54 45 53 54 33 TEST3 }



Sink2:

2013-06-04 23:45:42,293 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{ state=US } body: 54 45 53 54 32 TEST2 }





7.Flume Sink Processors测试

#Failover Sink Processor

Failover Sink Processor maintains a prioritized list of sinks, guaranteeing that so long as one is available events will be processed (delivered)

#配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1 c2



a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = failover

a1.sinkgroups.g1.processor.priority.k1 = 5

a1.sinkgroups.g1.processor.priority.k2 = 10

a1.sinkgroups.g1.processor.maxpenalty = 10000



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.selector.type = replicating

a1.sources.r1.channels = c1 c2



# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545



a1.sinks.k2.type = avro

a1.sinks.k2.channel = c2

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545

# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



a1.channels.c2.type = memory

a1.channels.c2.capacity = 1000

a1.channels.c2.transactionCapacity = 100



#生成测试log

echo "<37> test1 failover " | nc localhost 5140



#在sink2上产生log,sink1由于优先级小,没有产生

2013-06-05 00:10:51,194 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72  test1 failover  }



#主动关闭sink2,再次生成测试log

echo "<37> test2 failover " | nc localhost 5140



#在sink1上会同时生成test1和test2

2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 31 20 66 61 69 6C 6F 76 65 72  test1 failover  }

2013-06-05 00:11:14,312 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 66 61 69 6C 6F 76 65 72  test2 failover  }



#再次打开sink2,log会根据优先级再到sink2上

echo "<37> test4 failover " | nc localhost 5140

echo "<37> test5 failover " | nc localhost 5140



2013-06-05 00:12:33,071 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 66 61 69 6C 6F 76 65 72  test4 failover  }

2013-06-05 00:12:55,088 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 66 61 69 6C 6F 76 65 72  test5 failover  }



#Load balancing Sink Processor测试

Load balancing sink processor provides the ability to load-balance flow over multiple sinks. It maintains an indexed list of active sinks on which the load must be distributed.



#配置文件,注:load balance type下必须指定同一个channel到不同的sinks,否则不生效

# Name the components on this agent

a1.sources = r1

a1.sinks = k1 k2

a1.channels = c1



a1.sinkgroups = g1

a1.sinkgroups.g1.sinks = k1 k2

a1.sinkgroups.g1.processor.type = load_balance

a1.sinkgroups.g1.processor.backoff = true

a1.sinkgroups.g1.processor.selector = round_robin



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1



# Describe the sink

a1.sinks.k1.type = avro

a1.sinks.k1.channel = c1

a1.sinks.k1.hostname = 172.25.4.23

a1.sinks.k1.port = 4545



a1.sinks.k2.type = avro

a1.sinks.k2.channel = c1

a1.sinks.k2.hostname = 172.25.4.33

a1.sinks.k2.port = 4545



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



#生成4个测试log

[root@cc-staging-loginmgr2 ~]# echo "<37> test2 loadbalance " | nc localhost 5140

[root@cc-staging-loginmgr2 ~]# echo "<37> test3 loadbalance " | nc localhost 5140

[root@cc-staging-loginmgr2 ~]# echo "<37> test4 loadbalance " | nc localhost 5140

[root@cc-staging-loginmgr2 ~]# echo "<37> test5 loadbalance " | nc localhost 5140



#查看sink输出结果是否为轮询模式

Sink1:

2013-06-06 01:36:03,516 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 32 20 6C 6F 61 64 62 61 6C 61 6E 63  test2 loadbalanc  }

2013-06-06 01:36:09,769 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 34 20 6C 6F 61 64 62 61 6C 61 6E 63  test4 loadbalanc  }



Sink2:

2013-06-06 01:36:05,809 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 33 20 6C 6F 61 64 62 61 6C 61 6E 63  test3 loadbalanc  }

2013-06-06 01:36:37,057 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4} body: 74 65 73 74 35 20 6C 6F 61 64 62 61 6C 61 6E 63  test5 loadbalanc  }




8. Event Serializers测试


Body Text Serializer

Alias: text. This interceptor writes the body of the event to an output stream without any transformation or modification(把body中的内容变成文本内容)



#配置文件

a1.sources.r1.type = org.apache.flume.source.http.HTTPSource

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1



# Describe the sink

a1.sinks.k1.type = file_roll

a1.sinks.k1.channel = c1

a1.sinks.k1.sink.directory = /var/log/flume

a1.sinks.k1.sink.serializer = text

a1.sinks.k1.sink.serializer.appendNewline = false



#生成测试log

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : " TEST1 BODY TEXT "}]' http://localhost:5140

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : " TEST2 BODY TEXT "}]' http://localhost:5140

curl -X POST -d '[{ "headers" :{"host":"cc-staging-loginmgr2"},"body" : " TEST3 BODY TEXT "}]' http://localhost:5140



#查看file roll 文件中的文本内容

cat /var/log/flume/1370675739270-1

TEST1 BODY TEXT


TEST2 BODY TEXT


TEST3 BODY TEXT



#Avro Event Serializer

Alias: avro_event. This interceptor serializes Flume events into an Avro container file

把flume event变成avro 中包含的文件








9.Flume Interceptors测试

Timestamp Interceptor

This interceptor inserts into the event headers, the time in millis at which it processes the event. This interceptor inserts a header with key timestamp whose value is the relevant timestamp



Host Interceptor

This interceptor inserts the hostname or IP address of the host that this agent is running on. It inserts a header with key host or a configured key whose value is the hostname or IP address of the host



#配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.bind = 0.0.0.0

a1.sources.r1.port = 5140

a1.sources.r1.channels = c1



a1.sources.r1.interceptors = i1 i2

a1.sources.r1.interceptors.i1.preserveExisting = false

a1.sources.r1.interceptors.i1.type = timestamp

a1.sources.r1.interceptors.i2.type = host

a1.sources.r1.interceptors.i2.hostHeader = hostname

a1.sources.r1.interceptors.i2.useIP = false



# Describe the sink

a1.sinks.k1.type = hdfs

a1.sinks.k1.channel = c1

a1.sinks.k1.hdfs.path = hdfs://master:9000/user/Hadoop/flume/collected/%Y-%m-%d/%H%M

a1.sinks.k1.hdfs.filePrefix = %{hostname}.



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1



#启动agent

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console



#生成测试log

echo "<37>test dynamic interceptor" | nc localhost 5140



#查看hdfs生成的文件,可以看到timestamp和hostname都已经生成在header里面,可以根据自定义的格式生成文件夹

./hadoop dfs -ls hdfs://172.25.4.35:9000/user/hadoop/flume/collected/ 2013-06-16/2331 /

Found 1 items

-rw-r--r-- 3 root supergroup 140 2013-06-16 23:32 /user/hadoop/flume/collected/2013-06-16/2331/ cc-staging-loginmgr2 ..1371450697118



Static Interceptor

Static interceptor allows user to append a static header with static value to all events



#配置文件

# Name the components on this agent

a1.sources = r1

a1.sinks = k1

a1.channels = c1



# Describe/configure the source

a1.sources.r1.type = syslogtcp

a1.sources.r1.port = 5140

a1.sources.r1.host = localhost

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

a1.sources.r1.interceptors.i1.key = datacenter


a1.sources.r1.interceptors.i1.value = NEW_YORK



# Describe the sink

a1.sinks.k1.type = logger



# Use a channel which buffers events in memory

a1.channels.c1.type = memory

a1.channels.c1.capacity = 1000

a1.channels.c1.transactionCapacity = 100



# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1



#启动agent

cd /usr/local/apache-flume-1.3.1-bin/conf

flume-ng agent -c . -f dynamic_intercept.conf -n a1 -Dflume.root.logger=INFO,console



#生成测试log

echo "<37> test1 static interceptor " | nc localhost 5140



#查看console输出结果

2013-06-17 00:15:38,453 (SinkRunner-PollingRunner-DefaultSinkProcessor) [INFO - org.apache.flume.sink.LoggerSink.process(LoggerSink.java:70)] Event: { headers:{Severity=5, Facility=4, datacenter=NEW_YORK } body: 74 65 73 74 31 20 73 74 61 74 69 63 20 69 6E 74 test1 static int }





10. zabbix监控Flume

#JVM性能监控

Young GC counts

sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $6}'



Full GC counts

sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java)|tail -1|awk '{print $8}'



JVM total memory usage

sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $3}'



JVM total instances usage

sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java)|grep Total|awk '{print $2}'



#flume应用参数监控

启动时加上JSON repoting参数,这样就可以通过http://localhost:34545/metrics访问

flume-ng agent -c . -f exec.conf -n a1 -Dflume.root.logger=INFO,console  -Dflume.monitoring.type=http -Dflume.monitoring.port=34545



#生成一些数据

for i in {1..100};do echo "exec test$i" >> /usr/logs/log.10;echo $i;done



#通过shell脚本对JSON输出进行排版

[root@cc-staging-loginmgr2 conf]# curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'



CHANNEL.c1:

EventPutSuccessCount:100

ChannelFillPercentage:0.0

Type:CHANNEL

StopTime:0

EventPutAttemptCount:100

ChannelSize:0

StartTime:1371709073310

EventTakeSuccessCount:100

ChannelCapacity:1000

EventTakeAttemptCount:115



#配置监控flume的脚本文件

[root@cc-staging-loginmgr2 conf]#cat /opt/scripts/monitor_flume.sh

curl http://localhost:34545/metrics 2>/dev/null|sed -e 's/\([,]\)\s*/\1\n/g' -e 's/[{}]/\n/g' -e 's/[",]//g'|grep $1|awk -F: '{print $2}'



#在zabbix agent配置文件进行部署

cat /etc/zabbix/zabbix_agentd/zabbix_agentd.userparams.conf

UserParameter=ygc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $6}'

UserParameter=fgc.counts,sudo /usr/local/jdk1.7.0_21/bin/jstat -gcutil $(pgrep java|head -1)|tail -1|awk '{print $8}'

UserParameter=jvm.memory.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $3}'

UserParameter=jvm.instances.usage,sudo /usr/local/jdk1.7.0_21/bin/jmap -histo $(pgrep java|head -1)|grep Total|awk '{print $2}'

UserParameter=flume.monitor,/bin/bash /opt/scripts/monitor_flume.sh $1

这篇关于flume source、sink、Channels测试的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/957889

相关文章

SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程

《SpringBoot中整合RabbitMQ(测试+部署上线最新完整)的过程》本文详细介绍了如何在虚拟机和宝塔面板中安装RabbitMQ,并使用Java代码实现消息的发送和接收,通过异步通讯,可以优化... 目录一、RabbitMQ安装二、启动RabbitMQ三、javascript编写Java代码1、引入

Nginx设置连接超时并进行测试的方法步骤

《Nginx设置连接超时并进行测试的方法步骤》在高并发场景下,如果客户端与服务器的连接长时间未响应,会占用大量的系统资源,影响其他正常请求的处理效率,为了解决这个问题,可以通过设置Nginx的连接... 目录设置连接超时目的操作步骤测试连接超时测试方法:总结:设置连接超时目的设置客户端与服务器之间的连接

如何测试计算机的内存是否存在问题? 判断电脑内存故障的多种方法

《如何测试计算机的内存是否存在问题?判断电脑内存故障的多种方法》内存是电脑中非常重要的组件之一,如果内存出现故障,可能会导致电脑出现各种问题,如蓝屏、死机、程序崩溃等,如何判断内存是否出现故障呢?下... 如果你的电脑是崩溃、冻结还是不稳定,那么它的内存可能有问题。要进行检查,你可以使用Windows 11

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

【测试】输入正确用户名和密码,点击登录没有响应的可能性原因

目录 一、前端问题 1. 界面交互问题 2. 输入数据校验问题 二、网络问题 1. 网络连接中断 2. 代理设置问题 三、后端问题 1. 服务器故障 2. 数据库问题 3. 权限问题: 四、其他问题 1. 缓存问题 2. 第三方服务问题 3. 配置问题 一、前端问题 1. 界面交互问题 登录按钮的点击事件未正确绑定,导致点击后无法触发登录操作。 页面可能存在

业务中14个需要进行A/B测试的时刻[信息图]

在本指南中,我们将全面了解有关 A/B测试 的所有内容。 我们将介绍不同类型的A/B测试,如何有效地规划和启动测试,如何评估测试是否成功,您应该关注哪些指标,多年来我们发现的常见错误等等。 什么是A/B测试? A/B测试(有时称为“分割测试”)是一种实验类型,其中您创建两种或多种内容变体——如登录页面、电子邮件或广告——并将它们显示给不同的受众群体,以查看哪一种效果最好。 本质上,A/B测

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

Verybot之OpenCV应用一:安装与图像采集测试

在Verybot上安装OpenCV是很简单的,只需要执行:         sudo apt-get update         sudo apt-get install libopencv-dev         sudo apt-get install python-opencv         下面就对安装好的OpenCV进行一下测试,编写一个通过USB摄像头采

BIRT 报表的自动化测试

来源:http://www.ibm.com/developerworks/cn/opensource/os-cn-ecl-birttest/如何为 BIRT 报表编写自动化测试用例 BIRT 是一项很受欢迎的报表制作工具,但目前对其的测试还是以人工测试为主。本文介绍了如何对 BIRT 报表进行自动化测试,以及在实际项目中的一些测试实践,从而提高了测试的效率和准确性 -------