横扫Spark之 - 9个常见的行动算子

2024-02-10 04:04

本文主要是介绍横扫Spark之 - 9个常见的行动算子,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

水善利万物而不争,处众人之所恶,故几于道💦

文章目录

      • 1. collect()
      • 2. count()
      • 3. first()
      • 4. take()
      • 5. takeOrdered()
      • 6. countByKey()
      • 7. saveAS...()
      • 8. foreach()
      • 9. foreachPartition() ***

1. collect()

  收集RDD每个分区的数据以数组封装之后发给Driver
  如果RDD数据量比较大,Driver内存默认只有1G,可能出现内存溢出,工作中一般需要将Driver内存设置为5-10G。可以通过bin/spark-submit --driver-memory 10G 这样设置

  @Testdef collect(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))val arr = rdd1.collect()println(arr.toList)}

结果:
在这里插入图片描述

2. count()

  返回RDD中元素的个数

@Test
def count(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6))println(rdd1.count())
}

结果:
在这里插入图片描述

3. first()

  返回RDD中的第一个元素
  他会从多个分区取数据,如果0号分区取到了数据的话就只有一个job;如果0号分区没有取到数据,或者取到的数据不够,那就会再启动一个job去其他分区取

  @Testdef first(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),7)// 0号分区没有数据所以就会再启动一个job从后面的分区取,所以web页面看到有两个jobval i = rdd1.first()println(i)Thread.sleep(10000000)
}

结果:
在这里插入图片描述

4. take()

  返回RDD中前n个元素组成的数组
  take和first一样如果取到就一个job如果取不到或者没取够就再来一个job去取

@Test
def take(): Unit ={val rdd1 = sc.parallelize(List(1, 7, 3, 9, 42, 6),3)println(rdd1.take(3).toList)Thread.sleep(10000000)
}

结果:
在这里插入图片描述

5. takeOrdered()

  这个是取排序之后的前几个元素
  takeOrdered没有shuffle,因为只需要每个分区取前三然后拉到一起再取一次前三就完事了

@Test
def takeOrdered(): Unit ={val rdd1 = sc.parallelize(List(1, 7,98,3,7,86,23,54, 9, 42, 6),3)val ints = rdd1.takeOrdered(3)println(ints.toList)Thread.sleep(1000000)
}

结果:
在这里插入图片描述

6. countByKey()

  统计每个key出现的次数,返回的结果是(key,次数)

@Test
def countByKey(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))val rdd2 = rdd1.countByKey()println(rdd2.toList)
}

结果:
在这里插入图片描述

7. saveAS…()

  saveAsTextFile(path)将数据保存成text文件,有几个task就保存几个文件
  saveAsSequenceFile(path)将数据保存成Sequencefile文件【只有kv类型RDD有该操作,单值的没有】
  saveAsObjectFile(path)将数据序列化成对象保存到文件

@Test
def save(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.saveAsTextFile("output/text")  // 为啥保存出来8个文件因为有8个taskrdd1.saveAsObjectFile("output/ObjectFile")rdd1.saveAsSequenceFile("output/SequenceFile")
}

结果:
在这里插入图片描述

8. foreach()

  遍历RDD中的每个元素

@Test
def foreach(): Unit = {val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.foreach(println)
}

结果:
在这里插入图片描述

9. foreachPartition() ***

  对每个分区遍历,参数列表传入的函数是针对每个分区的操作,有多少个分区函数就执行多少次
  foreachPartition的使用场景是:一般用于将数据写入mysql/redis/hbase等位置,可以减少连接的创建、销毁次数,提高效率

