Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系

本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

四.RDD行动算子

行动算子

  1. 所谓的行动算子,其实就是触发作业(Job)执行的方法
  2. 底层代码调用的是环境对象的runJob方法
  3. 底层代码中会创建ActiveJob,并提交执行。

1.reduce

➢ 函数签名
def reduce(f: (T, T) => T): T
➢ 函数说明
聚集 RDD中的所有元素,先聚合分区内数据,再聚合分区间数据

    val rdd=sc.makeRDD(List(1,2,3,4))val result = rdd.reduce(_+_)println(result)

输出10

2.collect

➢ 函数签名
def collect(): Array[T]
➢ 函数说明
在驱动程序中,以数组 Array 的形式返回数据集的所有元素

	val rdd=sc.makeRDD(List(1,2,3,4))val ints: Array[Int] = rdd.collect()println(ints.mkString(","))

输出:1,2,3,4

3.count

➢ 函数签名
def count(): Long
➢ 函数说明
返回 RDD 中元素的个数

val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val countResult: Long = rdd.count()

输出:4

4.first

➢ 函数签名
def first(): T
➢ 函数说明
返回 RDD 中的第一个元素

	val rdd=sc.makeRDD(List(1,2,3,4))val first = rdd.first()println(first)

输出:1

5.take

➢ 函数签名
def take(num: Int): Array[T]
➢ 函数说明
返回一个由 RDD 的前 n 个元素组成的数组

	val rdd=sc.makeRDD(List(1,2,3,4))val ints1: Array[Int] = rdd.take(num=3)println(ints1.mkString(","))

输出:1,2,3

6.takeOrdered

➢ 函数签名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
➢ 函数说明
返回该 RDD 排序后的前 n 个元素组成的数组

	val rdd1=sc.makeRDD(List(4,2,3,4))val ints2: Array[Int] = rdd.takeOrdered(num=3)println(ints2.mkString(","))

输出:1,2,3

7.aggrgate

➢ 函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
➢ 函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合

	val rdd=sc.makeRDD(List(1,2,3,4))val result: Int = rdd.aggregate(zeroValue = 0)(_+_,_+_)println(result)

输出:10
区别:
aggregateByKey : 初始值只会参与分区内计算
aggregate : 初始值会参与分区内计算,并且和参与分区间计算

 val rdd = sc.makeRDD(List(1,2,3,4),2)//10 + 13 + 17 = 40
// aggregateByKey : 初始值只会参与分区内计算
// aggregate : 初始值会参与分区内计算,并且和参与分区间计算
val result = rdd.aggregate(10)(_+_, _+_)println(result)

输出:40

8.fold

➢ 函数签名
def fold(zeroValue: T)(op: (T, T) => T): T
➢ 函数说明
折叠操作,aggregate 的简化版操作

val rdd = sc.makeRDD(List(1,2,3,4),2)
val result = rdd.fold(10)(_+_)println(result)

输出:40

9.countByValue

	val rdd=sc.makeRDD(List(1,1,3,4))val intToLong: collection.Map[Int, Long] = rdd.countByValue()println(intToLong)

输出:Map(4 -> 1, 1 -> 2, 3 -> 1)

10.countByKey

➢ 函数签名
def countByKey(): Map[K, Long]
➢ 函数说明
统计每种 key 的个数

	val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3)))val stringToLong: collection.Map[String, Long] = rdd.countByKey()println(stringToLong)

输出:Map(a -> 3)

11.save相关算子

➢ 函数签名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
➢ 函数说明
将数据保存到不同格式的文件中

 val rdd = sc.makeRDD(List(("a", 1),("a", 2),("a", 3) ))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile方法要求数据的格式必须为K-V类型
rdd.saveAsSequenceFile("output2")

11. foreach

➢ 函数签名
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
➢ 函数说明
分布式遍历 RDD 中的每一个元素,调用指定函数

	val rdd = sc.makeRDD(List(1,2,3,4))// foreach 其实是Driver端内存集合的循环遍历方法rdd.collect().foreach(println)println("aaaaaa------aaaaaaaa")// foreach 其实是Executor端内存数据打印rdd.foreach(println)

输出:
1
2
3
4
aaaaaa------aaaaaaaa
1
2
3
4
算子 : Operator(操作)
RDD的方法和Scala集合对象的方法不一样
集合对象的方法都是在同一个节点的内存中完成的。
RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
为了区分不同的处理效果,所以将RDD的方法称之为算子。
RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。

  • Scala方法图示

在这里插入图片描述

  • RDD方法图示
    在这里插入图片描述

五.RDD依赖关系

1.血缘关系

RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
在这里插入图片描述
在这里插入图片描述eg:wordcount
在这里插入图片描述
代码展示

