本文主要是介绍SparkRDD之mapPartitions和mapPartitionsWithIndex,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.mapPartitions
mapPartition可以这么理解,先对RDD进行partition,再把每个partition进行map函数。
下面的例子,将整数转为字符串:
package com.cb.spark.sparkrdd;import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;public class MapPartitionExample {public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("MapPartition").setMaster("local");JavaSparkContext javaSparkContext = new JavaSparkContext(conf);JavaRDD<Integer> rdd = javaSparkContext.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));JavaRDD<String> mapPartitionRdd = rdd.mapPartitions(new FlatMapFunction<Iterator<Integer>, String>() {private static final long serialVersionUID = 1L;public Iterator<String> call(Iterator<Integer> iterator) throws Exception {ArrayList<String> results = new ArrayList<>();while (iterator.hasNext()) {Integer i = iterator.next();results.add("字符串" + i);}return results.iterator();}});mapPartitionRdd.foreach(x -> System.out.println(x));javaSparkContext.stop();}
}//输出如下:
字符串1
字符串2
字符串3
字符串4
字符串5
字符串6
字符串7
字符串8
字符串9
字符串10
2.mapPartitionsWithIndex
也是按照分区进行的map操作,不过mapPartitionsWithIndex传入的参数多了一个表示分区索引的值,示例如下:
public static void main(String[] args) {SparkConf conf = new SparkConf().setAppName("Aggregate").setMaster("local");JavaSparkContext jsc = new JavaSparkContext(conf);JavaRDD<Integer> rdd = jsc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6), 3);JavaRDD<String> mapPartitionsWithIndex = rdd.mapPartitionsWithIndex(new Function2<Integer, Iterator<Integer>, Iterator<String>>() {private static final long serialVersionUID = 1L;public Iterator<String> call(Integer index, Iterator<Integer> iterator) throws Exception {List<String> list = new ArrayList<>();while (iterator.hasNext()) {list.add("partition" + index + ":" + iterator.next());}return list.iterator();}}, true);mapPartitionsWithIndex.foreach(x->System.out.println(x));jsc.stop();#输出如下:
partition0:1
partition0:2
partition1:3
partition1:4
partition2:5
partition2:6
这篇关于SparkRDD之mapPartitions和mapPartitionsWithIndex的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!