Spark学习笔记(详解,附代码实列和图解)----------RDD(一)基础和转换算子

2024-01-10 20:48

本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------RDD(一)基础和转换算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

RDD : 弹性分布式数据集

一.简介

RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。

➢ 弹性
--------存储的弹性:内存与磁盘的自动切换;
--------容错的弹性:数据丢失可以自动恢复;
--------计算的弹性:计算出错重试机制;
--------分片的弹性:可根据需要重新分片。
➢ 分布式:数据存储在大数据集群不同节点上
➢ 数据集:RDD 封装了计算逻辑,并不保存数据
➢ 数据抽象:RDD 是一个抽象类,需要子类具体实现
➢ 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的RDD 里面封装计算逻辑
➢ 可分区、并行计算

执行逻辑
在这里插入图片描述

二.基础编程

1.RDD创建

  1. 从集合(内存)中创建 RDD
    从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
    从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法
  2. 从外部存储(文件)创建 RDD
    由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。
package org.xyl;
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDD{def main(args: Array[String]): Unit = {//prepare environmentval sparkConf=new SparkConf().setMaster("local[*]").setAppName("RDD")val sc=new SparkContext(sparkConf)//creat RDD from memoryval seq: Seq[Int] =Seq[Int](1,2,3,4)        val rdd:RDD[Int]=sc.parallelize(seq)val rdd1: RDD[Int] = sc.makeRDD(seq)//creat RDD from file(hdfs or local)val rdd3:RDD[String]=sc.textFile("datas/1.txt")    rdd.collect().foreach(println)sc.stop()    }
}
  1. 从其他 RDD 创建
    主要是通过一个 RDD 运算完后,再产生新的 RDD。
  2. 直接创建 RDD(new)
    使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。

2.RDD 并行度与分区

默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。这里的并行执行的任务数量,并不是指的切分任务的数量。

  • 读取内存数据和文件数据时分区方法不同。
    1.读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark 核心源码如下:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {(0 until numSlices).iterator.map { i =>val start = ((i * length) / numSlices).toIntval end = (((i + 1) * length) / numSlices).toInt(start, end)}

代码示例:

object RDD_Par {def main(args: Array[String]): Unit = {//prepare environmentval sparkConf=new SparkConf().setMaster("local[*]").setAppName("RDD_Par")val sc=new SparkContext(sparkConf)//makeRDD方法传递第二个参数表示分区的数量数, 不传递会使用默认值val rdd=sc.makeRDD(List(1,2,3,4),2) rdd.saveAsTextFile("output")//文件方式的分区采用的是hadoop的分区方式val rdd1=sc.textFile("datas/1.txt",2)rdd.saveAsTextFile("output1")sc.stop()}
}

三.RDD转换算子

  • RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型

1)value类型

1.1 map

def map[U: ClassTag](f: T => U): RDD[U]
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。

    val rdd=sc.makeRDD(List(1,2,3,4))def mapFunction(num:Int)={num*2}val mapRDD: RDD[Int] = rdd.map(mapFunction)//等价与 rdd.map((num:INt)=>{num*2})//简写为rdd.map(_*2)mapRDD.collect().foreach(println)
//output: 2 4 6 8

rdd的并行计算

  • rdd的计算一个分区内的数据是一个一个执行逻辑,只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。分区内数据的执行是有序的。
  • 不同分区数据计算是无序的。
//一个分区
val rdd = sc.makeRDD(List(1,2,3,4),1)val mapRDD = rdd.map(num => {println(">>>>>>>>" + num)num})val mapRDD1 = mapRDD.map(num => {println("#######" + num)num})

输出:

>>>>>>>>1
#######1
>>>>>>>>2
#######2
>>>>>>>>3
#######3
>>>>>>>>4
#######4

将分区数设为2 sc.makeRDD(List(1,2,3,4),2)

>>>>>>>>1
>>>>>>>>3
#######1
#######3
>>>>>>>>2
#######2
>>>>>>>>4
#######4

1.2 mapPartitions

def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
➢ 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。

val rdd = sc.makeRDD(List(1,2,3,4),2)
val mpRDD: RDD[Int] = rdd.mapPartitions(iter => {println(">>>>>>>>>>")iter.map(_ * 2)})mpRDD.collect().foreach(println)

输出:
因为有两个分区,所以会输出两次>>>>>>>>>>


>>>>>>>>>>
21/01/14 11:28:04 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 989 bytes result sent to driver
>>>>>>>>>>
21/01/14 11:28:04 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 5889 bytes)
21/01/14 11:28:04 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/01/14 11:28:04 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 916 bytes result sent to driver
21/01/14 11:28:04 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 281 ms on localhost (executor driver) (1/2)
21/01/14 11:28:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 81 ms on localhost (executor driver) (2/2)
21/01/14 11:28:04 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool 
21/01/14 11:28:04 INFO DAGScheduler: ResultStage 0 (collect at RDD_OperateTransform.scala:45) finished in 0.365 s
2
4
6
8

map 和 mapPartitions 的区别?

➢ 数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。
➢ 功能的角度 Map
算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据
➢ 性能的角度 Map
算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算类似于批处理,所以性能较高。但是 mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。

1.3 mapPartitionsWithIndex

➢ 函数签名 def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T])=> Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
➢ 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。

