spark,keyValue对RDDs

2024-09-01 17:32
文章标签 spark rdds keyvalue

本文主要是介绍spark,keyValue对RDDs,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

keyValue对RDDs

创建keyValue对RDDs:

使用map()函数,返回key/value对

例如,包含数行数据的RDD,每行数据的第一个单词作为keys,整行作为value

val rdd=sc.textFile("/home/hellospark.txt")

rdd.foreach(println)

val rdd2= rdd.map(line=>(lines.split(" ")(0),line))

rdd2.foreach(println)             (hello,hello spark)


常见操作

手动构建

val rdd3=sc.parallelize(Array((1,2),(3,4),(3,6)))

rdd3.foreach(println)   (1,2) (3,4)  (3,6)

reduceByKey(func)

把相同key的value累加   (1,2)(3,10)

val rdd4=rdd3.reduceByKey((x,y)=>x+y)


groupByKey 相同的key的values分组

{[1,2],[3,[4,6]]}

val rdd5=rdd3.groupByKey()


mapValues(func)函数作用于pairRDD的每个元素,key不变

rdd.mapValues(x=>x+1)     (1,3) (3,5)  (3.7)

flatMapValues(func)符号化使用,rdd.flatMapValues(x=>(x to 5)   

keys() 仅返回keys    

val rdd6 =rdd3.keys    1 3 3

values()

val rdd7 = rdd3.values               2 4 6

sortByKey()

val rdd8 = rdd3.sortByKey()               



combineByKey()

createCombiner,mergeValue,mergeCombiner,partitioner

聚合函数,返回类型可与输入类型不一样

许多基于key的聚合函数都用到它,像groupByKey()

原理:

遍历分区中的元素,元素的key要么是之前见过的,要么不是

如果是分区新元素,会使用createCombiner()函数

如果是这个分区已经存在的key,就会使用mergeValue()函数

合计每个分区的结果的时候,使用mergeCombiners()函数

例子,求平均值

val scores=sc.parallelize(Array(("jake",80.0),("jake",90.0),("jake",85.0),("mike",85.0),("mike",92.0),("mike",90.0)))

scores.foreach(println)  (jake,80.0) (jake,90.0) (jake,85.0) (mike,85)

val score2 = score.combineBykey(score=>(1,score),           (c1:(Int,Double),newscore)=>(c1._1+1,c1._2+newscore)),      

score指的是value值,分数                               计数1             c1:科目数,累加之后分数,遍历时出现的新分数=> 科目数+1,分数加

(c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2._1,c1._2+c2._2)))

     科目数相加,        分数相加

(jake,(3,255.0))

(mike,(3,267.0))

val average = scores2.map(case(name,(num,score))=>(name,score/num)}

(mike,89)

(jake,85)


这篇关于spark,keyValue对RDDs的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1127594

相关文章

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e

Spark数据介绍

从趋势上看,DataFrame 和 Dataset 更加流行。 示例场景 数据仓库和 BI 工具集成: 如果你需要处理存储在数据仓库中的结构化数据,并且希望与 BI 工具集成,那么 DataFrame 和 Dataset 是首选。 机器学习流水线: 在构建机器学习流水线时,使用 DataFrame 和 Dataset 可以更好地管理数据流,并且可以方便地与 MLlib 集成。 实时数据处理:

Mac搭建华为云平台Hadoop+spark步骤

1、安装终端和文件传输软件 下载、安装、配置 详戳数据平台搭建文件夹 Transmit 用于文件传输 iTerm2    用于终端 2、连接与登录 mac 使用iTerm2快捷登录远程服务器 Mac Transmit连接 (密码不可复制,手动输入) 3、安装jdk 4、修改主机名 Linux系统下如何修改主机名 4、安装配置hadoop

Spark-在集群上运行Spark

Spark-在集群上运行Spark

Spark—数据读取和保存

Spark—数据读取和保存

Spark源码分析之Spark Shell(上)

终于开始看Spark源码了,先从最常用的spark-shell脚本开始吧。不要觉得一个启动脚本有什么东东,其实里面还是有很多知识点的。另外,从启动脚本入手,是寻找代码入口最简单的方法,很多开源框架,其实都可以通过这种方式来寻找源码入口。 先来介绍一下Spark-shell是什么? Spark-shell是提供给用户即时交互的一个命令窗口,你可以在里面编写spark代码,然后根据你的命令立即进行

[大数据之Spark]——快速入门

本篇文档是介绍如何快速使用spark,首先将会介绍下spark在shell中的交互api,然后展示下如何使用java,scala,python等语言编写应用。可以查看 编程指南了解更多的内容。 为了良好的阅读下面的文档,最好是结合实际的练习。首先需要下载spark,然后安装hdfs,可以下载任意版本的hdfs。 Spark Shell 交互 基本操作 Spark Shell提供给用

周期性清除Spark Streaming流状态的方法

在Spark Streaming程序中,我们经常需要使用有状态的流来统计一些累积性的指标,比如各个商品的PV。简单的代码描述如下,使用mapWithState()算子: 现在的问题是,PV并不是一直累加的,而是每天归零,重新统计数据。要达到在凌晨0点清除状态的目的,有以下两种方法。 编写脚本重启Streaming程序 用crontab、Azkaban等在凌晨0点调度执行下面的Shell脚本