Spark_Spark 中 checkpoint 的正确使用方式 以及 与 cache区别

2024-05-03 05:48

本文主要是介绍Spark_Spark 中 checkpoint 的正确使用方式 以及 与 cache区别,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.Spark性能调优:checkPoint的使用

https://blog.csdn.net/leen0304/article/details/78718346

 

概述

    checkpoint的意思就是建立检查点,类似于快照,例如在spark计算里面,计算流程DAG特别长,服务器需要将整个DAG计算完成得出结果,但是如果在这很长的计算流程中突然中间算出的数据丢失了,spark又会根据RDD的依赖关系从头到尾计算一遍,这样子就很费性能,当然我们可以将中间的计算结果通过cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)。

 

使用Checkpoint

    使用checkpoint 需要 先设置 checkpoint 的目录,例如如下代码:

val sparkConf = new SparkConfsparkConf.setAppName("JOINSkewedData").set("spark.sql.autoBroadcastJoinThreshold", "1048576") //1M broadcastJOIN//.set("spark.sql.autoBroadcastJoinThreshold", "104857600") //100M broadcastJOIN.set("spark.sql.shuffle.partitions", "3")if (args.length > 0 && args(0).equals("ide")) {sparkConf.setMaster("local[3]")}val spark = SparkSession.builder().config(sparkConf).getOrCreate()val sparkContext = spark.sparkContextsparkContext.setLogLevel("WARN")sparkContext.setCheckpointDir("file:///D:/checkpoint/")

不同环境的设置代码

有的时候需要本地调试,需要设置为windows 或者 linux 的本地目录

windows 

 sparkContext.setCheckpointDir("file:///D:/checkpoint/")

linux 

sparkContext.setCheckpointDir("file:///tmp/checkpoint")

hdfs

sparkContext.setCheckpointDir("hdfs://leen:8020/checkPointDir")

 

调用checkpoint 

使用 checkpoint 的时候,需要在建立 checkpoint 的 rdd 上进行函数调用即可

rdd.checkpoint

 

注意 :

  使用 checkpoint 的时候,建议先将 rdd.cache 一次,因为 checkpoint 是 transform 算子,

  执行的时候相当于走了两次流程,前面计算了一遍,然后checkpoint又会计算一次,所以一般我们先进行cache然后做checkpoint就会只走一次流程,checkpoint的时候就会从刚cache到内存中取数据写入hdfs中,如下:

rdd.cache()

rdd.checkpoint()

rdd.collect

 

 

Sparkstreaming 中的 checkpoint

   在streaming中使用checkpoint主要包含以下两点:设置checkpoint目录,初始化StreamingContext时调用getOrCreate方法,即当checkpoint目录没有数据时,则新建streamingContext实例,并且设置checkpoint目录,否则从checkpoint目录中读取相关配置和数据创建streamingcontext。

// Function to create and setup a new StreamingContext
def functionToCreateContext(): StreamingContext = {val ssc = new StreamingContext(...)   // new contextval lines = ssc.socketTextStream(...) // create DStreams...ssc.checkpoint(checkpointDirectory)   // set checkpoint directoryssc
}
// Get StreamingContext from checkpoint data or create a new one
val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)

 

 

Checkpoint 与 cache 区别

  checkpoint 与 cache 是不一样的,checkpoint 会切除前面算子的rdd 依赖, 而 cache 是将数据暂存在一个具体的位置。

 

rdd 的 checkpoint 实现