val rdd = sc.makeRDD(List(1,2,3,4), 2)// 【1,2】,【3,4】//输出1号分区的数val mpiRDD = rdd.mapPartitionsWithIndex((index, iter) => {if ( index == 1 ) {iter} else {Nil.iterator}})mpiRDD.collect().foreach(println)//output:3 4

1.4 flatMap

➢ 函数签名 def flatMap[U: ClassTag](
f: T => TraversableOnce[U]): RDD[U]
➢函数说明
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射

val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD: RDD[String] = rdd.flatMap(s => {s.split(" ")})flatRDD.collect().foreach(println)

输出:
Hello
Scala
Hello
Spark

val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))val flatRDD = rdd.flatMap(data => {data match {case list:List[_] => listcase dat => List(dat)}})flatRDD.collect().foreach(println)

输出:
1
2
3
4
5

1.5 glom

➢ 函数签名 def glom(): RDD[Array[T]]
➢ 函数说明
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变

val rdd=sc.makeRDD(List(1,2,3,4),2)
val glomRDD: RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach(data=>println(data.mkString(",")))

把1和2的分区作为一个数组,把3和4的分区作为另一个数组,分别打印两个数组
输出:
1,2
3,4

利用该方法求所有分区最大值求和
val rdd=sc.makeRDD(List(1,2,3,4),2)val glomRDD: RDD[Array[Int]] = rdd.glom()val maxRDD:RDD[Int]=glomRDD.map(array=>{array.max})println(maxRDD.collect().sum)

先求出第一个分区最大值2和第二个分区最大值4,然后求和输出6

1.6 groupBy

➢ 函数签名 def groupBy[K](
f: T => K)(implicit kt: ClassTag[K]): RDD[(K,Iterable[T])]
➢ 函数说明
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样 的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中 一个组的数据在一个分区中,但是并不是说一个分区中只有一个组

val rdd=sc.makeRDD(List(1,2,3,4),2)
def groupFunction(num:Int)={num%2
}
val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)

输出:
(0,CompactBuffer(2, 4))
(1,CompactBuffer(1, 3))

1.7 filter

➢ 函数签名
def filter(f: T => Boolean): RDD[T]
➢ 函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出 现数据倾斜。

val rdd=sc.makeRDD(List(1,2,3,4))
val filterRDD: RDD[Int] = rdd.filter(num=>num%2!=0)
filterRDD.collect().foreach(println)

输出
1
3

1.8 sample

➢ 函数签名
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
➢ 函数说明
根据指定的规则从数据集中抽取数据

抽取数据不放回(伯努利算法)
伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
第一个参数:抽取的数据是否放回,false:不放回
第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
第三个参数:随机数种子
抽取数据放回(泊松算法)
第一个参数:抽取的数据是否放回,true:放回;false:不放回
第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
第三个参数:随机数种子

val rdd=sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))//1.第一个参数表示抽取数据后是否将数据返回 true放回 false 丢弃//2.第二个参数表示//		如果抽取丢弃的话数据源中每条数据被抽取的概率//     如果抽取放回的话数据源中每条数据被抽取的可能次数//3.第三个参数表示抽取的随机种子,不传递的话使用的是当前系统println(rdd.sample(false,0.4,1).collect().mkString(","))

输出:3,4,6,7,9

1.9 distinct

➢ 函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
➢ 函数说明
将数据集中重复的数据去重

	val rdd=sc.makeRDD(List(1,2,3,1,2,3))val rdd1=rdd.distinct()rdd1.collect().foreach(println)

输出:
1
2
3

1.10 coalesce

