spark的reduceByKey

2023-10-24 15:59
文章标签 spark reducebykey

本文主要是介绍spark的reduceByKey,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

在这里插入图片描述
在进行Spark开发算法时,最有用的一个函数就是reduceByKey。

reduceByKey的作用对像是(key, value)形式的rdd,而reduce有减少、压缩之意,reduceByKey的作用就是对相同key的数据进行处理,最终每个key只保留一条记录。

保留一条记录通常有两种结果。一种是只保留我们希望的信息,比如每个key出现的次数。第二种是把value聚合在一起形成列表,这样后续可以对value做进一步的操作,比如排序。

常用方式举例

比如现在我们有数据goodsSale:RDD[(String, String)],两个字段分别是goodsid、单个订单中的销售额,现在我们需要统计每个goodsid的销售额。

我们只需要保留每个goodsid的累记销售额,可以使用如下语句来实现:


val goodsSaleSum = goodsSale.reduceByKey((x,y) => x+y)

熟悉之后你可以使用更简洁的方式:

val goodsSaleSum = goodsSale.reduceByKey(_+_)

reduceByKey会寻找相同key的数据,当找到这样的两条记录时会对其value(分别记为x,y)做(x,y) => x+y的处理,即只保留求和之后的数据作为value。反复执行这个操作直至每个key只留下一条记录。

现在假设goodsSaleSum还有一个字段类目id,即 RDD[(String, String, String)] 形式,三个字段分别是类目id、goodsid、总销量,现在我们要获得第个类目id下销量最高的一个商品。

上一步聚是保留value求和之后的数据,而这里其实我们只需要保留销量更高的那条记录。不过我们不能直接对RDD[(String, String, String)]类型的数据使用reduceByKey方法,因为这并不是一个(key, value)形式的数据,所以需要使用map方法转化一下类型。


val catGmvTopGoods = goodsSaleSum.map(x => (x._1, (x._2, x._3))).reduceByKey((x, y) => if (x._2.toDouble > y._2.toDouble) x else y).map(x => (x._1, x._2._1, x._2._2)

再进一步,假设现在我们有一个任务:推荐5个销售额最高的类目,并为每个类目推荐一个销售额最高的商品,而我们的数据就是上述RDD[(String, String, String)类型的goodsSaleSum。

这需要两步,一是计算每个类目的销售额,这和举的第一个例子一样。二是找出每个类目下销量最高的商品,这和第二个例子一样。实际上,我们可以只实用一个reduceByKey就达到上面的目的。


val catIdGmvTopGoods = goodsSaleSum.map(x => (x._1, (x._2, x._3, x._3))).reduceByKey((x, y) => if (x._2 > y._2) (x._1, x._2, x._3+y._3) else (y._1, y._2, x._3+y._3)).map( x => (x._1, x._2._1, x._2._2, x._2._3).sortBy(_._3, false).take(5)

由于我们需要计算每个类目的总销售额,同时需要保留商品的销售额,所以先使用map增加一个字段用来记录类目的总销售额。这样一来,我们就可以使用reduceByKey同时完成前两个例子的操作。

剩下的就是进行排序并获取前5条记录。

聚合方式举例

上述的三个例子都是只保留需要的信息,但有时我们需要将value聚合在一起进行排序操作,比如对每个类目下的商品按销售额进行排序。

假设我们的数据是 RDD[(String, String, String)],三个字段分别是类目id、goodsid、销售额。

若是使用sql,那我们直接用row_number函数就可以很简单的使用分类目排序这个任务。

但由于spark-sql占用的资源会比RDD多不少,在开发任务时并不建议使用spark-sql。

我们的方法是通过reduceByKey把商品聚合成一个List,然后对这个List进行排序,再使用flatMapValues摊平数据。

我们在使用reduceyByKey时会注意到,两个value聚合后的数据类型必须和之前一致。

所以在聚合商品时我们也需要保证这点,通常有两种方法,一是使用ListBuffer,即可变长度的List。二是使用String,以分隔符来区分商品和销售额。下面我们使用第一种方式完成这个任务。

val catIdGoodsIdSorted = goodsGmvSum.map(x => (x._1, ListBuffer(x._2, x._3.toDouble))).reduceByKey((x, y) => x++y).flatMapValues( x => x.toList.sortBy(_._2).reverse.zipWithIndex)

上述zipWithIndex给列表增加一个字段,用来记录元素的位置信息。而flatMapValues可以把List的每个元素单独拆成一条记录,详细的说明可以参考我写的另一篇文章Spark入门-常用函数汇总

小结

我在本文中介绍了reduceByKey的三种作用:

求和汇总

获得每个key下value最大的记录

聚合value形成一个List之后进行排序

这篇关于spark的reduceByKey的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/276258

相关文章

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e

Spark数据介绍

从趋势上看,DataFrame 和 Dataset 更加流行。 示例场景 数据仓库和 BI 工具集成: 如果你需要处理存储在数据仓库中的结构化数据,并且希望与 BI 工具集成,那么 DataFrame 和 Dataset 是首选。 机器学习流水线: 在构建机器学习流水线时,使用 DataFrame 和 Dataset 可以更好地管理数据流,并且可以方便地与 MLlib 集成。 实时数据处理:

Mac搭建华为云平台Hadoop+spark步骤

1、安装终端和文件传输软件 下载、安装、配置 详戳数据平台搭建文件夹 Transmit 用于文件传输 iTerm2    用于终端 2、连接与登录 mac 使用iTerm2快捷登录远程服务器 Mac Transmit连接 (密码不可复制,手动输入) 3、安装jdk 4、修改主机名 Linux系统下如何修改主机名 4、安装配置hadoop

Spark-在集群上运行Spark

Spark-在集群上运行Spark

Spark—数据读取和保存

Spark—数据读取和保存

Spark源码分析之Spark Shell(上)

终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。 先来介绍一下Spark-shell是什么? Spark-shell是提供给用户即时交互的一个命令窗口,你可以在里面编写spark代码,然后根据你的命令立即进行

[大数据之Spark]——快速入门

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用。可以查看 编程指南了解更多的内容。 为了良好的阅读下面的文档,最好是结合实际的练习。首先需要下载spark,然后安装hdfs,可以下载任意版本的hdfs。 Spark Shell 交互 基本操作 Spark Shell提供给用

周期性清除Spark Streaming流状态的方法

在Spark Streaming程序中,我们经常需要使用有状态的流来统计一些累积性的指标,比如各个商品的PV。简单的代码描述如下,使用mapWithState()算子: 现在的问题是,PV并不是一直累加的,而是每天归零,重新统计数据。要达到在凌晨0点清除状态的目的,有以下两种方法。 编写脚本重启Streaming程序 用crontab、Azkaban等在凌晨0点调度执行下面的Shell脚本