Spark - Resilient Distributed Datasets (RDDs)介绍

2023-11-02 09:51

本文主要是介绍Spark - Resilient Distributed Datasets (RDDs)介绍,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

RDD介绍

RDD构建

RDD分区数

RDD算子

Transformation算子 

Action算子

RDD持久化

RDD缓存

检查点

RDD共享变量

广播变量

累加器Accumulator


RDD介绍

Resilient Distributed DataSets,弹性分布式数据集,可以把RDD看作一种分布式集合。其RDD本身不存储数据,数据实际存储在内存或磁盘上。同时RDD中的数据不可更改,只能通过算子生成一个新的RDD实现对数据的修改。一个父RDD可以被多个下游RDD依赖,为避免父RDD重复计算,可对父RDD的计算结果缓存。RDD可定义分区规则,实现多线程并发处理RDD中的数据

RDD构建

spark-shell启动spark客户端 sh spark-shell --master local

本地集合,通过sc.parallelize转变为RDD

本地集合,通过sc.makeRDD转变为RDD

本地文件,通过sc.textFile转变为RDD

外部文件,通过sc.textFile转变为RDD

RDD分区数

RDD的分区数建议与CPU核数保持一致,为充分利用CPU性能,可设置为CPU核数的2~3倍,启动时,通过CPU核数确定线程并行度参数   

spark.default.parallelism = 指定的CPU核数(集群模式最小2)

等同于 --master local[指定的CPU核数]

  • 对于本地集合方式构建RDD
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9], 8)

       若指定分区数,采用指定的分区数;若未指定,采用spark.default.parallelism参数值

  • 对于本地/外部文件方式构建RDD
    rdd = sc.textFile(name="file:///export/data/workspace/parent/spark_core/data/", minPartitions=3)
    

    若指定了分区数 minPartitions,就采用指定的分区数;若没有指定分区数,

       defaultMinPartition = (spark.default.parallelism,2)

       RDD分区数 = max(本地file的分片数,defaultMinPartition)

       RDD分区数 =  max(hdfs文件的block数量,defaultMinPartition)

  • wholeTextFiles

        默认,一个目录有多少文件,就对应多少分区,为避免分区数过多,增加线程和IO负                     担,wholeTextFiles专门用于处理小文件,其尽可能减少分区数

rdd = sc.wholeTextFiles(path="file:///export/data/workspace/parent/spark_core/data")
  • 调整分区数       

repartition、coalesce、partitionBy 三个算子可以调整RDD分区数
# 增大分区
rdd = rdd.repartition(5).glom().collect()# 减小分区
rdd = rdd.repartition(5).glom().collect()
# repartition本质上是coalesce的一种当参数2为True的简写方案,# coalesce
# 参数1: 分区数
# 参数2:表示是否存在shuffle, 默认为false
# 参数2为False 只能减少分区,为True可以增大分区
rdd = rdd.coalesce(5).glom().collect()# partitionBy: 专门针对kv类型重分区的函数
# 默认: 根据key进行Hash取模划分操作 ,也可自定义分区规则rm = sc.parallelize([(1, 'c01'), (2, 'c02'), (3, 'c03'),(4, 'c04'), (5, 'c05'), (6, 'c06'),(7, 'c07'), (8, 'c08'), (9, 'c09'),(10, 'c10')], 3)rdd = rm.partitionBy(5).glom().collect()# 自定义分区规则rm1 = rm.partitionBy(2, lambda key: 0 if key <= 5 else 1).glom().collect()

RDD算子

RDD对象中提供的一系列具有特定动能的函数

Transformation算子 

定义RDD的计算规则,不会触发JOB执行,须通过Action算子触发执行

Action算子

产生一个Job任务,触发执行,运行这个Action算子所依赖的所有Transformation算子

RDD持久化

为避免RDD重复计算或RDD数据损坏,其RDD计算结果可以持久化,即RDD数据可以保存到内存或磁盘

RDD缓存

即RDD计算结果的临时缓存, 缓存数据可以保存到内存(executor内存空间),也可以保存到磁盘, 甚至支持将缓存数据保存到堆外内存(executor以外的内存空间)

由于临时存储, 可能存在数据丢失, 当缓存失效后, 可以基于原有依赖关系重新计算。因为缓存操作, 并不会将RDD之间的依赖关系给截断

