Spark算子:转化算子、执行算子;累加器、广播变量

2024-03-08 09:20

本文主要是介绍Spark算子:转化算子、执行算子;累加器、广播变量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

目录

 一、转换算子

1、map

2、fliter

3、flatMap

4、Sample

5、Group

6、ReduceBykey

7、Union

8、Join

9、mapValus

10、sortBy

11、distinct

二、操作算子

三、累加器

四、广播变量


transformations转换算子:延迟执行--针对RDD的操作

Action操作算子:触发执行,转换算子是懒执行,需要一个action算子触发执行

 一、转换算子

1、map

    val conf = new SparkConf()conf.setMaster("local")conf.setAppName("map")val sc = new SparkContext(conf)//用parallelize构建rdd,不用读数据去创建rdd,后面是分区数val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6, 7, 8, 9), 2)//getNumPartitions获取当前rdd分区数println(s"rdd1:${rdd1.getNumPartitions}")/*mapPartitions: 将一个分区的数据传递给后面的函数,一次处理一个分区的数据,需要返回一个迭代器为什么是迭代器而不是集合,因为集合会将数据加载到内存中,如果一个分区数据量太大会导致内存溢出*/val rdd2: RDD[Int] = rdd1.mapPartitions((iter: Iterator[Int]) => {println("=" * 40)val iter1: Iterator[Int] = iter.map(i => i * 2)iter1})//分区分批输出rdd2.foreach(println(_))/*mapPartitionsWithIndex获取当前分区号和分区数据*/val rdd3: RDD[Int] = rdd1.mapPartitionsWithIndex((i: Int, iter: Iterator[Int]) => {println(s"当前的分区编号:${i}")val ints: Iterator[Int] = iter.map(i => i - 1)ints})rdd3.foreach(println(_))

2、fliter

        对数据进行过滤,函数返回treu保留数据,函数返回false过滤数据

3、flatMap

        将rdd中的数据一行变多行,函数返回值必须是一个序列

4、Sample

        抽样数据

val conf = new SparkConf()conf.setAppName("sample")conf.setMaster("local")
val sc = new SparkContext(conf)val studentRDD: RDD[String] = sc.textFile("data/students.txt")
//sample: 对数据进行抽样
//false表示数据写不写入文件中,0.1是抽样比例,抽样的数量存在误差
val sRdd: RDD[String] = studentRDD.sample(false, 0.1)sRdd.foreach(println(_))

5、Group

(1)GroupBY

        指定一个分组的字段进行分组,不需要一定是一个kv格式,返回的新的rdd的value里面包括所有的字段

(2)GroupBYkey

        rdd必须是一个kv格式,返回的新的rdd的迭代器中的数据只包含value, 后续在处理数据的时候方便一点

val rdd1: RDD[String] = sc.parallelize(List("1500100001,1000001,98", "1500100001,1000002,5", "1500100001, 1000003,137", "1500100001, 1000004,29", "1500100001, 1000005,85"))val rdd2: RDD[(String, Int)] =rdd1.map(str=>str.split(",")).map{case Array(id: String, _: String, sco: String) =>(id, sco.toInt)}rdd2.foreach(print(_))
//(1500100001,98)(1500100001,5)(1500100001,137)(1500100001,29)(1500100001,85)val rdd3: RDD[(String, Iterable[(String, Int)])] = rdd2.groupBy(str => str._1)rdd3.foreach(println(_))
//(1500100001,CompactBuffer((1500100001,98), (1500100001,5), (1500100001,137), (1500100001,29), (1500100001,85)))val rdd4: RDD[(String, Iterable[Int])] = rdd2.groupByKey()rdd4.foreach(println(_))
//(1500100001,CompactBuffer(98, 5, 137, 29, 85))

6、ReduceBykey

        在kv格式的数据中,对相同k的v做和运算,groupBYKey需要先分组,分组后v是一个集合,在对集合做求和运算

reduceBYKey是对相同的key的value进行聚合计算,加上一个聚合函数。

//x和y表示相同的key的两个value值,x+y表示对相同key的value做求和运算
val rdd5: RDD[(String, Int)] = rdd2.reduceByKey((x: Int, y: Int) => {val j: Int = x + yj})rdd5.foreach(println(_))
//(1500100001,354)

