横扫Spark之 - 9个常见的行动算子

2024-02-10 04:04

本文主要是介绍横扫Spark之 - 9个常见的行动算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 1. collect()
      • 2. count()
      • 3. first()
      • 4. take()
      • 5. takeOrdered()
      • 6. countByKey()
      • 7. saveAS...()
      • 8. foreach()
      • 9. foreachPartition() ***

1. collect()

  收集RDD每个分区的数据以数组封装之后发给Driver
  如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,工作中一般需要将Driver内存设置为5-10G。可以通过bin/spark-submit --driver-memory 10G 这样设置

  @Testdef collect(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))val arr = rdd1.collect()println(arr.toList)}

结果:
在这里插入图片描述

2. count()

  返回RDD中元素的个数

@Test
def count(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))println(rdd1.count())
}

结果:
在这里插入图片描述

3. first()

  返回RDD中的第一个元素
  他会从多个分区取数据,如果0号分区取到了数据的话就只有一个job;如果0号分区没有取到数据,或者取到的数据不够,那就会再启动一个job去其他分区取

  @Testdef first(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),7)// 0号分区没有数据所以就会再启动一个job从后面的分区取,所以web页面看到有两个jobval i = rdd1.first()println(i)Thread.sleep(10000000)
}

结果:
在这里插入图片描述

4. take()

  返回RDD中前n个元素组成的数组
  take和first一样如果取到就一个job如果取不到或者没取够就再来一个job去取

@Test
def take(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),3)println(rdd1.take(3).toList)Thread.sleep(10000000)
}

结果:
在这里插入图片描述

5. takeOrdered()

  这个是取排序之后的前几个元素
  takeOrdered没有shuffle,因为只需要每个分区取前三然后拉到一起再取一次前三就完事了

@Test
def takeOrdered(): Unit ={val rdd1 = sc.parallelize(List(1, 7,98,3,7,86,23,54, 9, 42, 6),3)val ints = rdd1.takeOrdered(3)println(ints.toList)Thread.sleep(1000000)
}

结果:
在这里插入图片描述

6. countByKey()

  统计每个key出现的次数,返回的结果是(key,次数)

@Test
def countByKey(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))val rdd2 = rdd1.countByKey()println(rdd2.toList)
}

结果:
在这里插入图片描述

7. saveAS…()

  saveAsTextFile(path)将数据保存成text文件,有几个task就保存几个文件
  saveAsSequenceFile(path)将数据保存成Sequencefile文件【只有kv类型RDD有该操作,单值的没有】
  saveAsObjectFile(path)将数据序列化成对象保存到文件

@Test
def save(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.saveAsTextFile("output/text")  // 为啥保存出来8个文件因为有8个taskrdd1.saveAsObjectFile("output/ObjectFile")rdd1.saveAsSequenceFile("output/SequenceFile")
}

结果:
在这里插入图片描述

8. foreach()

  遍历RDD中的每个元素

@Test
def foreach(): Unit = {val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.foreach(println)
}

结果:
在这里插入图片描述

9. foreachPartition() ***

  对每个分区遍历,参数列表传入的函数是针对每个分区的操作,有多少个分区函数就执行多少次
  foreachPartition的使用场景是:一般用于将数据写入mysql/redis/hbase等位置,可以减少连接的创建、销毁次数,提高效率

