大数据技术之Flume 企业开发案例——聚合(7)

2024-08-28 21:52

本文主要是介绍大数据技术之Flume 企业开发案例——聚合(7),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

聚合

1)案例需求:

2)需求分析 

3)实现步骤:

准备工作

创建 flume1-logger-flume.conf

创建 flume2-netcat-flume.conf

创建 flume3-flume-logger.conf

执行配置文件


聚合

1)案例需求:

  • hadoop12 上的 Flume-1 监控文件 /opt/module/group.log
  • hadoop13 上的 Flume-2 监控某个端口的数据流,
  • Flume-1 与 Flume-2 将数据发送给 hadoop14 上的 Flume-3Flume-3 将最终数据打印到控制台。

2)需求分析 

多数据源汇总案例

 

3)实现步骤:

  1. 准备工作
    • 分发 Flume

    • [lzl@hadoop12 module]$ xsync flumexsync 是集群同步文件脚本,也就是在一台服务器分发文件给其他台服务器,脚本内容如下:
      #!/bin/bash
      #1. 判断参数个数
      if [ $# -lt 1 ]
      thenecho Not Enough Arguement!exit;
      fi
      #2. 遍历集群所有机器
      for host in hadoop12 hadoop13 hadoop14
      doecho ====================  $host  ====================#3. 遍历所有目录,挨个发送for file in $@do#4 判断文件是否存在if [ -e $file ]then#5. 获取父目录pdir=$(cd -P $(dirname $file); pwd)#6. 获取当前文件的名称fname=$(basename $file)ssh $host "mkdir -p $pdir"rsync -av $pdir/$fname $host:$pdirelseecho $file does not exists!fidone
      done
    • hadoop12hadoop13 以及 hadoop14/opt/module/flume/job 目录下创建一个 group3 文件夹。

      [lzl@hadoop12 job]$ mkdir group3
      [lzl@hadoop13 job]$ mkdir group3
      [lzl@hadoop14 job]$ mkdir group3
  2. 创建 flume1-logger-flume.conf
    • 配置 Source 用于监控 /opt/module/group.log 文件,配置 Sink 输出数据到下一级 Flume。

      hadoop12 上编辑配置文件

      [lzl@hadoop12 group3]$ vim flume1-logger-flume.conf

      添加如下内容

      # Name the components on this agent
      a1.sources = r1
      a1.sinks = k1
      a1.channels = c1# Describe/configure the source
      a1.sources.r1.type = exec
      a1.sources.r1.command = tail -F /opt/module/group.log
      a1.sources.r1.shell = /bin/bash -c# Describe the sink
      a1.sinks.k1.type = avro
      a1.sinks.k1.hostname = hadoop14
      a1.sinks.k1.port = 4141# Describe the channel
      a1.channels.c1.type = memory
      a1.channels.c1.capacity = 1000
      a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
      a1.sources.r1.channels = c1
      a1.sinks.k1.channel = c1
  3. 创建 flume2-netcat-flume.conf
    • 配置 Source 监控端口 44444 数据流,配置 Sink 数据到下一级 Flume。

       

      hadoop13 上编辑配置文件

      [lzl@hadoop12 group3]$ vim flume2-netcat-flume.conf

      添加如下内容

      # Name the components on this agent
      a2.sources = r1
      a2.sinks = k1
      a2.channels = c1# Describe/configure the source
      a2.sources.r1.type = netcat
      a2.sources.r1.bind = hadoop13
      a2.sources.r1.port = 44444# Describe the sink
      a2.sinks.k1.type = avro
      a2.sinks.k1.hostname = hadoop14
      a2.sinks.k1.port = 4141# Use a channel which buffers events in memory
      a2.channels.c1.type = memory
      a2.channels.c1.capacity = 1000
      a2.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
      a2.sources.r1.channels = c1
      a2.sinks.k1.channel = c1
  4. 创建 flume3-flume-logger.conf
    • 配置 source 用于接收 flume1flume2 发送过来的数据流,最终合并后 sink 到控制台。

      hadoop14 上编辑配置文件

      [lzl@hadoop14 group3]$ touch flume3-flume-logger.conf
      [lzl@hadoop14 group3]$ vim flume3-flume-logger.conf

      添加如下内容

      # Name the components on this agent
      a3.sources = r1
      a3.sinks = k1
      a3.channels = c1# Describe/configure the source
      a3.sources.r1.type = avro 
      a3.sources.r1.bind = hadoop14
      a3.sources.r1.port = 4141# Describe the sink
      a3.sinks.k1.type = logger# Describe the channel
      a3.channels.c1.type = memory
      a3.channels.c1.capacity = 1000
      a3.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
      a3.sources.r1.channels = c1
      a3.sinks.k1.channel = c1
  5. 执行配置文件
    • 分别开启对应配置文件:flume3-flume-logger.confflume2-netcat-flume.confflume1-logger-flume.conf

      [lzl@hadoop14 flume]$ bin/flume-ng agent --conf conf/ --name 
      a3 --conf-file job/group3/flume3-flume-logger.conf 
      -Dflume.root.logger=INFO,console
      [lzl@hadoop12 flume]$ bin/flume-ng agent --conf conf/ --name 
      a2 --conf-file job/group3/flume1-logger-flume.conf
      [lzl@hadoop13 flume]$ bin/flume-ng agent --conf conf/ --name 
      a1 --conf-file job/group3/flume2-netcat-flume.conf
  6. hadoop13 上向 /opt/module 目录下的 group.log 追加内容

    [lzl@hadoop13 module]$ echo 'hello' >> group.log
  7. hadoop12 上向 44444 端口发送数据

    [lzl@hadoop12 flume]$ telnet hadoop13 44444
  8. 检查 hadoop14 上数据

 

这篇关于大数据技术之Flume 企业开发案例——聚合(7)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1115984

相关文章

MySQL快速复制一张表的四种核心方法(包括表结构和数据)

《MySQL快速复制一张表的四种核心方法(包括表结构和数据)》本文详细介绍了四种复制MySQL表(结构+数据)的方法,并对每种方法进行了对比分析,适用于不同场景和数据量的复制需求,特别是针对超大表(1... 目录一、mysql 复制表(结构+数据)的 4 种核心方法(面试结构化回答)方法 1:CREATE

详解C++ 存储二进制数据容器的几种方法

《详解C++存储二进制数据容器的几种方法》本文主要介绍了详解C++存储二进制数据容器,包括std::vector、std::array、std::string、std::bitset和std::ve... 目录1.std::vector<uint8_t>(最常用)特点:适用场景:示例:2.std::arra

Springboot3 ResponseEntity 完全使用案例

《Springboot3ResponseEntity完全使用案例》ResponseEntity是SpringBoot中控制HTTP响应的核心工具——它能让你精准定义响应状态码、响应头、响应体,相比... 目录Spring Boot 3 ResponseEntity 完全使用教程前置准备1. 项目基础依赖(M

Python+wxPython开发一个文件属性比对工具

《Python+wxPython开发一个文件属性比对工具》在日常的文件管理工作中,我们经常会遇到同一个文件存在多个版本,或者需要验证备份文件与源文件是否一致,下面我们就来看看如何使用wxPython模... 目录引言项目背景与需求应用场景核心需求运行结果技术选型程序设计界面布局核心功能模块关键代码解析文件大

C++11中的包装器实战案例

《C++11中的包装器实战案例》本文给大家介绍C++11中的包装器实战案例,本文结合实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录引言1.std::function1.1.什么是std::function1.2.核心用法1.2.1.包装普通函数1.2.

C++多线程开发环境配置方法

《C++多线程开发环境配置方法》文章详细介绍了如何在Windows上安装MinGW-w64和VSCode,并配置环境变量和编译任务,使用VSCode创建一个C++多线程测试项目,并通过配置tasks.... 目录下载安装 MinGW-w64下载安装VS code创建测试项目配置编译任务创建 tasks.js

MySQL中的DELETE删除数据及注意事项

《MySQL中的DELETE删除数据及注意事项》MySQL的DELETE语句是数据库操作中不可或缺的一部分,通过合理使用索引、批量删除、避免全表删除、使用TRUNCATE、使用ORDERBY和LIMI... 目录1. 基本语法单表删除2. 高级用法使用子查询删除删除多表3. 性能优化策略使用索引批量删除避免

MySQL 数据库进阶之SQL 数据操作与子查询操作大全

《MySQL数据库进阶之SQL数据操作与子查询操作大全》本文详细介绍了SQL中的子查询、数据添加(INSERT)、数据修改(UPDATE)和数据删除(DELETE、TRUNCATE、DROP)操作... 目录一、子查询:嵌套在查询中的查询1.1 子查询的基本语法1.2 子查询的实战示例二、数据添加:INSE

Redis 命令详解与实战案例

《Redis命令详解与实战案例》本文详细介绍了Redis的基础知识、核心数据结构与命令、高级功能与命令、最佳实践与性能优化,以及实战应用场景,通过实战案例,展示了如何使用Redis构建高性能应用系统... 目录Redis 命令详解与实战案例一、Redis 基础介绍二、Redis 核心数据结构与命令1. 字符

Linux服务器数据盘移除并重新挂载的全过程

《Linux服务器数据盘移除并重新挂载的全过程》:本文主要介绍在Linux服务器上移除并重新挂载数据盘的整个过程,分为三大步:卸载文件系统、分离磁盘和重新挂载,每一步都有详细的步骤和注意事项,确保... 目录引言第一步:卸载文件系统第二步:分离磁盘第三步:重新挂载引言在 linux 服务器上移除并重新挂p