深入理解groupByKey、reduceByKey

2024-08-27 12:48

本文主要是介绍深入理解groupByKey、reduceByKey,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

测试源码

下面来看看groupByKey和reduceByKey的区别:

    val conf = new SparkConf().setAppName("GroupAndReduce").setMaster("local")val sc = new SparkContext(conf)val words = Array("one", "two", "two", "three", "three", "three")val wordsRDD = sc.parallelize(words).map(word => (word, 1))val wordsCountWithReduce = wordsRDD.reduceByKey(_ + _).collect().foreach(println)val wordsCountWithGroup = wordsRDD.groupByKey().map(w => (w._1, w._2.sum)).collect().foreach(println)

虽然两个函数都能得出正确的结果, 但reduceByKey函数更适合使用在大数据集上。 这是因为Spark知道它可以在每个分区移动数据之前将输出数据与一个共用的key结合。

借助下图可以理解在reduceByKey里发生了什么。 在数据对被搬移前,同一机器上同样的key是怎样被组合的( reduceByKey中的 lamdba 函数)。然后 lamdba 函数在每个分区上被再次调用来将所有值 reduce成最终结果。整个过程如下:


image

另一方面,当调用 groupByKey时,所有的键值对(key-value pair) 都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey。

为了确定将数据对移到哪个主机,Spark会对数据对的key调用一个分区算法。 当移动的数据量大于单台执行机器内存总量时Spark会把数据保存到磁盘上。 不过在保存时每次会处理一个key的数据,所以当单个 key 的键值对超过内存容量会存在内存溢出的异常。 这将会在之后发行的 Spark 版本中更加优雅地处理,这样的工作还可以继续完善。 尽管如此,仍应避免将数据保存到磁盘上,这会严重影响性能。


image

你可以想象一个非常大的数据集,在使用 reduceByKey 和 groupByKey 时他们的差别会被放大更多倍。

我们来看看两个函数的实现:

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)}
  /*** Note: As currently implemented, groupByKey must be able to hold all the key-value pairs for any* key in memory. If a key has too many values, it can result in an [[OutOfMemoryError]].*/def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {// groupByKey shouldn't use map side combine because map side combine does not// reduce the amount of data shuffled and requires all map side data be inserted// into a hash table, leading to more objects in the old gen.val createCombiner = (v: V) => CompactBuffer(v)val mergeValue = (buf: CompactBuffer[V], v: V) => buf += vval mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2val bufs = combineByKeyWithClassTag[CompactBuffer[V]](createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)bufs.asInstanceOf[RDD[(K, Iterable[V])]]}

注意mapSideCombine=false,partitioner是HashPartitioner,但是groupByKey对小数据量比较好,一个key对应的个数少于10个。

他们都调用了combineByKeyWithClassTag,我们再来看看combineByKeyWithClassTag的定义:

  def combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)]

combineByKey函数主要接受了三个函数作为参数,分别为createCombiner、mergeValue、mergeCombiners。这三个函数足以说明它究竟做了什么。理解了这三个函数,就可以很好地理解combineByKey。

combineByKey是将RDD[(K,V)]combine为RDD[(K,C)],因此,首先需要提供一个函数,能够完成从V到C的combine,称之为combiner。如果V和C类型一致,则函数为V => V。倘若C是一个集合,例如Iterable[V],则createCombiner为V => Iterable[V]。

mergeValue则是将原RDD中Pair的Value合并为操作后的C类型数据。合并操作的实现决定了结果的运算方式。所以,mergeValue更像是声明了一种合并方式,它是由整个combine运算的结果来导向的。函数的输入为原RDD中Pair的V,输出为结果RDD中Pair的C。

最后的mergeCombiners则会根据每个Key所对应的多个C,进行归并。

例如:

var rdd1 = sc.makeRDD(Array(("A", 1), ("A", 2), ("B", 1), ("B", 2),("B",3),("B",4), ("C", 1)))rdd1.combineByKey((v: Int) => v + "_",(c: String, v: Int) => c + "@" + v,(c1: String, c2: String) => c1 + "$" + c2).collect.foreach(println)

result不确定欧,单机执行不会调用mergeCombiners:

(B,1_@2@3@4)
(A,1_@2)
(C,1_)

在集群情况下:

(B,2_@3@4$1_)
(A,1_@2)
(C,1_)
或者
(B,1_$2_@3@4)
(A,1_@2)
(C,1_)

mapSideCombine=false时,再体验一下运行结果。

