####好好好¥#####spark Streaming 技术内幕 : 从DSteam到RDD全过程解析

本文主要是介绍####好好好¥#####spark Streaming 技术内幕 : 从DSteam到RDD全过程解析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

一、DStream和RDD的关系
    DSream 代表了一系列连续的RDD,DStream中每个RDD包含特定时间间隔的数据,如下图所示:
 
    
从上图可以看出, 一个DStream 对应了时间维度上的多个RDD。
DStream 作为Spark Stream的一个基本抽象,提供了高层的API来进行Spark Streaming 程序开发,先看一个简单的Spark Streaming的WordCount程序实例:
  1. object WordCount{
  2. def main(args:Array[String]):Unit={
  3. val sparkConf =newSparkConf().setMaster("local[4]").setAppName("WordCount")
  4. val ssc =newStreamingContext(sparkConf,Seconds(1))
  5. val lines = ssc.socketTextStream("localhost",9999)
  6. val words = lines.flatMap(_.split(" "))
  7. val wordCounts = words.map(=>(x,1)).reduceByKey(_+_)
  8. wordCounts.print()
  9. ssc.start()
  10. ssc.awaitTermination()
  11. }
  12. }
我们会发现对DStream的操作和RDD的操作惊人的相似, 通过对DStream的不断转换,形成依赖关系。所以的DStream操作最终会转换成底层的RDD的操作,上面的例子中
lines DStream转换成wods DSteam。 lines DStream的 flatMap操作会作用于其中每一个RDD去生成words DStream 中的RDD, 过程如下图所示:
 
下面从源码角度看一下 DStream和RDD的关系:
    DStream 中 有一个HashMap[Time,RDD[T]]类型的对象 generatedRDDs,其中Key为作业开始时间,RDD为该DStream对应的RDD,源码如下:
 
    
二、Dstream 的分类
    Dstream 主要分为三大类:
         1. Input DStream
         2.  Transformed DStream
         3. Output DStream
 
2.1 InputDStream 是DStream 最初诞生的地方,也是RDD最初诞生的地方,它是依据数据源创建的最初的DStream,如上面例子中的代码:
 
val lines = ssc . socketTextStream ( "localhost" , 9999 )
 
基于Socket数据源创建了 SocketInputDStream对象lines,下面从源码角度分析一下他是怎么生成RDD的,  SocketInputDStream生成RDD的方法在 它的父类ReceiverInputDSteam中:
 

 
ReceiverInputDSteam  的compute方法中调用了createBloackRDD方法基于Block信息创建了RDD :
 

可以看到  ReceiverInputDSteam 的 createBloackRDD 方法new了BlockRDD对象,BlockRDD 是继承自RDD。至此,最初的RDD创建完成。
 
2.2、  Transformed DStream 是由其他DStream 通过非Output算子装换而来的DStream
   例如例子中的lines通过flatMap算子转换生成了FlatMappedDStream:
     val words = lines.flatMap(_.split(" "))
   下面看一下flatMap的源码:
    
    
 
可以看到flatMap是DStream的方法,它创建了FlatMappeedDStream并返回,上面例子中words 就是 FlatMappeedDStream 对象,创建 FlatMappeedDStream对象时传入了 参数flatMapFunc,这里的flatMapFunc就是用户编写的业务逻辑,我们再进入FlatMappedDStream,查看其compute方法:
 

可以惊喜的看到 FlatMappedDStream的compute方法调用了parent的getOrCompute方法获取父DStream的RDD.通过对 父DStream的RDD的flatMap算子生成新的RDD,转换的业务逻辑通过flatMapFunc参数传递给flatMap算子。这样对DStream的操作都转换成了对RDD的操作,同时DSream的依赖关系也与RDD之间依赖关系同时建立了起来。
说明:这些RDD的创建是在Job动态生成时候发生的,Job生成最终会调用ForeachDStream的generateJob方法,源码如下
 

其中的parent.getOrCompute方法会依据DStream之间的依赖关系,导致一系列的链式调用,从而创建所有的RDD,并形成RDD之间的依赖关系。
 
