groupByKey与reduceByKey

2023-12-27 16:38
文章标签 reducebykey groupbykey

本文主要是介绍groupByKey与reduceByKey,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

贴一段经典的代码:

    val conf = new SparkConf().setAppName("GroupAndReduce").setMaster("local")val sc = new SparkContext(conf)val words = Array("one", "two", "two", "three", "three", "three")val wordsRDD = sc.parallelize(words).map(word => (word, 1))val wordsCountWithReduce = wordsRDD.reduceByKey(_ + _).collect().foreach(println)val wordsCountWithGroup = wordsRDD.groupByKey().map(w => (w._1, w._2.sum)).collect().foreach(println)

虽然两个函数都能得出一样正确的结果, 但reduceByKey函数更适合使用在大数据集上。 这是因为Spark知道它可以在每个分区移动数据之前将输出数据与一个共用的key结合。

 

借助下图可以理解在reduceByKey里发生了什么。 在数据对被搬移前,同一机器上同样的key是怎样被组合的( reduceByKey中的 lamdba 函数)。然后 lamdba 函数在每个分区上被再次调用来将所有值 reduce成最终结果。整个过程如下:

image

 

当调用 groupByKey时,所有的键值对都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey

 

image

你可以想象一个非常大的数据集,在使用 reduceByKey 和 groupByKey 时他们的差别会被放大更多倍。

 

官方文档(spark2.1.1):

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
     当对(K,V)对的数据集进行调用时,返回(K,Iterable <V>)对的数据集。

Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.

     如果要分组以便在每个键上执行聚合(如总和或平均值),则使用reduceByKey或aggregateByKey将会产生更好的性能
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

     默认情况下,输出中的并行级别取决于父RDD的分区数。您可以传递一个可选的numTasks参数来设置不同数量的任务。

reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
     其中每个密钥的值使用给定的reduce函数func进行聚合,该函数必须是类型(V,V)=> V。与groupByKey类似,reduce任务的数量可以通过可选的第二个参数进行配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
     其中每个键的值使用给定的组合函数和中性“零”值进行聚合。允许不同于输入值类型的聚合值类型,同时避免不必要的分配。像groupByKey一样,reduce任务的数量可以通过可选的第二个参数进行配置。

 

这篇关于groupByKey与reduceByKey的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/543760

相关文章

深入理解groupByKey、reduceByKey

测试源码 下面来看看groupByKey和reduceByKey的区别: val conf = new SparkConf().setAppName("GroupAndReduce").setMaster("local")val sc = new SparkContext(conf)val words = Array("one", "two", "two", "three", "thr

Spark groupbykey和cogroup使用示例

groupByKey groupByKey([numTasks])是数据分组操作,在一个由(K,V)对组成的数据集上调用,返回一个(K,Seq[V])对的数据集。 val rdd0 = sc.parallelize(Array((1,1), (1,2) , (1,3) , (2,1) , (2,2) , (2,3)), 3) val rdd1 = rdd0.groupByKey() rdd1.co

spark 大型项目实战(四十三):算子调优之reduceByKey本地聚合介绍

下面给出一个图解: map端的task是不断的输出数据的,数据量可能是很大的。 但是,其实reduce端的task,并不是等到map端task将属于自己的那份数据全部写入磁盘文件之后,再去拉取的。map端写一点数据,reduce端task就会拉取一小部分数据,立即进行后面的聚合、算子函数的应用。 每次reduece能够拉取多少数据,就由buffer来决定。因为拉取过来的数据,都是先放在b

spark 大型项目实战(四十二):算子调优之reduceByKey本地聚合介绍

下面看一段简单的world count val lines = sc.textFile("hdfs://")val words = lines.flatMap(_.split(" "))val pairs = words.map((_, 1))val counts = pairs.reduceByKey(_ + _)counts.collect() reduceByKey,相较于普通

Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally

groupByKey def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] 该函数用于将RDD[K,V]中每个K对应

(转)groupByKey 和reduceByKey 的区别

【转载原文:https://blog.csdn.net/ZMC921/article/details/75098903】   版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/ZMC921/article/details/75098903 一、首先他们都是要经过shuffle的,g

spark的reduceByKey和groupByKey比较

在spark中,我们知道一切的操作都是基于RDD的。在使用中,RDD有一种非常特殊也是非常实用的format——pair RDD,即RDD的每一行是(key, value)的格式。这种格式很像Python的字典类型,便于针对key进行一些处理。 针对pair RDD这样的特殊形式,spark中定义了许多方便的操作,今天主要介绍一下reduceByKey和groupByKey,因为在接

Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)

Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)   声明:   大数据中,最重要的算子操作是:join  !!!       典型的transformation和action           val nums = sc.parallel

Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look

1、以本地模式实战map和filter 2、以集群模式实战textFile和cache 3、对Job输出结果进行升和降序 4、union 5、groupByKey 6、join 7、reduce 8、lookup     1、以本地模式实战map和filter 以local的方式,运行spark-shell。 spark@SparkSingleNo

spark【例子】同类合并、计算(主要使用groupByKey)

例子描述: 【同类合并、计算】 主要为两部分,将同类的数据分组归纳到一起,并将分组后的数据进行简单数学计算。  难点在于怎么去理解groupBy和groupByKey 原始数据  2010-05-04 12:50,10,10,10  2010-05-05 13:50,20,20,20  2010-05-06 14:50,30,30,30  2010-05-05 13:50,20,20,