➢ 函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null) : RDD[T]
➢ 函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率 当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce方法,收缩合并分区,减少 分区的个数,减小任务调度成本

	val rdd=sc.makeRDD(List(1,2,3,4),4)val newrdd=rdd.coalesce(2)newrdd.saveAsTextFile("output")

分成两个分区

val rdd=sc.makeRDD(List(1,2,3,4,5,6),3)val newrdd=rdd.coalesce(2)newrdd.saveAsTextFile("output")

这样会出现第一个分区为1 2第二个分区为3 4 5 6.所以coalesce默认情况不会将分区数据打乱重新组合,可能出现上述数据倾斜。如果想要可以设置该方法的shuffle值为true

val newrdd=rdd.coalesce(2,shuffle = true)

1.11 repartition

➢ 函数签名
def repartition(
numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
➢ 函数说明
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。

val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)

问题:coalesce 和 repartition 区别?

coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
所以如果想要实现扩大分区的效果,需要使用shuffle操作
spark提供了一个简化的操作
缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle

1.12 sortby

➢ 函数签名 def sortBy[K](
f: (T) => K, ascending: Boolean = true,
numPartitions: Int = this.partitions.length) (implicit ord:
Ordering[K], ctag: ClassTag[K]): RDD[T]
➢ 函数说明
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理 的结果进行排序,默认为升序排列。排序后新产生的RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程

// sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
// sortBy默认情况下,不会改变分区。但是中间存在shuffle操作val rdd=sc.makeRDD(List(("1",1),("11",2),("2",3)))val newRDD=rdd.sortBy(t=>t._1)//按照的是字典序比较newRDD.collect().foreach(println)

输出:
(1,1)
(11,2)
(2,3)

	val rdd=sc.makeRDD(List(("1",1),("11",2),("2",3)))val newRDD=rdd.sortBy(t=>t._1.toInt)newRDD.collect().foreach(println)

输出:
(1,1)
(2,3)
(11,2)

2)双value类型

intersection
➢函数签名
def intersection(other: RDD[T]): RDD[T]
➢ 函数说明
对源RDD 和参数 RDD 求交集后返回一个新的 RDD

union
➢ 函数签名
def union(other: RDD[T]): RDD[T]
➢ 函数说明
对源 RDD 和参数 RDD求并集后返回一个新的 RDD

subtract
➢ 函数签名
def subtract(other: RDD[T]): RDD[T]
➢ 函数说明
以一个 RDD元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集

zip
➢ 函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
➢ 函数说明
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素。

交集,并集,差集要求数据类型一致,拉链则不要求

	val rdd1=sc.makeRDD(List(1,2,3,4))val rdd2=sc.makeRDD(List(3,4,5,6))val rdd3=rdd1.intersection(rdd2)println(rdd3.collect().mkString(","))val rdd4=rdd1.union(rdd2)println(rdd4.collect().mkString(","))val rdd5=rdd1.subtract(rdd2)println(rdd5.collect().mkString(","))val rdd6=rdd1.zip(rdd2)println(rdd6.collect().mkString(","))

输出:
4,3
1,2,3,4,3,4,5,6
1,2
(1,3),(2,4),(3,5),(4,6)

3)Key–Value类型

3.1 partitionBy

➢ 函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
➢ 函数说明
将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner

    val rdd=sc.makeRDD(List(1,2,3,4),2)val mapRDD=rdd.map((_,1))mapRDD.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")

调用前应该是1和2在一个分区,3和4在一个分区,调用后如下:
在这里插入图片描述在这里插入图片描述

3.2 reduceBykey

➢ 函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
➢ 函数说明
可以将数据按照相同的 Key 对 Value 进行聚合

reduceByKey分区内和分区间计算规则相同
reduceByKey两两聚合,这里因为key为b的只有一个,所以(“b”,4)不进行聚合。1先和2聚合得3,再和3聚合得6

 	val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((x:Int, y:Int)=>{x+y})reduceRDD.collect().foreach(println)

输出:
(a,6)
(b,4)

3.3 groupByKey

➢ 函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
➢ 函数说明
将数据源的数据根据 key 对 value 进行分组

// groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
//              元组中的第一个元素就是key,
//              元组中的第二个元素就是相同key的value的集合
val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()groupRDD.collect().foreach(println)
val rdd2: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)rdd2.collect().foreach(println)

两者输出有所区别:
(a,CompactBuffer(1, 2, 3))
(b,CompactBuffer(4))