3.3  Output DStream 是有其他DStream通过Output算子生成,它只存在于Output算子内部,并不会像Transformed Stream一样由算子返回, 他是触发Job执行的关键。
          那么什么是Output 算子呢? Output 算子是让DStream中的数据被推送的外部系统,像数据库,文件系统(HDFS,GFS等)的算子。因为Output 算子是将转换后的数据推送到外部系统被使用的操作,所以他触发了前面转换操作的真正执行(类似于RDD的action操作)。
          下面,我们看看有哪些Output算子:
 
Output Operation Meaning
print() Prints the first ten elements of every batch of data in a DStream on the driver node running the streaming application. This is useful for development and debugging. 
Python API This is called pprint() in the Python API.
saveAsTextFiles(prefix, [suffix]) Save this DStream's contents as text files. The file name at each batch interval is generated based onprefix and suffix"prefix-TIME_IN_MS[.suffix]".
saveAsObjectFiles(prefix, [suffix]) Save this DStream's contents as SequenceFiles of serialized Java objects. The file name at each batch interval is generated based on prefix and suffix"prefix-TIME_IN_MS[.suffix]"
Python API This is not available in the Python API.
saveAsHadoopFiles(prefix, [suffix]) Save this DStream's contents as Hadoop files. The file name at each batch interval is generated based on prefix and suffix"prefix-TIME_IN_MS[.suffix]"
Python API This is not available in the Python API.
foreachRDD(func) The most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database. Note that the function func is executed in the driver process running the streaming application, and will usually have RDD actions in it that will force the computation of the streaming RDDs.
下面,回到我们开头的例子:
wordCounts . print ()
其中pirnt算子就是Output算子,我们进入print的源码:
 

print()方法调用了print(10),其实是调用了另一个print方法:
print 方法中首先定义了一个函数foreachFunc,foreachFunc从rdd中出去num个元素打印出来。接下来print函数调用了foreachRDD,并将foreachFunc的处理逻辑作为参数传入。这里的foreachRDD也是一个Output算子(上面已经有说明),接下来看看 foreachRDD的源码。
 

 
可以看到foreachRDD中创建了一个ForeachDStream对象,这就是我们期待已久的Output DStream。这里需要注意一个关键点:
创建完ForeachRDD对象后,调用了该对象的register方法。register方法将当前对象注册给DStreamGraph。源码如下:
 

注册的过程就是将当前对象加入graph的输出流outputStream中:

这个过程很重要,在Job触发时候会用到outputStream。我们先在这里记住这个过程,下面的分析会用到这个内容。
至此,DStream到RDD过程已经解析完毕。
三 、由Dstream触发RDD的执行
    Spark Stream的Job执行过程我在另一篇博客有详细介绍,具体细节请参考 http://www.cnblogs.com/zhouyf/p/5503682.html
在生成Job的过程中会调用DStreamGraph的generate方法:
其中,就调用了outputStream的generateJob方法,这里的outputStream就上面有output算子注册给DStreamGraph的输出流。就是我们实例中ForeachDStream 。
 
ForeachDStream 的generateJob方法源码:
 
可以看到它将我们的业务逻辑封装成jobFunc传递给了最终生成的Job对象。
由上篇博客《 Spark streaming技术内幕 : Job动态生成原理与源码解析 我们知道在StreamContext启动会动态创建job,并且最终调用Job的run方法
Job的run方法由JobScheduler的submitJobSet触发 : 
其中jobExecutor对象是一个线程池,JobHandler实现了 Runnable接口,在JobHandler 的run方法中会调用传入的job对象的run方法。在这里Job的run方法开始在线程中执行,JobHandler的run方法源码如下:
 

其中的job就是封装了我们业务逻辑的Job对象,它的run方法会触发我们在foreachRDD方法中对RDD的操作(一般是action操作),到这里RDD的Action操作被触发,spark作业开始执行。
总结:
    1、在一个固定时间维度上,DStream和RDD是一一对应关系,可以将DStream看成是RDD在时间维度上封装。
    2、Dstream 主要分为三大类: Input DStream,Transformed DStream,Output DStream,其中Output Dstream 对开发者是透明的,存在于Output 算子内部。
    3、Spark Streaming应用程序最终会转化成对RDD操作的spark 程序,spark 程序由于执行了foreachRDD算子中的RDD操作被触发。