缓存的API都是LAZY的, 如果需要触发缓存操作, 必须后续跟上一个action算子, 一般建议使用count。 如果不添加action算子, 只有当后续遇到第一个action算子后, 才会触发缓存

  • 设置缓存

    rdd.cache(): 仅能将数据缓存到内存中
    rdd.persist(缓存的级别(位置)): 默认将数据缓存到内存中, 也支持自定义缓存位置

例如:

rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK).count()


   

  • 清理缓存:

    rdd.unpersist()

     默认情况下, Spark程序执行完成后, 缓存会自动失效, 自动删除

  • 使用场景

当某个RDD被使用多次的时候,建议缓存此RDD数据

当某个RDD计算结果复杂,并且使用不止一次,建议缓存此RDD数据

检查点

checkpoint将数据保存到HDFS上,借助HDFS的高容错、高可靠来实现数据最大程度上的安全,实现了RDD的容错和高可用。

构建checkpoint操作后, 会将RDD之间的依赖关系截断, 后续计算如果出现了问题, 直接从检查点的位置恢复数据

集群模式中, checkpoint的保存路径地址必须是HDFS, 如果是local模式, 可以支持本地路径, checkpoint数据不会自动删除, 必须通过手动方式将其删除掉

Checkpoint相关的API:
 设置检查点的保存数据位置 
        sc.setCheckpointDir('路径地址') 默认路径为HDFS
        
  对应RDD上开启检查点
        rdd.checkpoint()
        rdd.count()

Spark容错机制:首先会查看RDD是否被Cache,如果被Cache到内存或磁盘,直接获取,否则查看Checkpoint所指定的HDFS中是否缓存数据,如果都没有则直接从父RDD开始重新计算还原。

一般建议将两种持久化的方案一同作用于项目环境中, 先设置缓存, 然后在设置检查点, 最后统一触发执行(先将数据缓存到内存中, 然后将内存数据写入到磁盘结束, 在使用的时候, 程序会自动优先读取内存, 内存没有读取磁盘)

 # 先设置缓存rdd4.persist(storageLevel=StorageLevel.MEMORY_AND_DISK)# 接着设置checkpointrdd4.checkpoint()# 最后统一触发rdd4.count()

RDD共享变量

默认情况下,当Spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量,在每个任务上都生成一个副本。但是,有时候需要在多个任务之间共享变量,或者在任务(Task) 和 任务控制节点(Driver Program) 之间共享变量

广播变量

在Driver端定义一个共享的变量, 如果不使用广播变量。 各个线程在运行Task的时候, 都需要将这个变量拷贝到自己的线程中, 对网络传输, 内存的使用都是一种浪费, 而且影响效率
    
如果使用广播变量。 会将变量在每个executor上放置一份, 各个线程直接读取executor上的变量,不需要拉取到Task中。 减少副本的数量, 对内存和网络都降低了, 从而提升效率


    
 广播变量是只读的, 各个Task只能读取数据, 不能修改

 

累加器Accumulator

一般,使用RDD的 map() 函数或者用 filter() 传条件时,可以使用驱动器driver程序中定义的变量,但是集群中运行的每个任务都会得到这些变量的一份新的副本,更新这些副本的值也不会影响驱动器中的对应变量。若保持driver程序中变量的原子性,可以使用Accumulator

Spark提供的Accumulator,主要用于多个节点对一个变量进行共享性的操作。Accumulator只提供了累加的功能,即确提供了多个task对一个变量并行操作的功能。但是task只能对Accumulator进行累加操作,不能读取Accumulator的值,只有Driver程序可以读取Accumulator的值。创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名。 

Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。

累加器只能在Driver端定义,在Executor端更新,不能在Executor端定义,不能在Executor端.value获取值。

累加器在遇到多次action操作的时候会出现重复累加求和的问题,解决办法是在调用累加器后的RDD上, 对其设置缓存操作

 # 定义一个公共变量 --> spark的累加器实现acc = sc.accumulator(0)# 2. 执行相关的操作# 2.1 初始化一份数据rdd_init = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8])# 2.2 对每一个元素累加一个指定变量的值的操作:# 需求: 对每一个元素进行 +10返回, 在执行过程中, 请统计共计对多少个数据进行了 +10 操作def fn1(e):acc.add(1)return e + 10rdd_map = rdd_init.map(fn1)# 设置缓存rdd_map.cache().count()# 触发一个Actionrdd_map.reduceByKey()# 触发另一个Actionrdd_filter = rdd_map.filter(lambda e : e > 5)# 2.3 打印结果print(rdd_filter.collect())

