spark的reduceByKey和groupByKey比较

2024-04-26 02:18

本文主要是介绍spark的reduceByKey和groupByKey比较,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在spark中,我们知道一切的操作都是基于RDD的。在使用中,RDD有一种非常特殊也是非常实用的format——pair RDD,即RDD的每一行是(key, value)的格式。这种格式很像Python的字典类型,便于针对key进行一些处理。


针对pair RDD这样的特殊形式,spark中定义了许多方便的操作,今天主要介绍一下reduceByKey和groupByKey,因为在接下来讲解《在spark中如何实现SQL中的group_concat功能?》时会用到这两个operations。


首先,看一看spark官网[1]是怎么解释的:

reduceByKey(func, numPartitions=None)

Merge the values for each key using an associative reduce function. This will also perform the merginglocally on each mapper before sending results to a reducer, similarly to a “combiner” in MapReduce. Output will be hash-partitioned with numPartitions partitions, or the default parallelism level if numPartitions is not specified.

也就是,reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义。

groupByKey(numPartitions=None)

Group the values for each key in the RDD into a single sequence. Hash-partitions the resulting RDD with numPartitions partitions. Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will provide much better performance.

也就是,groupByKey也是对每个key进行操作,但只生成一个sequence。需要特别注意“Note”中的话,它告诉我们:如果需要对sequence进行aggregation操作(注意,groupByKey本身不能自定义操作函数),那么,选择reduceByKey/aggregateByKey更好。这是因为groupByKey不能自定义函数,我们需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作。


为了更好的理解上面这段话,下面我们使用两种不同的方式去计算单词的个数[2]:

  1. val words = Array("one", "two", "two", "three", "three", "three")
  2. val wordPairsRDD = sc.parallelize(words).map(word => (word, 1))
  3. val wordCountsWithReduce = wordPairsRDD.reduceByKey(_ + _)
  4. val wordCountsWithGroup = wordPairsRDD.groupByKey().map(t => (t._1, t._2.sum))
上面得到的wordCountsWithReduce和wordCountsWithGroup是完全一样的,但是,它们的内部运算过程是不同的。


(1)当采用reduceByKeyt时,Spark可以在每个分区移动数据之前将待输出数据与一个共用的key结合。借助下图可以理解在reduceByKey里究竟发生了什么。 注意在数据对被搬移前同一机器上同样的key是怎样被组合的(reduceByKey中的lamdba函数)。然后lamdba函数在每个区上被再次调用来将所有值reduce成一个最终结果。整个过程如下:



(2)当采用groupByKey时,由于它不接收函数,spark只能先将所有的键值对(key-value pair)都移动,这样的后果是集群节点之间的开销很大,导致传输延时。整个过程如下:


因此,在对大数据进行复杂计算时,reduceByKey优于groupByKey。

另外,如果仅仅是group处理,那么以下函数应该优先于 groupByKey :
  (1)、combineByKey 组合数据,但是组合之后的数据类型与输入时值的类型不一样。
  (2)、foldByKey合并每一个 key 的所有值,在级联函数和“零值”中使用。


最后,对reduceByKey中的func做一些介绍:

如果是用Python写的spark,那么有一个库非常实用:operator[3],其中可以用的函数包括:大小比较函数,逻辑操作函数,数学运算函数,序列操作函数等等。这些函数可以直接通过“from operator import *”进行调用,直接把函数名作为参数传递给reduceByKey即可。如下:

  1. <span style="font-size:14px;">from operator import add
  2. rdd = sc.parallelize([("a", 1), ("b", 1), ("a", 1)])
  3. sorted(rdd.reduceByKey(add).collect())
  4. [('a', 2), ('b', 1)]</span>



参考:

[1] http://spark.apache.org/docs/latest/api/python/pyspark.html?highlight=reducebykey#pyspark.RDD.reduceByKey

[2] http://www.iteblog.com/archives/1357

[3] https://docs.python.org/2/library/operator.html

这篇关于spark的reduceByKey和groupByKey比较的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/936451

相关文章

百度/小米/滴滴/京东,中台架构比较

小米中台建设实践 01 小米的三大中台建设:业务+数据+技术 业务中台--从业务说起 在中台建设中,需要规范化的服务接口、一致整合化的数据、容器化的技术组件以及弹性的基础设施。并结合业务情况,判定是否真的需要中台。 小米参考了业界优秀的案例包括移动中台、数据中台、业务中台、技术中台等,再结合其业务发展历程及业务现状,整理了中台架构的核心方法论,一是企业如何共享服务,二是如何为业务提供便利。

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

关键字synchronized、volatile的比较

关键字volatile是线程同步的轻量级实现,所以volatile性能肯定比synchronized要好,并且volatile只能修饰于变量,而synchronized可以修饰方法,以及代码块。随着JDK新版本的发布,synchronized关键字的执行效率上得到很大提升,在开发中使用synchronized关键字的比率还是比较大的。多线程访问volatile不会发生阻塞,而synchronize

stl的sort和手写快排的运行效率哪个比较高?

STL的sort必然要比你自己写的快排要快,因为你自己手写一个这么复杂的sort,那就太闲了。STL的sort是尽量让复杂度维持在O(N log N)的,因此就有了各种的Hybrid sort algorithm。 题主你提到的先quicksort到一定深度之后就转为heapsort,这种是introsort。 每种STL实现使用的算法各有不同,GNU Standard C++ Lib

研究生生涯中一些比较重要的网址

Mali GPU相关: 1.http://malideveloper.arm.com/resources/sdks/opengl-es-sdk-for-linux/ 2.http://malideveloper.arm.com/resources/tools/arm-development-studio-5/ 3.https://www.khronos.org/opengles/sdk/do

性能测试工具 wrk,ab,locust,Jmeter 压测结果比较

前言 在开发服务端软件时,经常需要进行性能测试,一般我采用手写性能测试代码的方式进行测试,那有什么现成的好的性能测试工具吗? 性能测试工具 wrk,ab,locust,Jmeter 压测结果比较 详见: 性能测试工具 wrk,ab,locust,Jmeter 压测结果比较 Jmeter性能测试 入门

MongoDB学习—(6)MongoDB的find查询比较符

首先,先通过以下函数向BookList集合中插入10000条数据 function insertN(obj,n){var i=0;while(i<n){obj.insert({id:i,name:"bookNumber"+i,publishTime:i+2000})i++;}}var BookList=db.getCollection("BookList")调用函数,这样,BookList

超声波清洗机哪个品牌比较好一点的?清洁力强的超声波清洗机品牌

随着生活水平的不断提升和幸福感的增强,珠宝、饰品和眼镜等物品已成为许多家庭的常备之物。然而,这些贵重细小的物件易于积聚微尘与隐形细菌,长此以往可能悄悄影响家人的健康,毕竟细菌是肉眼难以察觉的隐患。超声波清洗机应运而生,它以高科技手段有效地解决了这一隐忧,深层清洁,守护家人免受微小污染物的潜在威胁。不过现在市面上超声波清洗机品牌挺多的,究竟有哪些品牌的超声波清洗机比较好一点呢?接下来就为大家带来四款

【JavaScript】版本号和日期时间的比较

JS使用 ‘>’ 运算符比较两个字符串大小时,会把字符串转换为ASCII码依次比较。 比较标准时间格式可以直接使用 ’ > ’ 比较; (2018-07-20格式)

俩个float数之间比较大小

需求:俩个标识金额的浮点数比较大小。 问题:相等无法成立。经过var_dump()打印,俩个浮点数数值 一样大。 解决:把标识金额的浮点数乘以100,抓换成整形,在做比较。即可使相等成立