Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look

本文主要是介绍Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1、以本地模式实战map和filter

2、以集群模式实战textFile和cache

3、对Job输出结果进行升和降序

4、union

5、groupByKey

6、join

7、reduce

8、lookup

 

 

1、以本地模式实战map和filter

以local的方式,运行spark-shell。

spark@SparkSingleNode:~$ cd /usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ pwd
/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin
spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell

 

 从集合中创建RDD,spark中主要提供了两种函数:parallelize和makeRDD,

 

scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21

scala> val mappedRDD = rdd.map(2*_)
mappedRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:23

scala> mappedRDD.collect

得到

res0: Array[Int] = Array(2, 4, 6, 8, 10)

scala>

 

 

 

scala> val filteredRDD = mappedRDD.filter(_ > 4)
16/09/26 20:32:29 INFO storage.BlockManagerInfo: Removed broadcast_0_piece0 on localhost:40688 in memory (size: 1218.0 B, free: 534.5 MB)
16/09/26 20:32:30 INFO spark.ContextCleaner: Cleaned accumulator 1
filteredRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[2] at filter at <console>:25

scala> filteredRDD.collect

 

 

注意,一般,生产环境和正宗的写法是。

scala> val filteredRDDAgain = sc.parallelize(List(1,2,3,4,5)).map(2 * _).filter(_ > 4).collect

 

 

 

 

 

 

2、以集群模式实战textFile和cache

 启动hadoop集群

spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ jps
8457 Jps
spark@SparkSingleNode:/usr/local/hadoop/hadoop-2.6.0$ sbin/start-dfs.sh

 

启动spark集群

 

spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6$ sbin/start-all.sh

 

 spark@SparkSingleNode:/usr/local/spark/spark-1.5.2-bin-hadoop2.6/bin$ ./spark-shell --master spark://SparkSingleNode:7077 

 

 

读取该文件

scala> val rdd = sc.textFile("/README.md")

 使用count统计一下该文件的行数

scala> rdd.count

 

took 7.018386 s

res0: Long = 98

花了时间7.018386 s

 

通过观察RDD.scala源代码即可知道cache和persist的区别:

def persist(newLevel: StorageLevel): this.type = {
  if (storageLevel != StorageLevel.NONE && newLevel != storageLevel) {
    throw new UnsupportedOperationException( "Cannot change storage level of an RDD after it was already assigned a level")
  }
  sc.persistRDD(this)
  sc.cleaner.foreach(_.registerRDDForCleanup(this))
  storageLevel = newLevel
  this
}
/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)

/** Persist this RDD with the default storage level (`MEMORY_ONLY`). */
def cache(): this.type = persist()

可知:
1)RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY;
2)可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别;
3)cache或者persist并不是action;
附:cache和persist都可以用unpersist来取消

 

进行缓存

scala> rdd.cache
res1: rdd.type = MapPartitionsRDD[1] at textFile at <console>:21

执行count,使得缓存生效

scala> rdd.count

 

took 2.055063 s
res2: Long = 98

花了时间 2.055063 s

 

再执行,count

took 0.583177 s
res3: Long = 98

花了时间 0.583177 s

 

总结,我们直接基于cache缓存后的数据,计算所消耗时间大大减少。

 

 正在进行中的spark-shell

 

 

 

 接着,对上面的RDD,进行wordcount操作

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_)
wordcount: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[4] at reduceByKey at <console>:23

scala> wordcount.collect

 

 通过saveAsTextFile把数据保存起来

 

