Spark Streaming(二十七)DStream的转换、输出、缓存持久化、检查点

本文主要是介绍Spark Streaming(二十七)DStream的转换、输出、缓存持久化、检查点,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

定义

所谓DStream的转换其实就是对间隔时间内DStream数据流的RDD进行转换操作并返回去一个新的DStream

DStream转换

其实DStream转换语法跟RDD的转换语法非常类似,但DStream有它自己的一些特殊的语法,如updateStateByKey()、transform()、以及各种Window语法。

转换意思
map(func)将DStream上的每个RDD通过func函数操作并返回一个新的DStream
flatMap(func)和map类似,但是每个输入的元素可以映射成0个或者多个项
filter(func)过滤出DStream上符合要求的RDD并返回新的DStream
repartiton(numPartitions)对DStream上的RDD进行重新分区,提高并行度
union(otherDStream)将两个DStream合并到一起
count()返回DStream中RDD中元素的总个数,返回只包含一个Long类型的DStream
reduce(func)通过func函数聚合DStream中每个RDD中的每个元素,返回新的DStream,该新的DStream中只有一个元素,就是聚合以后的而结果
countByValue()计算DStream上RDD内元素出现的频次,并返回新的DStream[(K,Long)],K是RDD元素的类型,Long是元素出现的次数
reduceByKey(func,[numPartition])聚合DStream上(K,V)类型的RDD里元素的,根据K统计V的个数,返回新的DStream,新的DStream里的RDD元素类型也为(K,V),K为键,V为K对应值的个数
join(otherDStream,[numPartition])将两个类型为(K,V)和(K,W)的DStream进行join连接,返回一个类型为(K,(V,W))的新的DStream
cogroup(otherDStream,[numPartition])对两个(K,V)和(K,W)类型的DStream上调用该函数的时候,返回(K,(Seq [V],Seq [W]))元组的新DStream。
transform(func)通过转换函数,将DStream上的每个RDD转换成另一种RDD,这种函数的操作基本单位为RDD,所以这个函数中的操作语法就是RDD的操作语法。转换后返回新的DStream

UpdateStateByKey操作

updateStateByKey就是随着时间的流逝,在SparkStreaming中的可以对每一个Key通过checkPoint来维护state状态,通过更新函数对每一个Key的状态进行更新,在更新的时候,对于每一个批次的数据而言,SparkStreaming都会通过updateStateByKey这个函数更新已存在Key对应的State,但是如果通过更新函数对State更新以后返回的是None,那么此刻的Key对应的State就会被删除,State可以是任意类型的数据结构。

package com.lyz.streaming.transformationimport org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.{Seconds, StreamingContext}object UpdateStateByKey {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val streaming = new StreamingContext(conf, Seconds(5))/**** 在HDFS上存储state的检查点,必须设置,否则会报错,因为每次更新state的时候* 都需要去检查点获取上一次更新后的state*/streaming.checkpoint("hdfs://xxxxx:8020/spark/checkpoint")val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))/*** 定义更新函数体* value:就是新一批次的数据经过逻辑处理得到的结果* res:新一批次的数据进来之前的状态值*/def func(value: Seq[Int], res: Option[Int]): Some[Int] = {val sum: Int = value.sumval per: Int = res.getOrElse(0)Some(sum + per)}//调用更新函数,传入更新函数体val res: DStream[(String, Int)] = mapStream.updateStateByKey(func)res.foreachRDD(rdd => {rdd.collect().foreach(println(_))})streaming.start()streaming.awaitTermination()}
}

Transform转换操作

transform操作允许DStream上间隔时间内里的RDD转换成另个RDD。它的作用就是应用Dstream为公开的RDD操作函数。例如DStream上一个新的批次RDD与已存在的RDD进行join连接查询的API是未公开的,所以是不能这么使用的,但是利用transform函数就可以使用DStream未公开API

package com.lyz.streaming.transformationimport org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}object TransformTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val streaming = new StreamingContext(conf, Seconds(5))/**** 在HDFS上存储state的检查点,必须设置,否则会报错,因为每次更新state的时候* 都需要去检查点获取上一次更新后的state*/streaming.checkpoint("hdfs://xxxx:8020/spark/checkpoint")val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))//外部的RDDval rdd: RDD[(String, Int)] = streaming.sparkContext.makeRDD(Array(("aaaa", 1)))/*** 调用DStream的transform函数,将DStream里的RDD与外部RDD进行操作,返回新的DStream* transform强大的原因就是能够使用DStream为公开的一些RDD操作函数*/val res: DStream[(String, (Int, Int))] = mapStream.transform(rdd1 => {rdd1.join(rdd)})res.foreachRDD(rdd => {rdd.collect().foreach(println(_))})streaming.start()streaming.awaitTermination()}
}