@Test
def foreachPartition(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.foreachPartition(it=>{var connection:Connection = nullvar statement:PreparedStatement = nulltry{connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")statement = connection.prepareStatement("insert into wc values(?,?)")//  计数器var count = 0it.foreach(x=>{statement.setString(1,x._1)statement.setInt(2,x._2)// 添加到批中,一批一批的执行statement.addBatch()// 满1000条执行一批if(count % 1000 == 0){statement.executeBatch()// todo 执行完批后要记得clearBatch !!!!!statement.clearBatch()}count = count+1})// 最后不满1000条的也执行一次statement.executeBatch()}catch {case e:Exception => e.printStackTrace()}finally {if (connection != null) {connection.close()}if (statement != null) {statement.close()}}})
}

结果:
在这里插入图片描述

这篇关于横扫Spark之 - 9个常见的行动算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/696072

相关文章

SQL BETWEEN 的常见用法小结

《SQLBETWEEN的常见用法小结》BETWEEN操作符是SQL中非常有用的工具,它允许你快速选取某个范围内的值,本文给大家介绍SQLBETWEEN的常见用法,感兴趣的朋友一起看看吧... 在SQL中,BETWEEN是一个操作符,用于选取介于两个值之间的数据。它包含这两个边界值。BETWEEN操作符常用

python中各种常见文件的读写操作与类型转换详细指南

《python中各种常见文件的读写操作与类型转换详细指南》这篇文章主要为大家详细介绍了python中各种常见文件(txt,xls,csv,sql,二进制文件)的读写操作与类型转换,感兴趣的小伙伴可以跟... 目录1.文件txt读写标准用法1.1写入文件1.2读取文件2. 二进制文件读取3. 大文件读取3.1

C++中初始化二维数组的几种常见方法

《C++中初始化二维数组的几种常见方法》本文详细介绍了在C++中初始化二维数组的不同方式,包括静态初始化、循环、全部为零、部分初始化、std::array和std::vector,以及std::vec... 目录1. 静态初始化2. 使用循环初始化3. 全部初始化为零4. 部分初始化5. 使用 std::a

前端下载文件时如何后端返回的文件流一些常见方法

《前端下载文件时如何后端返回的文件流一些常见方法》:本文主要介绍前端下载文件时如何后端返回的文件流一些常见方法,包括使用Blob和URL.createObjectURL创建下载链接,以及处理带有C... 目录1. 使用 Blob 和 URL.createObjectURL 创建下载链接例子:使用 Blob

C++ vector的常见用法超详细讲解

《C++vector的常见用法超详细讲解》:本文主要介绍C++vector的常见用法,包括C++中vector容器的定义、初始化方法、访问元素、常用函数及其时间复杂度,通过代码介绍的非常详细,... 目录1、vector的定义2、vector常用初始化方法1、使编程用花括号直接赋值2、使用圆括号赋值3、ve

Pytest多环境切换的常见方法介绍

《Pytest多环境切换的常见方法介绍》Pytest作为自动化测试的主力框架,如何实现本地、测试、预发、生产环境的灵活切换,本文总结了通过pytest框架实现自由环境切换的几种方法,大家可以根据需要进... 目录1.pytest-base-url2.hooks函数3.yml和fixture结论你是否也遇到过

C/C++错误信息处理的常见方法及函数

《C/C++错误信息处理的常见方法及函数》C/C++是两种广泛使用的编程语言,特别是在系统编程、嵌入式开发以及高性能计算领域,:本文主要介绍C/C++错误信息处理的常见方法及函数,文中通过代码介绍... 目录前言1. errno 和 perror()示例:2. strerror()示例:3. perror(

Go标准库常见错误分析和解决办法

《Go标准库常见错误分析和解决办法》Go语言的标准库为开发者提供了丰富且高效的工具,涵盖了从网络编程到文件操作等各个方面,然而,标准库虽好,使用不当却可能适得其反,正所谓工欲善其事,必先利其器,本文将... 目录1. 使用了错误的time.Duration2. time.After导致的内存泄漏3. jsO

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

java常见报错及解决方案总结

《java常见报错及解决方案总结》:本文主要介绍Java编程中常见错误类型及示例,包括语法错误、空指针异常、数组下标越界、类型转换异常、文件未找到异常、除以零异常、非法线程操作异常、方法未定义异常... 目录1. 语法错误 (Syntax Errors)示例 1:解决方案:2. 空指针异常 (NullPoi