sparkRDD操作

2024-06-04 09:18
文章标签 操作 sparkrdd

本文主要是介绍sparkRDD操作,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RDD介绍

标签(空格分隔): spark


hadoop,spark,kafka交流群:224209501

RDD 操作

1,RDD五大特点:

1,A list of partions
一系列的分片:比如64M一个分片,类似于hadoop的splits。
2,A function for computing each split
在每个分区上都有一个函数去迭代、执行、计算它。
3,A list dependancies on other RDDS
一系列的依赖:RDDa转换为RDDb,RDDb转换为RDDc;那么就是RDDc依赖于RDDb,RDDb依赖于RDDa。
4,Optionally,a Partioner for key-value RDDs(e.g。to say that the RDD is hash-partioned)
对于key-value的RDD可以指定一个Partioner,来告诉他如何进行分片;常用的有hash,range。
5,Optionally,a list of preferred location(s) to compute each split on (e.g.block locations for an HDFS file)
要运行的计算/执行最好在哪(几)个机器上运行。数据的本地性。
比如:hadoop,默认位置有三个,或者spark cache到内存是可能通过StorageLevel设置了多个副本,所以一个partion可能返回多个最佳位置。

2,RDDs的2 种类型的操作

RDDs 支持 2 种类型的操作:转换(transformations) 从已经存在的数据集中创建一个新的数据集;动作(actions) 在数据集上进行计算之后返回一个值到驱动程序。例如,map 是一个转换操作,它将每一个数据集元素传递给一个函数并且返回一个新的 RDD。另一方面,reduce 是一个动作,它使用相同的函数来聚合 RDD 的所有元素,并且将最终的结果返回到驱动程序(不过也有一个并行 reduceByKey 能返回一个分布式数据集)。

在 Spark 中,所有的转换(transformations)都是惰性(lazy)的,它们不会马上计算它们的结果。相反的,它们仅仅记录转换操作是应用到哪些基础数据集(例如一个文件)上的。转换仅仅在这个时候计算:当动作(action) 需要一个结果返回给驱动程序的时候。这个设计能够让 Spark 运行得更加高效。例如,我们可以实现:通过map 创建一个新数据集在 reduce 中使用,并且仅仅返回 reduce 的结果给 driver,而不是整个大的映射过的数据集。

默认情况下,每一个转换过的 RDD 会在每次执行动作(action)的时候重新计算一次。然而,你也可以使用persist (或 cache)方法持久化(persist)一个 RDD 到内存中。在这个情况下,Spark 会在集群上保存相关的元素,在你下次查询的时候会变得更快。在这里也同样支持持久化 RDD 到磁盘,或在多个节点间复制。

3,Transformations函数

TransformationMeaning
map(func)Return a new distributed dataset formed by passing each element of the source through a function func.
filter(func)Return a new dataset formed by selecting those elements of the source on which func returns true.
flatMap(func)Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).
mapPartitions(func)Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator => Iterator when running on an RDD of type T.
mapPartitionsWithIndex(func)Similar to mapPartitions, but also provides func with an integer value representing the index of the partition, so func must be of type (Int, Iterator) => Iterator when running on an RDD of type T.
sample(withReplacement, fraction, seed)Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed.
union(otherDataset)Return a new dataset that contains the union of the elements in the source dataset and the argument.
intersection(otherDataset)Return a new RDD that contains the intersection of elements in the source dataset and the argument.
distinct([numTasks]))Return a new dataset that contains the distinct elements of the source dataset.
groupByKey([numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable) pairs. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance. Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.
reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral “zero” value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
sortByKey([ascending], [numTasks])When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in ascending or descending order, as specified in the boolean ascending argument.
join(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
cogroup(otherDataset, [numTasks])When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable, Iterable)) tuples. This operation is also called groupWith.
cartesian(otherDataset)When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
pipe(command, [envVars])Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the process’s stdin and lines output to its stdout are returned as an RDD of strings.
coalesce(numPartitions)Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.
repartition(numPartitions)Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them. This always shuffles all data over the network.
repartitionAndSortWithinPartitions(partitioner)Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys. This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

action

4,Action函数

ActionMeaning
reduce(func)Aggregate the elements of the dataset using a function func (which takes two arguments and returns one). The function should be commutative and associative so that it can be computed correctly in parallel.
collect()Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
count()Return the number of elements in the dataset.
first()Return the first element of the dataset (similar to take(1)).
take(n)Return an array with the first n elements of the dataset.
takeSample(withReplacement, num, [seed])Return an array with a random sample of num elements of the dataset, with or without replacement, optionally pre-specifying a random number generator seed.
takeOrdered(n, [ordering])Return the first n elements of the RDD using either their natural order or a custom comparator.
saveAsTextFile(path)Write the elements of the dataset as a text file (or set of text files) in a given directory in the local filesystem, HDFS or any other Hadoop-supported file system. Spark will call toString on each element to convert it to a line of text in the file.
saveAsSequenceFile(path) (Java and Scala)Write the elements of the dataset as a Hadoop SequenceFile in a given path in the local filesystem, HDFS or any other Hadoop-supported file system. This is available on RDDs of key-value pairs that either implement Hadoop’s Writable interface. In Scala, it is also available on types that are implicitly convertible to Writable (Spark includes conversions for basic types like Int, Double, String, etc).
saveAsObjectFile(path) (Java and Scala)Write the elements of the dataset in a simple format using Java serialization, which can then be loaded using SparkContext.objectFile().
countByKey()Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with the count of each key.
foreach(func)Run a function func on each element of the dataset. This is usually done for side effects such as updating an accumulator variable (see below) or interacting with external storage systems.