(a,CompactBuffer((a,1), (a,2), (a,3)))
(b,CompactBuffer((b,4)))

问题:groupByKey和reduceByKey区别?

  • 从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
  • 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey
    在这里插入图片描述在这里插入图片描述

3.4 aggregateByKey

函数签名
def aggregateByKey[U: ClassTag](zeroValue: U)(
seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
➢ 函数说明
将数据根据不同的规则进行分区内计算和分区间计算

实列:分区内相同key求最大值,分区间求和

// aggregateByKey存在函数柯里化,有两个参数列表// 第一个参数列表,需要传递一个参数,表示为初始值//       主要用于当碰见第一个key的时候,和value进行分区内计算// 第二个参数列表需要传递2个参数//      第一个参数表示分区内计算规则//      第二个参数表示分区间计算规则val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2)rdd.aggregateByKey(zeroValue = 0)((x,y)=>math.max(x,y),(x,y)=>x+y).collect().foreach(println)

输出:
(a,6)

val rdd=sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)rdd.aggregateByKey(zeroValue = 0)((x,y)=>math.max(x,y),(x,y)=>x+y).collect().foreach(println)

输出:
(b,8)
(a,8)
在这里插入图片描述

实列:获取相同key的数据平均值
aggregateByKey最终返回的结果应该和初始类型值保持一致:
在这里插入图片描述

val rdd=sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)//这里传入的初始值(0,0)第一个表示相同key的值的和第二个表示相同key值次数//因为传入是tuple,所以返回就变成了tupleval newRDD: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(//分区内求和,次数累计(t, v) => {(t._1 + v, t._2 + 1)},//分区间求和,次数累计(t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})//统计,用总和除以次数求平均值val resultRDD: RDD[(String, Int)] = newRDD.mapValues {case (num, count) => {num / count}}resultRDD.collect().foreach(println)

输出:
(b,4)
(a,3)

3.5 combineByKey

➢ 函数签名
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
➢ 函数说明
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。

对于上述实例为了不使用初始值,可以使用combineByKey对第一个元素进行类型变换,例如将(“a”,1)=>(“a",(1,1))
在这里插入图片描述

val rdd=sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(v=>(v,1),(t:(Int,Int), v) => {(t._1 + v, t._2 + 1)},(t1:(Int,Int), t2:(Int,Int)) => {(t1._1 + t2._1, t1._2 + t2._2)})val resultRDD: RDD[(String, Int)] = newRDD.mapValues {case (num, count) => {num / count}}resultRDD.collect().foreach(println)

同样输出:
(b,4)
(a,3)

3.6 foldByKey

➢ 函数签名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
➢ 函数说明
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey

如果分区内和分区间计算规则相同可以使用

 val rdd=sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)rdd.foldByKey(zeroValue = 0)(_+_).collect.foreach(println)

输出:
(b,12)
(a,9)

*思考一个问题:reduceByKey、foldByKey、aggregateByKey、*combineByKey 的区别?

  • reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
  • FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
  • AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
  • CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。

3.7 sortByKey

➢ 函数签名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]
➢ 函数说明
在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的

val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)

3.8 join

	val rdd1=sc.makeRDD(List(("a",1),("b",2),("c",3)))val rdd2=sc.makeRDD(List(("a",4),("b",5),("c",6)))val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)joinRDD.collect().foreach(println)

输出:
(a,(1,4))
(b,(2,5))
(c,(3,6))

  • join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
  • 如果两个数据源中key没有匹配上,那么数据不会出现在结果中
  • 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。

eg:
(“a”,1),(“d”,2),(“c”,3)
(“a”,4),(“c”,5),(“e”,6)
结果是:
(a,(1,4))
(c,(3,5))
eg:
(“a”,1),(“a”,2),(“c”,3)
(“a”,5),(“c”,6),(“a”,4)
结果是:
(a,(1,5))
(a,(1,4))
(a,(2,5))
(a,(2,4))
(c,(3,6))

3.9 leftOuterJoin 和 rightOuterJoin

