mysql physical plan_Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

2024-03-21 06:59

本文主要是介绍mysql physical plan_Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

的版本号。 将右表的join keys放到HashSet里。然后遍历左表,查找左表的join key能否匹配。case class LeftSemiJoinHash(

leftKeys: Seq[Expression],

rightKeys: Seq[Expression],

left: SparkPlan,

right: SparkPlan) extends BinaryNode with HashJoin {

val buildSide = BuildRight //buildSide是以右表为基准

override def requiredChildDistribution =

ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

override def output = left.output

def execute() = {

buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //右表的物理计划运行后生成RDD,利用zipPartitions对Partition进行合并。然后用上述方法实现。

val hashSet = new java.util.HashSet[Row]()

var currentRow: Row = null

// Create a Hash set of buildKeys

while (buildIter.hasNext) {

currentRow = buildIter.next()

val rowKey = buildSideKeyGenerator(currentRow)

if(!rowKey.anyNull) {

val keyExists = hashSet.contains(rowKey)

if (!keyExists) {

hashSet.add(rowKey)

}

}

}

val joinKeys = streamSideKeyGenerator()

streamIter.filter(current => {

!joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)

})

}

}

}

2.2、BroadcastHashJoin 名约: 广播HashJoin,呵呵。  是InnerHashJoin的实现。这里用到了concurrent并发里的future,异步的广播buildPlan的表运行后的的RDD。

假设接收到了广播后的表,那么就用streamedPlan来匹配这个广播的表。

实现是RDD的mapPartitions和HashJoin里的joinIterators最后生成join的结果。case class BroadcastHashJoin(

leftKeys: Seq[Expression],

rightKeys: Seq[Expression],

buildSide: BuildSide,

left: SparkPlan,

right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin {

override def otherCopyArgs = sqlContext :: Nil

override def outputPartitioning: Partitioning = left.outputPartitioning

override def requiredChildDistribution =

UnspecifiedDistribution :: UnspecifiedDistribution :: Nil

@transient

lazy val broadcastFuture = future { //利用SparkContext广播表

sqlContext.sparkContext.broadcast(buildPlan.executeCollect())

}

def execute() = {

val broadcastRelation = Await.result(broadcastFuture, 5.minute)

streamedPlan.execute().mapPartitions { streamedIter =>

joinIterators(broadcastRelation.value.iterator, streamedIter) //调用joinIterators对每一个分区map

}

}

}

2.3、ShuffleHashJoinShuffleHashJoin顾名思义就是须要shuffle数据,outputPartitioning是左孩子的的Partitioning。

会依据这个Partitioning进行shuffle。

然后利用SparkContext里的zipPartitions方法对每一个分区进行zip。

这里的requiredChildDistribution。的是ClusteredDistribution,这个会在HashPartitioning里面进行匹配。

关于这里面的分区这里不赘述,能够去org.apache.spark.sql.catalyst.plans.physical下的partitioning里面去查看。case class ShuffledHashJoin(

leftKeys: Seq[Expression],

rightKeys: Seq[Expression],

buildSide: BuildSide,

left: SparkPlan,

right: SparkPlan) extends BinaryNode with HashJoin {

override def outputPartitioning: Partitioning = left.outputPartitioning

override def requiredChildDistribution =

ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

def execute() = {

buildPlan.execute().zipPartitions(streamedPlan.execute()) {

(buildIter, streamIter) => joinIterators(buildIter, streamIter)

}

}

}

未完待续 :)

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog。作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/38274621

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议。欢迎转载、转发和评论。可是请保留本文作者署名和文章链接。如若须要用于商业目的或者与授权方面的协商,请联系我。

logo_code.gif

Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

标签:ide   set   select   sof   sample   封装   element   rod   并发

本条技术文章来源于互联网,如果无意侵犯您的权益请点击此处反馈版权投诉

本文系统来源:http://www.cnblogs.com/cxchanpin/p/6869232.html

这篇关于mysql physical plan_Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/832039

相关文章

C++使用栈实现括号匹配的代码详解

《C++使用栈实现括号匹配的代码详解》在编程中,括号匹配是一个常见问题,尤其是在处理数学表达式、编译器解析等任务时,栈是一种非常适合处理此类问题的数据结构,能够精确地管理括号的匹配问题,本文将通过C+... 目录引言问题描述代码讲解代码解析栈的状态表示测试总结引言在编程中,括号匹配是一个常见问题,尤其是在

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

使用C++实现链表元素的反转

《使用C++实现链表元素的反转》反转链表是链表操作中一个经典的问题,也是面试中常见的考题,本文将从思路到实现一步步地讲解如何实现链表的反转,帮助初学者理解这一操作,我们将使用C++代码演示具体实现,同... 目录问题定义思路分析代码实现带头节点的链表代码讲解其他实现方式时间和空间复杂度分析总结问题定义给定

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Spring AI集成DeepSeek的详细步骤

《SpringAI集成DeepSeek的详细步骤》DeepSeek作为一款卓越的国产AI模型,越来越多的公司考虑在自己的应用中集成,对于Java应用来说,我们可以借助SpringAI集成DeepSe... 目录DeepSeek 介绍Spring AI 是什么?1、环境准备2、构建项目2.1、pom依赖2.2

python使用fastapi实现多语言国际化的操作指南

《python使用fastapi实现多语言国际化的操作指南》本文介绍了使用Python和FastAPI实现多语言国际化的操作指南,包括多语言架构技术栈、翻译管理、前端本地化、语言切换机制以及常见陷阱和... 目录多语言国际化实现指南项目多语言架构技术栈目录结构翻译工作流1. 翻译数据存储2. 翻译生成脚本

Springboot中分析SQL性能的两种方式详解

《Springboot中分析SQL性能的两种方式详解》文章介绍了SQL性能分析的两种方式:MyBatis-Plus性能分析插件和p6spy框架,MyBatis-Plus插件配置简单,适用于开发和测试环... 目录SQL性能分析的两种方式:功能介绍实现方式:实现步骤:SQL性能分析的两种方式:功能介绍记录

如何通过Python实现一个消息队列

《如何通过Python实现一个消息队列》这篇文章主要为大家详细介绍了如何通过Python实现一个简单的消息队列,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录如何通过 python 实现消息队列如何把 http 请求放在队列中执行1. 使用 queue.Queue 和 reque