@Test
def foreachPartition(): Unit ={val rdd1 = sc.parallelize(List("aa" -> 1, "bb" -> 5, "aa" -> 7, "cc" -> 9, "aa" -> 100))rdd1.foreachPartition(it=>{var connection:Connection = nullvar statement:PreparedStatement = nulltry{connection = DriverManager.getConnection("jdbc:mysql://localhost:3306/test","root","123456")statement = connection.prepareStatement("insert into wc values(?,?)")//  计数器var count = 0it.foreach(x=>{statement.setString(1,x._1)statement.setInt(2,x._2)// 添加到批中,一批一批的执行statement.addBatch()// 满1000条执行一批if(count % 1000 == 0){statement.executeBatch()// todo 执行完批后要记得clearBatch !!!!!statement.clearBatch()}count = count+1})// 最后不满1000条的也执行一次statement.executeBatch()}catch {case e:Exception => e.printStackTrace()}finally {if (connection != null) {connection.close()}if (statement != null) {statement.close()}}})
}

结果:
在这里插入图片描述

这篇关于横扫Spark之 - 9个常见的行动算子的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/696072

相关文章

Spring常见错误之Web嵌套对象校验失效解决办法

《Spring常见错误之Web嵌套对象校验失效解决办法》:本文主要介绍Spring常见错误之Web嵌套对象校验失效解决的相关资料,通过在Phone对象上添加@Valid注解,问题得以解决,需要的朋... 目录问题复现案例解析问题修正总结  问题复现当开发一个学籍管理系统时,我们会提供了一个 API 接口去

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

C语言线程池的常见实现方式详解

《C语言线程池的常见实现方式详解》本文介绍了如何使用C语言实现一个基本的线程池,线程池的实现包括工作线程、任务队列、任务调度、线程池的初始化、任务添加、销毁等步骤,感兴趣的朋友跟随小编一起看看吧... 目录1. 线程池的基本结构2. 线程池的实现步骤3. 线程池的核心数据结构4. 线程池的详细实现4.1 初

JVM 常见异常及内存诊断

栈内存溢出 栈内存大小设置:-Xss size 默认除了window以外的所有操作系统默认情况大小为 1MB,window 的默认大小依赖于虚拟机内存。 栈帧过多导致栈内存溢出 下述示例代码,由于递归深度没有限制且没有设置出口,每次方法的调用都会产生一个栈帧导致了创建的栈帧过多,而导致内存溢出(StackOverflowError)。 示例代码: 运行结果: 栈帧过大导致栈内存

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

模拟实现vector中的常见接口

insert void insert(iterator pos, const T& x){if (_finish == _endofstorage){int n = pos - _start;size_t newcapacity = capacity() == 0 ? 2 : capacity() * 2;reserve(newcapacity);pos = _start + n;//防止迭代

【Kubernetes】常见面试题汇总(三)

目录 9.简述 Kubernetes 的缺点或当前的不足之处? 10.简述 Kubernetes 相关基础概念? 9.简述 Kubernetes 的缺点或当前的不足之处? Kubernetes 当前存在的缺点(不足)如下: ① 安装过程和配置相对困难复杂; ② 管理服务相对繁琐; ③ 运行和编译需要很多时间; ④ 它比其他替代品更昂贵; ⑤ 对于简单的应用程序来说,可能不

【附答案】C/C++ 最常见50道面试题

文章目录 面试题 1:深入探讨变量的声明与定义的区别面试题 2:编写比较“零值”的`if`语句面试题 3:深入理解`sizeof`与`strlen`的差异面试题 4:解析C与C++中`static`关键字的不同用途面试题 5:比较C语言的`malloc`与C++的`new`面试题 6:实现一个“标准”的`MIN`宏面试题 7:指针是否可以是`volatile`面试题 8:探讨`a`和`&a`

常见的服务器

常见的Web服务器 1、Tomcat:Tomcat和Java结合得最好,是Oracle官方推荐的JSP服务器。Tomcat是开源的Web服务器,经过长时间的发展,性能、稳定性等方面都非常优秀。 2、Jetty:另一个优秀的Web服务器。Jetty有个更大的优点是,Jetty可作为一个嵌入式服务器,即:如果在应用中加入Jetty的JAR文件,应用可在代码中对外提供Web服务。 3、Resin:

【Kubernetes】常见面试题汇总(一)

目录 1.简述 etcd 及其特点? 2.简述 etcd 适应的场景? 3.简述什么是Kubernetes? 4.简述 Kubernetes和 Docker的关系? 1.简述 etcd 及其特点? (1)etcd 是Core0s 团队发起的开源项目,是一个管理配置信息和服务发现(service discovery)的项目,它的目标是构建一个高可用的分布式键值(keyvalue)数据