本文主要是介绍(转)groupByKey 和reduceByKey 的区别,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
【转载原文:https://blog.csdn.net/ZMC921/article/details/75098903】
版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。
本文链接:https://blog.csdn.net/ZMC921/article/details/75098903
一、首先他们都是要经过shuffle的,groupByKey在方法shuffle之间不会合并原样进行shuffle,。reduceByKey进行shuffle之前会先做合并,这样就减少了shuffle的io传送,所以效率高一点。
案例:
object GroupyKeyAndReduceByKeyDemo {def main(args: Array[String]): Unit = {Logger.getLogger("org").setLevel(Level.WARN)val config = new SparkConf().setAppName("GroupyKeyAndReduceByKeyDemo").setMaster("local")val sc = new SparkContext(config)val arr = Array("val config", "val arr")val socketDS = sc.parallelize(arr).flatMap(_.split(" ")).map((_, 1))//groupByKey 和reduceByKey 的区别://他们都是要经过shuffle的,groupByKey在方法shuffle之间不会合并原样进行shuffle,//reduceByKey进行shuffle之前会先做合并,这样就减少了shuffle的io传送,所以效率高一点socketDS.groupByKey().map(tuple => (tuple._1, tuple._2.sum)).foreach(x => {println(x._1 + " " + x._2)})println("----------------------")socketDS.reduceByKey(_ + _).foreach(x => {println(x._1 + " " + x._2)})sc.stop()}
}
二 、首先groupByKey有三种
查看源码groupByKey()实现了 groupByKey(defaultPartitioner(self))
/*** Group the values for each key in the RDD into a single sequence. Hash-partitions the* resulting RDD with the existing partitioner/parallelism level. The ordering of elements* within each group is not guaranteed, and may even differ each time the resulting RDD is* evaluated.** @note This operation may be very expensive. If you are grouping in order to perform an* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`* or `PairRDDFunctions.reduceByKey` will provide much better performance.*/def groupByKey(): RDD[(K, Iterable[V])] = self.withScope {groupByKey(defaultPartitioner(self))}
查看源码 groupByKey(numPartitions: Int) 实现了 groupByKey(new HashPartitioner(numPartitions))
/*** Group the values for each key in the RDD into a single sequence. Hash-partitions the* resulting RDD with into `numPartitions` partitions. The ordering of elements within* each group is not guaranteed, and may even differ each time the resulting RDD is evaluated.** @note This operation may be very expensive. If you are grouping in order to perform an* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`* or `PairRDDFunctions.reduceByKey` will provide much better performance.** @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any* key in memory. If a key has too many values, it can result in an `OutOfMemoryError`.*/def groupByKey(numPartitions: Int): RDD[(K, Iterable[V])] = self.withScope {groupByKey(new HashPartitioner(numPartitions))}
其实上面两个都是实现了groupByKey(partitioner: Partitioner)
/*** Group the values for each key in the RDD into a single sequence. Allows controlling the* partitioning of the resulting key-value pair RDD by passing a Partitioner.* The ordering of elements within each group is not guaranteed, and may even differ* each time the resulting RDD is evaluated.** @note This operation may be very expensive. If you are grouping in order to perform an* aggregation (such as a sum or average) over each key, using `PairRDDFunctions.aggregateByKey`* or `PairRDDFunctions.reduceByKey` will provide much better performance.** @note As currently implemented, groupByKey must be able to hold all the key-value pairs for any* key in memory. If a key has too many values, it can result in an `OutOfMemoryError`.*/def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {// groupByKey shouldn't use map side combine because map side combine does not// reduce the amount of data shuffled and requires all map side data be inserted// into a hash table, leading to more objects in the old gen.val createCombiner = (v: V) => CompactBuffer(v)val mergeValue = (buf: CompactBuffer[V], v: V) => buf += vval mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2val bufs = combineByKeyWithClassTag[CompactBuffer[V]](createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)bufs.asInstanceOf[RDD[(K, Iterable[V])]]}
而groupByKey(partitioner: Partitioner)有实现了combineByKeyWithClassTag,所以可以说groupByKey其实底层都是combineByKeyWithClassTag的实现,只是实现的方式不同。
三、再查看reduceByKey也有三种方式
/*** Merge the values for each key using an associative and commutative reduce function. This will* also perform the merging locally on each mapper before sending results to a reducer, similarly* to a "combiner" in MapReduce. Output will be hash-partitioned with the existing partitioner/* parallelism level.*/def reduceByKey(func: (V, V) => V): RDD[(K, V)] = self.withScope {reduceByKey(defaultPartitioner(self), func)}/*** Merge the values for each key using an associative and commutative reduce function. This will* also perform the merging locally on each mapper before sending results to a reducer, similarly* to a "combiner" in MapReduce. Output will be hash-partitioned with numPartitions partitions.*/def reduceByKey(func: (V, V) => V, numPartitions: Int): RDD[(K, V)] = self.withScope {reduceByKey(new HashPartitioner(numPartitions), func)}
/*** Merge the values for each key using an associative and commutative reduce function. This will* also perform the merging locally on each mapper before sending results to a reducer, similarly* to a "combiner" in MapReduce.*/def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)}
通过查看这三种reduceByKey不难发现,前两种是最后一种的实现。而最后一种是又实现了combineByKeyWithClassTag。
### groupByKey是这样实现的
combineByKeyWithClassTag[CompactBuffer[V]](createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
### reduceByKey是这样实现的
combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
对比上面发现,groupByKey设置了mapSideCombine = false,在map端不进行合并,那就是在shuffle前不合并。而reduceByKey没有设置
难道reduceByKey默认合并吗????
四、接下来,我们仔细看一下combineByKeyWithClassTag
/*** :: Experimental ::* Generic function to combine the elements for each key using a custom set of aggregation* functions. Turns an RDD[(K, V)] into a result of type RDD[(K, C)], for a "combined type" C** Users provide three functions:** - `createCombiner`, which turns a V into a C (e.g., creates a one-element list)* - `mergeValue`, to merge a V into a C (e.g., adds it to the end of a list)* - `mergeCombiners`, to combine two C's into a single one.** In addition, users can control the partitioning of the output RDD, and whether to perform* map-side aggregation (if a mapper can produce multiple items with the same key).** @note V and C can be different -- for example, one might group an RDD of type* (Int, Int) into an RDD of type (Int, Seq[Int]).*/@Experimentaldef combineByKeyWithClassTag[C](createCombiner: V => C,mergeValue: (C, V) => C,mergeCombiners: (C, C) => C,partitioner: Partitioner,mapSideCombine: Boolean = true,serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0if (keyClass.isArray) {if (mapSideCombine) {throw new SparkException("Cannot use map-side combining with array keys.")}if (partitioner.isInstanceOf[HashPartitioner]) {throw new SparkException("HashPartitioner cannot partition array keys.")}}val aggregator = new Aggregator[K, V, C](self.context.clean(createCombiner),self.context.clean(mergeValue),self.context.clean(mergeCombiners))if (self.partitioner == Some(partitioner)) {self.mapPartitions(iter => {val context = TaskContext.get()new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))}, preservesPartitioning = true)} else {new ShuffledRDD[K, V, C](self, partitioner).setSerializer(serializer).setAggregator(aggregator).setMapSideCombine(mapSideCombine)}}
通过查看combineByKeyWithClassTag的,发现reduceByKey默认在map端进行合并,那就是在shuffle前进行合并,如果合并了一些数据,那在shuffle时进行溢写则减少了磁盘IO,所以reduceByKey会快一些。
这篇关于(转)groupByKey 和reduceByKey 的区别的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!