本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------RDD(一)基础和转换算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
RDD : 弹性分布式数据集
一.简介
RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是 Spark 中最基本的数据处理模型。代码中是一个抽象类,它代表一个弹性的、不可变、可分区、里面的元素可并行计算的集合。
➢ 弹性
--------存储的弹性:内存与磁盘的自动切换;
--------容错的弹性:数据丢失可以自动恢复;
--------计算的弹性:计算出错重试机制;
--------分片的弹性:可根据需要重新分片。
➢ 分布式:数据存储在大数据集群不同节点上
➢ 数据集:RDD 封装了计算逻辑,并不保存数据
➢ 数据抽象:RDD 是一个抽象类,需要子类具体实现
➢ 不可变:RDD 封装了计算逻辑,是不可以改变的,想要改变,只能产生新的 RDD,在新的RDD 里面封装计算逻辑
➢ 可分区、并行计算
执行逻辑
二.基础编程
1.RDD创建
- 从集合(内存)中创建 RDD
从集合中创建 RDD,Spark 主要提供了两个方法:parallelize 和 makeRDD
从底层代码实现来讲,makeRDD 方法其实就是 parallelize 方法 - 从外部存储(文件)创建 RDD
由外部存储系统的数据集创建 RDD 包括:本地的文件系统,所有 Hadoop 支持的数据集,比如 HDFS、HBase 等。
package org.xyl;
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}object RDD{def main(args: Array[String]): Unit = {//prepare environmentval sparkConf=new SparkConf().setMaster("local[*]").setAppName("RDD")val sc=new SparkContext(sparkConf)//creat RDD from memoryval seq: Seq[Int] =Seq[Int](1,2,3,4) val rdd:RDD[Int]=sc.parallelize(seq)val rdd1: RDD[Int] = sc.makeRDD(seq)//creat RDD from file(hdfs or local)val rdd3:RDD[String]=sc.textFile("datas/1.txt") rdd.collect().foreach(println)sc.stop() }
}
- 从其他 RDD 创建
主要是通过一个 RDD 运算完后,再产生新的 RDD。 - 直接创建 RDD(new)
使用 new 的方式直接构造 RDD,一般由 Spark 框架自身使用。
2.RDD 并行度与分区
默认情况下,Spark 可以将一个作业切分多个任务后,发送给 Executor 节点并行计算,而能 够并行计算的任务数量我们称之为并行度。这个数量可以在构建 RDD 时指定。这里的并行执行的任务数量,并不是指的切分任务的数量。
- 读取内存数据和文件数据时分区方法不同。
1.读取内存数据时,数据可以按照并行度的设定进行数据的分区操作,数据分区规则的Spark 核心源码如下:
def positions(length: Long, numSlices: Int): Iterator[(Int, Int)] = {(0 until numSlices).iterator.map { i =>val start = ((i * length) / numSlices).toIntval end = (((i + 1) * length) / numSlices).toInt(start, end)}
代码示例:
object RDD_Par {def main(args: Array[String]): Unit = {//prepare environmentval sparkConf=new SparkConf().setMaster("local[*]").setAppName("RDD_Par")val sc=new SparkContext(sparkConf)//makeRDD方法传递第二个参数表示分区的数量数, 不传递会使用默认值val rdd=sc.makeRDD(List(1,2,3,4),2) rdd.saveAsTextFile("output")//文件方式的分区采用的是hadoop的分区方式val rdd1=sc.textFile("datas/1.txt",2)rdd.saveAsTextFile("output1")sc.stop()}
}
三.RDD转换算子
- RDD 根据数据处理方式的不同将算子整体上分为 Value 类型、双 Value 类型和 Key-Value类型
1)value类型
1.1 map
def map[U: ClassTag](f: T => U): RDD[U]
将处理的数据逐条进行映射转换,这里的转换可以是类型的转换,也可以是值的转换。
val rdd=sc.makeRDD(List(1,2,3,4))def mapFunction(num:Int)={num*2}val mapRDD: RDD[Int] = rdd.map(mapFunction)//等价与 rdd.map((num:INt)=>{num*2})//简写为rdd.map(_*2)mapRDD.collect().foreach(println)
//output: 2 4 6 8
rdd的并行计算
- rdd的计算一个分区内的数据是一个一个执行逻辑,只有前面一个数据全部的逻辑执行完毕后,才会执行下一个数据。分区内数据的执行是有序的。
- 不同分区数据计算是无序的。
//一个分区
val rdd = sc.makeRDD(List(1,2,3,4),1)val mapRDD = rdd.map(num => {println(">>>>>>>>" + num)num})val mapRDD1 = mapRDD.map(num => {println("#######" + num)num})
输出:
>>>>>>>>1
#######1
>>>>>>>>2
#######2
>>>>>>>>3
#######3
>>>>>>>>4
#######4
将分区数设为2 sc.makeRDD(List(1,2,3,4),2)
>>>>>>>>1
>>>>>>>>3
#######1
#######3
>>>>>>>>2
#######2
>>>>>>>>4
#######4
1.2 mapPartitions
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
➢ 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据。
val rdd = sc.makeRDD(List(1,2,3,4),2)
val mpRDD: RDD[Int] = rdd.mapPartitions(iter => {println(">>>>>>>>>>")iter.map(_ * 2)})mpRDD.collect().foreach(println)
输出:
因为有两个分区,所以会输出两次>>>>>>>>>>
>>>>>>>>>>
21/01/14 11:28:04 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 989 bytes result sent to driver
>>>>>>>>>>
21/01/14 11:28:04 INFO TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, localhost, executor driver, partition 1, PROCESS_LOCAL, 5889 bytes)
21/01/14 11:28:04 INFO Executor: Running task 1.0 in stage 0.0 (TID 1)
21/01/14 11:28:04 INFO Executor: Finished task 1.0 in stage 0.0 (TID 1). 916 bytes result sent to driver
21/01/14 11:28:04 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 281 ms on localhost (executor driver) (1/2)
21/01/14 11:28:04 INFO TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 81 ms on localhost (executor driver) (2/2)
21/01/14 11:28:04 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
21/01/14 11:28:04 INFO DAGScheduler: ResultStage 0 (collect at RDD_OperateTransform.scala:45) finished in 0.365 s
2
4
6
8
map 和 mapPartitions 的区别?
➢ 数据处理角度
Map 算子是分区内一个数据一个数据的执行,类似于串行操作。而mapPartitions算子是以分区为单位进行批处理操作。
➢ 功能的角度 Map
算子主要目的将数据源中的数据进行转换和改变。但是不会减少或增多数据。MapPartitions 算子需要传递一个迭代器,返回一个迭代器,没有要求的元素的个数保持不变, 所以可以增加或减少数据
➢ 性能的角度 Map
算子因为类似于串行操作,所以性能比较低,而是 mapPartitions 算类似于批处理,所以性能较高。但是 mapPartitions算子会长时间占用内存,那么这样会导致内存可能不够用,出现内存溢出的错误。所以在内存有限的情况下,不推荐使用。使用 map 操作。
1.3 mapPartitionsWithIndex
➢ 函数签名 def mapPartitionsWithIndex[U: ClassTag](
f: (Int, Iterator[T])=> Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
➢ 函数说明
将待处理的数据以分区为单位发送到计算节点进行处理,这里的处理是指可以进行任意的处理,哪怕是过滤数据,在处理时同时可以获取当前分区索引。
val rdd = sc.makeRDD(List(1,2,3,4), 2)// 【1,2】,【3,4】//输出1号分区的数val mpiRDD = rdd.mapPartitionsWithIndex((index, iter) => {if ( index == 1 ) {iter} else {Nil.iterator}})mpiRDD.collect().foreach(println)//output:3 4
1.4 flatMap
➢ 函数签名 def flatMap[U: ClassTag](
f: T => TraversableOnce[U]): RDD[U]
➢函数说明
将处理的数据进行扁平化后再进行映射处理,所以算子也称之为扁平映射
val rdd: RDD[String] = sc.makeRDD(List("Hello Scala", "Hello Spark"))val flatRDD: RDD[String] = rdd.flatMap(s => {s.split(" ")})flatRDD.collect().foreach(println)
输出:
Hello
Scala
Hello
Spark
val rdd = sc.makeRDD(List(List(1,2),3,List(4,5)))val flatRDD = rdd.flatMap(data => {data match {case list:List[_] => listcase dat => List(dat)}})flatRDD.collect().foreach(println)
输出:
1
2
3
4
5
1.5 glom
➢ 函数签名 def glom(): RDD[Array[T]]
➢ 函数说明
将同一个分区的数据直接转换为相同类型的内存数组进行处理,分区不变
val rdd=sc.makeRDD(List(1,2,3,4),2)
val glomRDD: RDD[Array[Int]] = rdd.glom()
glomRDD.collect().foreach(data=>println(data.mkString(",")))
把1和2的分区作为一个数组,把3和4的分区作为另一个数组,分别打印两个数组
输出:
1,2
3,4
利用该方法求所有分区最大值求和
val rdd=sc.makeRDD(List(1,2,3,4),2)val glomRDD: RDD[Array[Int]] = rdd.glom()val maxRDD:RDD[Int]=glomRDD.map(array=>{array.max})println(maxRDD.collect().sum)
先求出第一个分区最大值2和第二个分区最大值4,然后求和输出6
1.6 groupBy
➢ 函数签名 def groupBy[K](
f: T => K)(implicit kt: ClassTag[K]): RDD[(K,Iterable[T])]
➢ 函数说明
将数据根据指定的规则进行分组, 分区默认不变,但是数据会被打乱重新组合,我们将这样 的操作称之为 shuffle。极限情况下,数据可能被分在同一个分区中 一个组的数据在一个分区中,但是并不是说一个分区中只有一个组
val rdd=sc.makeRDD(List(1,2,3,4),2)
def groupFunction(num:Int)={num%2
}
val groupRDD: RDD[(Int, Iterable[Int])] = rdd.groupBy(groupFunction)
groupRDD.collect().foreach(println)
输出:
(0,CompactBuffer(2, 4))
(1,CompactBuffer(1, 3))
1.7 filter
➢ 函数签名
def filter(f: T => Boolean): RDD[T]
➢ 函数说明
将数据根据指定的规则进行筛选过滤,符合规则的数据保留,不符合规则的数据丢弃。 当数据进行筛选过滤后,分区不变,但是分区内的数据可能不均衡,生产环境下,可能会出 现数据倾斜。
val rdd=sc.makeRDD(List(1,2,3,4))
val filterRDD: RDD[Int] = rdd.filter(num=>num%2!=0)
filterRDD.collect().foreach(println)
输出
1
3
1.8 sample
➢ 函数签名
def sample(
withReplacement: Boolean,
fraction: Double,
seed: Long = Utils.random.nextLong): RDD[T]
➢ 函数说明
根据指定的规则从数据集中抽取数据
抽取数据不放回(伯努利算法)
伯努利算法:又叫 0、1 分布。例如扔硬币,要么正面,要么反面。
具体实现:根据种子和随机算法算出一个数和第二个参数设置几率比较,小于第二个参数要,大于不要
第一个参数:抽取的数据是否放回,false:不放回
第二个参数:抽取的几率,范围在[0,1]之间,0:全不取;1:全取;
第三个参数:随机数种子
抽取数据放回(泊松算法)
第一个参数:抽取的数据是否放回,true:放回;false:不放回
第二个参数:重复数据的几率,范围大于等于 0.表示每一个元素被期望抽取到的次数
第三个参数:随机数种子
val rdd=sc.makeRDD(List(1,2,3,4,5,6,7,8,9,10))//1.第一个参数表示抽取数据后是否将数据返回 true放回 false 丢弃//2.第二个参数表示// 如果抽取丢弃的话数据源中每条数据被抽取的概率// 如果抽取放回的话数据源中每条数据被抽取的可能次数//3.第三个参数表示抽取的随机种子,不传递的话使用的是当前系统println(rdd.sample(false,0.4,1).collect().mkString(","))
输出:3,4,6,7,9
1.9 distinct
➢ 函数签名
def distinct()(implicit ord: Ordering[T] = null): RDD[T]
def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
➢ 函数说明
将数据集中重复的数据去重
val rdd=sc.makeRDD(List(1,2,3,1,2,3))val rdd1=rdd.distinct()rdd1.collect().foreach(println)
输出:
1
2
3
1.10 coalesce
➢ 函数签名
def coalesce(numPartitions: Int, shuffle: Boolean = false,
partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
(implicit ord: Ordering[T] = null) : RDD[T]
➢ 函数说明
根据数据量缩减分区,用于大数据集过滤后,提高小数据集的执行效率 当 spark 程序中,存在过多的小任务的时候,可以通过 coalesce方法,收缩合并分区,减少 分区的个数,减小任务调度成本
val rdd=sc.makeRDD(List(1,2,3,4),4)val newrdd=rdd.coalesce(2)newrdd.saveAsTextFile("output")
val rdd=sc.makeRDD(List(1,2,3,4,5,6),3)val newrdd=rdd.coalesce(2)newrdd.saveAsTextFile("output")
这样会出现第一个分区为1 2第二个分区为3 4 5 6.所以coalesce默认情况不会将分区数据打乱重新组合,可能出现上述数据倾斜。如果想要可以设置该方法的shuffle值为true
val newrdd=rdd.coalesce(2,shuffle = true)
1.11 repartition
➢ 函数签名
def repartition(
numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T]
➢ 函数说明
该操作内部其实执行的是 coalesce 操作,参数 shuffle 的默认值为true。无论是将分区数多的 RDD 转换为分区数少的 RDD,还是将分区数少的 RDD 转换为分区数多的 RDD,repartition 操作都可以完成,因为无论如何都会经 shuffle 过程。
val dataRDD = sparkContext.makeRDD(List(1,2,3,4,1,2
),2)
val dataRDD1 = dataRDD.repartition(4)
问题:coalesce 和 repartition 区别?
coalesce算子可以扩大分区的,但是如果不进行shuffle操作,是没有意义,不起作用。
所以如果想要实现扩大分区的效果,需要使用shuffle操作
spark提供了一个简化的操作
缩减分区:coalesce,如果想要数据均衡,可以采用shuffle
扩大分区:repartition, 底层代码调用的就是coalesce,而且肯定采用shuffle
1.12 sortby
➢ 函数签名 def sortBy[K](
f: (T) => K, ascending: Boolean = true,
numPartitions: Int = this.partitions.length) (implicit ord:
Ordering[K], ctag: ClassTag[K]): RDD[T]
➢ 函数说明
该操作用于排序数据。在排序之前,可以将数据通过 f 函数进行处理,之后按照 f 函数处理 的结果进行排序,默认为升序排列。排序后新产生的RDD 的分区数与原 RDD 的分区数一致。中间存在 shuffle 的过程
// sortBy方法可以根据指定的规则对数据源中的数据进行排序,默认为升序,第二个参数可以改变排序的方式
// sortBy默认情况下,不会改变分区。但是中间存在shuffle操作val rdd=sc.makeRDD(List(("1",1),("11",2),("2",3)))val newRDD=rdd.sortBy(t=>t._1)//按照的是字典序比较newRDD.collect().foreach(println)
输出:
(1,1)
(11,2)
(2,3)
val rdd=sc.makeRDD(List(("1",1),("11",2),("2",3)))val newRDD=rdd.sortBy(t=>t._1.toInt)newRDD.collect().foreach(println)
输出:
(1,1)
(2,3)
(11,2)
2)双value类型
intersection
➢函数签名
def intersection(other: RDD[T]): RDD[T]
➢ 函数说明
对源RDD 和参数 RDD 求交集后返回一个新的 RDD
union
➢ 函数签名
def union(other: RDD[T]): RDD[T]
➢ 函数说明
对源 RDD 和参数 RDD求并集后返回一个新的 RDD
subtract
➢ 函数签名
def subtract(other: RDD[T]): RDD[T]
➢ 函数说明
以一个 RDD元素为主,去除两个 RDD 中重复元素,将其他元素保留下来。求差集
zip
➢ 函数签名
def zip[U: ClassTag](other: RDD[U]): RDD[(T, U)]
➢ 函数说明
将两个 RDD 中的元素,以键值对的形式进行合并。其中,键值对中的 Key 为第 1 个 RDD 中的元素,Value 为第 2 个 RDD 中的相同位置的元素。
交集,并集,差集要求数据类型一致,拉链则不要求
val rdd1=sc.makeRDD(List(1,2,3,4))val rdd2=sc.makeRDD(List(3,4,5,6))val rdd3=rdd1.intersection(rdd2)println(rdd3.collect().mkString(","))val rdd4=rdd1.union(rdd2)println(rdd4.collect().mkString(","))val rdd5=rdd1.subtract(rdd2)println(rdd5.collect().mkString(","))val rdd6=rdd1.zip(rdd2)println(rdd6.collect().mkString(","))
输出:
4,3
1,2,3,4,3,4,5,6
1,2
(1,3),(2,4),(3,5),(4,6)
3)Key–Value类型
3.1 partitionBy
➢ 函数签名
def partitionBy(partitioner: Partitioner): RDD[(K, V)]
➢ 函数说明
将数据按照指定 Partitioner 重新进行分区。Spark 默认的分区器是 HashPartitioner
val rdd=sc.makeRDD(List(1,2,3,4),2)val mapRDD=rdd.map((_,1))mapRDD.partitionBy(new HashPartitioner(2)).saveAsTextFile("output")
调用前应该是1和2在一个分区,3和4在一个分区,调用后如下:
3.2 reduceBykey
➢ 函数签名
def reduceByKey(func: (V, V) => V): RDD[(K, V)]
def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)]
➢ 函数说明
可以将数据按照相同的 Key 对 Value 进行聚合
reduceByKey分区内和分区间计算规则相同
reduceByKey两两聚合,这里因为key为b的只有一个,所以(“b”,4)不进行聚合。1先和2聚合得3,再和3聚合得6
val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))val reduceRDD: RDD[(String, Int)] = rdd.reduceByKey((x:Int, y:Int)=>{x+y})reduceRDD.collect().foreach(println)
输出:
(a,6)
(b,4)
3.3 groupByKey
➢ 函数签名
def groupByKey(): RDD[(K, Iterable[V])]
def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])]
def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])]
➢ 函数说明
将数据源的数据根据 key 对 value 进行分组
// groupByKey : 将数据源中的数据,相同key的数据分在一个组中,形成一个对偶元组
// 元组中的第一个元素就是key,
// 元组中的第二个元素就是相同key的value的集合
val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3),("b",4)))
val groupRDD: RDD[(String, Iterable[Int])] = rdd.groupByKey()groupRDD.collect().foreach(println)
val rdd2: RDD[(String, Iterable[(String, Int)])] = rdd.groupBy(_._1)rdd2.collect().foreach(println)
两者输出有所区别:
(a,CompactBuffer(1, 2, 3))
(b,CompactBuffer(4))
(a,CompactBuffer((a,1), (a,2), (a,3)))
(b,CompactBuffer((b,4)))
问题:groupByKey和reduceByKey区别?
- 从 shuffle 的角度:reduceByKey 和 groupByKey 都存在 shuffle 的操作,但是 reduceByKey可以在 shuffle 前对分区内相同 key 的数据进行预聚合(combine)功能,这样会减少落盘的数据量,而 groupByKey 只是进行分组,不存在数据量减少的问题,reduceByKey 性能比较高。
- 从功能的角度:reduceByKey 其实包含分组和聚合的功能。GroupByKey只能分组,不能聚合,所以在分组聚合的场合下,推荐使reduceByKey,如果仅仅是分组而不需要聚合。那么还是只能使用 groupByKey
3.4 aggregateByKey
函数签名
def aggregateByKey[U: ClassTag](zeroValue: U)(
seqOp: (U, V) => U, combOp: (U, U) => U): RDD[(K, U)]
➢ 函数说明
将数据根据不同的规则进行分区内计算和分区间计算
实列:分区内相同key求最大值,分区间求和
// aggregateByKey存在函数柯里化,有两个参数列表// 第一个参数列表,需要传递一个参数,表示为初始值// 主要用于当碰见第一个key的时候,和value进行分区内计算// 第二个参数列表需要传递2个参数// 第一个参数表示分区内计算规则// 第二个参数表示分区间计算规则val rdd=sc.makeRDD(List(("a",1),("a",2),("a",3),("a",4)),2)rdd.aggregateByKey(zeroValue = 0)((x,y)=>math.max(x,y),(x,y)=>x+y).collect().foreach(println)
输出:
(a,6)
val rdd=sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)rdd.aggregateByKey(zeroValue = 0)((x,y)=>math.max(x,y),(x,y)=>x+y).collect().foreach(println)
输出:
(b,8)
(a,8)
实列:获取相同key的数据平均值
aggregateByKey最终返回的结果应该和初始类型值保持一致:
val rdd=sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)//这里传入的初始值(0,0)第一个表示相同key的值的和第二个表示相同key值次数//因为传入是tuple,所以返回就变成了tupleval newRDD: RDD[(String, (Int, Int))] = rdd.aggregateByKey((0, 0))(//分区内求和,次数累计(t, v) => {(t._1 + v, t._2 + 1)},//分区间求和,次数累计(t1, t2) => {(t1._1 + t2._1, t1._2 + t2._2)})//统计,用总和除以次数求平均值val resultRDD: RDD[(String, Int)] = newRDD.mapValues {case (num, count) => {num / count}}resultRDD.collect().foreach(println)
输出:
(b,4)
(a,3)
3.5 combineByKey
➢ 函数签名
def combineByKey[C](
createCombiner: V => C,
mergeValue: (C, V) => C,
mergeCombiners: (C, C) => C): RDD[(K, C)]
➢ 函数说明
最通用的对 key-value 型 rdd 进行聚集操作的聚集函数(aggregation function)。类似于aggregate(),combineByKey()允许用户返回值的类型与输入不一致。
对于上述实例为了不使用初始值,可以使用combineByKey对第一个元素进行类型变换,例如将(“a”,1)=>(“a",(1,1))
val rdd=sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)val newRDD: RDD[(String, (Int, Int))] = rdd.combineByKey(v=>(v,1),(t:(Int,Int), v) => {(t._1 + v, t._2 + 1)},(t1:(Int,Int), t2:(Int,Int)) => {(t1._1 + t2._1, t1._2 + t2._2)})val resultRDD: RDD[(String, Int)] = newRDD.mapValues {case (num, count) => {num / count}}resultRDD.collect().foreach(println)
同样输出:
(b,4)
(a,3)
3.6 foldByKey
➢ 函数签名
def foldByKey(zeroValue: V)(func: (V, V) => V): RDD[(K, V)]
➢ 函数说明
当分区内计算规则和分区间计算规则相同时,aggregateByKey 就可以简化为 foldByKey
如果分区内和分区间计算规则相同可以使用
val rdd=sc.makeRDD(List(("a",1),("a",2),("b",3),("b",4),("b",5),("a",6)),2)rdd.foldByKey(zeroValue = 0)(_+_).collect.foreach(println)
输出:
(b,12)
(a,9)
*思考一个问题:reduceByKey、foldByKey、aggregateByKey、*combineByKey 的区别?
- reduceByKey: 相同 key 的第一个数据不进行任何计算,分区内和分区间计算规则相同
- FoldByKey: 相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则相同
- AggregateByKey:相同 key 的第一个数据和初始值进行分区内计算,分区内和分区间计算规则可以不相同
- CombineByKey:当计算时,发现数据结构不满足要求时,可以让第一个数据转换结构。分区内和分区间计算规则不相同。
3.7 sortByKey
➢ 函数签名
def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length) : RDD[(K, V)]
➢ 函数说明
在一个(K,V)的 RDD 上调用,K 必须实现 Ordered 接口(特质),返回一个按照 key 进行排序的
val dataRDD1 = sparkContext.makeRDD(List(("a",1),("b",2),("c",3)))
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(true)
val sortRDD1: RDD[(String, Int)] = dataRDD1.sortByKey(false)
3.8 join
val rdd1=sc.makeRDD(List(("a",1),("b",2),("c",3)))val rdd2=sc.makeRDD(List(("a",4),("b",5),("c",6)))val joinRDD: RDD[(String, (Int, Int))] = rdd1.join(rdd2)joinRDD.collect().foreach(println)
输出:
(a,(1,4))
(b,(2,5))
(c,(3,6))
- join : 两个不同数据源的数据,相同的key的value会连接在一起,形成元组
- 如果两个数据源中key没有匹配上,那么数据不会出现在结果中
- 如果两个数据源中key有多个相同的,会依次匹配,可能会出现笛卡尔乘积,数据量会几何性增长,会导致性能降低。
eg:
(“a”,1),(“d”,2),(“c”,3)
(“a”,4),(“c”,5),(“e”,6)
结果是:
(a,(1,4))
(c,(3,5))
eg:
(“a”,1),(“a”,2),(“c”,3)
(“a”,5),(“c”,6),(“a”,4)
结果是:
(a,(1,5))
(a,(1,4))
(a,(2,5))
(a,(2,4))
(c,(3,6))
3.9 leftOuterJoin 和 rightOuterJoin
➢ 函数签名
def leftOuterJoin[W](other: RDD[(K, W)]): RDD[(K, (V, Option[W]))]
➢ 函数说明
类似于 SQL 语句的左外连接
val rdd1=sc.makeRDD(List(("a",1),("b",2),("c",3)))val rdd2=sc.makeRDD(List(("a",4),("b",5)//,("c",6)))val leftJoinRDD: RDD[(String, (Int, Option[Int]))] = rdd1.leftOuterJoin(rdd2)leftJoinRDD.collect().foreach(println)sc.stop()
输出:
(a,(1,Some(4)))
(b,(2,Some(5)))
(c,(3,None))
val rdd1=sc.makeRDD(List(("a",1),("b",2)//,("c",3)))val rdd2=sc.makeRDD(List(("a",4),("b",5),("c",6)))val rightJoinRDD: RDD[(String, (Int, Option[Int]))] = rdd1.rightOuterJoin(rdd2)rightJoinRDD.collect().foreach(println)sc.stop()
(a,(Some(1),4))
(b,(Some(2),5))
(c,(None,6))
3.10 cogroup
➢ 函数签名
def cogroup[W](other: RDD[(K, W)]): RDD[(K, (Iterable[V], Iterable[W]))]
➢ 函数说明
在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable,Iterable))类型的 RDD
val rdd1=sc.makeRDD(List(("a",1),("b",2),("c",3)))val rdd2=sc.makeRDD(List(("a",4),("b",5)//,("c",6)))val cgRDD: RDD[(String, (Iterable[Int], Iterable[Int]))] = rdd1.cogroup(rdd2)cgRDD.collect().foreach(println)sc.stop()
输出:
(a,(CompactBuffer(1),CompactBuffer(4)))
(b,(CompactBuffer(2),CompactBuffer(5)))
(c,(CompactBuffer(3),CompactBuffer()))
这篇关于Spark学习笔记(详解,附代码实列和图解)----------RDD(一)基础和转换算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!