浪尖说spark的coalesce的利弊及原理

2023-10-09 02:32

本文主要是介绍浪尖说spark的coalesce的利弊及原理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

浪尖的粉丝应该很久没见浪尖发过spark源码解读的文章,今天浪尖在这里给大家分享一篇文章,帮助大家进一步理解rdd如何在spark中被计算的,同时解释一下coalesce降低分区的原理及使用问题。

主要是知识星球有人问到过coalesce方法的使用和原理的问题,并且参考阅读了网上关于coalesce方法的错误介绍,有了错误的理解,所以浪尖忙里偷闲给大家解释一下。

浪尖这里建议多看看spark源码上,spark源码我觉得是注释最全的一套源码了,而且整体代码逻辑比较清晰,就是scala高阶函数的使用会使得前期阅读的时候很头疼,但是不可否认spark是大家学习scala编程规范性的参考代码。

这里不得不吐槽一下:flink的代码写的很挫,注释又不好,感觉不太适合人们阅读学习。

1.  coalesce  函数start

对于Spark 算子使用,大家还是要经常翻看一下源码上的注释及理解一下spark 算子的源码实现逻辑,注释很多时候已经很清楚了讲了算子的应用场景及原理,比如本文要讲的关于coalesce函数的注释如下:

 /*** Return a new RDD that is reduced into `numPartitions` partitions.** This results in a narrow dependency, e.g. if you go from 1000 partitions* to 100 partitions, there will not be a shuffle, instead each of the 100* new partitions will claim 10 of the current partitions. If a larger number* of partitions is requested, it will stay at the current number of partitions.** However, if you're doing a drastic coalesce, e.g. to numPartitions = 1,* this may result in your computation taking place on fewer nodes than* you like (e.g. one node in the case of numPartitions = 1). To avoid this,* you can pass shuffle = true. This will add a shuffle step, but means the* current upstream partitions will be executed in parallel (per whatever* the current partitioning is).** @note With shuffle = true, you can actually coalesce to a larger number* of partitions. This is useful if you have a small number of partitions,* say 100, potentially with a few partitions being abnormally large. Calling* coalesce(1000, shuffle = true) will result in 1000 partitions with the* data distributed using a hash partitioner. The optional partition coalescer* passed in must be serializable.*/

注释的大致意思就是假设父rdd 1000分区,然后调用coalesce(100),实际上就是将父rdd的1000分区分成100组,每组10个,叫做partitionGroup,每个partitionGroup作为coalescedrdd的一个分区,在compute方法中迭代处理,以此来避免shuffle。

coalesce函数总共三个参数:分区数,是否进行shuffle(默认不shuffle),Coalesce分区器(用来决定哪些父rdd的分区组成一组,作为一个partitiongroup,也即是决定了coalescedrdd的分区情况)。

