RDD的join和Dstream的join有什么区别?

2023-10-09 03:32
文章标签 区别 join rdd dstream

本文主要是介绍RDD的join和Dstream的join有什么区别?,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

有人在知识星球里问:

浪院长,RDD的join和Dstream的join有什么区别?

浪尖的回答:

DStream的join底层就是rdd的join。

下面,我们就带着疑问去验证以下,我们的想法。

2. DStream -> PairDStreamFunctions

Dstream这个类实际上支持的只是Spark Streaming的基础操作算子,比如: mapfilter 和window.PairDStreamFunctions 这个支持key-valued类型的流数据

,支持的操作算子,如,groupByKeyAndWindow,join。这些操作,在有key-value类型的流上是自动识别的。

对于dstream -> PairDStreamFunctions自动转换的过程大家肯定想到的是scala的隐式转换。具体代码在Dstream的object内部。

implicit def toPairDStreamFunctions[K, V](stream: DStream[(K, V)])
     (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null):
   PairDStreamFunctions[K, V] = {
   new PairDStreamFunctions[K, V](stream)
 }

假如,你对scala的隐式转换比较懵逼,请阅读下面文章。

Scala语法基础之隐式转换

3. PairDStreamFunctions的join

PairDStreamFunctions的join API总共有三种

/**
  * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
  * Hash partitioning is used to generate the RDDs with Spark's default number of partitions.
   *
   *  通过join this和other Dstream的rdd构建出一个新的DStream.
   *  Hash分区器,用来使用默认的分区数来产生RDDs。
  */
 def join[W: ClassTag](other: DStream[(K, W)]): DStream[(K, (V, W))] = ssc.withScope {
   join[W](other, defaultPartitioner())
 }

 /**
  * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
  * Hash partitioning is used to generate the RDDs with `numPartitions` partitions.
   *
   *  通过join this和other Dstream的rdd构建出一个新的DStream.
   *  Hash分区器,用来使用numPartitions分区数来产生RDDs。
  */
 def join[W: ClassTag](
     other: DStream[(K, W)],
     numPartitions: Int): DStream[(K, (V, W))] = ssc.withScope {
   join[W](other, defaultPartitioner(numPartitions))
 }

 /**
  * Return a new DStream by applying 'join' between RDDs of `this` DStream and `other` DStream.
  * The supplied org.apache.spark.Partitioner is used to control the partitioning of each RDD.
   * 通过join this和other Dstream的rdd构建出一个新的DStream.
   * 使用org.apache.spark.Partitioner来控制每个RDD的分区。
  */
 def join[W: ClassTag](
     other: DStream[(K, W)],
     partitioner: Partitioner
   ): DStream[(K, (V, W))] = ssc.withScope {
   self.transformWith(
     other,
     (rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)
   )
 }

上面所示代码中,第三个PairDStreamFunctions的join api 体现了join的操作,也即是函数:

(rdd1: RDD[(K, V)], rdd2: RDD[(K, W)]) => rdd1.join(rdd2, partitioner)

上面是两个RDD的join过程,并且指定了分区器。后面我们主要是关注该函数封装及调用。

其实,看过浪尖的Spark Streaming的视频的朋友或者度过浪尖关于Spark Streaming相关源码讲解的朋友应该有所了解的是。 这个生成RDD的函数应该是在 DStream的compute方法中在生成RDD的时候调用。假设你不了解也不要紧。 我们跟着代码轨迹前进,验证我们的想法。

DStream.transformWith

/**
  * Return a new DStream in which each RDD is generated by applying a function
  * on each RDD of 'this' DStream and 'other' DStream.
  */

 def transformWith[U: ClassTag, V: ClassTag](
     other: DStream[U], transformFunc: (RDD[T], RDD[U]) => RDD[V]
   ): DStream[V] = ssc.withScope {
   // because the DStream is reachable from the outer object here, and because
   // DStreams can't be serialized with closures, we can't proactively check
   // it for serializability and so we pass the optional false to SparkContext.clean
   val cleanedF = ssc.sparkContext.clean(transformFunc, false)
   transformWith(other, (rdd1: RDD[T], rdd2: RDD[U], time: Time) => cleanedF(rdd1, rdd2))
 }
 
   /**
    * Return a new DStream in which each RDD is generated by applying a function
    * on each RDD of 'this' DStream and 'other' DStream.
    */

   def transformWith[U: ClassTag, V: ClassTag](
       other: DStream[U], transformFunc: (RDD[T], RDD[U], Time) => RDD[V]
     ): DStream[V] = ssc.withScope {
     // because the DStream is reachable from the outer object here, and because
     // DStreams can't be serialized with closures, we can't proactively check
     // it for serializability and so we pass the optional false to SparkContext.clean
     val cleanedF = ssc.sparkContext.clean(transformFunc, false)
     val realTransformFunc = (rdds: Seq[RDD[_]], time: Time) => {
       assert(rdds.length == 2)
       val rdd1 = rdds(0).asInstanceOf[RDD[T]]
       val rdd2 = rdds(1).asInstanceOf[RDD[U]]
       cleanedF(rdd1, rdd2, time)
     }
     new TransformedDStream[V](Seq(this, other), realTransformFunc)
   }

