####好好好¥#####spark Streaming 技术内幕 : 从DSteam到RDD全过程解析

本文主要是介绍####好好好¥#####spark Streaming 技术内幕 : 从DSteam到RDD全过程解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、DStream和RDD的关系
    DSream 代表了一系列连续的RDD,DStream中每个RDD包含特定时间间隔的数据,如下图所示:
 
    
从上图可以看出, 一个DStream 对应了时间维度上的多个RDD。
DStream 作为Spark Stream的一个基本抽象,提供了高层的API来进行Spark Streaming 程序开发,先看一个简单的Spark Streaming的WordCount程序实例:
  1. object WordCount{
  2. def main(args:Array[String]):Unit={
  3. val sparkConf =newSparkConf().setMaster("local[4]").setAppName("WordCount")
  4. val ssc =newStreamingContext(sparkConf,Seconds(1))
  5. val lines = ssc.socketTextStream("localhost",9999)
  6. val words = lines.flatMap(_.split(" "))
  7. val wordCounts = words.map(=>(x,1)).reduceByKey(_+_)
  8. wordCounts.print()
  9. ssc.start()
  10. ssc.awaitTermination()
  11. }
  12. }
我们会发现对DStream的操作和RDD的操作惊人的相似, 通过对DStream的不断转换,形成依赖关系。所以的DStream操作最终会转换成底层的RDD的操作,上面的例子中
lines DStream转换成wods DSteam。 lines DStream的 flatMap操作会作用于其中每一个RDD去生成words DStream 中的RDD, 过程如下图所示:
 
下面从源码角度看一下 DStream和RDD的关系:
    DStream 中 有一个HashMap[Time,RDD[T]]类型的对象 generatedRDDs,其中Key为作业开始时间,RDD为该DStream对应的RDD,源码如下:
 
    
二、Dstream 的分类
    Dstream 主要分为三大类:
         1. Input DStream
         2.  Transformed DStream
         3. Output DStream
 
2.1 InputDStream 是DStream 最初诞生的地方,也是RDD最初诞生的地方,它是依据数据源创建的最初的DStream,如上面例子中的代码:
 
val lines = ssc . socketTextStream ( "localhost" , 9999 )
 
基于Socket数据源创建了 SocketInputDStream对象lines,下面从源码角度分析一下他是怎么生成RDD的,  SocketInputDStream生成RDD的方法在 它的父类ReceiverInputDSteam中:
 

 
ReceiverInputDSteam  的compute方法中调用了createBloackRDD方法基于Block信息创建了RDD :
 

可以看到  ReceiverInputDSteam 的 createBloackRDD 方法new了BlockRDD对象,BlockRDD 是继承自RDD。至此,最初的RDD创建完成。
 
2.2、  Transformed DStream 是由其他DStream 通过非Output算子装换而来的DStream
   例如例子中的lines通过flatMap算子转换生成了FlatMappedDStream:
     val words = lines.flatMap(_.split(" "))
   下面看一下flatMap的源码:
    
    
 
可以看到flatMap是DStream的方法,它创建了FlatMappeedDStream并返回,上面例子中words 就是 FlatMappeedDStream 对象,创建 FlatMappeedDStream对象时传入了 参数flatMapFunc,这里的flatMapFunc就是用户编写的业务逻辑,我们再进入FlatMappedDStream,查看其compute方法:
 

可以惊喜的看到 FlatMappedDStream的compute方法调用了parent的getOrCompute方法获取父DStream的RDD.通过对 父DStream的RDD的flatMap算子生成新的RDD,转换的业务逻辑通过flatMapFunc参数传递给flatMap算子。这样对DStream的操作都转换成了对RDD的操作,同时DSream的依赖关系也与RDD之间依赖关系同时建立了起来。
说明:这些RDD的创建是在Job动态生成时候发生的,Job生成最终会调用ForeachDStream的generateJob方法,源码如下
 

其中的parent.getOrCompute方法会依据DStream之间的依赖关系,导致一系列的链式调用,从而创建所有的RDD,并形成RDD之间的依赖关系。
 
3.3  Output DStream 是有其他DStream通过Output算子生成,它只存在于Output算子内部,并不会像Transformed Stream一样由算子返回, 他是触发Job执行的关键。
          那么什么是Output 算子呢? Output 算子是让DStream中的数据被推送的外部系统,像数据库,文件系统(HDFS,GFS等)的算子。因为Output 算子是将转换后的数据推送到外部系统被使用的操作,所以他触发了前面转换操作的真正执行(类似于RDD的action操作)。
          下面,我们看看有哪些Output算子:
 
Output Operation Meaning
print() Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. 
Python API This is called pprint() in the Python API.
saveAsTextFiles(prefix, [suffix]) Save this DStream's contents as text files. The file name at each batch interval is generated based onprefix and suffix"prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix"prefix-TIME_IN_MS[.suffix]"
Python API This is not available in the Python API.
saveAsHadoopFiles(prefix, [suffix]) Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix"prefix-TIME_IN_MS[.suffix]"
Python API This is not available in the Python API.
foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
下面,回到我们开头的例子:
wordCounts . print ()
其中pirnt算子就是Output算子,我们进入print的源码:
 

print()方法调用了print(10),其实是调用了另一个print方法:
print 方法中首先定义了一个函数foreachFunc,foreachFunc从rdd中出去num个元素打印出来。接下来print函数调用了foreachRDD,并将foreachFunc的处理逻辑作为参数传入。这里的foreachRDD也是一个Output算子(上面已经有说明),接下来看看 foreachRDD的源码。
 

 
可以看到foreachRDD中创建了一个ForeachDStream对象,这就是我们期待已久的Output DStream。这里需要注意一个关键点:
创建完ForeachRDD对象后,调用了该对象的register方法。register方法将当前对象注册给DStreamGraph。源码如下:
 