有许多函数比goupByKey好:

  1. 当你combine元素时,可以使用combineByKey,但是输入值类型和输出可能不一样
  2. foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。
    //使用combineByKey计算wordcountwordsRDD.map(word=>(word,1)).combineByKey((v: Int) => v,(c: Int, v: Int) => c+v,(c1: Int, c2: Int) => c1 + c2).collect.foreach(println)//使用foldByKey计算wordcountprintln("=======foldByKey=========")wordsRDD.map(word=>(word,1)).foldByKey(0)(_+_).foreach(println)//使用aggregateByKey计算wordcountprintln("=======aggregateByKey============")wordsRDD.map(word=>(word,1)).aggregateByKey(0)((u:Int,v)=>u+v,_+_).foreach(println)

foldByKey,aggregateByKey都是由combineByKey实现,并且mapSideCombine=true,因此可以使用这些函数替代goupByKey。

参考

Spark中的combineByKey

databricks gitbooks

在Spark中尽量少使用GroupByKey函数



文/jacksu在简书(简书作者)
原文链接:http://www.jianshu.com/p/0c6705724cff
著作权归作者所有,转载请联系作者获得授权,并标注“简书作者”。

这篇关于深入理解groupByKey、reduceByKey的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1111715

相关文章

深入解析Spring TransactionTemplate 高级用法(示例代码)

《深入解析SpringTransactionTemplate高级用法(示例代码)》TransactionTemplate是Spring框架中一个强大的工具,它允许开发者以编程方式控制事务,通过... 目录1. TransactionTemplate 的核心概念2. 核心接口和类3. TransactionT

深入理解Apache Airflow 调度器(最新推荐)

《深入理解ApacheAirflow调度器(最新推荐)》ApacheAirflow调度器是数据管道管理系统的关键组件,负责编排dag中任务的执行,通过理解调度器的角色和工作方式,正确配置调度器,并... 目录什么是Airflow 调度器?Airflow 调度器工作机制配置Airflow调度器调优及优化建议最

一文带你理解Python中import机制与importlib的妙用

《一文带你理解Python中import机制与importlib的妙用》在Python编程的世界里,import语句是开发者最常用的工具之一,它就像一把钥匙,打开了通往各种功能和库的大门,下面就跟随小... 目录一、python import机制概述1.1 import语句的基本用法1.2 模块缓存机制1.

深入理解C语言的void*

《深入理解C语言的void*》本文主要介绍了C语言的void*,包括它的任意性、编译器对void*的类型检查以及需要显式类型转换的规则,具有一定的参考价值,感兴趣的可以了解一下... 目录一、void* 的类型任意性二、编译器对 void* 的类型检查三、需要显式类型转换占用的字节四、总结一、void* 的

深入理解Redis大key的危害及解决方案

《深入理解Redis大key的危害及解决方案》本文主要介绍了深入理解Redis大key的危害及解决方案,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着... 目录一、背景二、什么是大key三、大key评价标准四、大key 产生的原因与场景五、大key影响与危

深入理解C++ 空类大小

《深入理解C++空类大小》本文主要介绍了C++空类大小,规定空类大小为1字节,主要是为了保证对象的唯一性和可区分性,满足数组元素地址连续的要求,下面就来了解一下... 目录1. 保证对象的唯一性和可区分性2. 满足数组元素地址连续的要求3. 与C++的对象模型和内存管理机制相适配查看类对象内存在C++中,规

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

认识、理解、分类——acm之搜索

普通搜索方法有两种:1、广度优先搜索;2、深度优先搜索; 更多搜索方法: 3、双向广度优先搜索; 4、启发式搜索(包括A*算法等); 搜索通常会用到的知识点:状态压缩(位压缩,利用hash思想压缩)。

深入探索协同过滤:从原理到推荐模块案例

文章目录 前言一、协同过滤1. 基于用户的协同过滤(UserCF)2. 基于物品的协同过滤(ItemCF)3. 相似度计算方法 二、相似度计算方法1. 欧氏距离2. 皮尔逊相关系数3. 杰卡德相似系数4. 余弦相似度 三、推荐模块案例1.基于文章的协同过滤推荐功能2.基于用户的协同过滤推荐功能 前言     在信息过载的时代,推荐系统成为连接用户与内容的桥梁。本文聚焦于

【生成模型系列(初级)】嵌入(Embedding)方程——自然语言处理的数学灵魂【通俗理解】

【通俗理解】嵌入(Embedding)方程——自然语言处理的数学灵魂 关键词提炼 #嵌入方程 #自然语言处理 #词向量 #机器学习 #神经网络 #向量空间模型 #Siri #Google翻译 #AlexNet 第一节:嵌入方程的类比与核心概念【尽可能通俗】 嵌入方程可以被看作是自然语言处理中的“翻译机”,它将文本中的单词或短语转换成计算机能够理解的数学形式,即向量。 正如翻译机将一种语言