SparkStreaming---DStream

2024-02-01 11:28
文章标签 dstream sparkstreaming

本文主要是介绍SparkStreaming---DStream,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1.DStream是什么
  • 2.DStream创建
    • 2.1 RDD队列
    • 2.2 自定义数据源
  • 3.DStream转换
    • 3.1 无状态转换
      • 3.1.1 Transformations
      • 3.1.2 join
    • 3.2 有状态转换操作
      • 3.2.1 UpdateStateByKey
      • 3.2.2 WindowOperations
  • 4.DStream输出

1.DStream是什么

参考博文SparkStreaming入门

2.DStream创建

2.1 RDD队列

可以通过使用 queueStream(queueOfRDDs)来创建 DStream,每一个推送到这个队列中的 RDD,都会作为一个 DStream 处理。

  def main(args: Array[String]): Unit = {//1.初始化 Spark 配置信息val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//2.初始化 SparkStreamingContext//处理区间为3s一次val ssc = new StreamingContext(sparkConf, Seconds(3))//创建RDD队列val rddQueue = new mutable.Queue[RDD[Int]]()//创建QueueInputDStream//oneAtTime=false表示同时处理队列中所有的RDDval inputStream: InputDStream[Int] = ssc.queueStream(rddQueue, oneAtATime = false)//对DStream处理val mapStream: DStream[(Int, Int)] = inputStream.map((_, 1))val resDStream: DStream[(Int, Int)] = mapStream.reduceByKey(_ + _)resDStream.print()//启动SparkStreamingContextssc.start()//向RDD队列中放入RDDfor(i <- 1 to 5) {rddQueue+=ssc.sparkContext.makeRDD(List(1,2,3,4))Thread.sleep(2000)}//等待接收数据ssc.awaitTermination()}

在这里插入图片描述

2.2 自定义数据源

用户自定义数据源需要继承 Receiver,并实现 onStart、onStop 方法来自定义数据源采集。

class MyReceiver(host:String,port:Int) extends Receiver[String](StorageLevel.MEMORY_ONLY)
{//最初启动的时候,调用该方法,读数据并将数据发送给 Sparkoverride def onStart(): Unit = {new Thread("Receiver"){override def run(){receive()}}.start()}///读数据并将数据发送给 Sparkdef receive(): Unit = {//创建Socketval socket: Socket = new Socket(host,port)//创建变量用于接收端口穿过来的数据var input:String=null//创建BufferedReader用于读取端口传来的数据val reader: BufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream, StandardCharsets.UTF_8))//读取数据input=reader.readLine()//当 receiver 没有关闭并且输入数据不为空while(!isStopped() && input != null){//循环发送数据给 Sparkstore(input)input=reader.readLine()}//跳出循环则关闭资源reader.close()socket.close()//重启任务restart("restart")}override def onStop(): Unit = {}
}

使用自定义数据源采集数据

  def main(args: Array[String]): Unit = {//1.初始化 Spark 配置信息val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//2.初始化 SparkStreamingContext//处理区间为3s一次val ssc = new StreamingContext(sparkConf, Seconds(3))//3.创建自定义 receiver 的 Streamingval lineStream = ssc.receiverStream(new MyReceiver("localhost", 9999))//4.处理接收的数据val words: DStream[String] = lineStream.flatMap(_.split(" "))val wordMap: DStream[(String, Int)] = words.map((_, 1))val res = wordMap.reduceByKey(_ + _)res.print()//启动SparkStreamingContextssc.start()//等待接收数据ssc.awaitTermination()}

在这里插入图片描述

3.DStream转换

在SparkStreaming中,各个DStream之间是相互独立的,互不干扰的。

DStream 上的操作与 RDD 的类似,分为 Transformations(转换)和 Output Operations(输出)两种,此外转换操作中还有一些比较特殊的原语,如:updateStateByKey()、transform()以及各种 Window 相关的原语。

3.1 无状态转换

无状态转化操作就是把简单的 RDD 转化操作应用到每个批次上,也就是转化 DStream 中的每一个 RDD。即一次对一个批次的RDD执行转换操作。

3.1.1 Transformations

在Spark Streaming中,transform操作是一个强大的功能,允许你将一个DStream转换成一个新的DStream。 它类似于Spark Core中的flatMap和map操作。你可以使用它来执行各种转换操作,例如过滤、分组、连接等。原RDD保持不变。

  def main(args: Array[String]): Unit = {//1.初始化 Spark 配置信息val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//2.初始化 SparkStreamingContext//处理区间为3s一次val ssc = new StreamingContext(sparkConf, Seconds(3))//3.通过监控端口创建 DStream,读进来的数据为一行行val lines = ssc.socketTextStream("localhost", 9999)//4.使用transform来转换val resTransform = lines.transform(rdd => {val words = rdd.flatMap(_.split(" "))val wordMap = words.map((_, 1))val res = wordMap.reduceByKey(_ + _)res})//打印resTransform.print()//启动SparkStreamingContextssc.start()//等待接收数据ssc.awaitTermination()}

在这里插入图片描述

3.1.2 join

两个DStream流之间的 join 需要两个流的批次大小一致,这样才能做到同时触发计算。计算过程就是对当前批次的两个流中各自的 RDD 进行 join,与两个 RDD 的 join 效果相同。

  def main(args: Array[String]): Unit = {//1.初始化 Spark 配置信息val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//2.初始化 SparkStreamingContext//处理区间为3s一次val ssc = new StreamingContext(sparkConf, Seconds(3))//3.通过监控端口创建 DStream,读进来的数据为一行行val line9999: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)val line8888: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 8888)//4.将两个流转换为K,V类型val lines1: DStream[(String, Int)] = line8888.flatMap(_.split(" ")).map((_,1))val lines2: DStream[(String, Int)] = line9999.flatMap(_.split(" ")).map((_,11))//5.使用join来对两个DStream进行连接var resTransform: DStream[(String, (Int, Int))] =lines1.join(lines2)//打印resTransform.print()//启动SparkStreamingContextssc.start()//等待接收数据ssc.awaitTermination()}

在这里插入图片描述

3.2 有状态转换操作

在Spark Streaming中,有状态转换操作是指那些需要依赖之前批次数据或中间结果来计算当前批次数据的操作。这些操作在处理数据时,会在跨时间区间内跟踪数据的状态变化。在有状态转换的过程中,需要借助检查点(checkpoint)来完成转换。

3.2.1 UpdateStateByKey

UpdateStateByKey用于记录历史记录,有时,我们需要在 DStream 中跨批次维护状态(例如流计算中累加 wordcount)。针对这种情况,updateStateByKey()为我们提供了对一个状态变量的访问,用于键值对形式的 DStream。给定一个由(键,事件)对构成的 DStream,并传递一个指定如何根据新的事件更新每个键对应状态的函数,它可以构建出一个新的 DStream,其内部数据为(键,状态) 对。
updateStateByKey() 的结果会是一个新的 DStream,其内部的 RDD 序列是由每个时间区间对应的(键,状态)对组成的。

  def main(args: Array[String]): Unit = {//定义状态更新方法:参数 seq 为当前批次单词数量,state 为以往批次单词数量val updateFuc = (seq:Seq[Int],state:Option[Int])=>{//当前批次的单词数量val updateSeq: Int = seq.foldLeft(0)((x, y) => x + y)//以往批次的单词数量val updateState:Int = state.getOrElse(0)Some(updateSeq+updateState)}//1.初始化 Spark 配置信息val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//2.初始化 SparkStreamingContext//处理区间为3s一次val ssc = new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint("./ck")val line9999: ReceiverInputDStream[String] = ssc.socketTextStream("localhost", 9999)val wordMap: DStream[(String, Int)] = line9999.flatMap(_.split(" ")).map((_,1))val res = wordMap.updateStateByKey[Int](updateFuc)//打印res.print()//启动SparkStreamingContextssc.start()//等待接收数据ssc.awaitTermination()}

3.2.2 WindowOperations

Window Operations 可以设置窗口的大小和滑动窗口的间隔来动态的获取当前 Steaming 的允许状态。所有基于窗口的操作都需要两个参数,分别为窗口时长以及滑动步长

注意:窗口时长和滑动步长这两者都必须为采集周期大小的整数倍。

实现需求:WordCount案例:3 秒一个批次,窗口 12 秒,滑步 6 秒。

  def main(args: Array[String]): Unit = {//1.初始化 Spark 配置信息val sparkConf = new SparkConf().setMaster("local[*]").setAppName("StreamWordCount")//2.初始化 SparkStreamingContext//处理区间为3s一次val ssc = new StreamingContext(sparkConf, Seconds(3))ssc.checkpoint("./ck")val lines = ssc.socketTextStream("localhost", 9999)val dataMap: DStream[(String, Int)] = lines.flatMap(_.split(" ")).map((_, 1))//窗口 12 秒,滑步 6 秒val wordCountByWindows = dataMap.reduceByKeyAndWindow((a: Int, b: Int) => {a + b}, Seconds(12), Seconds(6))wordCountByWindows.print()//启动SparkStreamingContextssc.start()//等待接收数据ssc.awaitTermination()}

reduceByKeyAndWindow(func, windowLength, slideInterval, [numTasks]):
当在一个(K,V)对的DStream上调用此函数,会返回一个新(K,V)对的DStream,此处通过对滑动窗口中批次数据使用
reduce 函数来整合每个 key 的 value 值。
参数解答:

windowLength 是滑动窗口的大小。
slideInterval 是滑动窗口的滑动间隔。这意味着每6秒,窗口会向前滑动一次。

关于 Window 的操作还有如下方法:

(1)window(windowLength, slideInterval): 基于对源 DStream 窗化的批次进行计算返回一个新的 Dstream;
(2)countByWindow(windowLength, slideInterval): 返回一个滑动窗口计数流中的元素个数;
(3)reduceByWindow(func, windowLength, slideInterval): 通过使用自定义函数整合滑动区间流元素来创建>一个新的单元素流;
(4)reduceByKeyAndWindow(func, invFunc, windowLength, slideInterval, [numTasks]): 这个函数是上述函数的变化版本,每个窗口的 reduce 值都是通过用前一个窗的 reduce 值来递增计算。通过 reduce 进入到滑动窗口数据并”反向 reduce”离开窗口的旧数据来实现这个操作。一个例子是随着窗口滑动对 keys 的“加”“减”计数。当窗口较大而滑动步数较小时,可以使用该函数来避免重复计算。

4.DStream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与 RDD 中的惰性求值类似,如果一个 DStream 及其派生出的 DStream 都没有被执行输出操作,那么这些 DStream 就都不会被求值。 如果 StreamingContext 中没有设定输出操作,整个 context 就都不会启动。
在这里插入图片描述

这篇关于SparkStreaming---DStream的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/667141

相关文章

实时数仓链路分享:kafka =SparkStreaming=kudu集成kerberos

点击上方蓝色字体,选择“设为星标” 回复”资源“获取更多资源 大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 本文档主要介绍在cdh集成kerberos情况下,sparkstreaming怎么消费kafka数据,并存储在kudu里面 假设kafka集成kerberos假设kudu集成kerberos假设用非root用户操作spark基

sparkstreaming的实时黑名单过滤太慢

官网推荐如下这种方法进行过滤,但是这种方法其实有很大弊端,left out join如果黑名单数据量很大就会很伤,其实真不好。 object TransformBlackList {def main(args: Array[String]): Unit = {//获取streamingContextval sc=new StreamingContext(new SparkConf().setAp

Spark学习笔记 --- SparkStreaming 中基本概念

StreamingContext StreamingContext 是Spark Streaming程序的入口点,正如SparkContext是Spark程序的入口点一样。

理解SparkStreaming的Checkpointing

streaming 应用程序必须 24 * 7 运行, 因此必须对应用逻辑无关的故障(例如, 系统故障, JVM 崩溃等)具有弹性. 为了可以这样做, Spark Streaming 需要 checkpoint 足够的信息到容错存储系统, 以便可以从故障中恢复.checkpoint 有两种类型的数据.   Metadata checkpointing - 将定义 streaming 计算的信息

【SparkStreaming】面试题

Spark Streaming 是 Apache Spark 提供的一个扩展模块,用于处理实时数据流。它使得可以使用 Spark 强大的批处理能力来处理连续的实时数据流。Spark Streaming 提供了高级别的抽象,如 DStream(Discretized Stream),它代表了连续的数据流,并且可以通过应用在其上的高阶操作来进行处理,类似于对静态数据集的操作(如 map、reduce、

DSTREAM系列产品差异分析及最新DSTREAM-HT

DSTREAM-HT是ARM公司开发的一款高速占用引脚少的仿真调试器,更快速更少的引脚:非常适合收集大量追踪数据,其中SoC引脚数排除了并行追踪。捕获多个高速串行追踪通道(HSSTP)以启用软件分析,配合arm DS-5可对所有的ARM内核芯片进行开发。   优势功能 快速的数据传输:高达12.5 Gbps的单通道线路速率(60 Gbps组合通道速率),可配置和捕获CoreSight和自

SparkStreaming编程-DStream输出

输出操作指定了对流数据经转化操作得到的数据所要执行的操作(例如把结果推入外部数据库或输出到屏幕上)。与RDD中的惰性求值类似,如果一个DStream及其派生出的DStream都没有被执行输出操作,那么这些DStream就都不会被求值。如果StreamingContext中没有设定输出操作,整个context就都不会启动。 常见的输出操作函数如下:print()、saveAsTextFil

Spark2.x 入门:RDD队列流(DStream)

在调试Spark Streaming应用程序的时候,我们可以使用streamingContext.queueStream(queueOfRDD)创建基于RDD队列的DStream。 下面是参考Spark官网的QueueStream程序设计的程序,每隔1秒创建一个RDD,Streaming每隔2秒就对数据进行处理。 新建一个TestRDDQueueStream.scala文件,在该文件中输入以下

SparkStreaming架构原理(详解)

Spark概述 SparkStreaming架构原理 Spark Streaming的架构主要由以下几个关键部分组成。 1.数据源接收器(Receiver) 执行流程开始于数据源接收阶段,其中接收器(Receiver)负责从外部数据源获取数据流。 接收器可以连接到诸如Kafka、Flume、Kinesis等数据源,或直接通过网络套接字接收数据。 接收器的主要功能是接收数据并

######好好好#######DStream 生成 RDD 实例详解

DStream 生成 RDD 实例详解 [酷玩 Spark] Spark Streaming 源码解析系列 ,返回目录请 猛戳这里 「腾讯·广点通」技术团队荣誉出品 本系列内容适用范围:* 2016.12.28 update, Spark 2.1 全系列 √ (2.1.0)* 2016.11.14 update, Spark 2.0 全系列 √ (2.0.0, 2.0.1, 2.0.