Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系

本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

四.RDD行动算子

行动算子

  1. 所谓的行动算子,其实就是触发作业(Job)执行的方法
  2. 底层代码调用的是环境对象的runJob方法
  3. 底层代码中会创建ActiveJob,并提交执行。

1.reduce

➢ 函数签名
def reduce(f: (T, T) => T): T
➢ 函数说明
聚集 RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

    val rdd=sc.makeRDD(List(1,2,3,4))val result = rdd.reduce(_+_)println(result)

输出10

2.collect

➢ 函数签名
def collect(): Array[T]
➢ 函数说明
在驱动程序中,以数组 Array 的形式返回数据集的所有元素

	val rdd=sc.makeRDD(List(1,2,3,4))val ints: Array[Int] = rdd.collect()println(ints.mkString(","))

输出:1,2,3,4

3.count

➢ 函数签名
def count(): Long
➢ 函数说明
返回 RDD 中元素的个数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val countResult: Long = rdd.count()

输出:4

4.first

➢ 函数签名
def first(): T
➢ 函数说明
返回 RDD 中的第一个元素

	val rdd=sc.makeRDD(List(1,2,3,4))val first = rdd.first()println(first)

输出:1

5.take

➢ 函数签名
def take(num: Int): Array[T]
➢ 函数说明
返回一个由 RDD 的前 n 个元素组成的数组

	val rdd=sc.makeRDD(List(1,2,3,4))val ints1: Array[Int] = rdd.take(num=3)println(ints1.mkString(","))

输出:1,2,3

6.takeOrdered

➢ 函数签名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
➢ 函数说明
返回该 RDD 排序后的前 n 个元素组成的数组

	val rdd1=sc.makeRDD(List(4,2,3,4))val ints2: Array[Int] = rdd.takeOrdered(num=3)println(ints2.mkString(","))

输出:1,2,3

7.aggrgate

➢ 函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
➢ 函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

	val rdd=sc.makeRDD(List(1,2,3,4))val result: Int = rdd.aggregate(zeroValue = 0)(_+_,_+_)println(result)

输出:10
区别:
aggregateByKey : 初始值只会参与分区内计算
aggregate : 初始值会参与分区内计算,并且和参与分区间计算

 val rdd = sc.makeRDD(List(1,2,3,4),2)//10 + 13 + 17 = 40
// aggregateByKey : 初始值只会参与分区内计算
// aggregate : 初始值会参与分区内计算,并且和参与分区间计算
val result = rdd.aggregate(10)(_+_, _+_)println(result)

输出:40

8.fold

➢ 函数签名
def fold(zeroValue: T)(op: (T, T) => T): T
➢ 函数说明
折叠操作,aggregate 的简化版操作

val rdd = sc.makeRDD(List(1,2,3,4),2)
val result = rdd.fold(10)(_+_)println(result)

输出:40

9.countByValue

	val rdd=sc.makeRDD(List(1,1,3,4))val intToLong: collection.Map[Int, Long] = rdd.countByValue()println(intToLong)

输出:Map(4 -> 1, 1 -> 2, 3 -> 1)

10.countByKey

➢ 函数签名
def countByKey(): Map[K, Long]
➢ 函数说明
统计每种 key 的个数

	val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3)))val stringToLong: collection.Map[String, Long] = rdd.countByKey()println(stringToLong)

输出:Map(a -> 3)

11.save相关算子

➢ 函数签名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
➢ 函数说明
将数据保存到不同格式的文件中

 val rdd = sc.makeRDD(List(("a", 1),("a", 2),("a", 3) ))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile方法要求数据的格式必须为K-V类型
rdd.saveAsSequenceFile("output2")

11. foreach

➢ 函数签名
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
➢ 函数说明
分布式遍历 RDD 中的每一个元素,调用指定函数

	val rdd = sc.makeRDD(List(1,2,3,4))// foreach 其实是Driver端内存集合的循环遍历方法rdd.collect().foreach(println)println("aaaaaa------aaaaaaaa")// foreach 其实是Executor端内存数据打印rdd.foreach(println)

输出:
1
2
3
4
aaaaaa------aaaaaaaa
1
2
3
4
算子 : Operator(操作)
RDD的方法和Scala集合对象的方法不一样
集合对象的方法都是在同一个节点的内存中完成的。
RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
为了区分不同的处理效果,所以将RDD的方法称之为算子。
RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。

  • Scala方法图示

在这里插入图片描述

  • RDD方法图示
    在这里插入图片描述

五.RDD依赖关系

1.血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
在这里插入图片描述
在这里插入图片描述eg:wordcount
在这里插入图片描述
代码展示

