Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系

本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

四.RDD行动算子

行动算子

  1. 所谓的行动算子,其实就是触发作业(Job)执行的方法
  2. 底层代码调用的是环境对象的runJob方法
  3. 底层代码中会创建ActiveJob,并提交执行。

1.reduce

➢ 函数签名
def reduce(f: (T, T) => T): T
➢ 函数说明
聚集 RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

    val rdd=sc.makeRDD(List(1,2,3,4))val result = rdd.reduce(_+_)println(result)

输出10

2.collect

➢ 函数签名
def collect(): Array[T]
➢ 函数说明
在驱动程序中,以数组 Array 的形式返回数据集的所有元素

	val rdd=sc.makeRDD(List(1,2,3,4))val ints: Array[Int] = rdd.collect()println(ints.mkString(","))

输出:1,2,3,4

3.count

➢ 函数签名
def count(): Long
➢ 函数说明
返回 RDD 中元素的个数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val countResult: Long = rdd.count()

输出:4

4.first

➢ 函数签名
def first(): T
➢ 函数说明
返回 RDD 中的第一个元素

	val rdd=sc.makeRDD(List(1,2,3,4))val first = rdd.first()println(first)

输出:1

5.take

➢ 函数签名
def take(num: Int): Array[T]
➢ 函数说明
返回一个由 RDD 的前 n 个元素组成的数组

	val rdd=sc.makeRDD(List(1,2,3,4))val ints1: Array[Int] = rdd.take(num=3)println(ints1.mkString(","))

输出:1,2,3

6.takeOrdered

➢ 函数签名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
➢ 函数说明
返回该 RDD 排序后的前 n 个元素组成的数组

	val rdd1=sc.makeRDD(List(4,2,3,4))val ints2: Array[Int] = rdd.takeOrdered(num=3)println(ints2.mkString(","))

输出:1,2,3

7.aggrgate

➢ 函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
➢ 函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

	val rdd=sc.makeRDD(List(1,2,3,4))val result: Int = rdd.aggregate(zeroValue = 0)(_+_,_+_)println(result)

输出:10
区别:
aggregateByKey : 初始值只会参与分区内计算
aggregate : 初始值会参与分区内计算,并且和参与分区间计算

 val rdd = sc.makeRDD(List(1,2,3,4),2)//10 + 13 + 17 = 40
// aggregateByKey : 初始值只会参与分区内计算
// aggregate : 初始值会参与分区内计算,并且和参与分区间计算
val result = rdd.aggregate(10)(_+_, _+_)println(result)

输出:40

8.fold

➢ 函数签名
def fold(zeroValue: T)(op: (T, T) => T): T
➢ 函数说明
折叠操作,aggregate 的简化版操作

val rdd = sc.makeRDD(List(1,2,3,4),2)
val result = rdd.fold(10)(_+_)println(result)

输出:40

9.countByValue

	val rdd=sc.makeRDD(List(1,1,3,4))val intToLong: collection.Map[Int, Long] = rdd.countByValue()println(intToLong)

输出:Map(4 -> 1, 1 -> 2, 3 -> 1)

10.countByKey

➢ 函数签名
def countByKey(): Map[K, Long]
➢ 函数说明
统计每种 key 的个数

	val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3)))val stringToLong: collection.Map[String, Long] = rdd.countByKey()println(stringToLong)

输出:Map(a -> 3)

11.save相关算子

➢ 函数签名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
➢ 函数说明
将数据保存到不同格式的文件中

 val rdd = sc.makeRDD(List(("a", 1),("a", 2),("a", 3) ))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile方法要求数据的格式必须为K-V类型
rdd.saveAsSequenceFile("output2")

11. foreach

➢ 函数签名
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
➢ 函数说明
分布式遍历 RDD 中的每一个元素,调用指定函数

	val rdd = sc.makeRDD(List(1,2,3,4))// foreach 其实是Driver端内存集合的循环遍历方法rdd.collect().foreach(println)println("aaaaaa------aaaaaaaa")// foreach 其实是Executor端内存数据打印rdd.foreach(println)

输出:
1
2
3
4
aaaaaa------aaaaaaaa
1
2
3
4
算子 : Operator(操作)
RDD的方法和Scala集合对象的方法不一样
集合对象的方法都是在同一个节点的内存中完成的。
RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
为了区分不同的处理效果,所以将RDD的方法称之为算子。
RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。

  • Scala方法图示

在这里插入图片描述

  • RDD方法图示
    在这里插入图片描述

五.RDD依赖关系

1.血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
在这里插入图片描述
在这里插入图片描述eg:wordcount
在这里插入图片描述
代码展示