经过上面两个 TransformWith操作,最终生成了一个TransformedDStream。需要关注的是new TransformedDStream[V](Seq(this, other), realTransformFunc) 第一个参数是一个包含要进行join操作的两个流的Seq。

那么,TransformedDStream 的parents 就包含了两个流。我们可以看到其 compute 方法的第一行。

override def compute(validTime: Time): Option[RDD[U]] = {
//    针对每一个流,获取其当前时间的RDD。
   val parentRDDs = parents.map { parent => parent.getOrCompute(validTime).getOrElse(
     // Guard out against parent DStream that return None instead of Some(rdd) to avoid NPE
     throw new SparkException(s"Couldn't generate RDD from parent at time $validTime"))
   }

compute的第一行就是获取parent中每个流,当前有效时间的RDD。然后调用,前面步骤封装的函数进行join。

val transformedRDD = transformFunc(parentRDDs, validTime)

以上就是join的全部过程。也是,验证浪尖所说的,DStream的join底层就是RDD的join。

640?wx_fmt=jpeg

这篇关于RDD的join和Dstream的join有什么区别?的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/170049

相关文章

MyBatis中$与#的区别解析

《MyBatis中$与#的区别解析》文章浏览阅读314次,点赞4次,收藏6次。MyBatis使用#{}作为参数占位符时,会创建预处理语句(PreparedStatement),并将参数值作为预处理语句... 目录一、介绍二、sql注入风险实例一、介绍#(井号):MyBATis使用#{}作为参数占位符时,会

Android kotlin中 Channel 和 Flow 的区别和选择使用场景分析

《Androidkotlin中Channel和Flow的区别和选择使用场景分析》Kotlin协程中,Flow是冷数据流,按需触发,适合响应式数据处理;Channel是热数据流,持续发送,支持... 目录一、基本概念界定FlowChannel二、核心特性对比数据生产触发条件生产与消费的关系背压处理机制生命周期

Javaee多线程之进程和线程之间的区别和联系(最新整理)

《Javaee多线程之进程和线程之间的区别和联系(最新整理)》进程是资源分配单位,线程是调度执行单位,共享资源更高效,创建线程五种方式:继承Thread、Runnable接口、匿名类、lambda,r... 目录进程和线程进程线程进程和线程的区别创建线程的五种写法继承Thread,重写run实现Runnab

C++中NULL与nullptr的区别小结

《C++中NULL与nullptr的区别小结》本文介绍了C++编程中NULL与nullptr的区别,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编... 目录C++98空值——NULLC++11空值——nullptr区别对比示例 C++98空值——NUL

Conda与Python venv虚拟环境的区别与使用方法详解

《Conda与Pythonvenv虚拟环境的区别与使用方法详解》随着Python社区的成长,虚拟环境的概念和技术也在不断发展,:本文主要介绍Conda与Pythonvenv虚拟环境的区别与使用... 目录前言一、Conda 与 python venv 的核心区别1. Conda 的特点2. Python v

Go语言中make和new的区别及说明

《Go语言中make和new的区别及说明》:本文主要介绍Go语言中make和new的区别及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录1 概述2 new 函数2.1 功能2.2 语法2.3 初始化案例3 make 函数3.1 功能3.2 语法3.3 初始化

深度解析Spring Boot拦截器Interceptor与过滤器Filter的区别与实战指南

《深度解析SpringBoot拦截器Interceptor与过滤器Filter的区别与实战指南》本文深度解析SpringBoot中拦截器与过滤器的区别,涵盖执行顺序、依赖关系、异常处理等核心差异,并... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现

Before和BeforeClass的区别及说明

《Before和BeforeClass的区别及说明》:本文主要介绍Before和BeforeClass的区别及说明,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Before和BeforeClass的区别一个简单的例子当运行这个测试类时总结Before和Befor

SQL中JOIN操作的条件使用总结与实践

《SQL中JOIN操作的条件使用总结与实践》在SQL查询中,JOIN操作是多表关联的核心工具,本文将从原理,场景和最佳实践三个方面总结JOIN条件的使用规则,希望可以帮助开发者精准控制查询逻辑... 目录一、ON与WHERE的本质区别二、场景化条件使用规则三、最佳实践建议1.优先使用ON条件2.WHERE用

Mybatis Plus Join使用方法示例详解

《MybatisPlusJoin使用方法示例详解》:本文主要介绍MybatisPlusJoin使用方法示例详解,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,... 目录1、pom文件2、yaml配置文件3、分页插件4、示例代码:5、测试代码6、和PageHelper结合6