def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount").set("spark.testing.memory","2147480000")val sc = new SparkContext(sparConf)val lines: RDD[String] = sc.textFile("datas/word.txt")println(lines.toDebugString)println("/*************************")val words: RDD[String] = lines.flatMap(_.split(" "))println(words.toDebugString)println("/*************************")val wordToOne = words.map(word=>(word,1))println(wordToOne.toDebugString)println("/*************************")val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)println(wordToSum.toDebugString)println("/*************************")val array: Array[(String, Int)] = wordToSum.collect()array.foreach(println)sc.stop()}

输出:

(1) datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) MapPartitionsRDD[3] at map at RDD_Depenency.scala:16 []
| MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) ShuffledRDD[4] at reduceByKey at RDD_Depenency.scala:19 []
±(1) MapPartitionsRDD[3] at map at RDD_Depenency.scala:16 []
| MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(spark,1)
(Hello,2)
(Scala,1)

2.依赖关系

依赖关系其实就是两个相邻 RDD 之间的关系

val lines: RDD[String] = sc.textFile("datas/word.txt")println(lines.dependencies)println("/*************************")val words: RDD[String] = lines.flatMap(_.split(" "))println(words.dependencies)println("/*************************")val wordToOne = words.map(word=>(word,1))println(wordToOne.dependencies)println("/*************************")val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)println(wordToSum.dependencies)println("/*************************")val array: Array[(String, Int)] = wordToSum.collect()array.foreach(println)

List(org.apache.spark.OneToOneDependency@bb9ab64)
/*************************
List(org.apache.spark.OneToOneDependency@3b05a99b)
/*************************
List(org.apache.spark.OneToOneDependency@889d9e8)
/*************************
21/01/29 15:58:01 INFO FileInputFormat: Total input paths to process : 1
List(org.apache.spark.ShuffleDependency@700f518a)
/*************************
(spark,1)
(Hello,2)
(Scala,1)

3.窄依赖

窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。

class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)

在这里插入图片描述

4.宽依赖

宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle
源码展示:

class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]]

在这里插入图片描述

5.阶段划分

一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage

DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了RDD 的转换过程和任务的阶段

在这里插入图片描述RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
即:遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。一个宽依赖就分一个stage,每个shuffle之前都是一个stage。
在这里插入图片描述

6.任务划分

RDD 任务切分中间分为:Application、Job、Stage 和 Task

  • Application:初始化一个 SparkContext 即生成一个 Application;
  • Job:一个 Action 算子就会生成一个 Job;
  • Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
  • Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数

注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
在这里插入图片描述在spark中Task的类型分为2种:ShuffleMapTask和ResultTask;
DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中。
在这里插入图片描述

这篇关于Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/592025

相关文章

PHP轻松处理千万行数据的方法详解

《PHP轻松处理千万行数据的方法详解》说到处理大数据集,PHP通常不是第一个想到的语言,但如果你曾经需要处理数百万行数据而不让服务器崩溃或内存耗尽,你就会知道PHP用对了工具有多强大,下面小编就... 目录问题的本质php 中的数据流处理:为什么必不可少生成器:内存高效的迭代方式流量控制:避免系统过载一次性

C#实现千万数据秒级导入的代码

《C#实现千万数据秒级导入的代码》在实际开发中excel导入很常见,现代社会中很容易遇到大数据处理业务,所以本文我就给大家分享一下千万数据秒级导入怎么实现,文中有详细的代码示例供大家参考,需要的朋友可... 目录前言一、数据存储二、处理逻辑优化前代码处理逻辑优化后的代码总结前言在实际开发中excel导入很

SpringBoot+RustFS 实现文件切片极速上传的实例代码

《SpringBoot+RustFS实现文件切片极速上传的实例代码》本文介绍利用SpringBoot和RustFS构建高性能文件切片上传系统,实现大文件秒传、断点续传和分片上传等功能,具有一定的参考... 目录一、为什么选择 RustFS + SpringBoot?二、环境准备与部署2.1 安装 RustF

MySQL的JDBC编程详解

《MySQL的JDBC编程详解》:本文主要介绍MySQL的JDBC编程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录前言一、前置知识1. 引入依赖2. 认识 url二、JDBC 操作流程1. JDBC 的写操作2. JDBC 的读操作总结前言本文介绍了mysq

Python实现Excel批量样式修改器(附完整代码)

《Python实现Excel批量样式修改器(附完整代码)》这篇文章主要为大家详细介绍了如何使用Python实现一个Excel批量样式修改器,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一... 目录前言功能特性核心功能界面特性系统要求安装说明使用指南基本操作流程高级功能技术实现核心技术栈关键函

javacv依赖太大导致jar包也大的解决办法

《javacv依赖太大导致jar包也大的解决办法》随着项目的复杂度和依赖关系的增加,打包后的JAR包可能会变得很大,:本文主要介绍javacv依赖太大导致jar包也大的解决办法,文中通过代码介绍的... 目录前言1.检查依赖2.更改依赖3.检查副依赖总结 前言最近在写项目时,用到了Javacv里的获取视频

Redis 的 SUBSCRIBE命令详解

《Redis的SUBSCRIBE命令详解》Redis的SUBSCRIBE命令用于订阅一个或多个频道,以便接收发送到这些频道的消息,本文给大家介绍Redis的SUBSCRIBE命令,感兴趣的朋友跟随... 目录基本语法工作原理示例消息格式相关命令python 示例Redis 的 SUBSCRIBE 命令用于订

使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解

《使用Python批量将.ncm格式的音频文件转换为.mp3格式的实战详解》本文详细介绍了如何使用Python通过ncmdump工具批量将.ncm音频转换为.mp3的步骤,包括安装、配置ffmpeg环... 目录1. 前言2. 安装 ncmdump3. 实现 .ncm 转 .mp34. 执行过程5. 执行结

Python中 try / except / else / finally 异常处理方法详解

《Python中try/except/else/finally异常处理方法详解》:本文主要介绍Python中try/except/else/finally异常处理方法的相关资料,涵... 目录1. 基本结构2. 各部分的作用tryexceptelsefinally3. 执行流程总结4. 常见用法(1)多个e

SpringBoot日志级别与日志分组详解

《SpringBoot日志级别与日志分组详解》文章介绍了日志级别(ALL至OFF)及其作用,说明SpringBoot默认日志级别为INFO,可通过application.properties调整全局或... 目录日志级别1、级别内容2、调整日志级别调整默认日志级别调整指定类的日志级别项目开发过程中,利用日志