Spark的Streaming + Flume进行数据采集(flume主动推送或者Spark Stream主动拉取)

2024-06-14 13:58

本文主要是介绍Spark的Streaming + Flume进行数据采集(flume主动推送或者Spark Stream主动拉取),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark的Streaming + Flume进行数据采集(flume主动推送或者Spark Stream主动拉取)

1、针对国外的开源技术,还是学会看国外的英文说明来的直接,迅速,这里简单贴一下如何看:

2、进入到flume的conf目录,创建一个flume-spark-push.sh的文件:

[hadoop@slaver1 conf]$ vim flume-spark-push.sh

配置一下这个文件,flume使用avro的。

# example.conf: A single-node Flume configuration# Name the components on this agent
#定义这个agent中各组件的名字,给那三个组件sources,sinks,channels取个名字,是一个逻辑代号:
#a1是agent的代表。
a1.sources = r1
a1.channels = c1
a1.sinks = k1# Describe/configure the source 描述和配置source组件:r1
#类型, 从网络端口接收数据,在本机启动, 所以localhost, type=spoolDir采集目录源,目录里有就采
#type是类型,是采集源的具体实现,这里是接受网络端口的,netcat可以从一个网络端口接受数据的。netcat在linux里的程序就是nc,可以学习一下。
#bind绑定本机localhost。port端口号为44444。a1.sources.r1.type = exec
a1.sources.r1.bind = tail -f /home/hadoop/data_hadoop/spark-flume/wctotal.log
a1.sources.r1.shell = /bin/bash -c# Describe the sink 描述和配置sink组件:k1
#type,下沉类型,使用logger,将数据打印到屏幕上面。
#a1.sinks.k1.type = logger# Use a channel which buffers events in memory 描述和配置channel组件,此处使用是内存缓存的方式
#type类型是内存memory。
#下沉的时候是一批一批的, 下沉的时候是一个个eventChannel参数解释:
#capacity:默认该通道中最大的可以存储的event数量,1000是代表1000条数据。
#trasactionCapacity:每次最大可以从source中拿到或者送到sink中的event数量。
a1.channels.c1.type = memory
a1.channels.c1.capacity = 1000
a1.channels.c1.transactionCapacity = 100# define sink
a1.sinks.k1.type= avro
a1.sinks.k1.hostname = slaver1
a1.sinks.k1.port = 9999# Bind the source and sink to the channel 描述和配置source  channel   sink之间的连接关系
#将sources和sinks绑定到channel上面。
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

3、然后去Spark的github查看项目demo:https://github.com/apache/spark

具体案例如:https://github.com/apache/spark/blob/v1.5.1/examples/src/main/scala/org/apache/spark/examples/streaming/FlumeEventCount.scala

代码如下所示:

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._
import org.apache.spark.streaming.flume._
import org.apache.spark.util.IntParamval ssc = new StreamingContext(sc, Seconds(5))val stream = FlumeUtils.createStream(ssc, slaver1, 9999, StorageLevel.MEMORY_ONLY_SER_2)stream.count().map(cnt => "Received " + cnt + " flume events." ).print()ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

导入flume的包的时候出现问题,找不到包:import org.apache.spark.streaming.flume._

scala> import org.apache.spark.streaming.flume._
<console>:28: error: object flume is not a member of package org.apache.spark.streamingimport org.apache.spark.streaming.flume._

 由于没有搭建maven项目,在命令行需要导入jar包,这里先放置一下,稍后继续记笔记。

待续.......

posted @ 2018-04-24 10:53 别先生 阅读( ...) 评论( ...) 编辑 收藏

这篇关于Spark的Streaming + Flume进行数据采集(flume主动推送或者Spark Stream主动拉取)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1060598

相关文章

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Oracle Expdp按条件导出指定表数据的方法实例

《OracleExpdp按条件导出指定表数据的方法实例》:本文主要介绍Oracle的expdp数据泵方式导出特定机构和时间范围的数据,并通过parfile文件进行条件限制和配置,文中通过代码介绍... 目录1.场景描述 2.方案分析3.实验验证 3.1 parfile文件3.2 expdp命令导出4.总结

更改docker默认数据目录的方法步骤

《更改docker默认数据目录的方法步骤》本文主要介绍了更改docker默认数据目录的方法步骤,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 目录1.查看docker是否存在并停止该服务2.挂载镜像并安装rsync便于备份3.取消挂载备份和迁

不删数据还能合并磁盘? 让电脑C盘D盘合并并保留数据的技巧

《不删数据还能合并磁盘?让电脑C盘D盘合并并保留数据的技巧》在Windows操作系统中,合并C盘和D盘是一个相对复杂的任务,尤其是当你不希望删除其中的数据时,幸运的是,有几种方法可以实现这一目标且在... 在电脑生产时,制造商常为C盘分配较小的磁盘空间,以确保软件在运行过程中不会出现磁盘空间不足的问题。但在

SpringBoot使用minio进行文件管理的流程步骤

《SpringBoot使用minio进行文件管理的流程步骤》MinIO是一个高性能的对象存储系统,兼容AmazonS3API,该软件设计用于处理非结构化数据,如图片、视频、日志文件以及备份数据等,本文... 目录一、拉取minio镜像二、创建配置文件和上传文件的目录三、启动容器四、浏览器登录 minio五、

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

Mybatis拦截器如何实现数据权限过滤

《Mybatis拦截器如何实现数据权限过滤》本文介绍了MyBatis拦截器的使用,通过实现Interceptor接口对SQL进行处理,实现数据权限过滤功能,通过在本地线程变量中存储数据权限相关信息,并... 目录背景基础知识MyBATis 拦截器介绍代码实战总结背景现在的项目负责人去年年底离职,导致前期规

Redis KEYS查询大批量数据替代方案

《RedisKEYS查询大批量数据替代方案》在使用Redis时,KEYS命令虽然简单直接,但其全表扫描的特性在处理大规模数据时会导致性能问题,甚至可能阻塞Redis服务,本文将介绍SCAN命令、有序... 目录前言KEYS命令问题背景替代方案1.使用 SCAN 命令2. 使用有序集合(Sorted Set)

python-nmap实现python利用nmap进行扫描分析

《python-nmap实现python利用nmap进行扫描分析》Nmap是一个非常用的网络/端口扫描工具,如果想将nmap集成进你的工具里,可以使用python-nmap这个python库,它提供了... 目录前言python-nmap的基本使用PortScanner扫描PortScannerAsync异

SpringBoot整合Canal+RabbitMQ监听数据变更详解

《SpringBoot整合Canal+RabbitMQ监听数据变更详解》在现代分布式系统中,实时获取数据库的变更信息是一个常见的需求,本文将介绍SpringBoot如何通过整合Canal和Rabbit... 目录需求步骤环境搭建整合SpringBoot与Canal实现客户端Canal整合RabbitMQSp