RDD依赖

窄依赖

父RDD与子RDD之间的分区一对一,即父RDD中,一个分区内的数据不能分割,只能由子RDD中的一个分区整个利用

宽依赖

父RDD中一个分区的数据被子RDD中的多个分区接收,中间必然存在Shuffle操作;

Shuffle是判断宽窄依赖的重要依据,同时也是划分stage的依据

这篇关于Spark - Resilient Distributed Datasets (RDDs)介绍的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/329796

相关文章

性能测试介绍

性能测试是一种测试方法,旨在评估系统、应用程序或组件在现实场景中的性能表现和可靠性。它通常用于衡量系统在不同负载条件下的响应时间、吞吐量、资源利用率、稳定性和可扩展性等关键指标。 为什么要进行性能测试 通过性能测试,可以确定系统是否能够满足预期的性能要求,找出性能瓶颈和潜在的问题,并进行优化和调整。 发现性能瓶颈:性能测试可以帮助发现系统的性能瓶颈,即系统在高负载或高并发情况下可能出现的问题

水位雨量在线监测系统概述及应用介绍

在当今社会,随着科技的飞速发展,各种智能监测系统已成为保障公共安全、促进资源管理和环境保护的重要工具。其中,水位雨量在线监测系统作为自然灾害预警、水资源管理及水利工程运行的关键技术,其重要性不言而喻。 一、水位雨量在线监测系统的基本原理 水位雨量在线监测系统主要由数据采集单元、数据传输网络、数据处理中心及用户终端四大部分构成,形成了一个完整的闭环系统。 数据采集单元:这是系统的“眼睛”,

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

图神经网络模型介绍(1)

我们将图神经网络分为基于谱域的模型和基于空域的模型,并按照发展顺序详解每个类别中的重要模型。 1.1基于谱域的图神经网络         谱域上的图卷积在图学习迈向深度学习的发展历程中起到了关键的作用。本节主要介绍三个具有代表性的谱域图神经网络:谱图卷积网络、切比雪夫网络和图卷积网络。 (1)谱图卷积网络 卷积定理:函数卷积的傅里叶变换是函数傅里叶变换的乘积,即F{f*g}

C++——stack、queue的实现及deque的介绍

目录 1.stack与queue的实现 1.1stack的实现  1.2 queue的实现 2.重温vector、list、stack、queue的介绍 2.1 STL标准库中stack和queue的底层结构  3.deque的简单介绍 3.1为什么选择deque作为stack和queue的底层默认容器  3.2 STL中对stack与queue的模拟实现 ①stack模拟实现

Mysql BLOB类型介绍

BLOB类型的字段用于存储二进制数据 在MySQL中,BLOB类型,包括:TinyBlob、Blob、MediumBlob、LongBlob,这几个类型之间的唯一区别是在存储的大小不同。 TinyBlob 最大 255 Blob 最大 65K MediumBlob 最大 16M LongBlob 最大 4G

FreeRTOS-基本介绍和移植STM32

FreeRTOS-基本介绍和STM32移植 一、裸机开发和操作系统开发介绍二、任务调度和任务状态介绍2.1 任务调度2.1.1 抢占式调度2.1.2 时间片调度 2.2 任务状态 三、FreeRTOS源码和移植STM323.1 FreeRTOS源码3.2 FreeRTOS移植STM323.2.1 代码移植3.2.2 时钟中断配置 一、裸机开发和操作系统开发介绍 裸机:前后台系

nginx介绍及常用功能

什么是nginx nginx跟Apache一样,是一个web服务器(网站服务器),通过HTTP协议提供各种网络服务。 Apache:重量级的,不支持高并发的服务器。在Apache上运行数以万计的并发访问,会导致服务器消耗大量内存。操作系统对其进行进程或线程间的切换也消耗了大量的CPU资源,导致HTTP请求的平均响应速度降低。这些都决定了Apache不可能成为高性能WEB服务器  nginx:

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

多路转接之select(fd_set介绍,参数详细介绍),实现非阻塞式网络通信

目录 多路转接之select 引入 介绍 fd_set 函数原型 nfds readfds / writefds / exceptfds readfds  总结  fd_set操作接口  timeout timevalue 结构体 传入值 返回值 代码 注意点 -- 调用函数 select的参数填充  获取新连接 注意点 -- 通信时的调用函数 添加新fd到