Window窗口操作

SparkStreaming提供了基于窗口的操作,你可以在滑动窗口内对数据进行转换。这种窗口操作就是在比SparkStreaming批处理的时间间隔更长的时间间隔内整合多个批处理的结果。
Window窗口操作需要两个参数,一个参数是窗口的时长(最近的几个批次),另一个是窗口的滑动步长(多久执行一次计算)。需要特别注意的是这两个参数一定要是批处理间隔时间的整数倍。例如我们有一个以10秒为间隔的批处理,我们想要没十秒计算一次30秒内的数据,那么我们就可以把时长窗口设置成30秒,窗口的滑动步长为10秒。

package com.lyz.streaming.transformationimport org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}object WindowTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val streaming = new StreamingContext(conf, Seconds(5))val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))/*** 每十秒计算一次前三十秒的数据,其实也就是前6个批次的数据*/val res: DStream[(String, Int)] = mapStream.reduceByKeyAndWindow((w: Int, w1: Int) => w + w1, Seconds(30), Seconds(10))}
}

比较常用的Window窗口函数

函数解释
window(windowLength,slideInterval)创建滑动窗口,并返回一个新的DStream然后自定义函数处理这个滑动窗口的数据
reduceByKeyWindow(func,windowLength.slideInterval)对每个滑动窗口执行reduceByKey的操作
reduceByKeyAndWindow(func,invFunc,windowLength,slideInterval,[numParition])这是一个性能更好的reduceByKeyAndWindow函数,
countByWindow(windowLength,slideInterval)对每一个滑动窗口执行count操作,返回每个窗口里的元素个数
countByValueAndWindow(windowLength,slideInterval,[numPatition])在每个滑动窗口内计算reduceByValue操作,统计每个键对应的值在窗口内出现的频次

DStream的操作结果输出

操作DStream的输出结果可以推送到外部数据库中或者外的文件系统中。

函数解释
print()在Driver端打印DStream上的前十个元素
saveAsTextFiles(prefix,[suffix])保存DStream内容到文本文件中,每个批次产生的结果的名字为前缀为prefix,后缀为[.suffix],所以全称为"prefix-TIME_IN_MS[.suffix]"
saveAsObjectFiles(prefix,[suffix])保存DStream内容为Sequence文件,这文件是Java对象序列化后的,每个批次产生的结果的名字为前缀为prefix,后缀为[.suffix],所以全称为"prefix-TIME_IN_MS[.suffix]"
saveAsHadoopFiles(prefix,[suffix])保存DStream内容到Hadoop文件系统中,每个批次产生的结果的名字为前缀为prefix,后缀为[.suffix],所以全称为"prefix-TIME_IN_MS[.suffix]"
foreachRDD(func)循环DStream上的RDD,并且将函数应用到每个RDD上,并且这个函数可以将RDD推送到外部系统中,注意这个函数时在Driver端运行的。它内部的RDD的操作是在每个Woker分区上执行的

foreachRDD应用设计