➢ 函数签名
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
➢ 函数说明
类似于 SQL 语句的左外连接

	val rdd1=sc.makeRDD(List(("a",1),("b",2),("c",3)))val rdd2=sc.makeRDD(List(("a",4),("b",5)//,("c",6)))val leftJoinRDD: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)leftJoinRDD.collect().foreach(println)sc.stop()

输出:
(a,(1,Some(4)))
(b,(2,Some(5)))
(c,(3,None))

	val rdd1=sc.makeRDD(List(("a",1),("b",2)//,("c",3)))val rdd2=sc.makeRDD(List(("a",4),("b",5),("c",6)))val rightJoinRDD: RDD[(String, (Int, Option[Int]))] = rdd1.rightOuterJoin(rdd2)rightJoinRDD.collect().foreach(println)sc.stop()

(a,(Some(1),4))
(b,(Some(2),5))
(c,(None,6))

3.10 cogroup

➢ 函数签名
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
➢ 函数说明
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD

	val rdd1=sc.makeRDD(List(("a",1),("b",2),("c",3)))val rdd2=sc.makeRDD(List(("a",4),("b",5)//,("c",6)))val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)cgRDD.collect().foreach(println)sc.stop()

输出:
(a,(CompactBuffer(1),CompactBuffer(4)))
(b,(CompactBuffer(2),CompactBuffer(5)))
(c,(CompactBuffer(3),CompactBuffer()))

这篇关于Spark学习笔记(详解,附代码实列和图解)----------RDD(一)基础和转换算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/592024

相关文章

JAVA中整型数组、字符串数组、整型数和字符串 的创建与转换的方法

《JAVA中整型数组、字符串数组、整型数和字符串的创建与转换的方法》本文介绍了Java中字符串、字符数组和整型数组的创建方法,以及它们之间的转换方法,还详细讲解了字符串中的一些常用方法,如index... 目录一、字符串、字符数组和整型数组的创建1、字符串的创建方法1.1 通过引用字符数组来创建字符串1.2

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

Java调用Python代码的几种方法小结

《Java调用Python代码的几种方法小结》Python语言有丰富的系统管理、数据处理、统计类软件包,因此从java应用中调用Python代码的需求很常见、实用,本文介绍几种方法从java调用Pyt... 目录引言Java core使用ProcessBuilder使用Java脚本引擎总结引言python

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Mysql 中的多表连接和连接类型详解

《Mysql中的多表连接和连接类型详解》这篇文章详细介绍了MySQL中的多表连接及其各种类型,包括内连接、左连接、右连接、全外连接、自连接和交叉连接,通过这些连接方式,可以将分散在不同表中的相关数据... 目录什么是多表连接?1. 内连接(INNER JOIN)2. 左连接(LEFT JOIN 或 LEFT

Java中ArrayList的8种浅拷贝方式示例代码

《Java中ArrayList的8种浅拷贝方式示例代码》:本文主要介绍Java中ArrayList的8种浅拷贝方式的相关资料,讲解了Java中ArrayList的浅拷贝概念,并详细分享了八种实现浅... 目录引言什么是浅拷贝?ArrayList 浅拷贝的重要性方法一:使用构造函数方法二:使用 addAll(

Java中switch-case结构的使用方法举例详解

《Java中switch-case结构的使用方法举例详解》:本文主要介绍Java中switch-case结构使用的相关资料,switch-case结构是Java中处理多个分支条件的一种有效方式,它... 目录前言一、switch-case结构的基本语法二、使用示例三、注意事项四、总结前言对于Java初学者

Linux内核之内核裁剪详解

《Linux内核之内核裁剪详解》Linux内核裁剪是通过移除不必要的功能和模块,调整配置参数来优化内核,以满足特定需求,裁剪的方法包括使用配置选项、模块化设计和优化配置参数,图形裁剪工具如makeme... 目录简介一、 裁剪的原因二、裁剪的方法三、图形裁剪工具四、操作说明五、make menuconfig

JAVA利用顺序表实现“杨辉三角”的思路及代码示例

《JAVA利用顺序表实现“杨辉三角”的思路及代码示例》杨辉三角形是中国古代数学的杰出研究成果之一,是我国北宋数学家贾宪于1050年首先发现并使用的,:本文主要介绍JAVA利用顺序表实现杨辉三角的思... 目录一:“杨辉三角”题目链接二:题解代码:三:题解思路:总结一:“杨辉三角”题目链接题目链接:点击这里

SpringBoot使用注解集成Redis缓存的示例代码

《SpringBoot使用注解集成Redis缓存的示例代码》:本文主要介绍在SpringBoot中使用注解集成Redis缓存的步骤,包括添加依赖、创建相关配置类、需要缓存数据的类(Tes... 目录一、创建 Caching 配置类二、创建需要缓存数据的类三、测试方法Spring Boot 熟悉后,集成一个外