本文主要是介绍SparkRDD——转换算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
转换算子
一、单value型转换算子(只使用1个RDD):
1、map 将数据进行转换,数据量不会增加和减少
2、mapPartitions 以分区为单位将一个分区内的数据进行批处理操作,且可以执行过滤操作
3、mapPartitionsWithIndex 功能类似mapPartiutions算子,只是加入了每个分区的索引,可以选择性的对某些分区进行操作
4、flatMap 扁平化操作,即将集合嵌套类型的数据或者可转换为嵌套集合的类型的数据转换为非嵌套集合
5、glom 将元素合并为分区数量的数组,不需要传参数
6、groupBy 通过给定函数计算所得key进行分组,返回一个键值对,键为key的类型,值为Iterable,有shuffer
7、filter 将数据根据相应规则进行过滤,符合的保留,不符合的丢弃,过滤后分区不变。
8、sample 抽样算子
9、distinct 去重算子,将传入的数据集中相同的数据去除多余的保留一个
10、coalesce 缩减分区,默认不打乱分区,只是缩减分区,即将几个分区合并在一起
11、rePartition 扩大分区,扩大时必然会导致同一个分区的数据分到不同的分区,这就会发生shuffle
12、sortBy 排序算子,将分区数据进行排序,分区数不变,但会产生shuffle
二、双value型转换算子(使用两个RDD):
1、intersection 对两个RDD求交集然后将结果封装为一个RDD,两个RDD数据类型必须一致
2、subtract 对两个RDD求差集然后将结果封装为一个RDD,两个RDD数据类型必须一致
3、union 求两个RDD元素的并集,然后将结果封装为一个RDD,两个RDD数据类型必须一致
4、zip 将两个 RDD 中的元素以键值对的形式进行合并,不要求数据类型一致,但要求分区数一致且每个分区中元素数量一致
三、键值类型转换算子(数据源为键值对):
1、partitionBy 传入一个Partitioner进行重新分区,默认的分区器是HashPartitioner,也可以自定义分区器,存在shuffer
2、reduceByKey 将相同key的数据的value执行传入reduceByKey函数的操作,存在shuffer
3、groupByKey 根据key进行分组,分组后形成key,Iterator(value)格式的元组,存在shuffer
4、aggregateByKey 将分区内和分区间的计算分开,第一个参数列表传初始值,第二个参列表传递分区内和分区间计算规则,存在shuffer
5、foldByKey 作用和aggregate一样,当分区内函数和分区间函数相同时,spark提供了简化的算子foldByKey,存在shuffer
6、combineByKey 传入三个参数,第一个参数将value转换为指定数据类型,第二第三个参数表示分区内与分区间计算规则,存在shuffer
7、join 对两个不同的数据源进行自然连接,相同的key的value会连接在一起形成元组,存在shuffer
8、leftOuterJoin 左外连接,以左边的数据为主,若左边数据存在,右边数据不存在,那么形成左边数据+None的组合,存在shuffer
9、rightOuterJoin 右外连接,以右边的数据为主,若右边数据存在,左边数据不存在,那么形成右边数据+None的组合,存在shuffer
10、cogroup 首先在数据源内进行key的分组,再对两个数据源进行连接操作,存在shuffer
文章目录
- 转换算子
- 一、单value型转换算子(只使用1个RDD)
- 1、map
- 2、mapPartitions
- 3、mapPartitionsWithIndex
- 4、flatMap
- 5、glom
- 6、groupBy
- 7、filter
- 8、sample
- 9、distinct
- 10、coalesce
- 11、rePartition
- coalesce算子和rePartition算子比较
- 12、sortBy
- 二、双value型转换算子(使用两个RDD)
- 1、intersection
- 2、subtract
- 3、union
- 4、zip
- 三、键值类型转换算子(数据源为键值对)
- 1、partitionBy
- 2、reduceByKey
- 3、groupByKey
- reduceByKey与groupByKey比较
- 4、aggregateByKey
- 5、foldByKey
- 6、combineByKey
- reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别
- 7、join
- 8、leftOuterJoin
- 9、rightOuterJoin
- 10、cogroup
一、单value型转换算子(只使用1个RDD)
1、map
def map[U: ClassTag](f: T => U): RDD[U] = withScope
map算子将分区内的数据一个一个执行某个函数,类似串行操作,主要是将数据进行转换,数据不会增加和减少。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val mapRDD: RDD[Int] = rdd.map((_: Int) * 2)
2、mapPartitions
def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope
mapPartitions算子以分区为单位将一个分区内的数据进行批处理操作,传入一个迭代器,输出一个迭代器,且可以执行过滤操作,因此元素个数可能发生增加或减少。
map和mapPartitions二者性能比较:map类似串行,mapPartitions类似批处理,因此后者性能较高。但mapPartitions算子计算时将分区数据全部加载进内存,等处理完整个分区后才释放内存,因此在内存不足的情况下建议使用map。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mapRDD1: RDD[Int] = rdd.map((num: Int) => {println("map1 --- " + num)num}
)
val mapRDD2: RDD[Int] = mapRDD1.mapPartitions((it: Iterator[Int]) => {println("map2 --- " + it)it}
)
3、mapPartitionsWithIndex
def mapPartitionsWithIndex[U: ClassTag](f: (Int, Iterator[T]) => Iterator[U],preservesPartitioning: Boolean = false): RDD[U] = withScope
功能类似mapPartiutions算子,只是加入了每个分区的索引,可以选择性的对某些分区进行操作。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val mpRDD: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index: Int, it: Iterator[Int]) => {it.map((index, _: Int))}
)
mpRDD.collect().foreach(println)
4、flatMap
def flatMap[U: ClassTag](f: T => TraversableOnce[U]): RDD[U] = withScope
扁平化操作,即将集合嵌套类型的数据或者可转换为嵌套集合的类型的数据转换为非嵌套集合
val rdd: RDD[List[Int]] = sc.makeRDD(List(List(1, 2), List(3, 4)))
val flatRDD: RDD[Int] = rdd.flatMap((list: List[Int]) => list)
flatRDD.collect().foreach(println)
5、glom
def glom(): RDD[Array[T]] = withScope
将元素合并为分区数量的数组,不需要传参数
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val glomRDD: RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach((list: Array[Int]) => println(list.mkString(",")))
6、groupBy
def groupBy[K](f: T => K)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {groupBy[K](f, defaultPartitioner(this))
}
def groupBy[K](f: T => K,numPartitions: Int)(implicit kt: ClassTag[K]): RDD[(K, Iterable[T])] = withScope {groupBy(f, new HashPartitioner(numPartitions))
}
def groupBy[K](f: T => K, p: Partitioner)(implicit kt: ClassTag[K], ord: Ordering[K] = null): RDD[(K, Iterable[T])] = withScope {val cleanF = sc.clean(f)this.map(t => (cleanF(t), t)).groupByKey(p)
}
通过给定函数计算所得key进行分组,返回一个键值对,键为key的类型,值为Iterable,该算子会将数据打乱,即会进行shuffle。shuffle过程会进行分组,但分区数不变,即一个分区可能包含多个组,示例如下
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
//groupBy会将数据源中的每一个数据进行分组判断,根据返回的分组key进行分组
//相同的key的值的数据会放置在一个组中
val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy((_: Int) % 3)
groupRDD.collect().foreach(println)
7、filter
def filter(f: T => Boolean): RDD[T] = withScope
将数据根据相应规则进行过滤,符合的保留,不符合的丢弃。过滤后分区不变,但分区内的数据可能不均衡,可能导致数据倾斜。注:filter是根据返回的boolean值进行过滤的。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val filterRDD: RDD[Int] = rdd.filter((_: Int) % 2 != 0)
filterRDD.collect().foreach(println)
8、sample
def sample(withReplacement: Boolean,fraction: Double,seed: Long = Utils.random.nextLong): RDD[T]
抽样算子,输入三个参数,第一个表示是否有放回的抽取,第二个表示每个数据被抽取的概率(0-1),第三个参数表示抽取数据时随机算法的种子(整数)。第三个参数省略时使用的是当前系统时间,当三个参数确定时,每次抽取的样本都是一样的。
val dataRDD = sparkContext.makeRDD(List(1,2,3,4),1)
// 抽取数据不放回(伯努利算法)
// 伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
// 具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不 要
// 第一个参数:抽取的数据是否放回,false:不放回
// 第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
// 第三个参数:随机数种子
val dataRDD1 = dataRDD.sample(false, 0.5)
// 抽取数据放回(泊松算法)
// 第一个参数:抽取的数据是否放回,true:放回;false:不放回
// 第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
// 第三个参数:随机数种子
val dataRDD2 = dataRDD.sample(true, 2)
使用场景:当发生数据倾斜时对倾斜数据区抽取数据分析是什么原因导致了数据倾斜。
9、distinct
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope
去重算子,将传入的数据集中相同的数据去除多余的保留一个。
scala内置也有一个distinct函数,其原理是通过HashSet去重。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 1, 2, 3, 4))
val distinctRDD: RDD[Int] = rdd.distinct(3)
distinctRDD.collect().foreach(println)
10、coalesce
def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] = withScope
重新分区,coalesce方法再默认情况下不会将分区的数据打乱重新组合,单纯的只是将分区合并,可能造成数据不均衡,如果想要数据均衡,需要设置shuffer参数为true,一般用于缩减分区的情况。
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 3)
val newRDD: RDD[Int] = rdd.coalesce(2, true)
newRDD.saveAsTextFile("OutPut/coalesce")
11、rePartition
def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {coalesce(numPartitions, shuffle = true)
}
扩大分区,扩大时必然会导致同一个分区的数据分到不同的分区,这就会发生shuffle,rePartition实质上就是调用coalesce并将shuffle参数设置为true
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4, 5, 6), 2)
val newRDD: RDD[Int] = rdd.repartition(3)
//等价于
//val newRDD: RDD[Int] = rdd.coalesce(3, true)
newRDD.saveAsTextFile("OutPut/coalesce")
coalesce算子和rePartition算子比较
使用Spark进行数据处理的过程中,常常会使用filter方法来对数据进行一些预处理,过滤掉一些不符合条件的数据。在使用该方法对数据进行频繁过滤或者是过滤掉的数据量过大的情况下就会造成大量小分区的生成。在Spark内部会对每一个分区分配一个task执行,如果task过多,那么每个task处理的数据量很小,就会造成线程频繁的在task之间切换,使得资源开销较大,且很多任务等待执行,并行度不高,这会造成集群工作效益低下。
为了解决这一个问题,常采用RDD中重分区的函数(coalesce函数或rePartition函数)来进行数据紧缩,减少分区数量,将小分区合并为大分区,从而提高效率。
这两个算子的参数表中都有一个shuffle参数,coalesce算子中默认值为为false表示不发生shuffle,rePartition中默认值是true,表示发生shuffle。而rePartition实质上就是调用coalesce并将shuffle参数设置为true。
这两个算子的使用场景如下:
假设RDD有N个分区,需要重新划分成M个分区:
-
N < M: 一般情况下N个分区有数据分布不均匀的状况,利用HashPartitioner函数将数据重新分区为M个,这时需要将shuffle设置为true。因为重分区前后相当于宽依赖,会发生shuffle过程,此时可以使用coalesce(shuffle=true),或者直接使用repartition()。
-
如果N > M并且N和M相差不多(假如N是1000,M是100): 那么就可以将N个分区中的若干个分区合并成一个新的分区,最终合并为M个分区,这是前后是窄依赖关系,可以使用coalesce(shuffle=false)。
-
如果 N> M并且两者相差悬殊: 这时如果将shuffle设置为false,父子RDD是窄依赖关系,他们同处在一个Stage中,就可能造成spark程序的并行度不够,从而影响性能,如果在M为1的时候,为了使coalesce之前的操作有更好的并行度,可以将shuffle设置为true。
coalesce算子默认不打乱分区,只是缩减分区,即将几个分区合并在一起,这种情况下可能会导致数据不均衡,产生数据倾斜,因此,为了防止发生数据倾斜可以将shuffle参数设置为true或直接使用rePartition。
如果想扩大分区,则一定要将shuffle设置成true,因为扩大时必然会导致同一个分区的数据分到不同的分区,这就会发生shuffle,因此:缩减分区用coalesce,扩大分区用rePartition。
12、sortBy
def sortBy[K](f: (T) => K,ascending: Boolean = true,numPartitions: Int = this.partitions.length)(implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope
排序算子,将分区数据进行排序,分区数不变,但会产生shuffle,第一个参数是设置排序对象,第二个参数设置排序方式:默认true为升序,false为降序。
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("1", 1), ("11", 2), ("2", 3)))
//根据key进行排序
val sortRDD: RDD[(String, Int)] = rdd.sortBy(_._1)
sortRDD.collect().foreach(println)
二、双value型转换算子(使用两个RDD)
1、intersection
def intersection(other: RDD[T]): RDD[T] = withScope
对两个rdd求交集然后将结果封装为一个rdd,交集需要保证数据类型一致
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))
val rdd3: RDD[String] = sc.makeRDD(List("3", "4", "5", "6"), 4)
val newRDD: RDD[Int] = rdd1.intersection(rdd2)
//交集需要保持数据类型一致
//val newRDD2: RDD[Int] = rdd1.intersection(rdd3)
//println(newRDD2.collect().mkString(","))
println(newRDD.collect().mkString(","))
2、subtract
def subtract(other: RDD[T]): RDD[T] = withScope
对两个RDD求差集然后将结果封装为一个RDD,两个RDD数据类型必须一致
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 2)
val rdd3: RDD[String] = sc.makeRDD(List("3", "4", "5", "6"), 4)
//rdd1 - rdd2 = [1, 2]
val newRDD1: RDD[Int] = rdd1.subtract(rdd2)
//rdd2 - rdd1 = [5, 6]
val newRDD2: RDD[Int] = rdd2.subtract(rdd1)
//差集需要保持数据类型一致
//val newRDD3: RDD[Int] = rdd1.subtract(rdd3)
//println(newRDD3.collect().mkString(","))
println(newRDD1.collect().mkString(","))
println(newRDD2.collect().mkString(","))
3、union
def union(other: RDD[T]): RDD[T] = withScope
求两个RDD元素的并集,然后将结果封装为一个RDD,两个RDD数据类型必须一致
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4))
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6))
val rdd3: RDD[String] = sc.makeRDD(List("3", "4", "5", "6"), 4)
//求两个rdd元素的并集
val newRDD: RDD[Int] = rdd1.union(rdd2)
//并集需要保持数据类型一致
//val newRDD2 = rdd1.union(rdd3)
//println(newRDD2.collect().mkString(","))
println(newRDD.collect().mkString(","))
4、zip
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)] = withScope
将两个 RDD 中的元素以键值对的形式进行合并,不要求数据类型一致,但要求分区数一致且每个分区中元素数量一致
val rdd1: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
val rdd2: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6), 4)
val rdd3: RDD[Int] = sc.makeRDD(List(3, 4, 5, 6, 7, 8), 2)
val rdd4: RDD[Int] = sc.makeRDD(List(3,4,5,6), 2)
//Can't zip RDDs with unequal numbers of partitions: List(2, 4)
//两个数据源要求分区数量保持一致
//val zipRDD: RDD[(Int, Int)] = rdd1.zip(rdd2)
//Can only zip RDDs with same number of elements in each partition
//两个数据源要求对应分区中数据的数据量保持一致
//val zipRDD2: RDD[(Int, Int)] = rdd1.zip(rdd3)
//zipRDD2.collect().foreach(println)
val zipRDD3: RDD[(Int, Int)] = rdd1.zip(rdd4)
zipRDD3.collect().foreach(println)
三、键值类型转换算子(数据源为键值对)
1、partitionBy
def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope
该算子不属于RDD类,属于PairRDDFunctions类,两个类之间存在一个隐式转换函数,将RDD转换为PairRDDFunctions。
该算子的作用是传入一个Partitioner进行重新分区,默认的分区器是HashPartitioner。(new HashPartitioner)。
当重分区的分区器与当前RDD的分区器一样时不会进行分区,源码中是直接返回self。
传入的分区函数,可以使用spark自带的,也可以自定义分区函数
val rdd: RDD[Int] = sc.makeRDD(List(1, 2, 3, 4), 2)
//partitionBy要求操作的数据是元组,所以使用前需要将数据转换为元组
val mapRDD: RDD[(Int, Int)] = rdd.map((_, 1))
//调用次方法时RDD发生隐式转换
//RDD => PairRDDFunctions
//需要传入一个分区函数,可以使用spark自带的,也可以自定义分区函数
val parRDD: RDD[(Int, Int)] = mapRDD.partitionBy(newHashPartitioner(2))
parRDD.saveAsTextFile("OutPut/par")
2、reduceByKey
def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope
相同key的数据的value执行传入reduceByKey函数的操作,若一个key的数据只有一个,那么不会参与运算
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((_: Int) + (_: Int))
reduceRDD.collect().foreach(println)
reduceByKey底层实现原理
3、groupByKey
def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {groupByKey(defaultPartitioner(self))
}
该算子将相同key的键值对分到同一个组中,返回一个元组,元组的第一个元素是key,第二个元素是value构成的集合。集合名称为CompactBuffer,是Spark独有的,不来自scala。groupBy和groupByKey的使用与区别如下
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)))
// groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
// 元组中的第一个元素就是key,
// 元组中的第二个元素就是相同key的value的集合
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()
groupRDD.collect().foreach(println)
println("--------------------------------")
val groupRDD1: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)
groupRDD1.collect().foreach(println)
groupByKey底层实现方式
reduceByKey与groupByKey比较
从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey 只能分组,不能聚合,所以在分组聚合的场合下,推荐使用 reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey 。
4、aggregateByKey
def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,combOp: (U, U) => U): RDD[(K, U)] = self.withScope
aggregateByKey 将分区内和分区间的计算分开,存在函数柯里化,有两个参数列表
第一个参数列表需要传入一个参数,表示初始化值
- 主要用于碰见第一个key的时候,和value进行分区计算
第二个参数表示需要传入2个参数
- 第一个参数表示分区内计算
- 第二个参数表示分区间计算
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("a", 3), ("b", 4)),2)
val aggregateRDD: RDD[(String, Int)] = rdd.aggregateByKey(0)((x: Int, y: Int) => math.max(x, y),(x: Int, y: Int) => x + y
)
aggregateRDD.collect().foreach(println)
aggregateByKey最终的返回数据结果应该和初始值的类型保持一致,如:
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
//aggregateByKey的返回值类型与传入的初始值类型有关
//实现求平均值,定义初始值为元组(0,0),第一个值保存和,第二个值保存key出现次数
val aggRDD: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))((t: (Int, Int), v: Int) => (t._1 + v, t._2 + 1),(t1: (Int, Int), t2: (Int, Int)) => (t1._1 + t2._1, t1._2 + t2._2)
)
//通过元组求得平均值
val resRDD: RDD[(String, Int)] = aggRDD.mapValues((t: (Int, Int)) => t._1 / t._2)
resRDD.collect().foreach(println)
5、foldByKey
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)] = self.withScope
和aggregateByKey功能相同,当分区内函数和分区间函数相同时,spark提供了简化的算子foldByKey
val rdd: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3), ("b", 4), ("b", 5), ("a", 6)), 2)
val foldRDD: RDD[(String, Int)] = rdd.foldByKey(0)((_: Int) + (_: Int))
foldRDD.collect().foreach(println)
6、combineByKey
def combineByKey[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope
作用其实和aggregate一样,只是combineByKey算子没有第一个参数列表不需要传入初始值,但会将输入的相同key的第一个value进行转换
val rdd = sc.makeRDD(List(("a", 1), ("a", 2), ("b", 3),("b", 4), ("b", 5),("a", 6)),2)
// combineByKey : 方法需要三个参数
// 第一个参数表示:将相同key的第一个数据进行结构的转换,实现操作
// 第二个参数表示:分区内的计算规则
// 第三个参数表示:分区间的计算规则
// 因为该算子处理时动态的将数据进行了转换,因此tuple的类型必须指定
val newRDD : RDD[(String, (Int, Int))] = rdd.combineByKey(v => (v, 1),( t:(Int, Int), v ) => { (t._1 + v, t._2 + 1)},(t1:(Int, Int), t2:(Int, Int)) => {(t1._1 + t2._1, t1._2 + t2._2)}
)
val resultRDD: RDD[(String, Int)] = newRDD.mapValues {case (num, cnt) => {num / cnt}
}
resultRDD.collect().foreach(println)
reduceByKey、foldByKey、aggregateByKey、combineByKey 的区别
reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同。
FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同 。
AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同 。
CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
以下是Spark源码中的四个算子底层调用方法,其实底层都是combineByKey:
//reduceByKey:
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)//FoldByKey:
combineByKeyWithClassTag[V]((v: V) => cleanedFunc(createZero(), v),cleanedFunc, cleanedFunc, partitioner)//AggregateByKey:
combineByKeyWithClassTag[U]((v: V) => cleanedSeqOp(createZero(), v),cleanedSeqOp, combOp, partitioner)//CombineByKey:
combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,partitioner, mapSideCombine, serializer)(null)
7、join
def join[W](other: RDD[(K, W)]): RDD[(K, (V, W))] = self.withScope
该算子会将两个不同数据源中相同的key的value聚合在一起,形成一个tuple。对于只在一个数据源上出现的key则不会配对,如果有多个相同的key的数据则会进行类似于笛卡尔积的运算。即将数据源1的key取配对数据源2的每一个key,相同则将两个value形成元组。示例如下:
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("c", 5), ("a", 6)))
val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)
joinRDD.collect().foreach(println)
8、leftOuterJoin
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))] = self.withScope
左外连接,以左边的数据为主,若左边数据存在,右边数据不存在,那么形成左边数据+None的组合
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("c", 5), ("a", 6)))
val leftJoinRDD: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)
leftJoinRDD.collect().foreach(println)
9、rightOuterJoin
def rightOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (Option[V], W))] = self.withScope
右外连接,以右边的数据为主,若右边数据存在,左边数据不存在,那么形成None+右边数据的组合
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("c", 5), ("a", 6)))
val rightJoinRDD: RDD[(String, (Option[Int], Int))] = rdd2.rightOuterJoin(rdd1)
rightJoinRDD.collect().foreach(println)
10、cogroup
先分组,对这些rdd分组,对每个rdd的数据将key相同的数据的value放入一个迭代器中,然后将相同key的数据连接在一起,最多放入三个rdd对象。
val rdd1: RDD[(String, Int)] = sc.makeRDD(List(("a", 1), ("b", 2), ("c", 3)))
val rdd2: RDD[(String, Int)] = sc.makeRDD(List(("a", 4), ("c", 5), ("a", 6)))
val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)
cgRDD.collect().foreach(println)
这篇关于SparkRDD——转换算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!