35_SparkStreaming一

2024-04-28 07:58
文章标签 35 sparkstreaming

本文主要是介绍35_SparkStreaming一,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

SparkStreaming

1 实时任务简介

Spark流是对于Spark核心API的拓展,从而支持对于实时数据流的可拓展,高吞吐量和容错性流处理。数据可以由多个源取得,例如:Kafka,Flume,Twitter,ZeroMQ,Kinesis或者TCP接口,同时可以使用由如map,reduce,join和window这样的高层接口描述的复杂算法进行处理。最终,处理过的数据可以被推送到文件系统,数据库和HDFS。

Spark Streaming

2 SparkStreaming程序入口
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
3 什么是DStream

离散数据流或者DStream是SS提供的基本抽象。其表现数据的连续流,这个输入数据流可以来自于源,也可以来自于转换输入流产生的已处理数据流。内部而言,一个DStream以一系列连续的RDDs所展现,这些RDD是Spark对于不变的,分布式数据集的抽象。一个DStream中的每个RDD都包含来自一定间隔的数据,如下图:

在DStream上使用的任何操作都会转换为针对底层RDD的操作。例如:之前那个将行的流转变为词流的例子中,flatMap操作应用于行DStream的每个RDD上 从而产生words DStream的RDD。如下图:

Spark Streaming

4 入门程序演示

pom.xml配置:

<properties><maven.compiler.source>1.8</maven.compiler.source><maven.compiler.target>1.8</maven.compiler.target><scala.version>2.11.8</scala.version><spark.version>2.2.1</spark.version><hadoop.version>2.7.5</hadoop.version><encoding>UTF-8</encoding></properties><dependencies><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>${scala.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming_2.11</artifactId><version>${spark.version}</version></dependency><dependency><groupId>org.apache.spark</groupId><artifactId>spark-streaming-kafka-0-10_2.11</artifactId><version>${spark.version}</version></dependency></dependencies><build><pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><version>3.2.2</version></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.5.1</version></plugin></plugins></pluginManagement><plugins><plugin><groupId>net.alchim31.maven</groupId><artifactId>scala-maven-plugin</artifactId><executions><execution><id>scala-compile-first</id><phase>process-resources</phase><goals><goal>add-source</goal><goal>compile</goal></goals></execution><execution><id>scala-test-compile</id><phase>process-test-resources</phase><goals><goal>testCompile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><executions><execution><phase>compile</phase><goals><goal>compile</goal></goals></execution></executions></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><filters><filter><artifact>*:*</artifact><excludes><exclude>META-INF/*.SF</exclude><exclude>META-INF/*.DSA</exclude><exclude>META-INF/*.RSA</exclude></excludes></filter></filters></configuration></execution></executions></plugin></plugins></build>

scala版本

object WordCount {def main(args: Array[String]): Unit = {//步骤一:初始化程序入口val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(1))//步骤二:获取数据流val lines = ssc.socketTextStream("localhost", 9999)//步骤三:数据处理val words = lines.flatMap(_.split(" "))val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)//步骤四: 数据输出wordCounts.print()//步骤五:启动任务ssc.start()ssc.awaitTermination()ssc.stop()}}

java版本

/*** 单词统计*/
public class WordCount {public static void main(String[] args)  throws  Exception{//步骤一:初始化程序入口SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));//步骤二:获取数据源JavaReceiverInputDStream<String> lines = jssc.socketTextStream("10.148.15.10", 9999);//步骤三:数据处理JavaDStream<String> words = lines.flatMap(x -> Arrays.asList(x.split(" ")).iterator());JavaPairDStream<String, Integer> pairs = words.mapToPair(s -> new Tuple2<>(s, 1));JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey((i1, i2) -> i1 + i2);//步骤四:数据输出wordCounts.print();//步骤五:启动程序jssc.start();jssc.awaitTermination();jssc.stop();}
}
5 数据源
5.1 Socket数据源

见4.3 入门程序演示

5.2 HDFS数据源

注:如果HDFS使用的高可用模式,那么把集群的core-site.xml,hdfs-site.xml文件导入到项目的resources目录里面。

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MamV901O-1578366683388)(assets/1567305900019.png)]

