Spark Checkpoint读操作代码分析

2024-05-27 12:58

本文主要是介绍Spark Checkpoint读操作代码分析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

《Spark RDD缓存代码分析》
   《Spark Task序列化代码分析》
   《Spark分区器HashPartitioner和RangePartitioner代码详解》
   《Spark Checkpoint读操作代码分析》
   《Spark Checkpoint写操作代码分析》

  上次介绍了RDD的Checkpint写过程(《Spark Checkpoint写操作代码分析》),本文将介绍RDD如何读取已经Checkpint的数据。在RDD Checkpint完之后,Checkpint的信息(比如数据存放的目录)都由RDDCheckpointData去管理,所以当下次计算依赖了这个RDD的时候,首先是根据依赖关系判断出当前这个RDD是否被Checkpint了,主要是通过RDD的dependencies决定:

final def dependencies : Seq[Dependency[ _ ]] = {
   checkpointRDD.map(r = > List( new OneToOneDependency(r))).getOrElse {
     if (dependencies _ == null ) {
       dependencies _ = getDependencies
     }
     dependencies _
   }
}

  如果RDD被Checkpint了,那么checkpointRDD为Some(CheckpointRDD[T])了,所以依赖的RDD变成了CheckpointRDD。在计算数据的过程中会调用RDD的iterator方法:

final def iterator(split : Partition, context : TaskContext) : Iterator[T] = {
   if (storageLevel ! = StorageLevel.NONE) {
     < span class = "wp_keywordlink_affiliate" >< a href = "http://www.iteblog.com/archives/tag/spark" title = "" target = "_blank" data-original-title = "View all posts in Spark" > Spark < /a >< /span > Env.get.cacheManager.getOrCompute( this , split, context, storageLevel)
   } else {
     computeOrReadCheckpoint(split, context)
   }
}
private [spark] def computeOrReadCheckpoint(split : Partition, context : TaskContext) : Iterator[T] =
{
    if (isCheckpointed) firstParent[T].iterator(split, context) else compute(split, context)
}

  计算的过程中首先会判断RDD是否被Checkpint了,而RDD Checkpint写之后这个条件肯定是true的。而firstParent已经变成了CheckpointRDD,所以会调用CheckpointRDD的iterator方法, 该方法最终会调用ReliableCheckpointRDD的compute方法:

override def compute(split : Partition, context : TaskContext) : Iterator[T] = {
   val file = new Path(checkpointPath, ReliableCheckpointRDD.checkpointFileName(split.index))
   ReliableCheckpointRDD.readCheckpointFile(file, broadcastedConf, context)
}

  在compute方法中会通过ReliableCheckpointRDD的readCheckpointFile方法来从file路径里面读出已经Checkpint的数据,readCheckpointFile的实现如下:

def readCheckpointFile[T](
     path : Path,
     broadcastedConf : Broadcast[SerializableConfiguration],
     context : TaskContext) : Iterator[T] = {
   val env = < span class = "wp_keywordlink_affiliate" >< a href = "http://www.iteblog.com/archives/tag/spark" title = "" target = "_blank" data-original-title = "View all posts in Spark" > Spark < /a >< /span > Env.get
   val fs = path.getFileSystem(broadcastedConf.value.value)
   val bufferSize = env.conf.getInt( "spark.buffer.size" , 65536 )
   val fileInputStream = fs.open(path, bufferSize)
   val serializer = env.serializer.newInstance()
   val deserializeStream = serializer.deserializeStream(fileInputStream)
   // Register an on-task-completion callback to close the input stream.
   context.addTaskCompletionListener(context = > deserializeStream.close())
   deserializeStream.asIterator.asInstanceOf[Iterator[T]]
}

最后数据就回被全部读取出来,整个Checkpint读过程完成了。


 转载自过往记忆(http://www.iteblog.com/)

这篇关于Spark Checkpoint读操作代码分析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1007540

相关文章

java之Objects.nonNull用法代码解读

《java之Objects.nonNull用法代码解读》:本文主要介绍java之Objects.nonNull用法代码,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐... 目录Java之Objects.nonwww.chinasem.cnNull用法代码Objects.nonN

Spring事务中@Transactional注解不生效的原因分析与解决

《Spring事务中@Transactional注解不生效的原因分析与解决》在Spring框架中,@Transactional注解是管理数据库事务的核心方式,本文将深入分析事务自调用的底层原理,解释为... 目录1. 引言2. 事务自调用问题重现2.1 示例代码2.2 问题现象3. 为什么事务自调用会失效3

SpringBoot实现MD5加盐算法的示例代码

《SpringBoot实现MD5加盐算法的示例代码》加盐算法是一种用于增强密码安全性的技术,本文主要介绍了SpringBoot实现MD5加盐算法的示例代码,文中通过示例代码介绍的非常详细,对大家的学习... 目录一、什么是加盐算法二、如何实现加盐算法2.1 加盐算法代码实现2.2 注册页面中进行密码加盐2.

python+opencv处理颜色之将目标颜色转换实例代码

《python+opencv处理颜色之将目标颜色转换实例代码》OpenCV是一个的跨平台计算机视觉库,可以运行在Linux、Windows和MacOS操作系统上,:本文主要介绍python+ope... 目录下面是代码+ 效果 + 解释转HSV: 关于颜色总是要转HSV的掩膜再标注总结 目标:将红色的部分滤

找不到Anaconda prompt终端的原因分析及解决方案

《找不到Anacondaprompt终端的原因分析及解决方案》因为anaconda还没有初始化,在安装anaconda的过程中,有一行是否要添加anaconda到菜单目录中,由于没有勾选,导致没有菜... 目录问题原因问http://www.chinasem.cn题解决安装了 Anaconda 却找不到 An

Spring定时任务只执行一次的原因分析与解决方案

《Spring定时任务只执行一次的原因分析与解决方案》在使用Spring的@Scheduled定时任务时,你是否遇到过任务只执行一次,后续不再触发的情况?这种情况可能由多种原因导致,如未启用调度、线程... 目录1. 问题背景2. Spring定时任务的基本用法3. 为什么定时任务只执行一次?3.1 未启用

在C#中调用Python代码的两种实现方式

《在C#中调用Python代码的两种实现方式》:本文主要介绍在C#中调用Python代码的两种实现方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录C#调用python代码的方式1. 使用 Python.NET2. 使用外部进程调用 Python 脚本总结C#调

Java时间轮调度算法的代码实现

《Java时间轮调度算法的代码实现》时间轮是一种高效的定时调度算法,主要用于管理延时任务或周期性任务,它通过一个环形数组(时间轮)和指针来实现,将大量定时任务分摊到固定的时间槽中,极大地降低了时间复杂... 目录1、简述2、时间轮的原理3. 时间轮的实现步骤3.1 定义时间槽3.2 定义时间轮3.3 使用时

Python使用DrissionPage中ChromiumPage进行自动化网页操作

《Python使用DrissionPage中ChromiumPage进行自动化网页操作》DrissionPage作为一款轻量级且功能强大的浏览器自动化库,为开发者提供了丰富的功能支持,本文将使用Dri... 目录前言一、ChromiumPage基础操作1.初始化Drission 和 ChromiumPage

利用Go语言开发文件操作工具轻松处理所有文件

《利用Go语言开发文件操作工具轻松处理所有文件》在后端开发中,文件操作是一个非常常见但又容易出错的场景,本文小编要向大家介绍一个强大的Go语言文件操作工具库,它能帮你轻松处理各种文件操作场景... 目录为什么需要这个工具?核心功能详解1. 文件/目录存javascript在性检查2. 批量创建目录3. 文件