foreachRDD几首一个函数,通过这个函数我可以把RDD保存到外部系统中,既然要保存到外部系统就需要与外部系统建立连接,我们都知道RDD的数据是散落在各个woker上的,处理RDD的函数也是在各个worker上进行的,我们需要将数据保存到外部数据,那么我们就必须在每个worker上都存在一个连接,因为连接是不能被序列化的,所以这个连接肯定是不能由driver发送到worker上的,而是在worker上创建出来。那么怎么样才能在worker上创建连接呢?那就是在rdd.foreach代码块里创建连接,而不是在stream.foreachRDD代码块里创建。

 res.foreachRDD(rdd => {rdd.foreach(r => {//1、创建连接//2、保存数据//3、关闭连接})})

我们都知道遍历RDD的时候其实就是在遍历每一条记录,如上代码所示我们为每一条记录都创建了连接,显然这样是非常影响系统性能的,那么我们怎么办呢?既然数据是在各个worker的分区上的,那么我们可以为每个分区创建一个连接。

dstream.foreachRDD { rdd =>rdd.foreachPartition { partitionOfRecords =>val connection = createNewConnection()partitionOfRecords.foreach(record => connection.send(record))connection.close()}
}

DataFrame和SQL操作DStream

在处理DStream数据流的时候,我们可以利用DataFrame和SQL操作DStream。利用DataFrame和SQL处理DStream的前提是必须利用初始化Streaming的SparkConf来创建SparkSession。例子如下

package com.lyz.streaming.transformationimport org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}object DateFrameAndSqlTest {def main(args: Array[String]): Unit = {val conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")val streaming = new StreamingContext(conf, Seconds(5))/**** 在HDFS上存储state的检查点,必须设置,否则会报错,因为每次更新state的时候* 都需要去检查点获取上一次更新后的state*/streaming.checkpoint("hdfs://192.168.101.187:8020/spark/checkpoint")val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))mapStream.foreachRDD(rdd => {//利用SparkConf来初始化SparkSession。val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()//导入隐式转换来将RDDimport sparkSession.implicits._//将RDD转换成DFval df: DataFrame = rdd.toDF("word", "count")df.createOrReplaceTempView("compute")val computeDF: DataFrame = sparkSession.sql("select * from compute")computeDF.show()})streaming.start()streaming.awaitTermination()}
}

DStream缓存和持久化

与RDD缓存和持久化类似,我们在程序中可以把程序中将要被多次计算的DStream缓存到内存中,也就是对相同数据进行多次计算。调用DStream的persist方法来实现DStream的缓存。对于窗口函数的操作这些底层已经对数据进行缓存了,因此窗口函数生成的DStream已经保存在了内存中,开发人员无需调用persist方法。

对于通过网络传输的的数据流,默认的持久化是将数据复制到两个节点上进行备份实现容错

注意:与RDD不同,DStream的默认持久化级别是将数据持久化保存在内存中。

DStream的检查点

由于流式处理程序需要全天不间断的运行,运行期间有很大几率会出现程序故障,为了应用程序的故障容错性,Streaming设置了检查点功能,将足够的信息保存到检查点中,以便对系统故障进行恢复。Streaming可以对两种数据进行checkPoint。

  • 元数据检查点:将定义流式计算信息保存到检查点上(HDFS目录),用于恢复应用程序的驱动程序的故障。其中应用程序的元数据信息包含创建流式应用的配置、定义流式应用程序的操作集、部分未完成的批次
  • 数据检查点:将生成的RDD保存到检查点上 。在应用程序中跨多个批次的RDD的具有有状态的转换中,这种检查点必须设置,例如窗口操作和有状态更新。因为在多个批次组合生成一个RDD的时候,结果RDD会依赖先前批次的RDD,这样如果批次都的话,就会形成很多的依赖关系,设置RDD检查点就是为了切断依赖关系。

合适设置检查点

有状态操作的转换,如果在应用程序中只用了例如updateStateByKey侯喆是reduceByKeyAndWindow,就必须要设置RDD的检查点。

恢复故障的应用驱动程序 ,应用程序的元数据检查点是保存应用程序当前进度信息的,所以驱动程序故障恢复就是恢复当前应用程序执行的进度。

如何配置检查点

  • 配置数据检查点,在执行DStream有状态的操作的时候,必须设置检查点,设置检查点只需要调用StreamingContext.checkPoint(“hdfs://hadoop001:8020/sparkStreaming/checkPoint”)并传入HDFS上的目录,这样就可以将上次执行的结果RDD保存到该目录下,供下个批次与该数据进行整合。
  • 应用元数据信息检查点。在应用程序第一次启动的时候,会创建SparkContext,并调用start和awaitTermination方法开始流失数据的处理,当应用程序启动失败的时候,驱动程序会从检查点上恢复SparkContext。
package com.lyz.streaming.checkpointimport org.apache.hadoop.io.{LongWritable, Text}
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.{DStream, InputDStream}object MetaStoreCheckPointTest {def main(args: Array[String]): Unit = {//设置检查点目录val checkPointDir = "hdfs://192.168.101.187:8020/spark/checkpoint"//从检查点上得到StreamingContext,如果检查点上没有StreamingContext那么就创建一个新的StreamingContext。val streaming: StreamingContext = StreamingContext.getOrCreate(checkPointDir, () => createContext(checkPointDir))streaming.start()streaming.awaitTermination()}def createContext(checkPointDir: String): StreamingContext = {//创建SparkConfval conf = new SparkConf().setMaster("local[2]").setAppName("Streamingtest").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")//创建StreamingContextval streaming = new StreamingContext(conf, Seconds(5))/**** 在HDFS上存储state的检查点,必须设置,否则会报错,因为每次更新state的时候* 都需要去检查点获取上一次更新后的state*/streaming.checkpoint(checkPointDir)val stream: InputDStream[(LongWritable, Text)] = streaming.fileStream[LongWritable, Text, TextInputFormat]("hdfs://192.168.101.187:8020/spark/data")val mapStream: DStream[(String, Int)] = stream.map(x => (x._2.toString, 1))val res: DStream[(String, Int)] = mapStream.reduceByKeyAndWindow((a: Int, b: Int) => a + b, Seconds(30), Seconds(10))res.foreachRDD(_.foreach(println(_)))streaming}
}

这篇关于Spark Streaming(二十七)DStream的转换、输出、缓存持久化、检查点的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/753071

相关文章

缓存雪崩问题

缓存雪崩是缓存中大量key失效后当高并发到来时导致大量请求到数据库,瞬间耗尽数据库资源,导致数据库无法使用。 解决方案: 1、使用锁进行控制 2、对同一类型信息的key设置不同的过期时间 3、缓存预热 1. 什么是缓存雪崩 缓存雪崩是指在短时间内,大量缓存数据同时失效,导致所有请求直接涌向数据库,瞬间增加数据库的负载压力,可能导致数据库性能下降甚至崩溃。这种情况往往发生在缓存中大量 k

顺序表之创建,判满,插入,输出

文章目录 🍊自我介绍🍊创建一个空的顺序表,为结构体在堆区分配空间🍊插入数据🍊输出数据🍊判断顺序表是否满了,满了返回值1,否则返回0🍊main函数 你的点赞评论就是对博主最大的鼓励 当然喜欢的小伙伴可以:点赞+关注+评论+收藏(一键四连)哦~ 🍊自我介绍   Hello,大家好,我是小珑也要变强(也是小珑),我是易编程·终身成长社群的一名“创始团队·嘉宾”

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出

AI(文生语音)-TTS 技术线路探索学习:从拼接式参数化方法到Tacotron端到端输出 在数字化时代,文本到语音(Text-to-Speech, TTS)技术已成为人机交互的关键桥梁,无论是为视障人士提供辅助阅读,还是为智能助手注入声音的灵魂,TTS 技术都扮演着至关重要的角色。从最初的拼接式方法到参数化技术,再到现今的深度学习解决方案,TTS 技术经历了一段长足的进步。这篇文章将带您穿越时

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

PDF 软件如何帮助您编辑、转换和保护文件。

如何找到最好的 PDF 编辑器。 无论您是在为您的企业寻找更高效的 PDF 解决方案,还是尝试组织和编辑主文档,PDF 编辑器都可以在一个地方提供您需要的所有工具。市面上有很多 PDF 编辑器 — 在决定哪个最适合您时,请考虑这些因素。 1. 确定您的 PDF 文档软件需求。 不同的 PDF 文档软件程序可以具有不同的功能,因此在决定哪个是最适合您的 PDF 软件之前,请花点时间评估您的

Redis中使用布隆过滤器解决缓存穿透问题

一、缓存穿透(失效)问题 缓存穿透是指查询一个一定不存在的数据,由于缓存中没有命中,会去数据库中查询,而数据库中也没有该数据,并且每次查询都不会命中缓存,从而每次请求都直接打到了数据库上,这会给数据库带来巨大压力。 二、布隆过滤器原理 布隆过滤器(Bloom Filter)是一种空间效率很高的随机数据结构,它利用多个不同的哈希函数将一个元素映射到一个位数组中的多个位置,并将这些位置的值置

C# double[] 和Matlab数组MWArray[]转换

C# double[] 转换成MWArray[], 直接赋值就行             MWNumericArray[] ma = new MWNumericArray[4];             double[] dT = new double[] { 0 };             double[] dT1 = new double[] { 0,2 };

如何将一个文件里不包含某个字符的行输出到另一个文件?

第一种: grep -v 'string' filename > newfilenamegrep -v 'string' filename >> newfilename 第二种: sed -n '/string/!'p filename > newfilenamesed -n '/string/!'p filename >> newfilename

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录

Detectorn2预训练模型复现:数据准备、训练命令、日志分析与输出目录 在深度学习项目中,目标检测是一项重要的任务。本文将详细介绍如何使用Detectron2进行目标检测模型的复现训练,涵盖训练数据准备、训练命令、训练日志分析、训练指标以及训练输出目录的各个文件及其作用。特别地,我们将演示在训练过程中出现中断后,如何使用 resume 功能继续训练,并将我们复现的模型与Model Zoo中的

防止缓存击穿、缓存穿透和缓存雪崩

使用Redis缓存防止缓存击穿、缓存穿透和缓存雪崩 在高并发系统中,缓存击穿、缓存穿透和缓存雪崩是三种常见的缓存问题。本文将介绍如何使用Redis、分布式锁和布隆过滤器有效解决这些问题,并且会通过Java代码详细说明实现的思路和原因。 1. 背景 缓存穿透:指的是大量请求缓存中不存在且数据库中也不存在的数据,导致大量请求直接打到数据库上,形成数据库压力。 缓存击穿:指的是某个热点数据在