RDD 持久化

1,持久化介绍

Spark最重要的一个功能是它可以通过各种操作(operations)持久化(或者缓存)一个集合到内存中。当你持久化一个RDD的时候,每一个节点都将参与计算的所有分区数据存储到内存中,并且这些 数据可以被这个集合(以及这个集合衍生的其他集合)的动作(action)重复利用。这个能力使后续的动作速度更快(通常快10倍以上)。对应迭代算法和快速的交互使用来说,缓存是一个关键的工具。

你能通过persist()或者cache()方法持久化一个rdd。首先,在action中计算得到rdd;然后,将其保存在每个节点的内存中。Spark的缓存是一个容错的技术-如果RDD的任何一个分区丢失,它 可以通过原有的转换(transformations)操作自动的重复计算并且创建出这个分区。

此外,我们可以利用不同的存储级别存储每一个被持久化的RDD。例如,它允许我们持久化集合到磁盘上、将集合作为序列化的Java对象持久化到内存中、在节点间复制集合或者存储集合到 Tachyon中。我们可以通过传递一个StorageLevel对象给persist()方法设置这些存储级别。cache()方法使用了默认的存储级别—StorageLevel.MEMORY_ONLY。完整的存储级别介绍如下所示:

Storage LevelMeaning
MEMORY_ONLY将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,一些分区将不会被缓存,从而在每次需要这些分区时都需重新计算它们。这是系统默认的存储级别。
MEMORY_AND_DISK将RDD作为非序列化的Java对象存储在jvm中。如果RDD不适合存在内存中,将这些不适合存在内存中的分区存储在磁盘中,每次需要时读出它们。
MEMORY_ONLY_SER将RDD作为序列化的Java对象存储(每个分区一个byte数组)。这种方式比非序列化方式更节省空间,特别是用到快速的序列化工具时,但是会更耗费cpu资源—密集的读操作。
MEMORY_AND_DISK_SER和MEMORY_ONLY_SER类似,但不是在每次需要时重复计算这些不适合存储到内存中的分区,而是将这些分区存储到磁盘中。
DISK_ONLY仅仅将RDD分区存储到磁盘中
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc.和上面的存储级别类似,但是复制每个分区到集群的两个节点上面
OFF_HEAP (experimental)以序列化的格式存储RDD到Tachyon中。相对于MEMORY_ONLY_SER,OFF_HEAP减少了垃圾回收的花费,允许更小的执行者共享内存池。这使其在拥有大量内存的环境下或者多并发应用程序的环境中具有更强的吸引力。

NOTE:在python中,存储的对象都是通过Pickle库序列化了的,所以是否选择序列化等级并不重要。

Spark也会自动持久化一些shuffle操作(如reduceByKey)中的中间数据,即使用户没有调用persist方法。这样的好处是避免了在shuffle出错情况下,需要重复计算整个输入。如果用户计划重用计算过程中产生的RDD,我们仍然推荐用户调用persist方法。

2,使用时机

1,大量数据的输入的时候,包括从文件系统读取或者shuffle获取
2,计算比较耗时的RDD
3,链条比较长的
4,cheakpoint的时候
主要是避免缓存丢失要重新计算带来的性能的开销
checkpoint.png-92.5kB

3,如何选择存储级别

Spark的多个存储级别意味着在内存利用率和cpu利用效率间的不同权衡。我们推荐通过下面的过程选择一个合适的存储级别:

  • MEMORY_ONLY
    如果你的RDD适合默认的存储级别(MEMORY_ONLY),就选择默认的存储级别。因为这是cpu利用率最高的选项,会使RDD上的操作尽可能的快。
  • MEMORY_ONLY_SER
    如果不适合用默认的级别,选择MEMORY_ONLY_SER。选择一个更快的序列化库提高对象的空间使用率,但是仍能够相当快的访问。
  • 除非函数计算RDD的花费较大或者它们需要过滤大量的数据,不要将RDD存储到磁盘上,否则,重复计算一个分区就会和重磁盘上读取数据一样慢。
  • 如果你希望更快的错误恢复,可以利用重复(replicated)存储级别。所有的存储级别都可以通过重复计算丢失的数据来支持完整的容错,但是重复的数据能够使你在RDD上继续运行任务,而不需要重复计算丢失的数据。
    在拥有大量内存的环境中或者多应用程序的环境中,OFF_HEAP具有如下优势:
    1,它运行多个执行者共享Tachyon中相同的内存池
    2,它显著地减少垃圾回收的花费
    3,如果单个的执行者崩溃,缓存的数据不会丢失