def coalesce(numPartitions: Int, shuffle: Boolean = false,partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[T] = null): RDD[T] = withScope {require(numPartitions > 0, s"Number of partitions ($numPartitions) must be positive.")if (shuffle) {/** Distributes elements evenly across output partitions, starting from a random partition. */val distributePartition = (index: Int, items: Iterator[T]) => {var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)items.map { t =>// Note that the hash code of the key will just be the key itself. The HashPartitioner// will mod it with the number of total partitions.position = position + 1(position, t)}} : Iterator[(Int, T)]// include a shuffle step so that our upstream tasks are still distributednew CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),new HashPartitioner(numPartitions)),numPartitions,partitionCoalescer).values} else {new CoalescedRDD(this, numPartitions, partitionCoalescer)}}

研究coalescedrdd源码之前,浪尖觉得应该要强调一下rdd的五大特性,大家要不理解rdd的五大特性的话,很难理解本文的内容。

吐槽一下:

其实,大家对于RDD五大特性都能背诵入流,但是要说真正理解,浪尖感觉很多人还是差点。

 *  - A list of partitions*  - A function for computing each split*  - A list of dependencies on other RDDs*  - Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)*  - Optionally, a list of preferred locations to compute each split on (e.g. block locations for*    an HDFS file)

也即:

每个RDD都有一系列的分区,每个rdd都有一系列的父rdd,也有一个针对rdd的当前分区的compute计算函数,可选的分区器和可选的本地性策略。

那么,提个问题: RDD和父RDD的分区关系是如何确定的?

这里又要强调五大特性了:

所有的RDD的分区数都是由getPartitions函数来确定分区,所有的RDD都是通过getDependencies()函数来确定依赖关系:窄依赖和宽依赖。而所有的rdd都是通过compute方法来计算rdd数据的。

coalesce函数的shuffle的我们这里就暂时不介绍了,只介绍不进行shuffle操作的功能,也即是:

new CoalescedRDD(this, numPartitions, partitionCoalescer)

2. getPartitions 分区分组

默认coalesce函数的partitionCoalescer为空,所以你要想自己实现父RDD分区分组策略也是可以的。对于CoalescedRDD,默认指定分区器为空,那么看一下其getPartitions函数,会使用默认的分区器DefaultPartitionCoalescer。

override def getPartitions: Array[Partition] = {val pc = partitionCoalescer.getOrElse(new DefaultPartitionCoalescer())pc.coalesce(maxPartitions, prev).zipWithIndex.map {case (pg, i) =>val ids = pg.partitions.map(_.index).toArrayCoalescedRDDPartition(i, prev, ids, pg.prefLoc)}}

可以看看DefaultPartitionCoalescer分区器的coalesce方法,实际上就是将父RDD的分区分组缩减为指定的分区数,该函数返回的就是Array[PartitionGroup],每个PartitionGroup代表一组父RDD分区,也代表一个CoalescedRDD的分区。

 /*** Runs the packing algorithm and returns an array of PartitionGroups that if possible are* load balanced and grouped by locality** @return array of partition groups*/def coalesce(maxPartitions: Int, prev: RDD[_]): Array[PartitionGroup] = {val partitionLocs = new PartitionLocations(prev)// setup the groups (bins)setupGroups(math.min(prev.partitions.length, maxPartitions), partitionLocs)// assign partitions (balls) to each group (bins)throwBalls(maxPartitions, prev, balanceSlack, partitionLocs)getPartitions}

一个PartitionGroup实际上就是按照一定的规则组合的父RDD的partition数组,可以看一下该类。

/*** ::DeveloperApi::* A group of `Partition`s* @param prefLoc preferred location for the partition group*/
@DeveloperApi
class PartitionGroup(val prefLoc: Option[String] = None) {val partitions = mutable.ArrayBuffer[Partition]()def numPartitions: Int = partitions.size
}

3. getDependencies 血缘

上面说了,CoalescedRDD的getPartitions()方法,也就是完成了父RDD的分区到当前RDD分区的映射关系。这个映射关系的使用实际上就是通过getDependencies方法来调用的。具体如下:

 override def getDependencies: Seq[Dependency[_]] = {Seq(new NarrowDependency(prev) {def getParents(id: Int): Seq[Int] =partitions(id).asInstanceOf[CoalescedRDDPartition].parentsIndices})}

partitions数组是在RDD类里实现的,其实调用了getPartitions函数。

/*** Get the array of partitions of this RDD, taking into account whether the* RDD is checkpointed or not.*/final def partitions: Array[Partition] = {checkpointRDD.map(_.partitions).getOrElse {if (partitions_ == null) {partitions_ = getPartitionspartitions_.zipWithIndex.foreach { case (partition, index) =>require(partition.index == index,s"partitions($index).partition == ${partition.index}, but it should equal $index")}}partitions_}}

再说回窄依赖 NarrowDependency,其实他的getParents方法就是通过当前分区的id获取一个coalescedRDDPartition,也即一个父RDD分区数组。该数组是通过CoalescedRDD的getPartitions中实现的对父RDD分区分组得到的。

4. compute 计算分区

compute五大特性之一,针对分区的计算函数,对于CoalescedRDD,那么其计算函数的实现如下:

 override def compute(partition: Partition, context: TaskContext): Iterator[T] = {partition.asInstanceOf[CoalescedRDDPartition].parents.iterator.flatMap { parentPartition =>firstParent[T].iterator(parentPartition, context)}}

观察上述方法就会发现,是针对CoalescedRDDPartition的计算,这个其实是就是针对一个PartitionsGroup进行计算,也即使一个父RDD的分组。在getPartitions方法里生成的哦。

到这里就很明显了,coalescedrdd的compute方法虽然是针对Coalescedrdd的一个分区计算,实际上是计算的父RDD的一组RDD分区,降低了父RDD 的并行度哦,所以大家使用要慎重哦。

该使用shuffle决不能手软

5. shuffle模式 开篇

对于支持shuffle的Coalesce函数,我们可以看到其实是外层包括了一个shuffleRDD,同时CoalescedRDD传入的分区数和构建的父shuffleRDD一样,就实现了一对一分区转化,以此来实现shuffle功能的,针对shuffleRDD我们星球里分析分享。

if (shuffle) {/** Distributes elements evenly across output partitions, starting from a random partition. */val distributePartition = (index: Int, items: Iterator[T]) => {var position = new Random(hashing.byteswap32(index)).nextInt(numPartitions)items.map { t =>// Note that the hash code of the key will just be the key itself. The HashPartitioner// will mod it with the number of total partitions.position = position + 1(position, t)}} : Iterator[(Int, T)]// include a shuffle step so that our upstream tasks are still distributednew CoalescedRDD(new ShuffledRDD[Int, T, T](mapPartitionsWithIndexInternal(distributePartition, isOrderSensitive = true),new HashPartitioner(numPartitions)),numPartitions,partitionCoalescer).values

欢迎大家加入浪尖知识星球哦~

这篇关于浪尖说spark的coalesce的利弊及原理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/169721

相关文章

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

hdu4407(容斥原理)

题意:给一串数字1,2,......n,两个操作:1、修改第k个数字,2、查询区间[l,r]中与n互质的数之和。 解题思路:咱一看,像线段树,但是如果用线段树做,那么每个区间一定要记录所有的素因子,这样会超内存。然后我就做不来了。后来看了题解,原来是用容斥原理来做的。还记得这道题目吗?求区间[1,r]中与p互质的数的个数,如果不会的话就先去做那题吧。现在这题是求区间[l,r]中与n互质的数的和

hdu4407容斥原理

题意: 有一个元素为 1~n 的数列{An},有2种操作(1000次): 1、求某段区间 [a,b] 中与 p 互质的数的和。 2、将数列中某个位置元素的值改变。 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.Inpu

hdu4059容斥原理

求1-n中与n互质的数的4次方之和 import java.io.BufferedInputStream;import java.io.BufferedReader;import java.io.IOException;import java.io.InputStream;import java.io.InputStreamReader;import java.io.PrintWrit

寻迹模块TCRT5000的应用原理和功能实现(基于STM32)

目录 概述 1 认识TCRT5000 1.1 模块介绍 1.2 电气特性 2 系统应用 2.1 系统架构 2.2 STM32Cube创建工程 3 功能实现 3.1 代码实现 3.2 源代码文件 4 功能测试 4.1 检测黑线状态 4.2 未检测黑线状态 概述 本文主要介绍TCRT5000模块的使用原理,包括该模块的硬件实现方式,电路实现原理,还使用STM32类

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

TL-Tomcat中长连接的底层源码原理实现

长连接:浏览器告诉tomcat不要将请求关掉。  如果不是长连接,tomcat响应后会告诉浏览器把这个连接关掉。    tomcat中有一个缓冲区  如果发送大批量数据后 又不处理  那么会堆积缓冲区 后面的请求会越来越慢。

PHP原理之内存管理中难懂的几个点

PHP的内存管理, 分为俩大部分, 第一部分是PHP自身的内存管理, 这部分主要的内容就是引用计数, 写时复制, 等等面向应用的层面的管理. 而第二部分就是今天我要介绍的, zend_alloc中描写的关于PHP自身的内存管理, 包括它是如何管理可用内存, 如何分配内存等. 另外, 为什么要写这个呢, 因为之前并没有任何资料来介绍PHP内存管理中使用的策略, 数据结构, 或者算法. 而在我们

Smarty模板执行原理

为了实现程序的业务逻辑和内容表现页面的分离从而提高开发速度,php 引入了模板引擎的概念,php 模板引擎里面最流行的可以说是smarty了,smarty因其功能强大而且速度快而被广大php web开发者所认可。本文将记录一下smarty模板引擎的工作执行原理,算是加深一下理解。 其实所有的模板引擎的工作原理是差不多的,无非就是在php程序里面用正则匹配将模板里面的标签替换为php代码从而将两者

Restful API 原理以及实现

先说说API 再说啥是RESRFUL API之前,咱先说说啥是API吧。API大家应该都知道吧,简称接口嘛。随着现在移动互联网的火爆,手机软件,也就是APP几乎快爆棚了。几乎任何一个网站或者应用都会出一款iOS或者Android APP,相比网页版的体验,APP确实各方面性能要好很多。 那么现在问题来了。比如QQ空间网站,如果我想获取一个用户发的说说列表。 QQ空间网站里面需要这个功能。