Spark_Spark 中 checkpoint 的正确使用方式 以及 与 cache区别

2024-05-03 05:48

本文主要是介绍Spark_Spark 中 checkpoint 的正确使用方式 以及 与 cache区别,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.Spark性能调优:checkPoint的使用

https://blog.csdn.net/leen0304/article/details/78718346

 

概述

    checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面,计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)。

 

使用Checkpoint

    使用checkpoint 需要 先设置 checkpoint 的目录,例如如下代码:

val sparkConf = new SparkConfsparkConf.setAppName("JOINSkewedData").set("spark.sql.autoBroadcastJoinThreshold", "1048576") //1M broadcastJOIN//.set("spark.sql.autoBroadcastJoinThreshold", "104857600") //100M broadcastJOIN.set("spark.sql.shuffle.partitions", "3")if (args.length > 0 && args(0).equals("ide")) {sparkConf.setMaster("local[3]")}val spark = SparkSession.builder().config(sparkConf).getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")sparkContext.setCheckpointDir("file:///D:/checkpoint/")

不同环境的设置代码

有的时候需要本地调试,需要设置为windows 或者 linux 的本地目录

windows 

 sparkContext.setCheckpointDir("file:///D:/checkpoint/")

linux 

sparkContext.setCheckpointDir("file:///tmp/checkpoint")

hdfs

sparkContext.setCheckpointDir("hdfs://leen:8020/checkPointDir")

 

调用checkpoint 

使用 checkpoint 的时候,需要在建立 checkpoint 的 rdd 上进行函数调用即可

rdd.checkpoint

 

注意 :

  使用 checkpoint 的时候,建议先将 rdd.cache 一次,因为 checkpoint 是 transform 算子,

  执行的时候相当于走了两次流程,前面计算了一遍,然后checkpoint又会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中,如下:

rdd.cache()

rdd.checkpoint()

rdd.collect

 

 

Sparkstreaming 中的 checkpoint

   在streaming中使用checkpoint主要包含以下两点:设置checkpoint目录,初始化StreamingContext时调用getOrCreate方法,即当checkpoint目录没有数据时,则新建streamingContext实例,并且设置checkpoint目录,否则从checkpoint目录中读取相关配置和数据创建streamingcontext。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {val ssc = new StreamingContext(...)   // new contextval lines = ssc.socketTextStream(...) // create DStreams...ssc.checkpoint(checkpointDirectory)   // set checkpoint directoryssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

 

 

Checkpoint 与 cache 区别

  checkpoint 与 cache 是不一样的,checkpoint 会切除前面算子的rdd 依赖, 而 cache 是将数据暂存在一个具体的位置。

 

rdd 的 checkpoint 实现

/*** Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint* directory set with `SparkContext#setCheckpointDir` and all references to its parent* RDDs will be removed. This function must be called before any job has been* executed on this RDD. It is strongly recommended that this RDD is persisted in* memory, otherwise saving it on a file will require recomputation.*/def checkpoint(): Unit = RDDCheckpointData.synchronized {// NOTE: we use a global lock here due to complexities downstream with ensuring// children RDD partitions point to the correct parent partitions. In the future// we should revisit this consideration.if (context.checkpointDir.isEmpty) {throw new SparkException("Checkpoint directory has not been set in the SparkContext")} else if (checkpointData.isEmpty) {checkpointData = Some(new ReliableRDDCheckpointData(this))}}

 

dataframe 的 checkpoint 实现

 /*** Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate* the logical plan of this Dataset, which is especially useful in iterative algorithms where the* plan may grow exponentially. It will be saved to files inside the checkpoint* directory set with `SparkContext#setCheckpointDir`.** @group basic* @since 2.1.0*/@Experimental@InterfaceStability.Evolvingdef checkpoint(): Dataset[T] = checkpoint(eager = true)

 

 

这篇关于Spark_Spark 中 checkpoint 的正确使用方式 以及 与 cache区别的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/955958

相关文章

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

Linux线程之线程的创建、属性、回收、退出、取消方式

《Linux线程之线程的创建、属性、回收、退出、取消方式》文章总结了线程管理核心知识:线程号唯一、创建方式、属性设置(如分离状态与栈大小)、回收机制(join/detach)、退出方法(返回/pthr... 目录1. 线程号2. 线程的创建3. 线程属性4. 线程的回收5. 线程的退出6. 线程的取消7.

MyBatis中$与#的区别解析

《MyBatis中$与#的区别解析》文章浏览阅读314次,点赞4次,收藏6次。MyBatis使用#{}作为参数占位符时,会创建预处理语句(PreparedStatement),并将参数值作为预处理语句... 目录一、介绍二、sql注入风险实例一、介绍#(井号):MyBATis使用#{}作为参数占位符时,会

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定

golang程序打包成脚本部署到Linux系统方式

《golang程序打包成脚本部署到Linux系统方式》Golang程序通过本地编译(设置GOOS为linux生成无后缀二进制文件),上传至Linux服务器后赋权执行,使用nohup命令实现后台运行,完... 目录本地编译golang程序上传Golang二进制文件到linux服务器总结本地编译Golang程序

深入理解Go语言中二维切片的使用

《深入理解Go语言中二维切片的使用》本文深入讲解了Go语言中二维切片的概念与应用,用于表示矩阵、表格等二维数据结构,文中通过示例代码介绍的非常详细,需要的朋友们下面随着小编来一起学习学习吧... 目录引言二维切片的基本概念定义创建二维切片二维切片的操作访问元素修改元素遍历二维切片二维切片的动态调整追加行动态

Linux下删除乱码文件和目录的实现方式

《Linux下删除乱码文件和目录的实现方式》:本文主要介绍Linux下删除乱码文件和目录的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux下删除乱码文件和目录方法1方法2总结Linux下删除乱码文件和目录方法1使用ls -i命令找到文件或目录

prometheus如何使用pushgateway监控网路丢包

《prometheus如何使用pushgateway监控网路丢包》:本文主要介绍prometheus如何使用pushgateway监控网路丢包问题,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录监控网路丢包脚本数据图表总结监控网路丢包脚本[root@gtcq-gt-monitor-prome

Python通用唯一标识符模块uuid使用案例详解

《Python通用唯一标识符模块uuid使用案例详解》Pythonuuid模块用于生成128位全局唯一标识符,支持UUID1-5版本,适用于分布式系统、数据库主键等场景,需注意隐私、碰撞概率及存储优... 目录简介核心功能1. UUID版本2. UUID属性3. 命名空间使用场景1. 生成唯一标识符2. 数

Linux在线解压jar包的实现方式

《Linux在线解压jar包的实现方式》:本文主要介绍Linux在线解压jar包的实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录linux在线解压jar包解压 jar包的步骤总结Linux在线解压jar包在 Centos 中解压 jar 包可以使用 u