/*** HDFS 数据源*/
object WordCountForHDFSSource {def main(args: Array[String]): Unit = {//步骤一:初始化程序入口val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")val ssc = new StreamingContext(conf, Seconds(1))//步骤二:获取数据流val lines = ssc.textFileStream("/tmp");//步骤三:数据处理val words = lines.flatMap(_.split(" "))val pairs = words.map(word => (word, 1))val wordCounts = pairs.reduceByKey(_ + _)//步骤四: 数据输出wordCounts.print()//步骤五:启动任务ssc.start()ssc.awaitTermination()ssc.stop()}}
5.3 自定义数据源
/*** 自定义一个Receiver,这个Receiver从socket中接收数据* 接收过来的数据解析成以 \n 分隔开的text使用方式:nc -lk 9999*/
object CustomReceiver {def main(args: Array[String]) {// Create the context with a 1 second batch sizeval sparkConf = new SparkConf().setAppName("CustomReceiver").setMaster("local[2]")val sc = new SparkContext(sparkConf)val ssc = new StreamingContext(sc, Seconds(1))// 调用 receiverStream api,将自定义的Receiver传进去val lines = ssc.receiverStream(new CustomReceiver("10.148.15.10", 9999))val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)wordCounts.print()ssc.start()ssc.awaitTermination()ssc.stop(false)}
}class CustomReceiver(host: String, port: Int)extends Receiver[String](StorageLevel.MEMORY_AND_DISK_2) with Logging {def onStart() {// 启动一个线程,开始接收数据new Thread("Socket Receiver") {override def run() { receive() }}.start()}def onStop() {// There is nothing much to do as the thread calling receive()// is designed to stop by itself isStopped() returns false}/** Create a socket connection and receive data until receiver is stopped */private def receive() {var socket: Socket = nullvar userInput: String = nulltry {logInfo("Connecting to " + host + ":" + port)socket = new Socket(host, port)logInfo("Connected to " + host + ":" + port)val reader = new BufferedReader(new InputStreamReader(socket.getInputStream(), StandardCharsets.UTF_8))userInput = reader.readLine()while(!isStopped && userInput != null) {store(userInput)userInput = reader.readLine()}reader.close()socket.close()logInfo("Stopped receiving")restart("Trying to connect again")} catch {case e: java.net.ConnectException =>restart("Error connecting to " + host + ":" + port, e)case t: Throwable =>restart("Error receiving data", t)}}
}
5.4 Kafka数据源
见第三天

这篇关于35_SparkStreaming一的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/942708

相关文章

『功能项目』战士的平A特效【35】

我们打开上一篇34武器的切换实例的项目, 本章要做的事情是在战士的每次按A键时在指定位置生成一个平A特效 首先将之前下载的技能拖拽至场景中 完全解压缩后重命名为AEffect 拖拽至预制体文件夹 进入主角动画的战士动画层级 双击第一次攻击 选择Animation 创建事件 创建的动画事件帧放在攻击动画挥剑指定处 命名为PerpetualAtt

NoSQL数据库的35个应用场景

现在我们站在各个用例的角度上来考虑那种系统适合于这些用例。   你的意见是?   首先,我们要纵览各种数据模型。这些模型的分类方法来自于Emil Eifrem和NoSQL databases。   文档数据库   源起:受Lotus Notes启发。   数据模型:包含了key-value的文档集合   例子:CouchDB, MongoDB   优点:数据模型自然,编

实时数仓链路分享:kafka =SparkStreaming=kudu集成kerberos

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 本文档主要介绍在cdh集成kerberos情况下,sparkstreaming怎么消费kafka数据,并存储在kudu里面 假设kafka集成kerberos假设kudu集成kerberos假设用非root用户操作spark基

MySQL-35个DQL练手题(难)

第1题 取得每个部门最高薪水的人员名称 第一步:取得每个部门最高薪水 select max(sal) topsal, deptno from emp group by deptno; 第二步:将上面第一步的查询结果当做一张临时表t,进行表连接,条件是:t.deptno=e.deptno and t.maxsal=e.sal select e.ename, t.* from emp e

没有35类可以做特许经营加盟不!

前几天有个老客户找到普推知产商标老杨,没有35类可以做特许经营加盟不,在35类有个小类叫做“特许经营的商业管理”,但是35类这个主要指的为他人提供的,所以就是没有35类广告,照样就可以做特许经营加盟。 比如已经有43类餐饮行业的商标了,有44类美容店的商标,那也可以自营做加盟,35类的特许经营主要是指的是为他人的特许经营行为提供商业咨询、调查、管理等服务的主体,比如管理多个

「图」邻接矩阵|边集数组|邻接表 / LeetCode 35|33|81(C++)

概述 我们开始入门图论:图的存储。 图是一种高级数据结构:链表是一个节点由一条边指向下一个节点,二叉树是一个节点由两条边指向下两个节点,而图是由任意多个节点由任意多条边指向任意多个节点。 图由节点和边构成,边往往存在边权。边权在不同的问题中往往有不同含义,如在最短路径中表示边的长度,在AOE网中表示任务所需时间。 对于这种复杂的结构,如何存储在计算机的程序语言中呢? 我们先来介绍三种存储

代码随想录算法训练营第35天|背包问题基础、46. 携带研究材料(01背包二维解法)(01背包一维解法)(acm)、416. 分割等和子集

目录 0、背包问题基础01背包 46. 携带研究材料(01背包)1、题目描述2、思路3、code(二维解法)3-1、code(一维解法)4、复杂度分析 416. 分割等和子集1、题目描述2、思路3、code4、复杂度分析 0、背包问题基础 01背包 有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i],得到的价值是value[i] 。每件物品只能

【软件技巧】第35课,软件逆向安全工程师之汇编指令mov、ptr、xchg交换指令,每天5分钟学习逆向吧!

在x86汇编语言中,mov 指令用于将一个值从一个位置移动到另一个位置。这个值可以是立即数、寄存器中的值、内存中的值或者是一个指针。mov 指令是汇编语言中最常用的指令之一,因为它在数据传输和初始化操作中起着核心作用。 mov 指令的基本格式: mov 目标操作数, 源操作数 目标操作数:接收数据的操作数,通常是寄存器、内存地址或者是一个指针。源操作数:提供数据的操作数,可以是立即数、寄存器

【持续更新】Advanced Download Manager 14.0.35 Pro安卓ADM下载神器最新高级免费修改版

这个也算小有名气,名字和 idm 有点像。当程序从剪贴板中截取链接后,您可以将其复制并发送至ADM编辑器,或者使用“添加”按钮粘贴链接。 ▨ ADM 有以下特点: • 该应用支持同时下载最多三个文件 • 通过多线程技术(9个部分)加速下载过程 • 从安卓浏览器及剪贴板中拦截链接 • 后台下载文件,并在失败后自动恢复 • 支持图片、文档、压缩包及程序的加载 • 针对Lollipop和Ma

35~750kV 变电站无人值守与集中监控的智能运维模式

1、引言         电力行业的快速发展和电网规模的持续扩张,使得传统的变电站运维模式日渐不能满足现代电网对于安全性、可靠性和效率的更高要求。因此,向无人值守与集中监控过渡,已经逐渐成为了整个行业发展的主旋律。 2、关键技术支撑 2.1 自动化监控技术 为了实现对变电站的全面监控与管理,安装在变电站的高清摄像头、红外测温仪、烟雾探测器等设备扮演着至关重要的角色。它们可以实现对变电站