Spark学习笔记(详解,附代码实列和图解)----------RDD(四)分区器

2024-01-10 20:48

本文主要是介绍Spark学习笔记(详解,附代码实列和图解)----------RDD(四)分区器,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

七.RDD分区器

Spark 目前支持 Hash 分区和 Range 分区,和用户自定义分区。Hash 分区为当前的默认分区。分区器直接决定了 RDD 中分区的个数、RDD 中每条数据经过 Shuffle 后进入哪个分区,进而决定了 Reduce 的个数。

➢ 只有 Key-Value 类型的 RDD 才有分区器,非 Key-Value 类型的 RDD 分区的值是 None
➢ 每个 RDD的分区 ID 范围:0 ~ (numPartitions - 1),决定这个值是属于那个分区的。

1.Hash 分区:(默认的分区就是HashPartition分区)

对于给定的 key,计算其 hashCode,并除以分区个数取余,并除以分区的个数取余,如果余数小于0,则用余数+分区的个数(否则加0),最后返回的值就是这个key所属的分区ID。

Rdd.partitionBy(new spark.HashPartitioner(2))//2为分区数

源码:

class HashPartitioner(partitions: Int) extends Partitioner {require(partitions >= 0, s"Number of partitions ($partitions) cannot be negative.")def numPartitions: Int = partitionsdef getPartition(key: Any): Int = key match {case null => 0//key为0则统统放入0号分区case _ => Utils.nonNegativeMod(key.hashCode, numPartitions)
//否则调用这个方法,根据key的hashcode和分区数,得到分区号}def nonNegativeMod(x: Int, mod: Int): Int = {val rawMod = x % modrawMod + (if (rawMod < 0) mod else 0)
}

2.Range 分区:

将一定范围内的数据映射到一个分区中,尽量保证每个分区数据均匀,而且分区间有序。但是分区内的元素是不能保证顺序的。

实现过程为:
第一步:先从整个RDD中抽取出样本数据,将样本数据排序,计算出每个分区的最大key值,形成一个Array[KEY]类型的数组变量rangeBounds;

第二步:判断key在rangeBounds中所处的范围,给出该key值在RDD中的分区id下标;该分区器要求RDD中的KEY类型必须是可以排序的

Rdd.partitionBy(new RangePartitioner(3,Rdd))//3为要分区的个数

3.自定义分区器

需要继承org.apache.spark.Partitioner类,实现如下:

def numPartitions:这个方法需要返回你想要创建分区的个数;
def getPartition:这个函数需要对输入的key做计算,然后返回该key的分区ID,范围一定是0到numPartitions-1;
equals():这个是Java标准的判断相等的函数,之所以要求用户实现这个函数是因为Spark内部会比较两个RDD的分区是否一样,默认equals已经去区分两个RDD了 可以不重写

class  MySparkPartition(numParts: Int) extends Partitioner {override def numPartitions: Int = numParts/*** 可以自定义分区算法* @param key* @return*/override def getPartition(key: Any): Int = {val domain = new java.net.URL(key.toString).getHost()val code = (domain.hashCode % numPartitions)if (code < 0) {code + numPartitions} else {code}}override def equals(other: Any): Boolean = other match {case mypartition: MySparkPartition =>mypartition.numPartitions == numPartitionscase _ =>false}override def hashCode: Int = numPartitions}

示例:

import org.apache.spark.rdd.RDD
import org.apache.spark.{HashPartitioner, Partitioner, SparkConf, SparkContext}
object RDD_PART {def main(args: Array[String]): Unit = {val sparConf = new SparkConf().setMaster("local").setAppName("WordCount").set("spark.testing.memory", "2147480000")val sc = new SparkContext(sparConf)val rdd = sc.makeRDD(List(("nba", "xxxxxxxxx"),("cba", "xxxxxxxxx"),("wnba", "xxxxxxxxx"),("nba", "xxxxxxxxx")), 2)val partRDD: RDD[(String, String)] = rdd.partitionBy(new MyPartitioner)partRDD.saveAsTextFile("output1")sc.stop()}/*** 自定义分区器* 1. 继承Partitioner* 2. 重写方法*/class MyPartitioner extends Partitioner {// 分区数量override def numPartitions: Int = 3// 根据数据的key值返回数据所在的分区索引(从0开始)override def getPartition(key: Any): Int = {key match {case "nba" => 0case "wnba" => 1case _ => 2}}}}

输出:
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

这篇关于Spark学习笔记(详解,附代码实列和图解)----------RDD(四)分区器的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/592027

相关文章

SpringCloud集成AlloyDB的示例代码

《SpringCloud集成AlloyDB的示例代码》AlloyDB是GoogleCloud提供的一种高度可扩展、强性能的关系型数据库服务,它兼容PostgreSQL,并提供了更快的查询性能... 目录1.AlloyDBjavascript是什么?AlloyDB 的工作原理2.搭建测试环境3.代码工程1.

Java调用Python代码的几种方法小结

《Java调用Python代码的几种方法小结》Python语言有丰富的系统管理、数据处理、统计类软件包,因此从java应用中调用Python代码的需求很常见、实用,本文介绍几种方法从java调用Pyt... 目录引言Java core使用ProcessBuilder使用Java脚本引擎总结引言python

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Mysql 中的多表连接和连接类型详解

《Mysql中的多表连接和连接类型详解》这篇文章详细介绍了MySQL中的多表连接及其各种类型,包括内连接、左连接、右连接、全外连接、自连接和交叉连接,通过这些连接方式,可以将分散在不同表中的相关数据... 目录什么是多表连接?1. 内连接(INNER JOIN)2. 左连接(LEFT JOIN 或 LEFT

Java中ArrayList的8种浅拷贝方式示例代码

《Java中ArrayList的8种浅拷贝方式示例代码》:本文主要介绍Java中ArrayList的8种浅拷贝方式的相关资料,讲解了Java中ArrayList的浅拷贝概念,并详细分享了八种实现浅... 目录引言什么是浅拷贝?ArrayList 浅拷贝的重要性方法一:使用构造函数方法二:使用 addAll(

Java中switch-case结构的使用方法举例详解

《Java中switch-case结构的使用方法举例详解》:本文主要介绍Java中switch-case结构使用的相关资料,switch-case结构是Java中处理多个分支条件的一种有效方式,它... 目录前言一、switch-case结构的基本语法二、使用示例三、注意事项四、总结前言对于Java初学者

Linux内核之内核裁剪详解

《Linux内核之内核裁剪详解》Linux内核裁剪是通过移除不必要的功能和模块,调整配置参数来优化内核,以满足特定需求,裁剪的方法包括使用配置选项、模块化设计和优化配置参数,图形裁剪工具如makeme... 目录简介一、 裁剪的原因二、裁剪的方法三、图形裁剪工具四、操作说明五、make menuconfig

JAVA利用顺序表实现“杨辉三角”的思路及代码示例

《JAVA利用顺序表实现“杨辉三角”的思路及代码示例》杨辉三角形是中国古代数学的杰出研究成果之一,是我国北宋数学家贾宪于1050年首先发现并使用的,:本文主要介绍JAVA利用顺序表实现杨辉三角的思... 目录一:“杨辉三角”题目链接二:题解代码:三:题解思路:总结一:“杨辉三角”题目链接题目链接:点击这里

SpringBoot使用注解集成Redis缓存的示例代码

《SpringBoot使用注解集成Redis缓存的示例代码》:本文主要介绍在SpringBoot中使用注解集成Redis缓存的步骤,包括添加依赖、创建相关配置类、需要缓存数据的类(Tes... 目录一、创建 Caching 配置类二、创建需要缓存数据的类三、测试方法Spring Boot 熟悉后,集成一个外

详解Java中的敏感信息处理

《详解Java中的敏感信息处理》平时开发中常常会遇到像用户的手机号、姓名、身份证等敏感信息需要处理,这篇文章主要为大家整理了一些常用的方法,希望对大家有所帮助... 目录前后端传输AES 对称加密RSA 非对称加密混合加密数据库加密MD5 + Salt/SHA + SaltAES 加密平时开发中遇到像用户的