7、Union

        合并两个rdd, 不会对数据做去重, 两个rdd的类型要完全一致,在物理层面并没有合并,只是在逻辑层面合并了,合并的rdd是两个分区。

    val rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 6))val rdd2: RDD[Int] = sc.parallelize(List(3, 4, 5, 6, 7, 8, 9))val unionRDD: RDD[Int] = rdd1.union(rdd2)println(s"unionRDD:${unionRDD.getNumPartitions}") //2个分区unionRDD.foreach(println)

8、Join

val idNameRDD: RDD[(String, String)] = sc.parallelize(List(("000", "晓伟"),("001", "张三"),("002", "李四"),("003", "王五")))val idAgeRDD: RDD[(String, Int)] = sc.parallelize(List(("001", 23),("002", 24),("003", 25),("004", 23)))
//innerJoin: 两个表都有才能关联上val innerJoinRDD: RDD[(String, (String, Int))] = idNameRDD.join(idAgeRDD)innerJoinRDD.foreach(println)
结果:
(003,(王五,25))
(002,(李四,24))
(001,(张三,23))/*** leftOuterJoin: 以左表为基础,如果右表没有这个key,补NOne* Option: 可选择的值,有值或者没有值
*/val leftJoinRDD: RDD[(String, (String, Option[Int]))] = idNameRDD.leftOuterJoin(idAgeRDD)leftJoinRDD.foreach(println)
结果:
(003,(王五,Some(25)))
(000,(晓伟,None))
(002,(李四,Some(24)))
(001,(张三,Some(23)))//fullOuterJoin: 以两个表为基础,有一边有数据就会出来结果,列一边补Noneval fullJoinRDD: RDD[(String, (Option[String], Option[Int]))] = idNameRDD.fullOuterJoin(idAgeRDD)fullJoinRDD.foreach(println)
结果:
(003,(Some(王五),Some(25)))
(000,(Some(晓伟),None))
(004,(None,Some(23)))
(002,(Some(李四),Some(24)))
(001,(Some(张三),Some(23)))

9、mapValus

        key不变,对value做处理

val idAgeRDD: RDD[(String, Int)] = sc.parallelize(List(("001", 23),("002", 24),("003", 25),("004", 23)))val rdd: RDD[(String, Int)] = idAgeRDD.mapValues(age => age + 1)rdd.foreach(println)
结果:
(001,24)
(002,25)
(003,26)
(004,24)

10、sortBy

        指定一个字段进行排序

rdd.sortBy(kv => kv._2, ascending = false)
//ascending=false 表示降序,true表示升序

11、distinct

        对数据去重,会产生shuffle

al rdd1: RDD[Int] = sc.parallelize(List(1, 2, 3, 4, 5, 7, 87, 9, 4, 4, 3, 2, 4, 5, 6))val distinctRDD: RDD[Int] = rdd1.distinct()distinctRDD.foreach(print)//4163798752

二、操作算子

//count: 统计rdd的数据行数val count: Long = studentRDD.count()println(count)//sum: 对rdd中的数据求和,rdd中的数据类型必须是数字val sumAge: Double = studentRDD.map(line => line.split(",")(2).toInt)//对所有数据求和,只能是数字类型.sum()println(sumAge / count)//take: 取集合的前几个数, 返回一个数组,不能取太多, 会导致内存溢出val top: Array[String] = studentRDD.take(10)top.foreach(println)//collect: 将rdd转换成数组,如果rdd数据量比较大,会导致内存溢出val array: Array[String] = studentRDD.collect()array.foreach(println)/**
* foreach: 遍历rdd中的数据,也是一个action算子
* foreachPartition: 一次将一个分区的数据传递给后面的函数
*/studentRDD.foreach(println)studentRDD.foreachPartition((iter: Iterator[String]) => {iter.foreach(println)})//saveAsTextFile: 将数据保存在hdfs中HDFSUtil.deletePath("data/test")studentRDD.saveAsTextFile("data/test")

三、累加器

//算子的代码运行在Driver端var count = 0studentRDD.foreach(stu => {//算子内的代码运行在Executor端//在spark写代码的时候不能在算子内取修改算子外的一个普通变量,//就算修改了在算子外也不会生效count += 1println(count) //会打出依次输出数据的数量})println(count) //输出为0/*** 累加器*///1、定义累加器val countAcc: LongAccumulator = sc.longAccumulatorstudentRDD.foreach(stu => {//2、在算子内对累加器进行累加countAcc.add(1)})//3、在算子外获取累加的结果println(countAcc.value) //结果为数据的数量

