Spark实战(五)spark streaming + flume(Python版)

2024-09-03 23:18

本文主要是介绍Spark实战(五)spark streaming + flume(Python版),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、flume安装

(一)概述

   Flume是一个分布式、可靠、和高可用的海量日志采集、聚合和传输的系统。Flume可以采集文件,socket数据包等各种形式源数据,又可以将采集到的数据输出到HDFS、hbase、hive、kafka等众多外部存储系统中,一般的采集需求,通过对flume的简单配置即可实现,Flume针对特殊场景也具备良好的自定义扩展能力,因此flume可以适用于大部分的日常数据采集场景

(二)运行机制

   1、 Flume分布式系统中最核心的角色是agent,flume采集系统就是由一个个agent所连接起来形成

   2、 每一个agent相当于一个数据传递员,内部有三个组件:

a)	Source:采集源,用于跟数据源对接,以获取数据
b)	Sink:下沉地,采集数据的传送目的,用于往下一级agent传递数据或者往最终存储系统传递数据
c)	Channel:angent内部的数据传输通道,用于从source将数据传递到sink

在这里插入图片描述

(三)Flume采集系统结构图

1、简单结构

   单个agent采集数据

在这里插入图片描述

2、复杂结构

   多级agent之间串联
在这里插入图片描述

(四)Flume的安装部署

   1、去apache官网上下载安装包,并解压tar -zxvf apache-flume-1.8.0-bin,并修改conf目录下flume-env.sh,在里面配置JAVA_HOME

   2、根据数据采集的需求配置采集方案,描述在配置文件中(文件名可任意自定义)
   3、指定采集方案配置文件,在相应的节点上启动flume agent

二、flume push方式

1、spark streaming程序

   首先是flume通过push方式将采集到的数据传递到spark程序上,这种方式基本不用。spark代码如下:

import pyspark
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtilsif __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount") \.master("local[2]") \.getOrCreate()sc = spark.sparkContextssc = StreamingContext(sc, 5)# hostname = sys.argv[1]# port = int(sys.argv[2])flumeStream = FlumeUtils.createStream(ssc, "localhost", 8888, pyspark.StorageLevel.MEMORY_AND_DISK_SER_2)line = flumeStream.map(lambda x: x[1])words = line.flatMap(lambda x: x.split(" "))datas = words.map(lambda x: (x, 1))result = datas.reduceByKey(lambda agg, obj: agg + obj)result.pprint()ssc.start()ssc.awaitTermination()

   注意:要指定并行度,如在本地运行设置setMaster(“local[2]”),相当于启动两个线程,一个给receiver,一个给computer。否则会出现如下问题

2019-01-09 19:36:16 INFO  ReceiverSupervisorImpl:54 - Called receiver 0 onStart
2019-01-09 19:36:16 INFO  ReceiverSupervisorImpl:54 - Waiting for receiver to be stopped
2019-01-09 19:36:20 INFO  JobScheduler:54 - Added jobs for time 1547033780000 ms
2019-01-09 19:36:25 INFO  JobScheduler:54 - Added jobs for time 1547033785000 ms
2019-01-09 19:36:30 INFO  JobScheduler:54 - Added jobs for time 1547033790000 ms
2019-01-09 19:36:35 INFO  JobScheduler:54 - Added jobs for time 1547033795000 ms
2019-01-09 19:36:40 INFO  JobScheduler:54 - Added jobs for time 1547033800000 ms

   如果是在集群中运行,必须要求集群中可用core数大于1

2、flume conf文件

<font size=4>&emsp; &emsp;在flume的conf目录下新建flume-push.conf内容如下</font></br>
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/log/flume
a1.sources.r1.fileHeader = true# Describe the sink
a1.sinks.k1.type = avro
#这是接收方
a1.sinks.k1.hostname = 192.168.62.131
a1.sinks.k1.port = 8888# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

   需要先将spark程序运行,使用以下命令:

spark/bin/spark-submit  --driver-class-path /home/hadoop/spark/jars/*:/home/hadoop/jar/flume/* /tmp/pycharm_project_563/day5/FlumePushWordCount.py

   可能会出现以下问题

	Spark Streaming's Flume libraries not found in class path. Try one of the following.1. Include the Flume library and its dependencies with in thespark-submit command as$ bin/spark-submit --packages org.apache.spark:spark-streaming-flume:2.4.0 ...2. Download the JAR of the artifact from Maven Central http://search.maven.org/,Group Id = org.apache.spark, Artifact Id = spark-streaming-flume-assembly, Version = 2.4.0.Then, include the jar in the spark-submit command as$ bin/spark-submit --jars <spark-streaming-flume-assembly.jar> ...
Traceback (most recent call last):File "/tmp/pycharm_project_563/day5/FlumePushWordCount.py", line 12, in <module>flumeStream = FlumeUtils.createStream(ssc, "192.168.62.131", "8888")File "/home/hadoop/spark/python/pyspark/streaming/flume.py", line 67, in createStreamhelper = FlumeUtils._get_helper(ssc._sc)File "/home/hadoop/spark/python/pyspark/streaming/flume.py", line 130, in _get_helperreturn sc._jvm.org.apache.spark.streaming.flume.FlumeUtilsPythonHelper()
TypeError: 'JavaPackage' object is not callable
   需要去maven仓库下载spark-streaming-flume-assembly.jar,然后放到上面指定的jar目录中去。

   然后运行flume

bin/flume-ng agent -n a1 -c conf/ -f conf/flume-push.conf -Dflume.root.logger=WARN,console
   然后在/home/hadoop/log/flume目录下新建log文件,运行spark的日志中出现如下:

在这里插入图片描述

三、poll方式

1、spark streaming程序

   这种方式是有spark主动去flume拉取数据,代码如下:

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.flume import FlumeUtilsif __name__ == "__main__":spark = SparkSession\.builder\.appName("PythonWordCount") \.master("local[2]") \.getOrCreate()sc = spark.sparkContextssc = StreamingContext(sc, 5)addresses = [("localhost", 8888)]flumeStream = FlumeUtils.createPollingStream(ssc, addresses)line = flumeStream.map(lambda x: x[1])words = line.flatMap(lambda x: x.split(" "))datas = words.map(lambda x: (x, 1))result = datas.reduceByKey(lambda agg, obj: agg + obj)result.pprint()ssc.start()ssc.awaitTermination()

   如果是本地模式同样需要指定并行度,如果是在集群中运行,必须要求集群中可用core数大于1

2、flume conf文件

   在flume的conf目录下新建flume-poll.conf内容如下:

# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# source
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/hadoop/log/flume
a1.sources.r1.fileHeader = true# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname = localhost
a1.sinks.k1.port = 8888# Use a channel which buffers events in memory
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
   由于是poll方式,需要的flume
bin/flume-ng agent -n a1 -c conf/ -f conf/flume-poll.conf -Dflume.root.logger=WARN,console
   启动spark程序
spark/bin/spark-submit  --driver-class-path /home/hadoop/spark/jars/*:/home/hadoop/jar/flume/* /tmp/pycharm_project_563/day5/FlumePollWordCount.py 
   同样在/home/hadoop/log/flume目录下新建log文件,将原先生成的COMPLETED文件删除,rm flume/aaa.txt.COMPLETED ,运行spark的日志中出现如下:

在这里插入图片描述

这篇关于Spark实战(五)spark streaming + flume(Python版)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1134333

相关文章

一文带你搞懂Python中__init__.py到底是什么

《一文带你搞懂Python中__init__.py到底是什么》朋友们,今天我们来聊聊Python里一个低调却至关重要的文件——__init__.py,有些人可能听说过它是“包的标志”,也有人觉得它“没... 目录先搞懂 python 模块(module)Python 包(package)是啥?那么 __in

使用Python实现图像LBP特征提取的操作方法

《使用Python实现图像LBP特征提取的操作方法》LBP特征叫做局部二值模式,常用于纹理特征提取,并在纹理分类中具有较强的区分能力,本文给大家介绍了如何使用Python实现图像LBP特征提取的操作方... 目录一、LBP特征介绍二、LBP特征描述三、一些改进版本的LBP1.圆形LBP算子2.旋转不变的LB

Python中__init__方法使用的深度解析

《Python中__init__方法使用的深度解析》在Python的面向对象编程(OOP)体系中,__init__方法如同建造房屋时的奠基仪式——它定义了对象诞生时的初始状态,下面我们就来深入了解下_... 目录一、__init__的基因图谱二、初始化过程的魔法时刻继承链中的初始化顺序self参数的奥秘默认

Python实现特殊字符判断并去掉非字母和数字的特殊字符

《Python实现特殊字符判断并去掉非字母和数字的特殊字符》在Python中,可以通过多种方法来判断字符串中是否包含非字母、数字的特殊字符,并将这些特殊字符去掉,本文为大家整理了一些常用的,希望对大家... 目录1. 使用正则表达式判断字符串中是否包含特殊字符去掉字符串中的特殊字符2. 使用 str.isa

python中各种常见文件的读写操作与类型转换详细指南

《python中各种常见文件的读写操作与类型转换详细指南》这篇文章主要为大家详细介绍了python中各种常见文件(txt,xls,csv,sql,二进制文件)的读写操作与类型转换,感兴趣的小伙伴可以跟... 目录1.文件txt读写标准用法1.1写入文件1.2读取文件2. 二进制文件读取3. 大文件读取3.1

使用Python实现一个优雅的异步定时器

《使用Python实现一个优雅的异步定时器》在Python中实现定时器功能是一个常见需求,尤其是在需要周期性执行任务的场景下,本文给大家介绍了基于asyncio和threading模块,可扩展的异步定... 目录需求背景代码1. 单例事件循环的实现2. 事件循环的运行与关闭3. 定时器核心逻辑4. 启动与停

基于Python实现读取嵌套压缩包下文件的方法

《基于Python实现读取嵌套压缩包下文件的方法》工作中遇到的问题,需要用Python实现嵌套压缩包下文件读取,本文给大家介绍了详细的解决方法,并有相关的代码示例供大家参考,需要的朋友可以参考下... 目录思路完整代码代码优化思路打开外层zip压缩包并遍历文件:使用with zipfile.ZipFil

Python处理函数调用超时的四种方法

《Python处理函数调用超时的四种方法》在实际开发过程中,我们可能会遇到一些场景,需要对函数的执行时间进行限制,例如,当一个函数执行时间过长时,可能会导致程序卡顿、资源占用过高,因此,在某些情况下,... 目录前言func-timeout1. 安装 func-timeout2. 基本用法自定义进程subp

Python实现word文档内容智能提取以及合成

《Python实现word文档内容智能提取以及合成》这篇文章主要为大家详细介绍了如何使用Python实现从10个左右的docx文档中抽取内容,再调整语言风格后生成新的文档,感兴趣的小伙伴可以了解一下... 目录核心思路技术路径实现步骤阶段一:准备工作阶段二:内容提取 (python 脚本)阶段三:语言风格调

Python结合PyWebView库打造跨平台桌面应用

《Python结合PyWebView库打造跨平台桌面应用》随着Web技术的发展,将HTML/CSS/JavaScript与Python结合构建桌面应用成为可能,本文将系统讲解如何使用PyWebView... 目录一、技术原理与优势分析1.1 架构原理1.2 核心优势二、开发环境搭建2.1 安装依赖2.2 验