本文主要是介绍spark,keyValue对RDDs,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
keyValue对RDDs
创建keyValue对RDDs:
使用map()函数,返回key/value对
例如,包含数行数据的RDD,每行数据的第一个单词作为keys,整行作为value
val rdd=sc.textFile("/home/hellospark.txt")
rdd.foreach(println)
val rdd2= rdd.map(line=>(lines.split(" ")(0),line))
rdd2.foreach(println) (hello,hello spark)
常见操作
手动构建
val rdd3=sc.parallelize(Array((1,2),(3,4),(3,6)))
rdd3.foreach(println) (1,2) (3,4) (3,6)
reduceByKey(func)
把相同key的value累加 (1,2)(3,10)
groupByKey 相同的key的values分组
{[1,2],[3,[4,6]]}
val rdd5=rdd3.groupByKey()
mapValues(func)函数作用于pairRDD的每个元素,key不变
rdd.mapValues(x=>x+1) (1,3) (3,5) (3.7)
flatMapValues(func)符号化使用,rdd.flatMapValues(x=>(x to 5)
keys() 仅返回keys
val rdd6 =rdd3.keys 1 3 3
values()
val rdd7 = rdd3.values 2 4 6
sortByKey()
val rdd8 = rdd3.sortByKey()
combineByKey()
createCombiner,mergeValue,mergeCombiner,partitioner
聚合函数,返回类型可与输入类型不一样
许多基于key的聚合函数都用到它,像groupByKey()
原理:
遍历分区中的元素,元素的key要么是之前见过的,要么不是
如果是分区新元素,会使用createCombiner()函数
如果是这个分区已经存在的key,就会使用mergeValue()函数
合计每个分区的结果的时候,使用mergeCombiners()函数
例子,求平均值
val scores=sc.parallelize(Array(("jake",80.0),("jake",90.0),("jake",85.0),("mike",85.0),("mike",92.0),("mike",90.0)))
scores.foreach(println) (jake,80.0) (jake,90.0) (jake,85.0) (mike,85)
val score2 = score.combineBykey(score=>(1,score), (c1:(Int,Double),newscore)=>(c1._1+1,c1._2+newscore)),
score指的是value值,分数 计数1 c1:科目数,累加之后分数,遍历时出现的新分数=> 科目数+1,分数加
(c1:(Int,Double),c2:(Int,Double)=>(c1._1+c2._1,c1._2+c2._2)))
科目数相加, 分数相加
(jake,(3,255.0))
(mike,(3,267.0))
val average = scores2.map(case(name,(num,score))=>(name,score/num)}
(mike,89)
(jake,85)
这篇关于spark,keyValue对RDDs的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!