这篇关于####好好好¥#####spark Streaming 技术内幕 : 从DSteam到RDD全过程解析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/967701

相关文章

linux hostname设置全过程

《linuxhostname设置全过程》:本文主要介绍linuxhostname设置全过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录查询hostname设置步骤其它相关点hostid/etc/hostsEDChina编程A工具license破解注意事项总结以RHE

PostgreSQL的扩展dict_int应用案例解析

《PostgreSQL的扩展dict_int应用案例解析》dict_int扩展为PostgreSQL提供了专业的整数文本处理能力,特别适合需要精确处理数字内容的搜索场景,本文给大家介绍PostgreS... 目录PostgreSQL的扩展dict_int一、扩展概述二、核心功能三、安装与启用四、字典配置方法

深度解析Java DTO(最新推荐)

《深度解析JavaDTO(最新推荐)》DTO(DataTransferObject)是一种用于在不同层(如Controller层、Service层)之间传输数据的对象设计模式,其核心目的是封装数据,... 目录一、什么是DTO?DTO的核心特点:二、为什么需要DTO?(对比Entity)三、实际应用场景解析

深度解析Java项目中包和包之间的联系

《深度解析Java项目中包和包之间的联系》文章浏览阅读850次,点赞13次,收藏8次。本文详细介绍了Java分层架构中的几个关键包:DTO、Controller、Service和Mapper。_jav... 目录前言一、各大包1.DTO1.1、DTO的核心用途1.2. DTO与实体类(Entity)的区别1

Java中的雪花算法Snowflake解析与实践技巧

《Java中的雪花算法Snowflake解析与实践技巧》本文解析了雪花算法的原理、Java实现及生产实践,涵盖ID结构、位运算技巧、时钟回拨处理、WorkerId分配等关键点,并探讨了百度UidGen... 目录一、雪花算法核心原理1.1 算法起源1.2 ID结构详解1.3 核心特性二、Java实现解析2.

使用Python绘制3D堆叠条形图全解析

《使用Python绘制3D堆叠条形图全解析》在数据可视化的工具箱里,3D图表总能带来眼前一亮的效果,本文就来和大家聊聊如何使用Python实现绘制3D堆叠条形图,感兴趣的小伙伴可以了解下... 目录为什么选择 3D 堆叠条形图代码实现:从数据到 3D 世界的搭建核心代码逐行解析细节优化应用场景:3D 堆叠图

深度解析Python装饰器常见用法与进阶技巧

《深度解析Python装饰器常见用法与进阶技巧》Python装饰器(Decorator)是提升代码可读性与复用性的强大工具,本文将深入解析Python装饰器的原理,常见用法,进阶技巧与最佳实践,希望可... 目录装饰器的基本原理函数装饰器的常见用法带参数的装饰器类装饰器与方法装饰器装饰器的嵌套与组合进阶技巧

解析C++11 static_assert及与Boost库的关联从入门到精通

《解析C++11static_assert及与Boost库的关联从入门到精通》static_assert是C++中强大的编译时验证工具,它能够在编译阶段拦截不符合预期的类型或值,增强代码的健壮性,通... 目录一、背景知识:传统断言方法的局限性1.1 assert宏1.2 #error指令1.3 第三方解决

全面解析MySQL索引长度限制问题与解决方案

《全面解析MySQL索引长度限制问题与解决方案》MySQL对索引长度设限是为了保持高效的数据检索性能,这个限制不是MySQL的缺陷,而是数据库设计中的权衡结果,下面我们就来看看如何解决这一问题吧... 目录引言:为什么会有索引键长度问题?一、问题根源深度解析mysql索引长度限制原理实际场景示例二、五大解决

深度解析Spring Boot拦截器Interceptor与过滤器Filter的区别与实战指南

《深度解析SpringBoot拦截器Interceptor与过滤器Filter的区别与实战指南》本文深度解析SpringBoot中拦截器与过滤器的区别,涵盖执行顺序、依赖关系、异常处理等核心差异,并... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现