4,删除数据

Spark自动的监控每个节点缓存的使用情况,利用最近最少使用原则删除老旧的数据。如果你想手动的删除RDD,可以使用RDD.unpersist()方法

窄依赖和宽依赖,

1,窄依赖:

一个窄依赖在一个stage
1,每一个parentRDD的RDDpartition只能被一个子RDDpartition使用
2,输入输出一对一的算子,且结果RDD的分区结构不变,主要是map、flatmap
3,输入输出一对一,淡结果RDD的分区结构发生变化,如union、coaesce
4,从输入中选择部分元素的原子,如filter、distinct、subtract、sample

2,宽依赖

1,子RDD的每个分区依赖于所有父RDD分区。
2,对于单个RDD基于key进行重组和reduce,如groupByKey,reduceByKey;
3,对于两个RDD基于key进行join和重组,如join
宽窄依赖.png-185.2kB
stage里面计算是并行的,因为互相独立
wordcountstage.png-187.4kB

resultTask和shuffleMapTask

A ShuffleMapTask divides the elements of an RDD into multiple buckets (based on a partitionerspecified in the ShuffleDependency).

A task that sends back the output to the driver application.

这篇关于sparkRDD操作的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!


原文地址:
本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.chinasem.cn/article/1029668

相关文章

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

C# WinForms存储过程操作数据库的实例讲解

《C#WinForms存储过程操作数据库的实例讲解》:本文主要介绍C#WinForms存储过程操作数据库的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、存储过程基础二、C# 调用流程1. 数据库连接配置2. 执行存储过程(增删改)3. 查询数据三、事务处

Java使用Curator进行ZooKeeper操作的详细教程

《Java使用Curator进行ZooKeeper操作的详细教程》ApacheCurator是一个基于ZooKeeper的Java客户端库,它极大地简化了使用ZooKeeper的开发工作,在分布式系统... 目录1、简述2、核心功能2.1 CuratorFramework2.2 Recipes3、示例实践3

Java利用JSONPath操作JSON数据的技术指南

《Java利用JSONPath操作JSON数据的技术指南》JSONPath是一种强大的工具,用于查询和操作JSON数据,类似于SQL的语法,它为处理复杂的JSON数据结构提供了简单且高效... 目录1、简述2、什么是 jsONPath?3、Java 示例3.1 基本查询3.2 过滤查询3.3 递归搜索3.4

Python使用DrissionPage中ChromiumPage进行自动化网页操作

《Python使用DrissionPage中ChromiumPage进行自动化网页操作》DrissionPage作为一款轻量级且功能强大的浏览器自动化库,为开发者提供了丰富的功能支持,本文将使用Dri... 目录前言一、ChromiumPage基础操作1.初始化Drission 和 ChromiumPage

利用Go语言开发文件操作工具轻松处理所有文件

《利用Go语言开发文件操作工具轻松处理所有文件》在后端开发中,文件操作是一个非常常见但又容易出错的场景,本文小编要向大家介绍一个强大的Go语言文件操作工具库,它能帮你轻松处理各种文件操作场景... 目录为什么需要这个工具?核心功能详解1. 文件/目录存javascript在性检查2. 批量创建目录3. 文件

Redis中管道操作pipeline的实现

《Redis中管道操作pipeline的实现》RedisPipeline是一种优化客户端与服务器通信的技术,通过批量发送和接收命令减少网络往返次数,提高命令执行效率,本文就来介绍一下Redis中管道操... 目录什么是pipeline场景一:我要向Redis新增大批量的数据分批处理事务( MULTI/EXE

使用Python高效获取网络数据的操作指南

《使用Python高效获取网络数据的操作指南》网络爬虫是一种自动化程序,用于访问和提取网站上的数据,Python是进行网络爬虫开发的理想语言,拥有丰富的库和工具,使得编写和维护爬虫变得简单高效,本文将... 目录网络爬虫的基本概念常用库介绍安装库Requests和BeautifulSoup爬虫开发发送请求解

Oracle存储过程里操作BLOB的字节数据的办法

《Oracle存储过程里操作BLOB的字节数据的办法》该篇文章介绍了如何在Oracle存储过程中操作BLOB的字节数据,作者研究了如何获取BLOB的字节长度、如何使用DBMS_LOB包进行BLOB操作... 目录一、缘由二、办法2.1 基本操作2.2 DBMS_LOB包2.3 字节级操作与RAW数据类型2.

JDK多版本共存并自由切换的操作指南(本文为JDK8和JDK17)

《JDK多版本共存并自由切换的操作指南(本文为JDK8和JDK17)》本文介绍了如何在Windows系统上配置多版本JDK(以JDK8和JDK17为例),并通过图文结合的方式给大家讲解了详细步骤,具有... 目录第一步 下载安装JDK第二步 配置环境变量第三步 切换JDK版本并验证可能遇到的问题前提:公司常