Spark Streaming(六)—— 检查点

2024-06-19 04:38
文章标签 spark streaming 检查点

本文主要是介绍Spark Streaming(六)—— 检查点,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

  • 1. 一般会对两种类型的数据使用检查点
  • 2. 何时启用检查点
  • 3. 如何配置检查点
  • 4. 改写之前的WordCount程序

流数据处理程序通常都是全天候运行,因此必须对应用中逻辑无关的故障(例如,系统故障,JVM崩溃等)具有弹性。为了实现这一特性,Spark Streaming需要checkpoint足够的信息到容错存储系统,以便可以从故障中恢复。

1. 一般会对两种类型的数据使用检查点

  • 元数据检查点(Metadatacheckpointing)

    将定义流计算的信息保存到容错存储中(如HDFS)。这用于从运行Streaming程序的Driver程序的节点的故障中恢复。

    元数据包括以下几种:

    • 配置(Configuration) - 用于创建Streaming应用程序的配置信息。

    • DStream操作(DStream operations) - 定义Streaming应用程序的DStream操作集合。

    • 不完整的batch(Incomplete batches) - jobs还在队列中但尚未完成的batch。

  • 数据检查点(Datacheckpointing)

    将生成的RDD保存到可靠的存储层。对于一些需要将多个批次之间的数据进行组合的stateful变换操作,设置数据检查点是必需的。在这些转换操作中,当前生成的RDD依赖于先前批次的RDD,这导致依赖链的长度随时间而不断增加,由此也会导致基于血统机制的恢复时间无限增加。为了避免这种情况,stateful转换的中间RDD将定期设置检查点并保存到到可靠的存储层(例如HDFS)以切断依赖关系链。

总而言之,元数据检查点主要用于从Driver程序故障中恢复,而数据或RDD检查点在任何使用stateful转换时是必须要有的。

2. 何时启用检查点

对于具有以下任一要求的应用程序,必须启用检查点:

  1. 使用状态转换:如果在应用程序中使用updateStateByKey或reduceByKeyAndWindow(具有逆函数),则必须提供检查点目录以允许定期保存RDD检查点。
  2. 从运行应用程序的Driver程序的故障中恢复:元数据检查点用于使用进度信息进行恢复。

3. 如何配置检查点

可以通过在一些可容错、高可靠的文件系统(例如,HDFS,S3等)中设置保存检查点信息的目录来启用检查点。这是通过使用streamingContext.checkpoint(checkpointDirectory)完成的。设置检查点后,就可以使用上述的有状态转换操作。此外,如果要使应用程序从驱动程序故障中恢复,应该重写Streaming应用程序以使程序具有以下行为:

  1. 当程序第一次启动时,它将创建一个新的StreamingContext,设置好所有流数据源,然后调用start()方法。
  2. 当程序在失败后重新启动时,它将从checkpoint目录中的检查点数据重新创建一个StreamingContext。
    使用StreamingContext.getOrCreate可以简化此行为

4. 改写之前的WordCount程序

使得每次计算的结果和状态都保存到检查点目录下

package demoimport org.apache.spark.SparkConf
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.{Seconds, StreamingContext}object MyCheckpointNetworkWordCount {def main(args: Array[String]): Unit = {//在主程序中,创建一个Streaming Context对象//1、读取一个检查点的目录//2、如果该目录下已经存有之前的检查点信息,从已有的信息上创建这个Streaming Context对象//3、如果该目录下没有信息,创建一个新的Streaming Contextval context = StreamingContext.getOrCreate("hdfs://topnpl200:9000/spark_checkpoint",createStreamingContext)//启动任务context.start()context.awaitTermination()}//创建一个StreamingContext对象,并且设置检查点目录,执行WordCount程序(记录之前的状态信息)def createStreamingContext():StreamingContext = {val conf = new SparkConf().setAppName("MyCheckpointNetworkWordCount").setMaster("local[2]")//创建这个StreamingContext对象val ssc = new StreamingContext(conf,Seconds(3))//设置检查点目录ssc.checkpoint("hdfs://topnpl200:9000/spark_checkpoint")//创建一个DStream,执行WordCountval lines = ssc.socketTextStream("192.168.15.131",7788,StorageLevel.MEMORY_AND_DISK_SER)//分词操作val words = lines.flatMap(_.split(" "))//每个单词记一次数val wordPair = words.map(x=> (x,1))//执行单词计数//定义一个新的函数:把当前的值跟之前的结果进行一个累加val addFunc = (currValues:Seq[Int],preValueState:Option[Int]) => {//当前当前批次的值val currentCount = currValues.sum//得到已经累加的值。如果是第一次求和,之前没有数值,从0开始计数val preValueCount = preValueState.getOrElse(0)//进行累加,然后累加后结果,是Option[Int]Some(currentCount + preValueCount)}//要把新的单词个数跟之前的结果进行叠加(累计)val totalCount = wordPair.updateStateByKey[Int](addFunc)//输出结果totalCount.print()//返回这个对象ssc}
}

这篇关于Spark Streaming(六)—— 检查点的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1074033

相关文章

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e

Spark数据介绍

从趋势上看,DataFrame 和 Dataset 更加流行。 示例场景 数据仓库和 BI 工具集成: 如果你需要处理存储在数据仓库中的结构化数据,并且希望与 BI 工具集成,那么 DataFrame 和 Dataset 是首选。 机器学习流水线: 在构建机器学习流水线时,使用 DataFrame 和 Dataset 可以更好地管理数据流,并且可以方便地与 MLlib 集成。 实时数据处理:

Mac搭建华为云平台Hadoop+spark步骤

1、安装终端和文件传输软件 下载、安装、配置 详戳数据平台搭建文件夹 Transmit 用于文件传输 iTerm2    用于终端 2、连接与登录 mac 使用iTerm2快捷登录远程服务器 Mac Transmit连接 (密码不可复制,手动输入) 3、安装jdk 4、修改主机名 Linux系统下如何修改主机名 4、安装配置hadoop

Spark-在集群上运行Spark

Spark-在集群上运行Spark

Spark—数据读取和保存

Spark—数据读取和保存

Spark源码分析之Spark Shell(上)

终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。 先来介绍一下Spark-shell是什么? Spark-shell是提供给用户即时交互的一个命令窗口,你可以在里面编写spark代码,然后根据你的命令立即进行

[大数据之Spark]——快速入门

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用。可以查看 编程指南了解更多的内容。 为了良好的阅读下面的文档,最好是结合实际的练习。首先需要下载spark,然后安装hdfs,可以下载任意版本的hdfs。 Spark Shell 交互 基本操作 Spark Shell提供给用