res4: Array[(String, Int)] = Array((package,1), (this,1), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),1), (Because,1), (Python,2), (cluster.,1), (its,1), ([run,1), (general,2), (have,1), (pre-built,1), (locally.,1), (locally,2), (changed,1), (sc.parallelize(1,1), (only,1), (several,1), (This,2), (basic,1), (Configuration,1), (learning,,1), (documentation,3), (YARN,,1), (graph,1), (Hive,2), (first,1), (["Specifying,1), ("yarn-client",1), (page](http://spark.apache.org/documentation.html),1), ([params]`.,1), (application,1), ([project,2), (prefer,1), (SparkPi,2), (<http://spark.apache.org/>,1), (engine,1), (version,1), (file,1), (documentation,,1), (MASTER,1), (example,3), (distribution.,1), (are,1), (params,1), (scala>,1), (DataFrames...
scala> wordcount.saveAsTextFile("/result")

只是,仅仅对每行,做了wordcount而已。

 

 

3、对Job输出结果进行升和降序

升序

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(true).map(x => (x._2,x._1)).saveAsTextFile("/resultAscSorted")

 

同理,去下载,不多赘述。

变了

 

 

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortBy(true).map(x => (x._2,x._1)).saveAsTextFile("/resultAscSorted")
<console>:23: error: type mismatch;
found : Boolean(true)
required: ((Int, String)) => ?
val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortBy(true).map(x => (x._2,x._1)).saveAsTextFile("/resultAscSorted")
^

scala>

 

 

 降序

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).reduceByKey(_+_).map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).saveAsTextFile("/resultDescSorted")

 

下载,同理

 此刻,成功对Job输出结果进行了排序。

 

4、union

union的使用

scala> val rdd1 = sc.parallelize(List(('a',1),('b',1)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[26] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(('c',1),('d',1)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[27] at parallelize at <console>:21

scala> rdd1 union rdd2
res6: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[28] at union at <console>:26

scala> val result = rdd1 union rdd2
result: org.apache.spark.rdd.RDD[(Char, Int)] = UnionRDD[29] at union at <console>:25

 

 

使用collect操作,查看一下执行结果

scala> result.collect

res7: Array[(Char, Int)] = Array((a,1), (b,1), (c,1), (d,1))

 

5、groupByKey

 

scala> val wordcount = rdd.flatMap(_.split(' ')).map((_,1)).groupByKey
wordcount: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[32] at groupByKey at <console>:23

scala> wordcount.collect

res8: Array[(String, Iterable[Int])] = Array((package,CompactBuffer(1)), (this,CompactBuffer(1)), (Version"](http://spark.apache.org/docs/latest/building-spark.html#specifying-the-hadoop-version),CompactBuffer(1)), (Because,CompactBuffer(1)), (Python,CompactBuffer(1, 1)), (cluster.,CompactBuffer(1)), (its,CompactBuffer(1)), ([run,CompactBuffer(1)), (general,CompactBuffer(1, 1)), (YARN,,CompactBuffer(1)), (have,CompactBuffer(1)), (pre-built,CompactBuffer(1)), (locally.,CompactBuffer(1)), (locally,CompactBuffer(1, 1)), (changed,CompactBuffer(1)), (sc.parallelize(1,CompactBuffer(1)), (only,CompactBuffer(1)), (several,CompactBuffer(1)), (learning,,CompactBuffer(1)), (basic,CompactBuffer(1)), (first,CompactBuffer(1)), (This,CompactBuffer(1, 1)), (documentation,CompactBuffer(1, 1, 1)), (Confi...
scala>

 

6、join

 概念知识,参考

http://www.cnblogs.com/goforward/p/4748128.html  

scala> val rdd1 = sc.parallelize(List(('a',1),('a',2),('b',3),('b',4)))
rdd1: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[33] at parallelize at <console>:21

scala> val rdd2 = sc.parallelize(List(('a',5),('a',6),('b',7),('b',8)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[34] at parallelize at <console>:21

scala> rdd1 join rdd2
res9: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[37] at join at <console>:26

scala> val result = rdd1 join rdd2
result: org.apache.spark.rdd.RDD[(Char, (Int, Int))] = MapPartitionsRDD[40] at join at <console>:25

scala> result.collect

 

res10: Array[(Char, (Int, Int))] = Array((b,(3,7)), (b,(3,8)), (b,(4,7)), (b,(4,8)), (a,(1,5)), (a,(1,6)), (a,(2,5)), (a,(2,6)))

scala>

 

可见,join操作,完全是一个笛卡尔积的操作。

 

 

 

7、reduce

reduce本身啊,在RDD操作里,属于一个action类型的操作,会导致job作业的提交和执行。

 

scala> val rdd = sc.parallelize(List(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[41] at parallelize at <console>:21

scala> rdd.reduce(_+_)

res11: Int = 15

 

8、lookup

scala> val rdd2 = sc.parallelize(List(('a',5),('a',6),('b',7),('b',8)))
rdd2: org.apache.spark.rdd.RDD[(Char, Int)] = ParallelCollectionRDD[42] at parallelize at <console>:21

scala> rdd2.lookup('a')    //返回一个seq, (5, 6) 是把a对应的所有元素的value提出来组成一个seq

 

res12: Seq[Int] = WrappedArray(5, 6)

 

这篇关于Spark RDD/Core 编程 API入门系列之map、filter、textFile、cache、对Job输出结果进行升和降序、union、groupByKey、join、reduce、look的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/778694

相关文章

C语言中联合体union的使用

本文编辑整理自: http://bbs.chinaunix.net/forum.php?mod=viewthread&tid=179471 一、前言 “联合体”(union)与“结构体”(struct)有一些相似之处。但两者有本质上的不同。在结构体中,各成员有各自的内存空间, 一个结构变量的总长度是各成员长度之和。而在“联合”中,各成员共享一段内存空间, 一个联合变量

C++必修:模版的入门到实践

✨✨ 欢迎大家来到贝蒂大讲堂✨✨ 🎈🎈养成好习惯,先赞后看哦~🎈🎈 所属专栏:C++学习 贝蒂的主页:Betty’s blog 1. 泛型编程 首先让我们来思考一个问题,如何实现一个交换函数? void swap(int& x, int& y){int tmp = x;x = y;y = tmp;} 相信大家很快就能写出上面这段代码,但是如果要求这个交换函数支持字符型

零基础STM32单片机编程入门(一)初识STM32单片机

文章目录 一.概要二.单片机型号命名规则三.STM32F103系统架构四.STM32F103C8T6单片机启动流程五.STM32F103C8T6单片机主要外设资源六.编程过程中芯片数据手册的作用1.单片机外设资源情况2.STM32单片机内部框图3.STM32单片机管脚图4.STM32单片机每个管脚可配功能5.单片机功耗数据6.FALSH编程时间,擦写次数7.I/O高低电平电压表格8.外设接口

16.Spring前世今生与Spring编程思想

1.1.课程目标 1、通过对本章内容的学习,可以掌握Spring的基本架构及各子模块之间的依赖关系。 2、 了解Spring的发展历史,启发思维。 3、 对 Spring形成一个整体的认识,为之后的深入学习做铺垫。 4、 通过对本章内容的学习,可以了解Spring版本升级的规律,从而应用到自己的系统升级版本命名。 5、Spring编程思想总结。 1.2.内容定位 Spring使用经验

大语言模型(LLMs)能够进行推理和规划吗?

大语言模型(LLMs),基本上是经过强化训练的 n-gram 模型,它们在网络规模的语言语料库(实际上,可以说是我们文明的知识库)上进行了训练,展现出了一种超乎预期的语言行为,引发了我们的广泛关注。从训练和操作的角度来看,LLMs 可以被认为是一种巨大的、非真实的记忆库,相当于为我们所有人提供了一个外部的系统 1(见图 1)。然而,它们表面上的多功能性让许多研究者好奇,这些模型是否也能在通常需要系

ps基础入门

1.基础      1.1新建文件      1.2创建指定形状      1.4移动工具          1.41移动画布中的任意元素          1.42移动画布          1.43修改画布大小          1.44修改图像大小      1.5框选工具      1.6矩形工具      1.7图层          1.71图层颜色修改          1

通过高德api查询所有店铺地址信息

通过高德api查询所有店铺地址电话信息 需求:通过高德api查询所有店铺地址信息需求分析具体实现1、申请高德appkey2、下载types city 字典值3、具体代码调用 需求:通过高德api查询所有店铺地址信息 需求分析 查询现有高德api发现现有接口关键字搜索API服务地址: https://developer.amap.com/api/webservice/gui

C++入门01

1、.h和.cpp 源文件 (.cpp)源文件是C++程序的实际实现代码文件,其中包含了具体的函数和类的定义、实现以及其他相关的代码。主要特点如下:实现代码: 源文件中包含了函数、类的具体实现代码,用于实现程序的功能。编译单元: 源文件通常是一个编译单元,即单独编译的基本单位。每个源文件都会经过编译器的处理,生成对应的目标文件。包含头文件: 源文件可以通过#include指令引入头文件,以使

DDei在线设计器-API-DDeiSheet

DDeiSheet   DDeiSheet是代表一个页签,一个页签含有一个DDeiStage用于显示图形。   DDeiSheet实例包含了一个页签的所有数据,在获取后可以通过它访问其他内容。DDeiFile中的sheets属性记录了当前文件的页签列表。   一个DDeiFile实例至少包含一个DDeiSheet实例。   本篇最后提供的示例可以在DDei文档直接预览 属性 属性名说明数

Python应用开发——30天学习Streamlit Python包进行APP的构建(9)

st.area_chart 显示区域图。 这是围绕 st.altair_chart 的语法糖。主要区别在于该命令使用数据自身的列和指数来计算图表的 Altair 规格。因此,在许多 "只需绘制此图 "的情况下,该命令更易于使用,但可定制性较差。 如果 st.area_chart 无法正确猜测数据规格,请尝试使用 st.altair_chart 指定所需的图表。 Function signa