横扫Spark之 - 9个常见的行动算子

2024-02-10 04:04

本文主要是介绍横扫Spark之 - 9个常见的行动算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 1. collect()
      • 2. count()
      • 3. first()
      • 4. take()
      • 5. takeOrdered()
      • 6. countByKey()
      • 7. saveAS...()
      • 8. foreach()
      • 9. foreachPartition() ***

1. collect()

  收集RDD每个分区的数据以数组封装之后发给Driver
  如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,工作中一般需要将Driver内存设置为5-10G。可以通过bin/spark-submit --driver-memory 10G 这样设置

  @Testdef collect(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))val arr = rdd1.collect()println(arr.toList)}

结果:
在这里插入图片描述

2. count()

  返回RDD中元素的个数

@Test
def count(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))println(rdd1.count())
}

结果:
在这里插入图片描述

3. first()

  返回RDD中的第一个元素
  他会从多个分区取数据,如果0号分区取到了数据的话就只有一个job;如果0号分区没有取到数据,或者取到的数据不够,那就会再启动一个job去其他分区取

  @Testdef first(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),7)// 0号分区没有数据所以就会再启动一个job从后面的分区取,所以web页面看到有两个jobval i = rdd1.first()println(i)Thread.sleep(10000000)
}

结果:
在这里插入图片描述

4. take()

  返回RDD中前n个元素组成的数组
  take和first一样如果取到就一个job如果取不到或者没取够就再来一个job去取

@Test
def take(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),3)println(rdd1.take(3).toList)Thread.sleep(10000000)
}

结果:
在这里插入图片描述

5. takeOrdered()

  这个是取排序之后的前几个元素
  takeOrdered没有shuffle,因为只需要每个分区取前三然后拉到一起再取一次前三就完事了

@Test
def takeOrdered(): Unit ={val rdd1 = sc.parallelize(List(1, 7,98,3,7,86,23,54, 9, 42, 6),3)val ints = rdd1.takeOrdered(3)println(ints.toList)Thread.sleep(1000000)
}

结果:
在这里插入图片描述

6. countByKey()

  统计每个key出现的次数,返回的结果是(key,次数)

@Test
def countByKey(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))val rdd2 = rdd1.countByKey()println(rdd2.toList)
}

结果:
在这里插入图片描述

7. saveAS…()

  saveAsTextFile(path)将数据保存成text文件,有几个task就保存几个文件
  saveAsSequenceFile(path)将数据保存成Sequencefile文件【只有kv类型RDD有该操作,单值的没有】
  saveAsObjectFile(path)将数据序列化成对象保存到文件

@Test
def save(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.saveAsTextFile("output/text")  // 为啥保存出来8个文件因为有8个taskrdd1.saveAsObjectFile("output/ObjectFile")rdd1.saveAsSequenceFile("output/SequenceFile")
}

结果:
在这里插入图片描述

8. foreach()

  遍历RDD中的每个元素

@Test
def foreach(): Unit = {val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.foreach(println)
}

结果:
在这里插入图片描述

9. foreachPartition() ***

  对每个分区遍历,参数列表传入的函数是针对每个分区的操作,有多少个分区函数就执行多少次
  foreachPartition的使用场景是:一般用于将数据写入mysql/redis/hbase等位置,可以减少连接的创建、销毁次数,提高效率

@Test
def foreachPartition(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.foreachPartition(it=>{var connection:Connection = nullvar statement:PreparedStatement = nulltry{connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")statement = connection.prepareStatement("insert into wc values(?,?)")//  计数器var count = 0it.foreach(x=>{statement.setString(1,x._1)statement.setInt(2,x._2)// 添加到批中,一批一批的执行statement.addBatch()// 满1000条执行一批if(count % 1000 == 0){statement.executeBatch()// todo 执行完批后要记得clearBatch !!!!!statement.clearBatch()}count = count+1})// 最后不满1000条的也执行一次statement.executeBatch()}catch {case e:Exception => e.printStackTrace()}finally {if (connection != null) {connection.close()}if (statement != null) {statement.close()}}})
}

结果:
在这里插入图片描述

这篇关于横扫Spark之 - 9个常见的行动算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/696072

相关文章

java常见报错及解决方案总结

《java常见报错及解决方案总结》:本文主要介绍Java编程中常见错误类型及示例,包括语法错误、空指针异常、数组下标越界、类型转换异常、文件未找到异常、除以零异常、非法线程操作异常、方法未定义异常... 目录1. 语法错误 (Syntax Errors)示例 1:解决方案:2. 空指针异常 (NullPoi

C++常见容器获取头元素的方法大全

《C++常见容器获取头元素的方法大全》在C++编程中,容器是存储和管理数据集合的重要工具,不同的容器提供了不同的接口来访问和操作其中的元素,获取容器的头元素(即第一个元素)是常见的操作之一,本文将详细... 目录一、std::vector二、std::list三、std::deque四、std::forwa

MySQL常见的存储引擎和区别说明

《MySQL常见的存储引擎和区别说明》MySQL支持多种存储引擎,如InnoDB、MyISAM、MEMORY、Archive、CSV和Blackhole,每种引擎有其特点和适用场景,选择存储引擎时需根... 目录mysql常见的存储引擎和区别说明1. InnoDB2. MyISAM3. MEMORY4. A

前端bug调试的方法技巧及常见错误

《前端bug调试的方法技巧及常见错误》:本文主要介绍编程中常见的报错和Bug,以及调试的重要性,调试的基本流程是通过缩小范围来定位问题,并给出了推测法、删除代码法、console调试和debugg... 目录调试基本流程调试方法排查bug的两大技巧如何看控制台报错前端常见错误取值调用报错资源引入错误解析错误

通俗易懂的Java常见限流算法具体实现

《通俗易懂的Java常见限流算法具体实现》:本文主要介绍Java常见限流算法具体实现的相关资料,包括漏桶算法、令牌桶算法、Nginx限流和Redis+Lua限流的实现原理和具体步骤,并比较了它们的... 目录一、漏桶算法1.漏桶算法的思想和原理2.具体实现二、令牌桶算法1.令牌桶算法流程:2.具体实现2.1

C++初始化数组的几种常见方法(简单易懂)

《C++初始化数组的几种常见方法(简单易懂)》本文介绍了C++中数组的初始化方法,包括一维数组和二维数组的初始化,以及用new动态初始化数组,在C++11及以上版本中,还提供了使用std::array... 目录1、初始化一维数组1.1、使用列表初始化(推荐方式)1.2、初始化部分列表1.3、使用std::

SQL 中多表查询的常见连接方式详解

《SQL中多表查询的常见连接方式详解》本文介绍SQL中多表查询的常见连接方式,包括内连接(INNERJOIN)、左连接(LEFTJOIN)、右连接(RIGHTJOIN)、全外连接(FULLOUTER... 目录一、连接类型图表(ASCII 形式)二、前置代码(创建示例表)三、连接方式代码示例1. 内连接(I

Python安装时常见报错以及解决方案

《Python安装时常见报错以及解决方案》:本文主要介绍在安装Python、配置环境变量、使用pip以及运行Python脚本时常见的错误及其解决方案,文中介绍的非常详细,需要的朋友可以参考下... 目录一、安装 python 时常见报错及解决方案(一)安装包下载失败(二)权限不足二、配置环境变量时常见报错及

Go语言利用泛型封装常见的Map操作

《Go语言利用泛型封装常见的Map操作》Go语言在1.18版本中引入了泛型,这是Go语言发展的一个重要里程碑,它极大地增强了语言的表达能力和灵活性,本文将通过泛型实现封装常见的Map操作,感... 目录什么是泛型泛型解决了什么问题Go泛型基于泛型的常见Map操作代码合集总结什么是泛型泛型是一种编程范式,允

C#多线程编程中导致死锁的常见陷阱和避免方法

《C#多线程编程中导致死锁的常见陷阱和避免方法》在C#多线程编程中,死锁(Deadlock)是一种常见的、令人头疼的错误,死锁通常发生在多个线程试图获取多个资源的锁时,导致相互等待对方释放资源,最终形... 目录引言1. 什么是死锁?死锁的典型条件:2. 导致死锁的常见原因2.1 锁的顺序问题错误示例:不同