本文主要是介绍spark漫谈一,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
在IT的世界,技术种类繁杂,多如牛毛,每当我们遇到一种技术时,如何快速有效地学习并掌握技术,是每一个IT从业人员必须要面对的问题。在我看来,在学习任何一种技术时,有一个问题是值得我们先提问和思考的,那就是”为什么这种技术会出现?“。因为只有当我们认真严肃地去思考这个问题时,我们才会豁然开朗地发现,原来任何一种技术的出现,都有其特殊的需求和特定的背景。
以分布式计算框架spark为例,在其出现之前,世面上已经有分布式计算框架Mapreduce的存在。但是MapReduce存在自身无法克服的一些缺点,比如:
- 执行速度慢,原因主要来自两方面:
1)IO瓶颈:网络IO和磁盘IO。
2) shuffle瓶颈:数据需要写出到磁盘,而且具有排序的操作,并且内存的利用率低/不太灵活。
3)Task的执行是以进程的方式启动的,所以在处理小规模数据集的时候比较慢。 - MapReduce是一种基于数据集的工作模式。这种工作模式一般是从存储上加载数据集,然后操作数据集,最后写入物理存储设备。数据更多面临的是一次性处理,这种方式对数据领域两种常见的操作不是很高效。主要存在以下问题:
1)第一种是迭代式的算法。比如机器学习中ALS、凸优化梯度下降等。这些都需要基于数据集或者数据集的衍生数据反复查询反复操作。MapReduce不太合适这种模式,即使多MR串行处理,性能和时间也是一个问题。
2)另外一种是交互式数据挖掘,MR显然不擅长。
基于以上原因,我们需要一个效率非常快,且能够支持迭代计算和有效数据共享的模型,Spark 应运而生。
那spark又是怎么解决上述的一些问题的呢?
spark的大致理念包括:spark采用基于内存的计算方式,除了扩展了广泛使用的 MapReduce 计算模型,而且高效地支持更多计算模式,包括交互式查询和流处理。而实现这种理念,最初spark依靠的则是一种叫做RDD(Resilient Distributed Dataset,即弹性分布式数据集)的数据抽象。
那RDD又是什么呢?
RDD是Spark的基石,是实现Spark数据处理的核心抽象。RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。RDD可序列化,可以cache到内存中,省去了MapReduce大量的磁盘IO操作,有效了解决了Mapreduce执行速度慢的问题。RDD实质代表一个不可变、可分区、里面的元素可并行计算的集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。任何数据在Spark中都被表示为RDD,借助RDD,spark实现了以操作本地集合的方式来操作分布式数据集的抽象实现。
RDD有什么特征?
1.RDD逻辑上是分区的,每个分区的数据是抽象存在的。计算的时候会通过一个compute函数得到每个分区的数据,借助foreachPartition可以获得RDD中的分区。
2.RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD。
3.RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系,也称之为依赖。依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的,另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
4.如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来。该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。
5.虽然RDD的血缘关系天然地可以实现容错,当RDD的某个分区数据失败或丢失,可以通过血缘关系重建。但是对于长时间迭代型应用来说,随着迭代的进行,RDDs之间的血缘关系会越来越长,一旦在后续迭代过程中出错,则需要通过非常长的血缘关系去重建,势必影响性能。为此,RDD支持checkpoint将数据保存到持久化的存储中,这样就可以切断之前的血缘关系,因为checkpoint后的RDD不需要知道它的父RDDs了,它可以从checkpoint处拿到数据。
正是以上特征,RDD才实现了所谓的弹性。弹性主要表现在以下这些方面:
1.自动进行内存和磁盘数据存储的切换
Spark优先把数据放到内存中,如果内存放不下,就会放到磁盘里面,程序进行自动的存储切换。
2.基于血统的高效容错机制
在RDD进行转换和动作的时候,会形成RDD的Lineage依赖链,当某一个RDD失效的时候,可以通过重新计算上游的RDD来重新生成丢失的RDD数据。
3.Task如果失败会自动进行特定次数的重试
RDD的计算任务如果运行失败,会自动进行任务的重新计算,默认次数是4次。
4.Stage如果失败会自动进行特定次数的重试
如果Job的某个Stage阶段计算失败,框架也会自动进行任务的重新计算,默认次数也是4次。
5.Checkpoint和Persist可主动或被动触发
RDD可以通过Persist持久化将RDD缓存到内存或者磁盘,当再次用到该RDD时直接读取就行。也可以将RDD进行Checkpoint,Checkpoint会将数据存储在HDFS中,但是该RDD的所有父RDD依赖都会被移除。
6.数据调度弹性
Spark把这个JOB执行模型抽象为通用的有向无环图DAG,可以将多Stage的任务串联或并行执行,调度引擎自动处理Stage的失败以及Task的失败。
7.数据分片的高度弹性
可以根据业务的特征,动态调整数据分片的个数,提升整体的应用执行效率。
小结:RDD全称叫做弹性分布式数据集(Resilient Distributed Datasets),它是一种分布式的内存抽象,表示一个只读的记录分区的集合,它只能通过其他RDD转换而创建。为此,RDD支持丰富的转换操作(如map, join, filter, groupBy等),通过这种转换操作,新的RDD则包含了如何从其他RDDs衍生所必需的信息,所以说RDDs之间是有依赖关系的。基于RDDs之间的依赖,RDDs会形成一个有向无环图DAG,该DAG描述了整个流式计算的流程,实际执行的时候,RDD是通过血缘关系(Lineage)一气呵成的,即使出现数据分区丢失,也可以通过血缘关系重建分区。总结起来,基于RDD的流式计算任务可描述为:从稳定的物理存储(如分布式文件系统)中加载记录,记录被传入由一组确定性操作构成的DAG,然后写回稳定存储。另外RDD还可以将数据集缓存到内存中,使得在多个操作之间可以重用数据集,基于这个特点可以很方便地构建迭代型应用(图计算、机器学习等)或者交互式数据分析应用。可以说Spark最初也就是实现RDD的一个分布式系统,后面通过不断发展壮大成为现在较为完善的大数据生态系统。RDD之后,spark接着又实现了spark-sql、spark-streaming、structedstreaming、MLlib等。
spark-sql又是怎么回事?
如果学习过HIve的都知道,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性。同理,由于MapReduce这种计算模型执行效率比较慢,所以spark-sql的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!Spark SQL作为Spark用来处理结构化数据的一个模块,它提供了一个编程抽象叫做DataFrame并且作为分布式SQL查询引擎的作用。与RDD类似,DataFrame也是一个分布式数据容器。然而DataFrame更像传统数据库的二维表格,除了数据以外,还记录数据的结构信息,即schema。接着,从spark1.6之后,spark又增加了一种新的数据抽象DatSet。DataFrame只是知道字段,但是不知道字段的类型,所以在执行这些操作的时候是没办法在编译的时候检查是否类型失败的,比如你可以对一个String进行减法操作,在执行的时候才报错,而DataSet不仅仅知道字段,而且知道字段类型,所以有更严格的错误检查。这三者之间的显著区别在于,RDD让我们能够决定怎么做,而DataFrame和DataSet让我们决定做什么,控制的粒度一个比一个更细。
它们之间存在以下共性:
1.RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利。
2.三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过。
3.三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。
4.三者都有partition的概念。
5.三者有许多共同的函数,如filter,排序等。
6.在对DataFrame和Dataset进行操作许多操作都需要这个包(import spark.implicits._)进行支持。
7.DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型。
它们之间存在以下区别:
RDD:
1.RDD一般和spark mlib同时使用。
2.RDD不支持sparksql操作。
DataFrame:
1.与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,每一列的值无法直接访问。
2.DataFrame与Dataset一般不与spark mlib同时使用。
3.DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,
4.DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然。
Dataset:
Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同。DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段。而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息。
当然,它们之间还可以相互转换。如下:
RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换,这是Spark SQL的核心。
1.DataFrame/Dataset转RDD:
这个转换很简单,返回的是RDD[Row]
val rdd1=testDF.rdd
val rdd2=testDS.rdd
区别:
DataFrame转换后,返回的是弱类型RDD[Row],如下:
res11是DataFrame,转换后返回的是弱类型RDD[Row],属于弱类型。如果想获取其中的name字段,需要通过getString()方法,且通过正确的索引0才能实现。如果这里的索引写了1,则会报错,因为age字段是Int型的,想要获取age字段的数据,必须写成res13.map(_.getInt(1)).collect。
Dataset转换后,返回的是强类型,如下:
Res19是DataSet,转换后返回的是强类型RDD[People],属于强类型。如果想获取其中的name字段,可以通过name方法直接获取。
2.RDD转DataFrame:
import spark.implicits._
val testDF = rdd
.map {line=>(line._1,line._2)}
.toDF(“col1”,“col2”)
方式一:
方式二:
一般用元组把一行的数据写在一起,然后在toDF中指定字段名。
3.RDD转Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = rdd
.map {line=>Coltest(line._1,line._2)}
.toDS
可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可。
4.Dataset转DataFrame:
这个也很简单,因为只是把case class封装成Row。
import spark.implicits._
val testDF = testDS.toDF
5.DataFrame转Dataset:
import spark.implicits._
case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型
val testDS = testDF.as[Coltest]
这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。
在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。还有,这里的spark不是包名,而是SparkSession。
聊完spark-sql,再来聊聊spark streaming。
前面提到的RDD和spark sql都只能满足离线计算,现实业务中,我们需要实时流式计算。为此,spark便推出了spark streaming。其实严格来讲spark streaming并非真正意义上流式计算。深入研究,便会发现,spark streaming采取是实际上是微批次策略,这是只是一种近似流式计算而已。后来出现的flink才是真正意义上的实时流式计算。
和Spark基于RDD的概念很相似,Spark Streaming使用离散化流(discretized stream)作为抽象表示,叫作DStream。DStream是随时间推移而收到的数据的序列。在内部,每个时间区间收到的数据都作为 RDD 存在,而 DStream 是由这些 RDD 所组成的序列(因此得名“离散化”),借助foreachRDD可以获得DStream内部的RDD。
DStream 可以从各种输入源创建,比如 Flume、Kafka 或者 HDFS。创建出来的DStream 支持两种操作,一种是转化操作(transformation),会生成一个新的DStream,另一种是输出操作(output operation),可以把数据写入外部系统中。DStream 提供了许多与 RDD 所支持的操作相类似的操作支持,还增加了与时间(基于Processing-Time)相关的新操作,比如滑动窗口,但是Spark Streaming不满足基于Event Time的实时处理需求。
接着再聊聊structed streaming。
Structured Streaming是Spark2.0版本提出的新的实时流框架(2.0和2.1是实验版本,从Spark2.2开始为稳定版本),相比于Spark Streaming,优点如下:
1.同样能支持多种数据源的输入和输出,Kafka、flume、Socket、Json。
2.基于Event-Time,相比于Spark Streaming的Processing-Time更精确,更符合业务场景。
Event time 事件时间: 就是数据真正发生的时间,比如用户浏览了一个页面可能会产生一条用户的该时间点的浏览日志。
Process time 处理时间: 则是这条日志数据真正到达计算框架中被处理的时间点,简单的说,就是你的Spark程序是什么时候读到这条日志的。
事件时间是嵌入在数据本身中的时间。对于许多应用程序,用户可能希望在此事件时间操作。例如,如果要获取IoT设备每分钟生成的事件数,则可能需要使用生成数据的时间(即数据中的事件时间),而不是Spark接收他们的时间。事件时间在此模型中非常自然地表示 - 来自设备的每个事件都是表中的一行,事件时间是该行中的一个列值。
3.支持spark2的dataframe/Dataset处理,而不是Dstream。
4.解决了Spark Streaming存在的代码升级,DAG图变化引起的任务失败,无法断点续传的问题。
5.Structured Streaming is a scalable and fault-tolerant stream processing engine built on the Spark SQL engine. You can express your streaming computation the same way you would express a batch computation on static data. The Spark SQL engine will take care of running it incrementally and continuously and updating the final result as streaming data continues to arrive. You can use the Dataset/DataFrame API in Scala, Java, Python or R to express streaming aggregations, event-time windows, stream-to-batch joins, etc. The computation is executed on the same optimized Spark SQL engine. Finally, the system ensures end-to-end exactly-once fault-tolerance guarantees through checkpointing and Write-Ahead Logs. In short, Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming.
Internally, by default, Structured Streaming queries are processed using a micro-batch processing engine, which processes data streams as a series of small batch jobs thereby achieving end-to-end latencies as low as 100 milliseconds and exactly-once fault-tolerance guarantees. However, since Spark 2.3, we have introduced a new low-latency processing mode called Continuous Processing, which can achieve end-to-end latencies as low as 1 millisecond with at-least-once guarantees. Without changing the Dataset/DataFrame operations in your queries, you will be able to choose the mode based on your application requirements.
Structured Streaming支持的Source:
1.File Source:从给定的目录读取数据,目前支持的格式有text,csv,json,parquet,容错。
2.Kafka Source:从kafka拉取数据。仅兼容kafka 0.10.0或者更高版本,容错。
3.Socket Source(for testing):从一个连接中读取UTF8编码的文本数据,不容错。
Structured Streaming Output Sinks:
StructuredStreaming目前支持的sink只有FileSink、KafkaSink、ConsoleSink、MemorySink和ForeachSink。
注意:不同的output sink支持的output mode范围不同。
Structured Streaming output modes:
1.Append mode(default):This is the default mode, where only the new rows added to the Result Table since the last trigger will be outputted to the sink. This is supported for only those queries where rows added to the Result Table is never going to change. Hence, this mode guarantees that each row will be output only once (assuming fault-tolerant sink). For example, queries with only select, where, map, flatMap, filter, join, etc. will support Append mode.
2.Complete mode:The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.
3.Update mode:(Available since Spark 2.1.1) Only the rows in the Result Table that were updated since the last trigger will be outputted to the sink. More information to be added in future releases.
这篇关于spark漫谈一的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!