本文主要是介绍《Spark快速大数据分析》——读书笔记(3),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
只看书是快,但是动手时会遇到种种问题,不可怠慢!
第3章 RDD编程
弹性分布式数据集(Resilient Distributed Dataset,RDD)其实就是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用RDD操作进行求值。
3.1 RDD基础
Spark中RDD是一个不可变的分布式对象集合。每个RDD都被分为多个分区,分区运行在集群的不同节点上。RDD可以包含任意类型的对象。
创建RDD的两种方法:
- 读取一个外部数据集。如Python中lines=sc.textFile(“README.md”)
- 在驱动器程序里分发驱动器程序中的对象集合(如list或set) 。
创建后,RDD支持两种类型的操作:
- 转化操作——由一个RDD生成一个新的RDD。如pythonLines=lines.filter(lambda line: “Python” in line)
- 行动操作——对RDD计算出一个结果,并把结果返回到驱动器程序或存储。如pythonLines.first()
转化操作和行动操作的区别在于Spark计算RDD的方式不同。虽然可以在任何时候定义新的RDD,但Spark只会惰性计算这些RDD——即第一次在行动操作中用到时,才会计算。
最后,默认情况下,Spark的RDD会在你每次对他们进行行动操作时重新计算。如果想在多个行动操作中重用一个RDD,可以使用RDD.persist()让Spark把这个RDD缓存下来。
在实际操作中,persist()经常被用来把数据的一部分读取到内存中,并反复查询这部分数据。
Spark程序或shell会话的工作方式:
- 从外部数据创建出输入RDD
- 使用诸如filter()这样的转化操作对RDD进行转化,以及定义新的RDD。
- 告诉Spark对需要被重用的中间结果RDD执行persist()操作。
- 使用行动操作(例如count()和first()等)来触发一次并行计算,Spark会对计算进行优化后再执行。
cache()与使用默认存储级别调用persist()是一样的。
3.2 创建RDD
Spark提供了两种创建RDD的方式:读取外部数据集,以及在驱动器程序中对一个集合进行并行化。
创建RDD的最简单的方式是把程序中一个已有的集合传给SparkContext的parallelize()方法。但实际中用的不多,因为该方法需把整个数据集先放在一台机器的内存中。
Python中的parallelize方法
lines=sc.parallelize(["panda","i like pandas"])
更常用的方式是从外部存储中读取数据创建RDD。如SparkContext方法将文本文件读入为一个存储字符串的RDD。
Python中的textFile()方法
lines=sc.textFile("/path/to/README.md")
3.3 RDD操作
操作类型 | 操作结果 | 典型函数 | 特点 |
---|---|---|---|
转化操作 | 返回新的RDD | map()和filter() | 返回RDD |
行动操作 | 向驱动器程序返回结果或把结果写出 | count()和fist() | 返回其他类型 |
3.3.1 转化操作
RDD的转化操作是返回新RDD的操作(并不改变原RDD的内容)。许多转化操作都是针对各个元素的,即只操作RDD中的一个元素。
例:从log.txt中选出错误信息及union()函数的用法(python)
inputRDD=sc.textFile("log.txt")
errorsRDD=inputRDD.filter(lambda x: "error" in x)
warningsRDD=inputRDD.filter(lambda x:"warnning" in x)
badLinesRDD=errorsRDD.union(warningsRDD)
转化操作从已有的RDD中派生出新的RDD,**Spark会使用谱系图,来记录这些不同RDD之间的依赖关系。**Spark需要用这些信息来按需计算每个RDD,也可以依靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。下图是上例中的谱系图。
3.3.2 行动操作
行动操作会把最终求得的结果返回到驱动器程序,或者写入外部存储系统中。由于行动操作需要生成实际的输出,他们会强制执行那些求值必须用到的RDD的转化操作。
例3-15:在Python中使用行动操作对错误进行计数
print "Input had "+badlinesRDD.count()+" concerning lines"
print "Here are 10 examples: "
for lines in badLines RDD.take(10):print line
该例中使用take()获取了少量元素,在本地处理。RDD还有一个collect()函数,可以获取整个RDD的数据。如果RDD规模较小,且想在本地处理时,可以使用它(只有当整个数据集能在单台机器的内存中放得下时,才能使用collect())。因此该方法不常用,通常将数据写到HDFS或Amazon S3这样的分布式存储系统。
注:每当调用一个新的行动操作时,整个RDD都会从头开始计算。为避免这种低效的行为,需将中间结果持久化。
3.3.3 惰性求值
惰性求值指我们对RDD调用转化操作时,操作不会立即执行。Spark会在内部记录下所要求执行的操作的相关信息。
我们不应该吧RDD看做存放着特定数据的数据集,而最好把每个RDD当做我们通过转化操作构建出来的、记录如何计算数据的指令列表。
另外把数据读取到RDD的操作同样是惰性的。如调用sc.textFile()时,数据并没有读取进来,而是在必要时才会读取。
Spark使用惰性求值,这样就可以把一些操作合并到一起来减少计算数据的步骤。在类似hadoop MapReduce的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少MapReduce的周期数。而在Spark中,写出一个非常复杂的映射并不见得能比使用很多简单的连续操作获得好很多的性能。因此用户可以用更小的操作来组织他们的程序。(加黑部分不是很懂)
3.4 向Spark传递函数
Spark的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。
3.4.1 Python
在Python中,我们有三种方式来把函数传递给Spark。传递比较短的函数时,可以使用lambda表达式来传递。除了lambda表达式,我们也可以传递顶层函数或是定义的局部函数。
例3-18:在Python中传递函数
word=rdd.filter(lambda s: "error" in s)def containsError(s)return "error" in s
word=rdd.filter(containsError)
传递函数时需要注意,Python会把函数所在的对象也序列化传出去。当你传递的对象是某个对象的成员,或者包含了对某个对象中一个字段的引用时(例如self.field),Spark就会把整个对象发到工作节点上,这可能比想传递的东西大得多(见例3-19)。另外,如果传递的类里面包含Python不知道如何序列化传输的对象,也会导致程序的失败。(这里的序列化传输不懂)
例3-19:传递一个带字段引用的函数(别这么做!!)
class SearchFunctions(object):def __init__(self, query):self.query=querydef isMatch(self,s):return self.query in sdef getMatchesFunctionReference(self, rdd):# 问题:在“self.isMatch”中引用了整个selfreturn rdd.filter(self.isMatch)def getMatchesMemberReference(self, rdd):#问题:在"self.query"中引用了整个selfreturn rdd.filter(lambda x: self.query in x)
替代的方案是,吧所需的字段从对象中拿出来放在一个局部变量中,然后传递这个局部变量。
例3-20:传递不带字段引用的Python函数
class WordFunctions(object):---def getMatchesNoReference(self, rdd):#安全:只把需要的字段提取到局部变量中query=self.queryreturn rdd.filter(lambda x: query in x)
3.4.2 Scala
在Scala中,可以把定义的内联函数、方法的引用或静态方法传递给Spark,就像Scala的其他函数式API一样。
注:所传递的函数及其引用的数据需要时可序列化的(实现了Java的serializable接口),另外,和Python类似传递一个对象的方法或者字段时,会包含对整个对象的引用。
例3-21:Scala中的函数传递
class SearchFunctions(val query: String) {def isMatch(s:String):Boolean={s.contains(query)}def getMatchesFunctionReference(rdd: RDD[String]): RDD[String]={//问题:"isMatch"表示整个"this.ismatch",因此我们要传递整个"this"rdd.map(isMatch)}def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {// 问题:"query"表示"this.query",因此我们要传递整个"this" rdd.map(x => x.split(query))} def getMatchesNoReference(rdd: RDD[String]): RDD[String] = { // 安全:只把我们需要的字段拿出来放入局部变量中val query_ = this.queryrdd.map(x => x.split(query_)}
}
3.4.3 Java
在Java中,函数需要作为实现了Spark的org.apche.spark.api.java.function包中的任一函数接口的对象来传递。
可以把我们的函数类内联定义为匿名内部类(例3-22),也可以创建一个具名类(例3-23)
例3-22:在Java中使用匿名内部类进行函数传递
RDD<String> errors=lines.filter(new Function<String,Boolean>(){public Boolean call(String x){return x.contains("error");}
});
例3-23:在Java中使用具名类进行函数传递
class ContainsError implements Function<String ,Boolean>(){public Boolean call(String x){return x.contains("error");}
}
RDD<String> errors=lines.filter(new ContainsError());
使用顶级函数的另一个好处在于你可以给他们的构造函数添加参数,如例3-24所示。
例3-24:带参数的Java函数类
class Contains implements Function<String, Boolean>(){private String query;public Contains(String query){ this.query=query;}public boolean call(String x){return x.contains(query);}
}
RDD<String> errors=lines.filter(new Contains("error"));
也可以使用lambda表达式来简洁地实现函数接口,如例3-25所示。
例3-25:在Java中使用Java 8 地lambda表达式进行函数传递
RDD<String> errors=lines.filter(s->s.contains("error"));
(lambda表达式不懂!)
3.5 常见的转化操作和行动操作
3.5.1 基本RDD
1.针对各个元素的转化操作
最常用的是map()和filter()。
转化操作map()接受一个函数,把这个函数用于RDD中的每个元素,将函数的返回结果作为结果RDD中对应元素的值。输入类型和返“`
类型可gg
不同。
转化操作filter()则接收一个函数,并将RDD中满足该函数的元素放入新的RDD中返回。
例3-26:Python版计算RDD中各值的平方
nums=sc.parallelize([1,2,3,4])
squared=nums.map(lambda x: x*x).collect()
for num in squard:print "%i " % (num)
flatMap()对每个输入元素生成多个输出元素。和map()类似,提供给flatMap()的函数被分别应用到了输入RDD的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。我们得到的是一个包含各个迭代器可访问的所有元素的RDD。
例3-29:Python中的flatMap()将行数据切分为单词(并与map()进行对比)
注:Spark的配置不光要在环境变量中加上bin目录,还要对SPARK_LOCAL_IP配置为127.0.0.1才可以顺利的在命令行中使用pyspark
2. 伪集合操作
RDD虽不是严格意义上的集合,但也支持许多数学上的集合操作,比如合并和相交。这些操作都要求操作的RDD是相同数据类型的。
- distinct()去除重复元素。注意,distinct()操作开销很大,因为它需要将所有数据通过网络进行混洗(shuffle)。
- union(other)返回包含两个RDD中所有元素的RDD。注意这里并不会剔除重复数据。
- intersection(other)只返回两个RDD中都有的元素。注意这里会去除掉所有的重复元素,该操作的性能很差,因为需要通过网络混洗数据来发现共有的元素。
- subtract(other)和intersection一样需要数据混洗,性能差。
- cartesian(other)计算笛卡尔积,返回所有可能的(a,b)对。在我们希望考虑所有可能的组合的相似度时比较有用。注意RDD规模大时该操作开销巨大。
表3-2和表3-3总结了这些常见的RDD转化操作。
3. 行动操作
最常见的行动操作reduce()。它接受一个函数作为参数,该函数操作两个RDD的元素类型的数据并返回一个同样类型的新元素。
例3-32:Python中的reduce()——计算元素的总和
sum=rdd.reduce(lambda x,y:x+y)
#x指代的是返回值,y是对rdd中元素的遍历
fold()和reduce()类似,接受一个与reduce()接受的函数签名相同的函数,再加上一个“初始值”来作为每个分区第一次调用时的结果。你所提供的初始值应当是你提供的操作的单位元素;也就是说,使用你的函数对这个初始值进行多次计算不会改变结果(例如+对应的0,*对应的1,或拼接操作对应的空列表)。如下例:
val l = List(1,2,3,4)
l.reduce((x, y) => x + y)
val l = List(1,2,3,4)
l.fold(0)((x, y) => x + y)
这个计算其实 0 + 1 + 2 + 3 + 4,而reduce()的计算是:1 + 2 + 3 + 4,没有初始值,或者说rdd的第一个元素值是它的初始值。
fold()和reduce()都要求函数的返回值类型和所操作的RDD元素类型相同。如果需要返回不同类型的值,则需要使用map函数进行转化。
aggregate()函数则把我们从返回值类型和所操作的RDD类型可以不同。与fold()类似,需要期待返回类型的初始值。然后通过一个函数把RDD中的元素合并起来放入累加器,考虑到每个节点是在本地进行累加(考虑分布式的情况~),最终还需要提供第二个函数来讲累加器两两合并。
例3-35:Python中的aggregate()
sumCount=nums.aggregate((0,0),(lambda acc,value:(acc[0]+value,acc[1]+1),(lambda acc1,acc2:(acc1[0]+acc2[0],acc1[1]+acc2[1]))))
return sumCount[0]/float(sumCount[1])
RDD的一些行动操作会以普通集合或者值的形式将RDD的部分或全部数据返回驱动器程序中。
把数据返回驱动器程序最简单、最常见的操作是collect(),它返回整个RDD的内容。所以要求数据不会太大,能放入单台机器的内存中。
take(n)返回RDD中的n个元素,并且尝试只访问尽量少的分区,因此该操作会得到一个不均衡的集合。
top()从RDD中获取前几个元素。(需要为数据定义顺序)
takeSample(withReplacement, num, seed)函数可以让我们从数据中获取采样。
foreach()行动操作对RDD每个元素操作,而不需要把RDD发挥本地。
3.5.2 在不同RDD类型间转换
有些函数只能用于特定类型的RDD,比如mean()和variance()只能用在数值RDD上,而join()只能用在键值对RDD上。在Scala和Java中,这些函数没有定义在标准的RDD类中,所以要访问这些附加功能,必须要确保获得了正确的专用RDD类。
1. Scala
Scala中,将RDD转为由特定函数的RDD是由隐式转换来自动处理的。我们需要加上import org.apache.spark.SparkContext._来使用这些隐式转换。
2. Java
Java中有两个专门的类JavaDoubleRDD和JavaPairRDD,来处理特殊类型的RDD。
例3-28:用Java创建DoubleRDD
JavaDoubleRDD result=rdd.mapToDouble(new DoubleFunction<Integer>(){public double call(Integer x){return (double) x*x;}});System.out.println(result.mean());
3. Python
Python中所有的函数都实现在基本的RDD类中。
3.6 持久化(缓存)
为了避免多次计算同一个RDD,可以让Spark对数据进行持久化。当我们让Spark持久化存储一个RDD时,计算出RDD的节点会分别保存他们所求出的分区数据。
默认情况下,我们会把数据以序列化的形式缓存在JVM的堆空间中。
如果要缓存的数据太多,内存中放不下,Spark会自动利用最近最少使用的缓存策略把最老的分区从内存中移除。
RDD的unpersist()方法可以手动把持久化的RDD从缓存中移除。
这篇关于《Spark快速大数据分析》——读书笔记(3)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!