四、广播变量

def main(args: Array[String]): Unit = {val conf = new SparkConf()conf.setAppName("bro")conf.setMaster("local")val sc = new SparkContext(conf)val studentRDD: RDD[String] = sc.textFile("data/students.txt")val scoreRDD: RDD[String] = sc.textFile("data/score.txt")val array: Array[String] = studentRDD.collect()val array2: Array[(String, String)] = array.map(stu => {val s: Array[String] = stu.split(",")(s(0), stu)})val map: Map[String, String] = array2.toMap  //普通变量val Rdd: RDD[(String, String)] = scoreRDD.map(stu => {val strings: Array[String] = stu.split(",")val str: String = map.getOrElse(strings(0), "默认值·") //在算子内使用普通变量(str, stu)})Rdd.foreach(println(_))}
}

 

val map: Map[String, String] = array2.toMap//将一个普通变量广播出去,通常是较大的变量val mapBro: Broadcast[Map[String, String]] = sc.broadcast(map)val Rdd: RDD[(String, String)] = scoreRDD.map(stu => {val strings: Array[String] = stu.split(",")//使用value方法获取广播变量中的值val values: Map[String, String] = mapBro.valueval str: String = values.getOrElse(strings(0), "默认值·")(str, stu)})Rdd.foreach(println(_))

这篇关于Spark算子:转化算子、执行算子;累加器、广播变量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/786648

相关文章

grom设置全局日志实现执行并打印sql语句

《grom设置全局日志实现执行并打印sql语句》本文主要介绍了grom设置全局日志实现执行并打印sql语句,包括设置日志级别、实现自定义Logger接口以及如何使用GORM的默认logger,通过这些... 目录gorm中的自定义日志gorm中日志的其他操作日志级别Debug自定义 Loggergorm中的

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

在MySQL执行UPDATE语句时遇到的错误1175的解决方案

《在MySQL执行UPDATE语句时遇到的错误1175的解决方案》MySQL安全更新模式(SafeUpdateMode)限制了UPDATE和DELETE操作,要求使用WHERE子句时必须基于主键或索引... mysql 中遇到的 Error Code: 1175 是由于启用了 安全更新模式(Safe Upd

Java文件与Base64之间的转化方式

《Java文件与Base64之间的转化方式》这篇文章介绍了如何使用Java将文件(如图片、视频)转换为Base64编码,以及如何将Base64编码转换回文件,通过提供具体的工具类实现,作者希望帮助读者... 目录Java文件与Base64之间的转化1、文件转Base64工具类2、Base64转文件工具类3、

如何将二进制文件流转化为MockMultipartFile文件

《如何将二进制文件流转化为MockMultipartFile文件》文章主要介绍了如何使用Spring框架中的MockMultipartFile类来模拟文件上传,并处理上传逻辑,包括获取二进制文件流、创... 目录一、名词解释及业务解释1.具体业务流程2.转换对象解释1. MockMultipartFile2

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed

浅析Rust多线程中如何安全的使用变量

《浅析Rust多线程中如何安全的使用变量》这篇文章主要为大家详细介绍了Rust如何在线程的闭包中安全的使用变量,包括共享变量和修改变量,文中的示例代码讲解详细,有需要的小伙伴可以参考下... 目录1. 向线程传递变量2. 多线程共享变量引用3. 多线程中修改变量4. 总结在Rust语言中,一个既引人入胜又可

详解如何在React中执行条件渲染

《详解如何在React中执行条件渲染》在现代Web开发中,React作为一种流行的JavaScript库,为开发者提供了一种高效构建用户界面的方式,条件渲染是React中的一个关键概念,本文将深入探讨... 目录引言什么是条件渲染?基础示例使用逻辑与运算符(&&)使用条件语句列表中的条件渲染总结引言在现代

Go语言实现将中文转化为拼音功能

《Go语言实现将中文转化为拼音功能》这篇文章主要为大家详细介绍了Go语言中如何实现将中文转化为拼音功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 有这么一个需求:新用户入职 创建一系列账号比较麻烦,打算通过接口传入姓名进行初始化。想把姓名转化成拼音。因为有些账号即需要中文也需要英

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont