SparkRDD之mapPartitions和mapPartitionsWithIndex

2024-09-01 08:38

本文主要是介绍SparkRDD之mapPartitions和mapPartitionsWithIndex,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1.mapPartitions

mapPartition可以这么理解,先对RDD进行partition,再把每个partition进行map函数。

下面的例子,将整数转为字符串:

package com.cb.spark.sparkrdd;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;public class MapPartitionExample {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("MapPartition").setMaster("local");JavaSparkContext javaSparkContext = new JavaSparkContext(conf);JavaRDD<Integer> rdd = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));JavaRDD<String> mapPartitionRdd = rdd.mapPartitions(new FlatMapFunction<Iterator<Integer>, String>() {private static final long serialVersionUID = 1L;public Iterator<String> call(Iterator<Integer> iterator) throws Exception {ArrayList<String> results = new ArrayList<>();while (iterator.hasNext()) {Integer i = iterator.next();results.add("字符串" + i);}return results.iterator();}});mapPartitionRdd.foreach(x -> System.out.println(x));javaSparkContext.stop();}
}//输出如下:
字符串1
字符串2
字符串3
字符串4
字符串5
字符串6
字符串7
字符串8
字符串9
字符串10

2.mapPartitionsWithIndex

也是按照分区进行的map操作,不过mapPartitionsWithIndex传入的参数多了一个表示分区索引的值,示例如下:

public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("Aggregate").setMaster("local");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 3);JavaRDD<String> mapPartitionsWithIndex = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {private static final long serialVersionUID = 1L;public Iterator<String> call(Integer index, Iterator<Integer> iterator) throws Exception {List<String> list = new ArrayList<>();while (iterator.hasNext()) {list.add("partition" + index + ":" + iterator.next());}return list.iterator();}}, true);mapPartitionsWithIndex.foreach(x->System.out.println(x));jsc.stop();#输出如下:
partition0:1
partition0:2
partition1:3
partition1:4
partition2:5
partition2:6

 

这篇关于SparkRDD之mapPartitions和mapPartitionsWithIndex的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1126459

相关文章

SparkRDD转DataSet/DataFrame的一个深坑

大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! By  大数据技术与架构 场景描述:本文是根据读者反馈的一个问题总结而成的。 关键词:Saprk RDD 原需求:希望在map函数中将每一

SparkRDD之filter、filterByRange

1.filter:使用一个布尔函数为RDD的每个数据项计算,并将函数返回true的项放入生成的RDD中。 package com.cb.spark.sparkrdd;import java.util.Arrays;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apac

SparkRDD之distinct和first

distinct:对RDD中的元素进行去重。 first:返回RDD中第一个元素。 package com.cb.spark.sparkrdd;import java.util.Arrays;import org.apache.spark.SparkConf;import org.apache.spark.api.java.JavaRDD;import org.apache.spark.a

Spark算子:RDD基本转换操作(5)–mapPartitions/mapPartitionsWithIndex

mapPartitions def mapPartitions[U](f: (Iterator[T]) => Iterator[U], preservesPartitioning: Boolean = false)(implicit arg0: ClassTag[U]): RDD[U]      该函数和map函数类似,只不过映射函数的参数由RDD中的每一个元素变成了RDD中每一个分区的迭代

spark中mapPartitions双重循环或两次遍历(duplicate)

在spark当中通常需要对mapPartitions内部进行计算,这样可以在不进行网络传输的情况下,对数据进行局部计算 而mapPartitions中的迭代器为Iterator scala中的Iterator只能进行一次迭代,使用过后就消失了,所以在mapPartitions中既不能两次遍历 如:一次mapPartitions求最大最小值 val it = Iterator(20, 4

sparkRDD操作

RDD介绍 标签(空格分隔): spark hadoop,spark,kafka交流群:224209501 RDD 操作 1,RDD五大特点: 1,A list of partions 一系列的分片:比如64M一个分片,类似于hadoop的splits。 2,A function for computing each split 在每个分区上都有一个函数去迭代、执行、计算它。

sparkRDD中key-value类型类型数据的三种方法对比

reduceBykey   |    groupByKey    |   sortByKey        的区别! reduceBykey操作——reduceBykey在源码中reduce之前使用预先聚合的combine操作   groupByKey——直接进行shuffle的操作 sortByKey ——根据key进行排序的操作   总结: reduc

SparkRDD——行动算子

一、行动算子定义 spark的算子可以分为trans action算子 以及 action算子 ,即变换/转换 算子。如果执行一个RDD算子并不触发作业的提交,仅仅只是记录作业中间处理过程,那么这就是trans action算子 ,相反如果执行这个 RDD 时会触发 Spark Context 提交 Job 作业,那么它就是 action算子及行动算子。 总结来说就是在Spark中,转换算子并不

SparkRDD——转换算子

转换算子 一、单value型转换算子(只使用1个RDD):1、map 将数据进行转换,数据量不会增加和减少2、mapPartitions 以分区为单位将一个分区内的数据进行批处理操作,且可以执行过滤操作3、mapPartitionsWithIndex 功能类似mapPartiutions算子,只是加入了每个分区的索引,可以

SparkRDD之——RDD概述

目录 1、什么是RDD ①弹性: ②分布式 ③数据集 ④数据抽象 ⑤不可变 2、RDD特征 ①分区列表 ②分区计算函数 ③依赖于其他RDD ④(Key,Value)数据类型的RDD分区器(可选特征) ⑤首选位置(可选特征) 3、执行原理 4、RDD的依赖 ①窄依赖 ②宽依赖 4、创建RDD ①在内存中创建 ②读取文件创建 5、spark分区方式 ①读取数据