Spark算子:RDD基本转换操作(6)–zip、zipPartitions

2024-06-23 13:18

本文主要是介绍Spark算子:RDD基本转换操作(6)–zip、zipPartitions,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

zip

      def zip[U](other: RDD[U])(implicit arg0: ClassTag[U]): RDD[(T, U)]

       zip函数用于将两个RDD组合成Key/Value形式的RDD,这里默认两个RDD的partition数量以及元素数量都相同,否则会抛出异常。

scala> var rdd1 = sc.makeRDD(1 to 10,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at makeRDD at :21scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[2] at makeRDD at :21scala> rdd1.zip(rdd2).collect
res0: Array[(Int, String)] = Array((1,A), (2,B), (3,C), (4,D), (5,E))           scala> rdd2.zip(rdd1).collect
res1: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (D,4), (E,5))scala> var rdd3 = sc.makeRDD(Seq("A","B","C","D","E"),3)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at makeRDD at :21scala> rdd1.zip(rdd3).collect
java.lang.IllegalArgumentException: Can't zip RDDs with unequal numbers of partitions
//如果两个RDD分区数不同,则抛出异常

zipPartitions

      zipPartitions函数将多个RDD按照partition组合成为新的RDD,该函数需要组合的RDD具有相同的分区数,但对于每个分区内的元素数量没有要求。

      该函数有好几种实现,可分为三类:

      参数是一个RDD
            def zipPartitions[B, V](rdd2: RDD[B])(f: (Iterator[T], Iterator[B]) => Iterator[V])(implicit arg0: ClassTag[B], arg1:       ClassTag[V]): RDD[V]

            def zipPartitions[B, V](rdd2: RDD[B], preservesPartitioning: Boolean)(f: (Iterator[T], Iterator[B]) => Iterator[V])      (implicit arg0: ClassTag[B], arg1: ClassTag[V]): RDD[V]

      这两个区别就是参数preservesPartitioning,是否保留父RDD的partitioner分区信息

      映射方法f参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[22] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[23] at makeRDD at :21//rdd1两个分区中元素分布:
scala> rdd1.mapPartitionsWithIndex{|         (x,iter) => {|           var result = List[String]()|             while(iter.hasNext){|               result ::= ("part_" + x + "|" + iter.next())|             }|             result.iterator|            |         }|       }.collect
res17: Array[String] = Array(part_0|2, part_0|1, part_1|5, part_1|4, part_1|3)//rdd2两个分区中元素分布
scala> rdd2.mapPartitionsWithIndex{|         (x,iter) => {|           var result = List[String]()|             while(iter.hasNext){|               result ::= ("part_" + x + "|" + iter.next())|             }|             result.iterator|            |         }|       }.collect
res18: Array[String] = Array(part_0|B, part_0|A, part_1|E, part_1|D, part_1|C)//rdd1和rdd2做zipPartition
scala> rdd1.zipPartitions(rdd2){|       (rdd1Iter,rdd2Iter) => {|         var result = List[String]()|         while(rdd1Iter.hasNext && rdd2Iter.hasNext) {|           result::=(rdd1Iter.next() + "_" + rdd2Iter.next())|         }|         result.iterator|       }|     }.collect
res19: Array[String] = Array(2_B, 1_A, 5_E, 4_D, 3_C)


      参数是两个RDD
            def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C])(f: (Iterator[T], Iterator[B], Iterator[C]) => Iterator[V])(implicit       arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]

            def zipPartitions[B, C, V](rdd2: RDD[B], rdd3: RDD[C], preservesPartitioning: Boolean)(f: (Iterator[T],

      Iterator[B],Iterator[C]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[V]): RDD[V]


      用法同上面,只不过该函数参数为两个RDD,映射方法f输入参数为两个RDD的迭代器。

scala> var rdd1 = sc.makeRDD(1 to 5,2)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[27] at makeRDD at :21scala> var rdd2 = sc.makeRDD(Seq("A","B","C","D","E"),2)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[28] at makeRDD at :21scala> var rdd3 = sc.makeRDD(Seq("a","b","c","d","e"),2)
rdd3: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[29] at makeRDD at :21//rdd3中个分区元素分布
scala> rdd3.mapPartitionsWithIndex{|         (x,iter) => {|           var result = List[String]()|             while(iter.hasNext){|               result ::= ("part_" + x + "|" + iter.next())|             }|             result.iterator|            |         }|       }.collect
res21: Array[String] = Array(part_0|b, part_0|a, part_1|e, part_1|d, part_1|c)//三个RDD做zipPartitions
scala> var rdd4 = rdd1.zipPartitions(rdd2,rdd3){|       (rdd1Iter,rdd2Iter,rdd3Iter) => {|         var result = List[String]()|         while(rdd1Iter.hasNext && rdd2Iter.hasNext && rdd3Iter.hasNext) {|           result::=(rdd1Iter.next() + "_" + rdd2Iter.next() + "_" + rdd3Iter.next())|         }|         result.iterator|       }|     }
rdd4: org.apache.spark.rdd.RDD[String] = ZippedPartitionsRDD3[33] at zipPartitions at :27scala> rdd4.collect
res23: Array[String] = Array(2_B_b, 1_A_a, 5_E_e, 4_D_d, 3_C_c)

       参数是三个RDD
      def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D])(f: (Iterator[T], Iterator[B], Iterator[C],      Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3: ClassTag[V]): RDD[V]

      def zipPartitions[B, C, D, V](rdd2: RDD[B], rdd3: RDD[C], rdd4: RDD[D], preservesPartitioning: Boolean)(f: (Iterator[T],      Iterator[B], Iterator[C], Iterator[D]) => Iterator[V])(implicit arg0: ClassTag[B], arg1: ClassTag[C], arg2: ClassTag[D], arg3:      ClassTag[V]): RDD[V]

      用法同上面,只不过这里又多了个一个RDD而已。

      转载请注明:Spark算子:RDD基本转换操作(6)–zip、zipPartitions

这篇关于Spark算子:RDD基本转换操作(6)–zip、zipPartitions的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1087263

相关文章

MySQL中between and的基本用法、范围查询示例详解

《MySQL中betweenand的基本用法、范围查询示例详解》BETWEENAND操作符在MySQL中用于选择在两个值之间的数据,包括边界值,它支持数值和日期类型,示例展示了如何使用BETWEEN... 目录一、between and语法二、使用示例2.1、betwphpeen and数值查询2.2、be

Go异常处理、泛型和文件操作实例代码

《Go异常处理、泛型和文件操作实例代码》Go语言的异常处理机制与传统的面向对象语言(如Java、C#)所使用的try-catch结构有所不同,它采用了自己独特的设计理念和方法,:本文主要介绍Go异... 目录一:异常处理常见的异常处理向上抛中断程序恢复程序二:泛型泛型函数泛型结构体泛型切片泛型 map三:文

Python轻松实现Word到Markdown的转换

《Python轻松实现Word到Markdown的转换》在文档管理、内容发布等场景中,将Word转换为Markdown格式是常见需求,本文将介绍如何使用FreeSpire.DocforPython实现... 目录一、工具简介二、核心转换实现1. 基础单文件转换2. 批量转换Word文件三、工具特性分析优点局

使用C#实现将RTF转换为PDF

《使用C#实现将RTF转换为PDF》RTF(RichTextFormat)是一种通用的文档格式,允许用户在不同的文字处理软件中保存和交换格式化文本,下面我们就来看看如何使用C#实现将RTF转换为PDF... 目录Spire.Doc for .NET 简介安装 Spire.Doc代码示例处理异常总结RTF(R

MySQL基本表查询操作汇总之单表查询+多表操作大全

《MySQL基本表查询操作汇总之单表查询+多表操作大全》本文全面介绍了MySQL单表查询与多表操作的关键技术,包括基本语法、高级查询、表别名使用、多表连接及子查询等,并提供了丰富的实例,感兴趣的朋友跟... 目录一、单表查询整合(一)通用模版展示(二)举例说明(三)注意事项(四)Mapper简单举例简单查询

Nginx概念、架构、配置与虚拟主机实战操作指南

《Nginx概念、架构、配置与虚拟主机实战操作指南》Nginx是一个高性能的HTTP服务器、反向代理服务器、负载均衡器和IMAP/POP3/SMTP代理服务器,它支持高并发连接,资源占用低,功能全面且... 目录Nginx 深度解析:概念、架构、配置与虚拟主机实战一、Nginx 的概念二、Nginx 的特点

SpringBoot整合Apache Spark实现一个简单的数据分析功能

《SpringBoot整合ApacheSpark实现一个简单的数据分析功能》ApacheSpark是一个开源的大数据处理框架,它提供了丰富的功能和API,用于分布式数据处理、数据分析和机器学习等任务... 目录第一步、添加android依赖第二步、编写配置类第三步、编写控制类启动项目并测试总结ApacheS

MySQL 数据库进阶之SQL 数据操作与子查询操作大全

《MySQL数据库进阶之SQL数据操作与子查询操作大全》本文详细介绍了SQL中的子查询、数据添加(INSERT)、数据修改(UPDATE)和数据删除(DELETE、TRUNCATE、DROP)操作... 目录一、子查询:嵌套在查询中的查询1.1 子查询的基本语法1.2 子查询的实战示例二、数据添加:INSE

使用Python在PDF中绘制多种图形的操作示例

《使用Python在PDF中绘制多种图形的操作示例》在进行PDF自动化处理时,人们往往首先想到的是文本生成、图片嵌入或表格绘制等常规需求,然而在许多实际业务场景中,能够在PDF中灵活绘制图形同样至关重... 目录1. 环境准备2. 创建 PDF 文档与页面3. 在 PDF 中绘制不同类型的图形python

Java 操作 MinIO详细步骤

《Java操作MinIO详细步骤》本文详细介绍了如何使用Java操作MinIO,涵盖了从环境准备、核心API详解到实战场景的全过程,文章从基础的桶和对象操作开始,到大文件分片上传、预签名URL生成... 目录Java 操作 MinIO 全指南:从 API 详解到实战场景引言:为什么选择 MinIO?一、环境