本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
四.RDD行动算子
行动算子
- 所谓的行动算子,其实就是触发作业(Job)执行的方法
- 底层代码调用的是环境对象的runJob方法
- 底层代码中会创建ActiveJob,并提交执行。
1.reduce
➢ 函数签名
def reduce(f: (T, T) => T): T
➢ 函数说明
聚集 RDD中的所有元素,先聚合分区内数据,再聚合分区间数据
val rdd=sc.makeRDD(List(1,2,3,4))val result = rdd.reduce(_+_)println(result)
输出10
2.collect
➢ 函数签名
def collect(): Array[T]
➢ 函数说明
在驱动程序中,以数组 Array 的形式返回数据集的所有元素
val rdd=sc.makeRDD(List(1,2,3,4))val ints: Array[Int] = rdd.collect()println(ints.mkString(","))
输出:1,2,3,4
3.count
➢ 函数签名
def count(): Long
➢ 函数说明
返回 RDD 中元素的个数
val rdd: RDD[Int] = sc.makeRDD(List(1,2,3,4))
// 返回 RDD 中元素的个数
val countResult: Long = rdd.count()
输出:4
4.first
➢ 函数签名
def first(): T
➢ 函数说明
返回 RDD 中的第一个元素
val rdd=sc.makeRDD(List(1,2,3,4))val first = rdd.first()println(first)
输出:1
5.take
➢ 函数签名
def take(num: Int): Array[T]
➢ 函数说明
返回一个由 RDD 的前 n 个元素组成的数组
val rdd=sc.makeRDD(List(1,2,3,4))val ints1: Array[Int] = rdd.take(num=3)println(ints1.mkString(","))
输出:1,2,3
6.takeOrdered
➢ 函数签名
def takeOrdered(num: Int)(implicit ord: Ordering[T]): Array[T]
➢ 函数说明
返回该 RDD 排序后的前 n 个元素组成的数组
val rdd1=sc.makeRDD(List(4,2,3,4))val ints2: Array[Int] = rdd.takeOrdered(num=3)println(ints2.mkString(","))
输出:1,2,3
7.aggrgate
➢ 函数签名
def aggregate[U: ClassTag](zeroValue: U)(seqOp: (U, T) => U, combOp: (U, U) => U): U
➢ 函数说明
分区的数据通过初始值和分区内的数据进行聚合,然后再和初始值进行分区间的数据聚合
val rdd=sc.makeRDD(List(1,2,3,4))val result: Int = rdd.aggregate(zeroValue = 0)(_+_,_+_)println(result)
输出:10
区别:
aggregateByKey : 初始值只会参与分区内计算
aggregate : 初始值会参与分区内计算,并且和参与分区间计算
val rdd = sc.makeRDD(List(1,2,3,4),2)//10 + 13 + 17 = 40
// aggregateByKey : 初始值只会参与分区内计算
// aggregate : 初始值会参与分区内计算,并且和参与分区间计算
val result = rdd.aggregate(10)(_+_, _+_)println(result)
输出:40
8.fold
➢ 函数签名
def fold(zeroValue: T)(op: (T, T) => T): T
➢ 函数说明
折叠操作,aggregate 的简化版操作
val rdd = sc.makeRDD(List(1,2,3,4),2)
val result = rdd.fold(10)(_+_)println(result)
输出:40
9.countByValue
val rdd=sc.makeRDD(List(1,1,3,4))val intToLong: collection.Map[Int, Long] = rdd.countByValue()println(intToLong)
输出:Map(4 -> 1, 1 -> 2, 3 -> 1)
10.countByKey
➢ 函数签名
def countByKey(): Map[K, Long]
➢ 函数说明
统计每种 key 的个数
val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3)))val stringToLong: collection.Map[String, Long] = rdd.countByKey()println(stringToLong)
输出:Map(a -> 3)
11.save相关算子
➢ 函数签名
def saveAsTextFile(path: String): Unit
def saveAsObjectFile(path: String): Unit
def saveAsSequenceFile(
path: String,
codec: Option[Class[_ <: CompressionCodec]] = None): Unit
➢ 函数说明
将数据保存到不同格式的文件中
val rdd = sc.makeRDD(List(("a", 1),("a", 2),("a", 3) ))
rdd.saveAsTextFile("output")
rdd.saveAsObjectFile("output1")
// saveAsSequenceFile方法要求数据的格式必须为K-V类型
rdd.saveAsSequenceFile("output2")
11. foreach
➢ 函数签名
def foreach(f: T => Unit): Unit = withScope {
val cleanF = sc.clean(f)
sc.runJob(this, (iter: Iterator[T]) => iter.foreach(cleanF))
}
➢ 函数说明
分布式遍历 RDD 中的每一个元素,调用指定函数
val rdd = sc.makeRDD(List(1,2,3,4))// foreach 其实是Driver端内存集合的循环遍历方法rdd.collect().foreach(println)println("aaaaaa------aaaaaaaa")// foreach 其实是Executor端内存数据打印rdd.foreach(println)
输出:
1
2
3
4
aaaaaa------aaaaaaaa
1
2
3
4
算子 : Operator(操作)
RDD的方法和Scala集合对象的方法不一样
集合对象的方法都是在同一个节点的内存中完成的。
RDD的方法可以将计算逻辑发送到Executor端(分布式节点)执行
为了区分不同的处理效果,所以将RDD的方法称之为算子。
RDD的方法外部的操作都是在Driver端执行的,而方法内部的逻辑代码是在Executor端执行。
- Scala方法图示
- RDD方法图示
五.RDD依赖关系
1.血缘关系
RDD 只支持粗粒度转换,即在大量记录上执行的单个操作。将创建 RDD 的一系列 Lineage(血统)记录下来,以便恢复丢失的分区。RDD 的 Lineage 会记录 RDD 的元数据信息和转换行为,当该 RDD 的部分分区数据丢失时,它可以根据这些信息来重新运算和恢复丢失的数据分区。
eg:wordcount
代码展示
def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount").set("spark.testing.memory","2147480000")val sc = new SparkContext(sparConf)val lines: RDD[String] = sc.textFile("datas/word.txt")println(lines.toDebugString)println("/*************************")val words: RDD[String] = lines.flatMap(_.split(" "))println(words.toDebugString)println("/*************************")val wordToOne = words.map(word=>(word,1))println(wordToOne.toDebugString)println("/*************************")val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)println(wordToSum.toDebugString)println("/*************************")val array: Array[(String, Int)] = wordToSum.collect()array.foreach(println)sc.stop()}
输出:
(1) datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) MapPartitionsRDD[3] at map at RDD_Depenency.scala:16 []
| MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(1) ShuffledRDD[4] at reduceByKey at RDD_Depenency.scala:19 []
±(1) MapPartitionsRDD[3] at map at RDD_Depenency.scala:16 []
| MapPartitionsRDD[2] at flatMap at RDD_Depenency.scala:13 []
| datas/word.txt MapPartitionsRDD[1] at textFile at RDD_Depenency.scala:10 []
| datas/word.txt HadoopRDD[0] at textFile at RDD_Depenency.scala:10 []
/*************************
(spark,1)
(Hello,2)
(Scala,1)
2.依赖关系
依赖关系其实就是两个相邻 RDD 之间的关系
val lines: RDD[String] = sc.textFile("datas/word.txt")println(lines.dependencies)println("/*************************")val words: RDD[String] = lines.flatMap(_.split(" "))println(words.dependencies)println("/*************************")val wordToOne = words.map(word=>(word,1))println(wordToOne.dependencies)println("/*************************")val wordToSum: RDD[(String, Int)] = wordToOne.reduceByKey(_+_)println(wordToSum.dependencies)println("/*************************")val array: Array[(String, Int)] = wordToSum.collect()array.foreach(println)
List(org.apache.spark.OneToOneDependency@bb9ab64)
/*************************
List(org.apache.spark.OneToOneDependency@3b05a99b)
/*************************
List(org.apache.spark.OneToOneDependency@889d9e8)
/*************************
21/01/29 15:58:01 INFO FileInputFormat: Total input paths to process : 1
List(org.apache.spark.ShuffleDependency@700f518a)
/*************************
(spark,1)
(Hello,2)
(Scala,1)
3.窄依赖
窄依赖表示每一个父(上游)RDD 的 Partition 最多被子(下游)RDD 的一个 Partition 使用,窄依赖我们形象的比喻为独生子女。
class OneToOneDependency[T](rdd: RDD[T]) extends NarrowDependency[T](rdd)
4.宽依赖
宽依赖表示同一个父(上游)RDD 的 Partition 被多个子(下游)RDD 的 Partition 依赖,会引起 Shuffle
源码展示:
class ShuffleDependency[K: ClassTag, V: ClassTag, C: ClassTag](@transient private val _rdd: RDD[_ <: Product2[K, V]],val partitioner: Partitioner,val serializer: Serializer = SparkEnv.get.serializer,val keyOrdering: Option[Ordering[K]] = None,val aggregator: Option[Aggregator[K, V, C]] = None,val mapSideCombine: Boolean = false)extends Dependency[Product2[K, V]]
5.阶段划分
一个Job会被拆分为多组Task,每组任务被称为一个Stage就像Map Stage, Reduce Stage
DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG 记录了RDD 的转换过程和任务的阶段
RDD之间的依赖关系将DAG图划分为不同的阶段,对于窄依赖,由于partition依赖关系的确定性,partition的转换处理就可以在同一个线程里完成,窄依赖就被spark划分到同一个stage中,而对于宽依赖,只能等父RDD shuffle处理完成后,下一个stage才能开始接下来的计算。
即:遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。一个宽依赖就分一个stage,每个shuffle之前都是一个stage。
6.任务划分
RDD 任务切分中间分为:Application、Job、Stage 和 Task
- Application:初始化一个 SparkContext 即生成一个 Application;
- Job:一个 Action 算子就会生成一个 Job;
- Stage:Stage 等于宽依赖(ShuffleDependency)的个数加 1;
- Task:一个 Stage 阶段中,最后一个 RDD 的分区个数就是 Task 的个数
注意:Application->Job->Stage->Task 每一层都是 1 对 n 的关系。
在spark中Task的类型分为2种:ShuffleMapTask和ResultTask;
DAG的最后一个阶段会为每个结果的partition生成一个ResultTask,即每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition的数量所决定的!而其余所有阶段都会生成ShuffleMapTask;之所以称之为ShuffleMapTask是因为它需要将自己的计算结果通过shuffle到下一个stage中。
这篇关于Spark学习笔记(详解,附代码实列和图解)----------RDD(二)行动算子,依赖关系的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!