def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount").set("spark.testing.memory","2147480000")val sc = new SparkContext(sparConf)val lines: RDD[String] = sc.textFile("datas/word.txt")println(lines.toDebugString)println("/*************************")val words: RDD[String] = lines.flatMap(_.split(" "))println(words.toDebugString)println("/*************************")val wordToOne = words.map(word=>(word,1))println(wordToOne.toDebugString)println("/*************************")val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)println(wordToSum.toDebugString)println("/*************************")val array: Array[(String, Int)] = wordToSum.collect()array.foreach(println)sc.stop()}

输出:

(1) datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) MapPartitionsRDD[3] at map at RDD_Depenency.scala:16 []
| MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) ShuffledRDD[4] at reduceByKey at RDD_Depenency.scala:19 []
±(1) MapPartitionsRDD[3] at map at RDD_Depenency.scala:16 []
| MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(spark,1)
(Hello,2)
(Scala,1)

2.依赖关系

依赖关系其实就是两个相邻 RDD 之间的关系

val lines: RDD[String] = sc.textFile("datas/word.txt")println(lines.dependencies)println("/*************************")val words: RDD[String] = lines.flatMap(_.split(" "))println(words.dependencies)println("/*************************")val wordToOne = words.map(word=>(word,1))println(wordToOne.dependencies)println("/*************************")val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)println(wordToSum.dependencies)println("/*************************")val array: Array[(String, Int)] = wordToSum.collect()array.foreach(println)

List(org.apache.spark.OneToOneDependency@bb9ab64)
/*************************
List(org.apache.spark.OneToOneDependency@3b05a99b)
/*************************
List(org.apache.spark.OneToOneDependency@889d9e8)
/*************************
21/01/29 15:58:01 INFO FileInputFormat: Total input paths to process : 1
List(org.apache.spark.ShuffleDependency@700f518a)
/*************************
(spark,1)
(Hello,2)
(Scala,1)

3.窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)

在这里插入图片描述

4.宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle
源码展示:

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]]

在这里插入图片描述

5.阶段划分

一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了RDD 的转换过程和任务的阶段

在这里插入图片描述RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
即:遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。一个宽依赖就分一个stage,每个shuffle之前都是一个stage。
在这里插入图片描述

6.任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
在这里插入图片描述在spark中Task的类型分为2种:ShuffleMapTask和ResultTask;
DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中。
在这里插入图片描述

这篇关于Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/592025

相关文章

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

Java调用Python代码的几种方法小结

《Java调用Python代码的几种方法小结》Python语言有丰富的系统管理、数据处理、统计类软件包,因此从java应用中调用Python代码的需求很常见、实用,本文介绍几种方法从java调用Pyt... 目录引言Java core使用ProcessBuilder使用Java脚本引擎总结引言python

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Mysql 中的多表连接和连接类型详解

《Mysql中的多表连接和连接类型详解》这篇文章详细介绍了MySQL中的多表连接及其各种类型,包括内连接、左连接、右连接、全外连接、自连接和交叉连接,通过这些连接方式,可以将分散在不同表中的相关数据... 目录什么是多表连接?1. 内连接(INNER JOIN)2. 左连接(LEFT JOIN 或 LEFT

Java中ArrayList的8种浅拷贝方式示例代码

《Java中ArrayList的8种浅拷贝方式示例代码》:本文主要介绍Java中ArrayList的8种浅拷贝方式的相关资料,讲解了Java中ArrayList的浅拷贝概念,并详细分享了八种实现浅... 目录引言什么是浅拷贝?ArrayList 浅拷贝的重要性方法一:使用构造函数方法二:使用 addAll(

Java中switch-case结构的使用方法举例详解

《Java中switch-case结构的使用方法举例详解》:本文主要介绍Java中switch-case结构使用的相关资料,switch-case结构是Java中处理多个分支条件的一种有效方式,它... 目录前言一、switch-case结构的基本语法二、使用示例三、注意事项四、总结前言对于Java初学者

Linux内核之内核裁剪详解

《Linux内核之内核裁剪详解》Linux内核裁剪是通过移除不必要的功能和模块,调整配置参数来优化内核,以满足特定需求,裁剪的方法包括使用配置选项、模块化设计和优化配置参数,图形裁剪工具如makeme... 目录简介一、 裁剪的原因二、裁剪的方法三、图形裁剪工具四、操作说明五、make menuconfig

JAVA利用顺序表实现“杨辉三角”的思路及代码示例

《JAVA利用顺序表实现“杨辉三角”的思路及代码示例》杨辉三角形是中国古代数学的杰出研究成果之一,是我国北宋数学家贾宪于1050年首先发现并使用的,:本文主要介绍JAVA利用顺序表实现杨辉三角的思... 目录一:“杨辉三角”题目链接二:题解代码:三:题解思路:总结一:“杨辉三角”题目链接题目链接:点击这里

SpringBoot使用注解集成Redis缓存的示例代码

《SpringBoot使用注解集成Redis缓存的示例代码》:本文主要介绍在SpringBoot中使用注解集成Redis缓存的步骤,包括添加依赖、创建相关配置类、需要缓存数据的类(Tes... 目录一、创建 Caching 配置类二、创建需要缓存数据的类三、测试方法Spring Boot 熟悉后,集成一个外

详解Java中的敏感信息处理

《详解Java中的敏感信息处理》平时开发中常常会遇到像用户的手机号、姓名、身份证等敏感信息需要处理,这篇文章主要为大家整理了一些常用的方法,希望对大家有所帮助... 目录前后端传输AES 对称加密RSA 非对称加密混合加密数据库加密MD5 + Salt/SHA + SaltAES 加密平时开发中遇到像用户的