注册的过程就是将当前对象加入graph的输出流outputStream中:

这个过程很重要,在Job触发时候会用到outputStream。我们先在这里记住这个过程,下面的分析会用到这个内容。
至此,DStream到RDD过程已经解析完毕。
三 、由Dstream触发RDD的执行
    Spark Stream的Job执行过程我在另一篇博客有详细介绍,具体细节请参考 http://www.cnblogs.com/zhouyf/p/5503682.html
在生成Job的过程中会调用DStreamGraph的generate方法:
其中,就调用了outputStream的generateJob方法,这里的outputStream就上面有output算子注册给DStreamGraph的输出流。就是我们实例中ForeachDStream 。
 
ForeachDStream 的generateJob方法源码:
 
可以看到它将我们的业务逻辑封装成jobFunc传递给了最终生成的Job对象。
由上篇博客《 Spark streaming技术内幕 : Job动态生成原理与源码解析 我们知道在StreamContext启动会动态创建job,并且最终调用Job的run方法
Job的run方法由JobScheduler的submitJobSet触发 : 
其中jobExecutor对象是一个线程池,JobHandler实现了 Runnable接口,在JobHandler 的run方法中会调用传入的job对象的run方法。在这里Job的run方法开始在线程中执行,JobHandler的run方法源码如下:
 

其中的job就是封装了我们业务逻辑的Job对象,它的run方法会触发我们在foreachRDD方法中对RDD的操作(一般是action操作),到这里RDD的Action操作被触发,spark作业开始执行。
总结:
    1、在一个固定时间维度上,DStream和RDD是一一对应关系,可以将DStream看成是RDD在时间维度上封装。
    2、Dstream 主要分为三大类: Input DStream,Transformed DStream,Output DStream,其中Output Dstream 对开发者是透明的,存在于Output 算子内部。
    3、Spark Streaming应用程序最终会转化成对RDD操作的spark 程序,spark 程序由于执行了foreachRDD算子中的RDD操作被触发。

这篇关于####好好好¥#####spark Streaming 技术内幕 : 从DSteam到RDD全过程解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967701

相关文章

mac安装redis全过程

《mac安装redis全过程》文章内容主要介绍了如何从官网下载指定版本的Redis,以及如何在自定义目录下安装和启动Redis,还提到了如何修改Redis的密码和配置文件,以及使用RedisInsig... 目录MAC安装Redis安装启动redis 配置redis 常用命令总结mac安装redis官网下

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

在C#中合并和解析相对路径方式

《在C#中合并和解析相对路径方式》Path类提供了几个用于操作文件路径的静态方法,其中包括Combine方法和GetFullPath方法,Combine方法将两个路径合并在一起,但不会解析包含相对元素... 目录C#合并和解析相对路径System.IO.Path类幸运的是总结C#合并和解析相对路径对于 C

Jenkins中自动化部署Spring Boot项目的全过程

《Jenkins中自动化部署SpringBoot项目的全过程》:本文主要介绍如何使用Jenkins从Git仓库拉取SpringBoot项目并进行自动化部署,通过配置Jenkins任务,实现项目的... 目录准备工作启动 Jenkins配置 Jenkins创建及配置任务源码管理构建触发器构建构建后操作构建任务

Java解析JSON的六种方案

《Java解析JSON的六种方案》这篇文章介绍了6种JSON解析方案,包括Jackson、Gson、FastJSON、JsonPath、、手动解析,分别阐述了它们的功能特点、代码示例、高级功能、优缺点... 目录前言1. 使用 Jackson:业界标配功能特点代码示例高级功能优缺点2. 使用 Gson:轻量

Java如何接收并解析HL7协议数据

《Java如何接收并解析HL7协议数据》文章主要介绍了HL7协议及其在医疗行业中的应用,详细描述了如何配置环境、接收和解析数据,以及与前端进行交互的实现方法,文章还分享了使用7Edit工具进行调试的经... 目录一、前言二、正文1、环境配置2、数据接收:HL7Monitor3、数据解析:HL7Busines

Kibana的安装和配置全过程

《Kibana的安装和配置全过程》Kibana是一个开源的数据分析和可视化平台,它与Elasticsearch紧密集成,提供了一个直观的Web界面,使您可以快速地搜索、分析和可视化数据,在本文中,我们... 目录Kibana的安装和配置1.安装Java运行环境2.下载Kibana3.解压缩Kibana4.配

python解析HTML并提取span标签中的文本

《python解析HTML并提取span标签中的文本》在网页开发和数据抓取过程中,我们经常需要从HTML页面中提取信息,尤其是span元素中的文本,span标签是一个行内元素,通常用于包装一小段文本或... 目录一、安装相关依赖二、html 页面结构三、使用 BeautifulSoup javascript

若依部署Nginx和Tomcat全过程

《若依部署Nginx和Tomcat全过程》文章总结了两种部署方法:Nginx部署和Tomcat部署,Nginx部署包括打包、将dist文件拉到指定目录、配置nginx.conf等步骤,Tomcat部署... 目录Nginx部署后端部署Tomcat部署出现问题:点击刷新404总结Nginx部署第一步:打包

网页解析 lxml 库--实战

lxml库使用流程 lxml 是 Python 的第三方解析库,完全使用 Python 语言编写,它对 XPath表达式提供了良好的支 持,因此能够了高效地解析 HTML/XML 文档。本节讲解如何通过 lxml 库解析 HTML 文档。 pip install lxml lxm| 库提供了一个 etree 模块,该模块专门用来解析 HTML/XML 文档,下面来介绍一下 lxml 库