/*** Mark this RDD for checkpointing. It will be saved to a file inside the checkpoint* directory set with `SparkContext#setCheckpointDir` and all references to its parent* RDDs will be removed. This function must be called before any job has been* executed on this RDD. It is strongly recommended that this RDD is persisted in* memory, otherwise saving it on a file will require recomputation.*/def checkpoint(): Unit = RDDCheckpointData.synchronized {// NOTE: we use a global lock here due to complexities downstream with ensuring// children RDD partitions point to the correct parent partitions. In the future// we should revisit this consideration.if (context.checkpointDir.isEmpty) {throw new SparkException("Checkpoint directory has not been set in the SparkContext")} else if (checkpointData.isEmpty) {checkpointData = Some(new ReliableRDDCheckpointData(this))}}

 

dataframe 的 checkpoint 实现

 /*** Eagerly checkpoint a Dataset and return the new Dataset. Checkpointing can be used to truncate* the logical plan of this Dataset, which is especially useful in iterative algorithms where the* plan may grow exponentially. It will be saved to files inside the checkpoint* directory set with `SparkContext#setCheckpointDir`.** @group basic* @since 2.1.0*/@Experimental@InterfaceStability.Evolvingdef checkpoint(): Dataset[T] = checkpoint(eager = true)

 

 

这篇关于Spark_Spark 中 checkpoint 的正确使用方式 以及 与 cache区别的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/955958

相关文章

SpringBoot中@Value注入静态变量方式

《SpringBoot中@Value注入静态变量方式》SpringBoot中静态变量无法直接用@Value注入,需通过setter方法,@Value(${})从属性文件获取值,@Value(#{})用... 目录项目场景解决方案注解说明1、@Value("${}")使用示例2、@Value("#{}"php

SpringBoot分段处理List集合多线程批量插入数据方式

《SpringBoot分段处理List集合多线程批量插入数据方式》文章介绍如何处理大数据量List批量插入数据库的优化方案:通过拆分List并分配独立线程处理,结合Spring线程池与异步方法提升效率... 目录项目场景解决方案1.实体类2.Mapper3.spring容器注入线程池bejsan对象4.创建

Python使用FastAPI实现大文件分片上传与断点续传功能

《Python使用FastAPI实现大文件分片上传与断点续传功能》大文件直传常遇到超时、网络抖动失败、失败后只能重传的问题,分片上传+断点续传可以把大文件拆成若干小块逐个上传,并在中断后从已完成分片继... 目录一、接口设计二、服务端实现(FastAPI)2.1 运行环境2.2 目录结构建议2.3 serv

Spring Security简介、使用与最佳实践

《SpringSecurity简介、使用与最佳实践》SpringSecurity是一个能够为基于Spring的企业应用系统提供声明式的安全访问控制解决方案的安全框架,本文给大家介绍SpringSec... 目录一、如何理解 Spring Security?—— 核心思想二、如何在 Java 项目中使用?——

springboot中使用okhttp3的小结

《springboot中使用okhttp3的小结》OkHttp3是一个JavaHTTP客户端,可以处理各种请求类型,比如GET、POST、PUT等,并且支持高效的HTTP连接池、请求和响应缓存、以及异... 在 Spring Boot 项目中使用 OkHttp3 进行 HTTP 请求是一个高效且流行的方式。

Vue和React受控组件的区别小结

《Vue和React受控组件的区别小结》本文主要介绍了Vue和React受控组件的区别小结,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一起学... 目录背景React 的实现vue3 的实现写法一:直接修改事件参数写法二:通过ref引用 DOMVu

Java使用Javassist动态生成HelloWorld类

《Java使用Javassist动态生成HelloWorld类》Javassist是一个非常强大的字节码操作和定义库,它允许开发者在运行时创建新的类或者修改现有的类,本文将简单介绍如何使用Javass... 目录1. Javassist简介2. 环境准备3. 动态生成HelloWorld类3.1 创建CtC

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Java使用jar命令配置服务器端口的完整指南

《Java使用jar命令配置服务器端口的完整指南》本文将详细介绍如何使用java-jar命令启动应用,并重点讲解如何配置服务器端口,同时提供一个实用的Web工具来简化这一过程,希望对大家有所帮助... 目录1. Java Jar文件简介1.1 什么是Jar文件1.2 创建可执行Jar文件2. 使用java

C#使用Spire.Doc for .NET实现HTML转Word的高效方案

《C#使用Spire.Docfor.NET实现HTML转Word的高效方案》在Web开发中,HTML内容的生成与处理是高频需求,然而,当用户需要将HTML页面或动态生成的HTML字符串转换为Wor... 目录引言一、html转Word的典型场景与挑战二、用 Spire.Doc 实现 HTML 转 Word1