大数据技术之Flume 企业开发案例——聚合(7)

2024-08-28 21:52

本文主要是介绍大数据技术之Flume 企业开发案例——聚合(7),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

聚合

1)案例需求:

2)需求分析 

3)实现步骤:

准备工作

创建 flume1-logger-flume.conf

创建 flume2-netcat-flume.conf

创建 flume3-flume-logger.conf

执行配置文件


聚合

1)案例需求:

  • hadoop12 上的 Flume-1 监控文件 /opt/module/group.log
  • hadoop13 上的 Flume-2 监控某个端口的数据流,
  • Flume-1 与 Flume-2 将数据发送给 hadoop14 上的 Flume-3Flume-3 将最终数据打印到控制台。

2)需求分析 

多数据源汇总案例

 

3)实现步骤:

  1. 准备工作
    • 分发 Flume

    • [lzl@hadoop12 module]$ xsync flumexsync 是集群同步文件脚本,也就是在一台服务器分发文件给其他台服务器,脚本内容如下:
      #!/bin/bash
      #1. 判断参数个数
      if [ $# -lt 1 ]
      thenecho Not Enough Arguement!exit;
      fi
      #2. 遍历集群所有机器
      for host in hadoop12 hadoop13 hadoop14
      doecho ====================  $host  ====================#3. 遍历所有目录,挨个发送for file in $@do#4 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir=$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
      done
    • hadoop12hadoop13 以及 hadoop14/opt/module/flume/job 目录下创建一个 group3 文件夹。

      [lzl@hadoop12 job]$ mkdir group3
      [lzl@hadoop13 job]$ mkdir group3
      [lzl@hadoop14 job]$ mkdir group3
  2. 创建 flume1-logger-flume.conf
    • 配置 Source 用于监控 /opt/module/group.log 文件,配置 Sink 输出数据到下一级 Flume。

      hadoop12 上编辑配置文件

      [lzl@hadoop12 group3]$ vim flume1-logger-flume.conf

      添加如下内容

      # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1# Describe/configure the source
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /opt/module/group.log
      a1.sources.r1.shell = /bin/bash -c# Describe the sink
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop14
      a1.sinks.k1.port = 4141# Describe the channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
  3. 创建 flume2-netcat-flume.conf
    • 配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume。

       

      hadoop13 上编辑配置文件

      [lzl@hadoop12 group3]$ vim flume2-netcat-flume.conf

      添加如下内容

      # Name the components on this agent
      a2.sources = r1
      a2.sinks = k1
      a2.channels = c1# Describe/configure the source
      a2.sources.r1.type = netcat
      a2.sources.r1.bind = hadoop13
      a2.sources.r1.port = 44444# Describe the sink
      a2.sinks.k1.type = avro
      a2.sinks.k1.hostname = hadoop14
      a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
      a2.channels.c1.type = memory
      a2.channels.c1.capacity = 1000
      a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
      a2.sources.r1.channels = c1
      a2.sinks.k1.channel = c1
  4. 创建 flume3-flume-logger.conf
    • 配置 source 用于接收 flume1flume2 发送过来的数据流,最终合并后 sink 到控制台。

      hadoop14 上编辑配置文件

      [lzl@hadoop14 group3]$ touch flume3-flume-logger.conf
      [lzl@hadoop14 group3]$ vim flume3-flume-logger.conf

      添加如下内容

      # Name the components on this agent
      a3.sources = r1
      a3.sinks = k1
      a3.channels = c1# Describe/configure the source
      a3.sources.r1.type = avro 
      a3.sources.r1.bind = hadoop14
      a3.sources.r1.port = 4141# Describe the sink
      a3.sinks.k1.type = logger# Describe the channel
      a3.channels.c1.type = memory
      a3.channels.c1.capacity = 1000
      a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
      a3.sources.r1.channels = c1
      a3.sinks.k1.channel = c1
  5. 执行配置文件
    • 分别开启对应配置文件:flume3-flume-logger.confflume2-netcat-flume.confflume1-logger-flume.conf

      [lzl@hadoop14 flume]$ bin/flume-ng agent --conf conf/ --name 
      a3 --conf-file job/group3/flume3-flume-logger.conf 
      -Dflume.root.logger=INFO,console
      [lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name 
      a2 --conf-file job/group3/flume1-logger-flume.conf
      [lzl@hadoop13 flume]$ bin/flume-ng agent --conf conf/ --name 
      a1 --conf-file job/group3/flume2-netcat-flume.conf
  6. hadoop13 上向 /opt/module 目录下的 group.log 追加内容

    [lzl@hadoop13 module]$ echo 'hello' >> group.log
  7. hadoop12 上向 44444 端口发送数据

    [lzl@hadoop12 flume]$ telnet hadoop13 44444
  8. 检查 hadoop14 上数据

 

这篇关于大数据技术之Flume 企业开发案例——聚合(7)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115984

相关文章

Android 悬浮窗开发示例((动态权限请求 | 前台服务和通知 | 悬浮窗创建 )

《Android悬浮窗开发示例((动态权限请求|前台服务和通知|悬浮窗创建)》本文介绍了Android悬浮窗的实现效果,包括动态权限请求、前台服务和通知的使用,悬浮窗权限需要动态申请并引导... 目录一、悬浮窗 动态权限请求1、动态请求权限2、悬浮窗权限说明3、检查动态权限4、申请动态权限5、权限设置完毕后

使用Navicat工具比对两个数据库所有表结构的差异案例详解

《使用Navicat工具比对两个数据库所有表结构的差异案例详解》:本文主要介绍如何使用Navicat工具对比两个数据库test_old和test_new,并生成相应的DDLSQL语句,以便将te... 目录概要案例一、如图两个数据库test_old和test_new进行比较:二、开始比较总结概要公司存在多

Redis的数据过期策略和数据淘汰策略

《Redis的数据过期策略和数据淘汰策略》本文主要介绍了Redis的数据过期策略和数据淘汰策略,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录一、数据过期策略1、惰性删除2、定期删除二、数据淘汰策略1、数据淘汰策略概念2、8种数据淘汰策略

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

Python给Excel写入数据的四种方法小结

《Python给Excel写入数据的四种方法小结》本文主要介绍了Python给Excel写入数据的四种方法小结,包含openpyxl库、xlsxwriter库、pandas库和win32com库,具有... 目录1. 使用 openpyxl 库2. 使用 xlsxwriter 库3. 使用 pandas 库

SpringBoot定制JSON响应数据的实现

《SpringBoot定制JSON响应数据的实现》本文主要介绍了SpringBoot定制JSON响应数据的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们... 目录前言一、如何使用@jsonView这个注解?二、应用场景三、实战案例注解方式编程方式总结 前言

使用Python在Excel中创建和取消数据分组

《使用Python在Excel中创建和取消数据分组》Excel中的分组是一种通过添加层级结构将相邻行或列组织在一起的功能,当分组完成后,用户可以通过折叠或展开数据组来简化数据视图,这篇博客将介绍如何使... 目录引言使用工具python在Excel中创建行和列分组Python在Excel中创建嵌套分组Pyt

在Rust中要用Struct和Enum组织数据的原因解析

《在Rust中要用Struct和Enum组织数据的原因解析》在Rust中,Struct和Enum是组织数据的核心工具,Struct用于将相关字段封装为单一实体,便于管理和扩展,Enum用于明确定义所有... 目录为什么在Rust中要用Struct和Enum组织数据?一、使用struct组织数据:将相关字段绑

在Mysql环境下对数据进行增删改查的操作方法

《在Mysql环境下对数据进行增删改查的操作方法》本文介绍了在MySQL环境下对数据进行增删改查的基本操作,包括插入数据、修改数据、删除数据、数据查询(基本查询、连接查询、聚合函数查询、子查询)等,并... 目录一、插入数据:二、修改数据:三、删除数据:1、delete from 表名;2、truncate

基于Python开发PPTX压缩工具

《基于Python开发PPTX压缩工具》在日常办公中,PPT文件往往因为图片过大而导致文件体积过大,不便于传输和存储,所以本文将使用Python开发一个PPTX压缩工具,需要的可以了解下... 目录引言全部代码环境准备代码结构代码实现运行结果引言在日常办公中,PPT文件往往因为图片过大而导致文件体积过大,