Spark Streaming(三)—— 高级数据源Flume

2024-06-19 04:38

本文主要是介绍Spark Streaming(三)—— 高级数据源Flume,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 高级数据源Flume
    • 1. Push方式
    • 2. 基于Custom Sink的Pull模式

高级数据源Flume

Spark Streaming 是一个流式计算引擎,就需要对接外部数据源来对接、接收数据。每一个输入流DStream和一个Receiver对象相关联,这个Receiver从源中获取数据,并将数据存入内存中用于处理。Spark Streaming的基本数据源(文件流、RDD队列流、套接字流)上篇已经介绍过了,而Spark Streaming的高级数据流主要有Kafka,Flume,Kinesis,Twitter等。本文主要介绍Flume作为高级数据源的使用。

1. Push方式

Flume将数据推送给Spark Streaming。在这种方式下,Spark Streaming可以很方便的建立一个Receiver,起到一个Avro agent的作用。Flume可以将数据推送到该Receiver。

Flume配置文件:

#定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1#具体定义source 采集该目录下的日志
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /root/training/logs#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100#具体定义sink
a4.sinks = k1
a4.sinks.k1.type = avro
a4.sinks.k1.channel = c1
a4.sinks.k1.hostname = 192.168.15.131
a4.sinks.k1.port = 1234#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1

Spark Streaming Demo:
注意除了需要使用Flume的lib的jar包以外,还需要spark-streaming-flume_2.10-2.1.0.jar

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtilsobject MyFlumeStream {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("MyFlumeStream").setMaster("local[2]")val ssc = new StreamingContext(conf,Seconds(3))//创建 flume event 从 flume中接收push来的数据 ---> 也是DStream//flume将数据push到了 ip 和 端口中  ip和端口在Flume配置文件中设置val flumeEventDstream = FlumeUtils.createStream(ssc, "192.168.15.131", 1234)val lineDStream = flumeEventDstream.map( e => {new String(e.event.getBody.array)})lineDStream.print()ssc.start()ssc.awaitTermination()}
}

测试:
1、启动Spark Streaming程序。
2、启动Flume。

bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console

3、拷贝日志文件到/root/training/logs目录。
4、观察输出,采集到数据。

2. 基于Custom Sink的Pull模式

比第一种有更好的健壮性和容错性。生产使用这个方式。

不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume Sink。Flume将数据推送到Sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从Sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要为Flume配置一个正常的Sink。

Flume 配置文件

a1.channels = c1
a1.sinks = k1
a1.sources = r1a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /root/training/logsa1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 192.168.15.131
a1.sinks.k1.port = 1234#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

Spark Streaming Full Demo:
注意除了需要使用Flume的lib的jar包以外,还需要
将Spark的jar包拷贝到Flume的lib目录下,spark-streaming-flume_2.10-2.1.0.jar也需要拷贝到Flume的lib目录下,同时加入IDEA工程的classpath。

import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.spark.SparkConf
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.Seconds
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.storage.StorageLevelobject FlumeLogPull {def main(args: Array[String]): Unit = {System.setProperty("hadoop.home.dir", "G:\\bin\\hadoop-2.5.2")Logger.getLogger("org.apache.spark").setLevel(Level.ERROR)Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)val conf = new SparkConf().setAppName("FlumeLogPull").setMaster("local[2]")val ssc = new StreamingContext(conf,Seconds(1))val flumeEvent = FlumeUtils.createPollingStream(ssc, "192.168.15.131", 1234, StorageLevel.MEMORY_ONLY_SER)val lineDStream = flumeEvent.map( e => {new String(e.event.getBody.array)})lineDStream.print()ssc.start()ssc.awaitTermination()}
}

测试:
1、启动Flume。

bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console

2、启动Spark Streaming程序。
3、拷贝日志文件到/root/training/logs目录。
4、观察输出,采集到数据。

这篇关于Spark Streaming(三)—— 高级数据源Flume的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1074030

相关文章

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

flume系列之:查看flume系统日志、查看统计flume日志类型、查看flume日志

遍历指定目录下多个文件查找指定内容 服务器系统日志会记录flume相关日志 cat /var/log/messages |grep -i oom 查找系统日志中关于flume的指定日志 import osdef search_string_in_files(directory, search_string):count = 0

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

Java基础回顾系列-第七天-高级编程之IO

Java基础回顾系列-第七天-高级编程之IO 文件操作字节流与字符流OutputStream字节输出流FileOutputStream InputStream字节输入流FileInputStream Writer字符输出流FileWriter Reader字符输入流字节流与字符流的区别转换流InputStreamReaderOutputStreamWriter 文件复制 字符编码内存操作流(

Java基础回顾系列-第五天-高级编程之API类库

Java基础回顾系列-第五天-高级编程之API类库 Java基础类库StringBufferStringBuilderStringCharSequence接口AutoCloseable接口RuntimeSystemCleaner对象克隆 数字操作类Math数学计算类Random随机数生成类BigInteger/BigDecimal大数字操作类 日期操作类DateSimpleDateForma

Mysql高级篇(中)——索引介绍

Mysql高级篇(中)——索引介绍 一、索引本质二、索引优缺点三、索引分类(1)按数据结构分类(2)按功能分类(3) 按存储引擎分类(4) 按存储方式分类(5) 按使用方式分类 四、 索引基本语法(1)创建索引(2)查看索引(3)删除索引(4)ALTER 关键字创建/删除索引 五、适合创建索引的情况思考题 六、不适合创建索引的情况 一、索引本质 索引本质 是 一种数据结构,它用

多数据源的事务处理总是打印很多无用的log日志

之前做了一个项目,需要用到多数据源以及事务处理,在使用事务处理,服务器总是打印很多关于事务处理的log日志(com.atomikos.logging.Slf4jLogger),但是我们根本不会用到这些log日志,反而使得查询一些有用的log日志变得困难。那要如何屏蔽这些log日志呢? 之前的项目是提高项目打印log日志的级别,后来觉得这样治标不治本。 现在有一个更好的方法: 我使用的是log

flume系列之:记录一次flume agent进程被异常oom kill -9的原因定位

flume系列之:记录一次flume agent进程被异常oom kill -9的原因定位 一、背景二、定位问题三、解决方法 一、背景 flume系列之:定位flume没有关闭某个时间点生成的tmp文件的原因,并制定解决方案在博主上面这篇文章的基础上,在机器内存、cpu资源、flume agent资源都足够的情况下,flume agent又出现了tmp文件无法关闭的情况 二、

linux高级学习10

24.9.7学习目录 一.线程1.线程API 一.线程 线程与进程的关系: 线程是轻量级进程,也有PCB,只是各自不同,创建线程使用的底层函数和进程一样,都是clone进程可以蜕变成线程线程是最小的执行单位,进程是最小的分配资源单位 1.线程API (1)查看线程号 #include <pthread.h>pthread_t pthread_self(); (2)

Mysql高级教程

1.安装部署 安装依赖性: [root@mysql-node10 ~]# dnf install cmake gcc-c++ openssl-develncurses-devel.x86_64 libtirpc-devel-1.3.3-8.el7_4.x86_64.rpm rpcgen.x86_64 下载并解压源码包 [root@mysql-node10 ~]# tar zx