groupByKey与reduceByKey

2023-12-27 16:38
文章标签 reducebykey groupbykey

本文主要是介绍groupByKey与reduceByKey,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

贴一段经典的代码:

    val conf = new SparkConf().setAppName("GroupAndReduce").setMaster("local")val sc = new SparkContext(conf)val words = Array("one", "two", "two", "three", "three", "three")val wordsRDD = sc.parallelize(words).map(word => (word, 1))val wordsCountWithReduce = wordsRDD.reduceByKey(_ + _).collect().foreach(println)val wordsCountWithGroup = wordsRDD.groupByKey().map(w => (w._1, w._2.sum)).collect().foreach(println)

虽然两个函数都能得出一样正确的结果, 但reduceByKey函数更适合使用在大数据集上。 这是因为Spark知道它可以在每个分区移动数据之前将输出数据与一个共用的key结合。

 

借助下图可以理解在reduceByKey里发生了什么。 在数据对被搬移前,同一机器上同样的key是怎样被组合的( reduceByKey中的 lamdba 函数)。然后 lamdba 函数在每个分区上被再次调用来将所有值 reduce成最终结果。整个过程如下:

image

 

当调用 groupByKey时,所有的键值对都会被移动,在网络上传输这些数据非常没必要,因此避免使用 GroupByKey

 

image

你可以想象一个非常大的数据集,在使用 reduceByKey 和 groupByKey 时他们的差别会被放大更多倍。

 

官方文档(spark2.1.1):

groupByKey([numTasks])

When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs. 
     当对(K,V)对的数据集进行调用时,返回(K,Iterable <V>)对的数据集。

Note: If you are grouping in order to perform an aggregation (such as a sum or average) over each key, using reduceByKey or aggregateByKey will yield much better performance.

     如果要分组以便在每个键上执行聚合(如总和或平均值),则使用reduceByKey或aggregateByKey将会产生更好的性能
Note: By default, the level of parallelism in the output depends on the number of partitions of the parent RDD. You can pass an optional numTasks argument to set a different number of tasks.

     默认情况下,输出中的并行级别取决于父RDD的分区数。您可以传递一个可选的numTasks参数来设置不同数量的任务。

reduceByKey(func, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V,V) => V. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
     其中每个密钥的值使用给定的reduce函数func进行聚合,该函数必须是类型(V,V)=> V。与groupByKey类似,reduce任务的数量可以通过可选的第二个参数进行配置。
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated using the given combine functions and a neutral "zero" value. Allows an aggregated value type that is different than the input value type, while avoiding unnecessary allocations. Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
     其中每个键的值使用给定的组合函数和中性“零”值进行聚合。允许不同于输入值类型的聚合值类型,同时避免不必要的分配。像groupByKey一样,reduce任务的数量可以通过可选的第二个参数进行配置。

 

这篇关于groupByKey与reduceByKey的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/543760

相关文章

Spark算子:RDD键值转换操作(3)–groupByKey、reduceByKey、reduceByKeyLocally

groupByKey def groupByKey(): RDD[(K, Iterable[V])] def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] 该函数用于将RDD[K,V]中每个K对应

(转)groupByKey 和reduceByKey 的区别

【转载原文:https://blog.csdn.net/ZMC921/article/details/75098903】   版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/ZMC921/article/details/75098903 一、首先他们都是要经过shuffle的,g

spark的reduceByKey和groupByKey比较

在spark中,我们知道一切的操作都是基于RDD的。在使用中,RDD有一种非常特殊也是非常实用的format——pair RDD,即RDD的每一行是(key, value)的格式。这种格式很像Python的字典类型,便于针对key进行一些处理。 针对pair RDD这样的特殊形式,spark中定义了许多方便的操作,今天主要介绍一下reduceByKey和groupByKey,因为在接

Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)

Spark RDD/Core 编程 API入门系列 之rdd案例(map、filter、flatMap、groupByKey、reduceByKey、join、cogroupy等)(四)   声明:   大数据中,最重要的算子操作是:join  !!!       典型的transformation和action           val nums = sc.parallel

Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look

1、以本地模式实战map和filter 2、以集群模式实战textFile和cache 3、对Job输出结果进行升和降序 4、union 5、groupByKey 6、join 7、reduce 8、lookup     1、以本地模式实战map和filter 以local的方式,运行spark-shell。 spark@SparkSingleNo

spark【例子】同类合并、计算(主要使用groupByKey)

例子描述: 【同类合并、计算】 主要为两部分,将同类的数据分组归纳到一起,并将分组后的数据进行简单数学计算。  难点在于怎么去理解groupBy和groupByKey 原始数据  2010-05-04 12:50,10,10,10  2010-05-05 13:50,20,20,20  2010-05-06 14:50,30,30,30  2010-05-05 13:50,20,20,

Spark API编程动手实战-04-以在Spark 1.2版本实现对union、groupByKey、join、reduce、lookup等操作实践

下面看下union的使用: 使用collect操作查看一下执行结果: 再看下groupByKey的使用: 执行结果: join操作就是一个笛卡尔积操作的过程,如下示例: 对rdd3和rdd4执行join操作: 使用collect查看执行结果: 可以看出join操作完全就是一个笛卡尔积的操作; reduce本身在

RDD算子——转换操作(Transformations )【map、flatMap、reduceByKey】

一、map map 算子 # spark-shellsc.parallelize(Seq(1, 2, 3)).map( num => num * 10).collect()# IDEA@Testdef mapTest(): Unit = {// 1. 创建RDDval rdd1 = sc.parallelize(Seq(1, 2, 3))// 2. 执行 map 操作val rdd2 =

spark的reduceByKey

在进行Spark开发算法时,最有用的一个函数就是reduceByKey。 reduceByKey的作用对像是(key, value)形式的rdd,而reduce有减少、压缩之意,reduceByKey的作用就是对相同key的数据进行处理,最终每个key只保留一条记录。 保留一条记录通常有两种结果。一种是只保留我们希望的信息,比如每个key出现的次数。第二种是把value聚合在一起形成列表,这

reduceByKey提示Cannot resolve overloaded method ‘reduceByKey‘

scala更新为2.13,spark更新为3.2.1后原本正常使用的代码提示Cannot resolve overloaded method 'reduceByKey' 但是依旧可以正常执行 稍微修改一下,看下数据类型 编译器在识别map时,自动把(_,1)识别为(String,1) 手动修改为(String,Int) 警告提示消失