def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount").set("spark.testing.memory","2147480000")val sc = new SparkContext(sparConf)val lines: RDD[String] = sc.textFile("datas/word.txt")println(lines.toDebugString)println("/*************************")val words: RDD[String] = lines.flatMap(_.split(" "))println(words.toDebugString)println("/*************************")val wordToOne = words.map(word=>(word,1))println(wordToOne.toDebugString)println("/*************************")val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)println(wordToSum.toDebugString)println("/*************************")val array: Array[(String, Int)] = wordToSum.collect()array.foreach(println)sc.stop()}

输出:

(1) datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) MapPartitionsRDD[3] at map at RDD_Depenency.scala:16 []
| MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) ShuffledRDD[4] at reduceByKey at RDD_Depenency.scala:19 []
±(1) MapPartitionsRDD[3] at map at RDD_Depenency.scala:16 []
| MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(spark,1)
(Hello,2)
(Scala,1)

2.依赖关系

依赖关系其实就是两个相邻 RDD 之间的关系

val lines: RDD[String] = sc.textFile("datas/word.txt")println(lines.dependencies)println("/*************************")val words: RDD[String] = lines.flatMap(_.split(" "))println(words.dependencies)println("/*************************")val wordToOne = words.map(word=>(word,1))println(wordToOne.dependencies)println("/*************************")val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)println(wordToSum.dependencies)println("/*************************")val array: Array[(String, Int)] = wordToSum.collect()array.foreach(println)

List(org.apache.spark.OneToOneDependency@bb9ab64)
/*************************
List(org.apache.spark.OneToOneDependency@3b05a99b)
/*************************
List(org.apache.spark.OneToOneDependency@889d9e8)
/*************************
21/01/29 15:58:01 INFO FileInputFormat: Total input paths to process : 1
List(org.apache.spark.ShuffleDependency@700f518a)
/*************************
(spark,1)
(Hello,2)
(Scala,1)

3.窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)

在这里插入图片描述

4.宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle
源码展示:

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]]

在这里插入图片描述

5.阶段划分

一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了RDD 的转换过程和任务的阶段

在这里插入图片描述RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
即:遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。一个宽依赖就分一个stage,每个shuffle之前都是一个stage。
在这里插入图片描述

6.任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
在这里插入图片描述在spark中Task的类型分为2种:ShuffleMapTask和ResultTask;
DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中。
在这里插入图片描述

这篇关于Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/592025

相关文章

MySQL 主从复制部署及验证(示例详解)

《MySQL主从复制部署及验证(示例详解)》本文介绍MySQL主从复制部署步骤及学校管理数据库创建脚本,包含表结构设计、示例数据插入和查询语句,用于验证主从同步功能,感兴趣的朋友一起看看吧... 目录mysql 主从复制部署指南部署步骤1.环境准备2. 主服务器配置3. 创建复制用户4. 获取主服务器状态5

一文详解如何使用Java获取PDF页面信息

《一文详解如何使用Java获取PDF页面信息》了解PDF页面属性是我们在处理文档、内容提取、打印设置或页面重组等任务时不可或缺的一环,下面我们就来看看如何使用Java语言获取这些信息吧... 目录引言一、安装和引入PDF处理库引入依赖二、获取 PDF 页数三、获取页面尺寸(宽高)四、获取页面旋转角度五、判断

Spring Boot中的路径变量示例详解

《SpringBoot中的路径变量示例详解》SpringBoot中PathVariable通过@PathVariable注解实现URL参数与方法参数绑定,支持多参数接收、类型转换、可选参数、默认值及... 目录一. 基本用法与参数映射1.路径定义2.参数绑定&nhttp://www.chinasem.cnbs

MySql基本查询之表的增删查改+聚合函数案例详解

《MySql基本查询之表的增删查改+聚合函数案例详解》本文详解SQL的CURD操作INSERT用于数据插入(单行/多行及冲突处理),SELECT实现数据检索(列选择、条件过滤、排序分页),UPDATE... 目录一、Create1.1 单行数据 + 全列插入1.2 多行数据 + 指定列插入1.3 插入否则更

Redis中Stream详解及应用小结

《Redis中Stream详解及应用小结》RedisStreams是Redis5.0引入的新功能,提供了一种类似于传统消息队列的机制,但具有更高的灵活性和可扩展性,本文给大家介绍Redis中Strea... 目录1. Redis Stream 概述2. Redis Stream 的基本操作2.1. XADD

Spring StateMachine实现状态机使用示例详解

《SpringStateMachine实现状态机使用示例详解》本文介绍SpringStateMachine实现状态机的步骤,包括依赖导入、枚举定义、状态转移规则配置、上下文管理及服务调用示例,重点解... 目录什么是状态机使用示例什么是状态机状态机是计算机科学中的​​核心建模工具​​,用于描述对象在其生命

Java JDK1.8 安装和环境配置教程详解

《JavaJDK1.8安装和环境配置教程详解》文章简要介绍了JDK1.8的安装流程,包括官网下载对应系统版本、安装时选择非系统盘路径、配置JAVA_HOME、CLASSPATH和Path环境变量,... 目录1.下载JDK2.安装JDK3.配置环境变量4.检验JDK官网下载地址:Java Downloads

使用Python删除Excel中的行列和单元格示例详解

《使用Python删除Excel中的行列和单元格示例详解》在处理Excel数据时,删除不需要的行、列或单元格是一项常见且必要的操作,本文将使用Python脚本实现对Excel表格的高效自动化处理,感兴... 目录开发环境准备使用 python 删除 Excphpel 表格中的行删除特定行删除空白行删除含指定

MySQL中的LENGTH()函数用法详解与实例分析

《MySQL中的LENGTH()函数用法详解与实例分析》MySQLLENGTH()函数用于计算字符串的字节长度,区别于CHAR_LENGTH()的字符长度,适用于多字节字符集(如UTF-8)的数据验证... 目录1. LENGTH()函数的基本语法2. LENGTH()函数的返回值2.1 示例1:计算字符串

Spring Boot spring-boot-maven-plugin 参数配置详解(最新推荐)

《SpringBootspring-boot-maven-plugin参数配置详解(最新推荐)》文章介绍了SpringBootMaven插件的5个核心目标(repackage、run、start... 目录一 spring-boot-maven-plugin 插件的5个Goals二 应用场景1 重新打包应用