Spark RDD序列化与血缘关系

2024-04-30 18:08

本文主要是介绍Spark RDD序列化与血缘关系,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RDD序列化

从计算的角度, 算子以外的代码都是在 Driver 端执行, 算子里面的代码都是在 Executor端执行。所以需要涉及到网络传输,且如果计算中涉及到算子以外的数据那么就要求这个数据进行序列化,因为没有序列化就代表无法进行网络传输也就无法将值传到其它Excutor端执行,就会发生错误。

(1)闭包检查
在scala的函数式编程中,函数内经常会用到函数外变量,这样就会形成闭包效果,因此,在分布式系统中,就会存在对象需要在Driver和Executor间传递,那么就需要传递的对象被序列化,因此在计算之前就会进行闭包检查,即检查使用到的对象是否进行了序列化。

(2)Kryo序列化框架
这个序列化框架不同于Java的序列化框架,相比于java的序列化框架,这个框架速度更快,生成的序列化文件更小,而且这个框架会忽略Java的一些序列化机制,例如Java中使用了transient关键字忽略对一些数据的序列化,而Kryo会忽略这个关键字。Spark 出于性能的考虑,Spark2.0 开始支持另外一种 Kryo 序列化机制。Kryo 速度是 Serializable 的 10 倍。当 RDD 在 Shuffle 数据的时候,简单数据类型、数组和字符串类型已经在 Spark 内部使用 Kryo 来序列化。

样例:

def main(args: Array[String]): Unit = {val conf: SparkConf = new SparkConf().setAppName("SparkCoreTest").setMaster("local[2]")val sc: SparkContext = new SparkContext(conf)val rdd: RDD[String] = sc.makeRDD(Array("hello world", "hello spark", "hive", "atguigu"))val search = new Search("hello")//函数传递,打印:ERROR Task not serializablesearch.getMatch1(rdd).collect().foreach(println)//若将query变为方法中的局部变量那么就不会报错search.getMatch2(rdd).collect().foreach(println)sc.stop()
}class Search(query:String) {def isMatch(s: String): Boolean = {s.contains(query)}// 函数序列化案例def getMatch1(rdd: RDD[String]): RDD[String] = {//rdd.filter(this.isMatch)rdd.filter(isMatch)}// 属性序列化案例def getMatch2(rdd: RDD[String]): RDD[String] = {val s = queryrdd.filter(x => x.contains(s))}
}

之前发生错误是因为涉及到search中的属性query,query在scala语法中代表Search的构造参数,而sparkRDD在执行前需要进行闭包检测。对类的构造参数进行闭包检测其实就相当于对这个类进行闭包检测,而Search类是没有进行序列化的,所以就会报Task not serializable错。

RDD血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
在这里插入图片描述

val rdd: RDD[String] = sc.makeRDD(List("hello world", "hello spark"))
println(rdd.toDebugString)	// 打印出当前RDD的血缘关系
println("*************************")val words: RDD[String] = rdd.flatMap((_: String).split(" "))
println(words.toDebugString)
println("*************************")val wordToOne: RDD[(String, Int)] = words.map((word: String) =>(word,1))
println(wordToOne.toDebugString)
println("*************************")val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey((_: Int)+(_: Int))
println(wordToSum.toDebugString)
println("*************************")val array: Array[(String, Int)] = wordToSum.collect()
array.foreach(println)

在这里插入图片描述

这篇关于Spark RDD序列化与血缘关系的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/949557

相关文章

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Java中JSON字符串反序列化(动态泛型)

《Java中JSON字符串反序列化(动态泛型)》文章讨论了在定时任务中使用反射调用目标对象时处理动态参数的问题,通过将方法参数存储为JSON字符串并进行反序列化,可以实现动态调用,然而,这种方式容易导... 需求:定时任务扫描,反射调用目标对象,但是,方法的传参不是固定的。方案一:将方法参数存成jsON字

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

Python---文件IO流及对象序列化

文章目录 前言一、pandas是什么?二、使用步骤 1.引入库2.读入数据总结 前言 前文模块中提到加密模块,本文将终点介绍加密模块和文件流。 一、文件流和IO流概述         在Python中,IO流是用于输入和输出数据的通道。它可以用于读取输入数据或将数据写入输出目标。IO流可以是标准输入/输出流(stdin和stdout),也可以是文件流,网络流等。

jquery 表单序列化

jQuery序列化表单的方法总结 现在这里贴出案例中静态的html网页内容: <!DOCTYPE html><html lang="zh"><head><meta charset="UTF-8"><title>Title</title><script src="../js/jquery-3.2.1.js"></script></head><body><form method="post"

Java反序列化漏洞-TemplatesImpl利用链分析

文章目录 一、前言二、正文1. 寻找利用链2. 构造POC2.1 生成字节码2.2 加载字节码1)getTransletInstance2)defineTransletClasses 2.3 创建实例 3. 完整POC 三、参考文章 一、前言 java.lang.ClassLoader#defineClass defineClass可以加载字节码,但由于defineClas

Spring之——整合Redis序列化方式StringRedisSerializer、FastJsonRedisSerializer和KryoRedisSerializer

当我们的数据存储到Redis的时候,我们的键(key)和值(value)都是通过Spring提供的Serializer序列化到数据库的。RedisTemplate默认使用的是JdkSerializationRedisSerializer,StringRedisTemplate默认使用的是StringRedisSerializer。 Spring Data JPA为我们提供了下面的Serializ

使用 `readResolve` 防止序列化破坏单例模式

单例模式是一种设计模式,其目的是确保一个类只有一个实例,并提供一个全局访问点。在 Java 中,我们常常通过私有化构造方法和提供静态访问方法来实现单例。然而,尽管这些手段可以有效防止类的实例化,反射和序列化依然能够破坏单例模式的唯一性。本文将重点讲解序列化如何破坏单例模式,以及如何通过 readResolve 方法来防止这种破坏。 1. 序列化和反序列化 序列化 是指将对象的状态转换为字节

RDD的map和flatMap

在 Apache Spark 中,map 和 flatMap 是 RDD(弹性分布式数据集)中最常用的转换操作之一。 map 假设你有一个包含整数的 RDD,你想要计算每个元素的平方。 from pyspark import SparkContextsc = SparkContext(appName="MapExample")# 创建一个包含整数的 RDDnumbers = sc.para

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e