mysql physical plan_Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

2024-03-21 06:59

本文主要是介绍mysql physical plan_Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

的版本号。 将右表的join keys放到HashSet里。然后遍历左表,查找左表的join key能否匹配。case class LeftSemiJoinHash(

leftKeys: Seq[Expression],

rightKeys: Seq[Expression],

left: SparkPlan,

right: SparkPlan) extends BinaryNode with HashJoin {

val buildSide = BuildRight //buildSide是以右表为基准

override def requiredChildDistribution =

ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

override def output = left.output

def execute() = {

buildPlan.execute().zipPartitions(streamedPlan.execute()) { (buildIter, streamIter) => //右表的物理计划运行后生成RDD,利用zipPartitions对Partition进行合并。然后用上述方法实现。

val hashSet = new java.util.HashSet[Row]()

var currentRow: Row = null

// Create a Hash set of buildKeys

while (buildIter.hasNext) {

currentRow = buildIter.next()

val rowKey = buildSideKeyGenerator(currentRow)

if(!rowKey.anyNull) {

val keyExists = hashSet.contains(rowKey)

if (!keyExists) {

hashSet.add(rowKey)

}

}

}

val joinKeys = streamSideKeyGenerator()

streamIter.filter(current => {

!joinKeys(current).anyNull && hashSet.contains(joinKeys.currentValue)

})

}

}

}

2.2、BroadcastHashJoin 名约: 广播HashJoin,呵呵。  是InnerHashJoin的实现。这里用到了concurrent并发里的future,异步的广播buildPlan的表运行后的的RDD。

假设接收到了广播后的表,那么就用streamedPlan来匹配这个广播的表。

实现是RDD的mapPartitions和HashJoin里的joinIterators最后生成join的结果。case class BroadcastHashJoin(

leftKeys: Seq[Expression],

rightKeys: Seq[Expression],

buildSide: BuildSide,

left: SparkPlan,

right: SparkPlan)(@transient sqlContext: SQLContext) extends BinaryNode with HashJoin {

override def otherCopyArgs = sqlContext :: Nil

override def outputPartitioning: Partitioning = left.outputPartitioning

override def requiredChildDistribution =

UnspecifiedDistribution :: UnspecifiedDistribution :: Nil

@transient

lazy val broadcastFuture = future { //利用SparkContext广播表

sqlContext.sparkContext.broadcast(buildPlan.executeCollect())

}

def execute() = {

val broadcastRelation = Await.result(broadcastFuture, 5.minute)

streamedPlan.execute().mapPartitions { streamedIter =>

joinIterators(broadcastRelation.value.iterator, streamedIter) //调用joinIterators对每一个分区map

}

}

}

2.3、ShuffleHashJoinShuffleHashJoin顾名思义就是须要shuffle数据,outputPartitioning是左孩子的的Partitioning。

会依据这个Partitioning进行shuffle。

然后利用SparkContext里的zipPartitions方法对每一个分区进行zip。

这里的requiredChildDistribution。的是ClusteredDistribution,这个会在HashPartitioning里面进行匹配。

关于这里面的分区这里不赘述,能够去org.apache.spark.sql.catalyst.plans.physical下的partitioning里面去查看。case class ShuffledHashJoin(

leftKeys: Seq[Expression],

rightKeys: Seq[Expression],

buildSide: BuildSide,

left: SparkPlan,

right: SparkPlan) extends BinaryNode with HashJoin {

override def outputPartitioning: Partitioning = left.outputPartitioning

override def requiredChildDistribution =

ClusteredDistribution(leftKeys) :: ClusteredDistribution(rightKeys) :: Nil

def execute() = {

buildPlan.execute().zipPartitions(streamedPlan.execute()) {

(buildIter, streamIter) => joinIterators(buildIter, streamIter)

}

}

}

未完待续 :)

原创文章,转载请注明:

转载自:OopsOutOfMemory盛利的Blog。作者: OopsOutOfMemory

本文链接地址:http://blog.csdn.net/oopsoom/article/details/38274621

注:本文基于署名-非商业性使用-禁止演绎 2.5 中国大陆(CC BY-NC-ND 2.5 CN)协议。欢迎转载、转发和评论。可是请保留本文作者署名和文章链接。如若须要用于商业目的或者与授权方面的协商,请联系我。

logo_code.gif

Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现

标签:ide   set   select   sof   sample   封装   element   rod   并发

本条技术文章来源于互联网,如果无意侵犯您的权益请点击此处反馈版权投诉

本文系统来源:http://www.cnblogs.com/cxchanpin/p/6869232.html

这篇关于mysql physical plan_Spark SQL 源代码分析之Physical Plan 到 RDD的详细实现的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/832039

相关文章

SQL中的外键约束

外键约束用于表示两张表中的指标连接关系。外键约束的作用主要有以下三点: 1.确保子表中的某个字段(外键)只能引用父表中的有效记录2.主表中的列被删除时,子表中的关联列也会被删除3.主表中的列更新时,子表中的关联元素也会被更新 子表中的元素指向主表 以下是一个外键约束的实例展示

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

如何去写一手好SQL

MySQL性能 最大数据量 抛开数据量和并发数,谈性能都是耍流氓。MySQL没有限制单表最大记录数,它取决于操作系统对文件大小的限制。 《阿里巴巴Java开发手册》提出单表行数超过500万行或者单表容量超过2GB,才推荐分库分表。性能由综合因素决定,抛开业务复杂度,影响程度依次是硬件配置、MySQL配置、数据表设计、索引优化。500万这个值仅供参考,并非铁律。 博主曾经操作过超过4亿行数据

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

MySQL数据库宕机,启动不起来,教你一招搞定!

作者介绍:老苏,10余年DBA工作运维经验,擅长Oracle、MySQL、PG、Mongodb数据库运维(如安装迁移,性能优化、故障应急处理等)公众号:老苏畅谈运维欢迎关注本人公众号,更多精彩与您分享。 MySQL数据库宕机,数据页损坏问题,启动不起来,该如何排查和解决,本文将为你说明具体的排查过程。 查看MySQL error日志 查看 MySQL error日志,排查哪个表(表空间

【C++】_list常用方法解析及模拟实现

相信自己的力量,只要对自己始终保持信心,尽自己最大努力去完成任何事,就算事情最终结果是失败了,努力了也不留遗憾。💓💓💓 目录   ✨说在前面 🍋知识点一:什么是list? •🌰1.list的定义 •🌰2.list的基本特性 •🌰3.常用接口介绍 🍋知识点二:list常用接口 •🌰1.默认成员函数 🔥构造函数(⭐) 🔥析构函数 •🌰2.list对象

【Prometheus】PromQL向量匹配实现不同标签的向量数据进行运算

✨✨ 欢迎大家来到景天科技苑✨✨ 🎈🎈 养成好习惯,先赞后看哦~🎈🎈 🏆 作者简介:景天科技苑 🏆《头衔》:大厂架构师,华为云开发者社区专家博主,阿里云开发者社区专家博主,CSDN全栈领域优质创作者,掘金优秀博主,51CTO博客专家等。 🏆《博客》:Python全栈,前后端开发,小程序开发,人工智能,js逆向,App逆向,网络系统安全,数据分析,Django,fastapi

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

Android实现任意版本设置默认的锁屏壁纸和桌面壁纸(两张壁纸可不一致)

客户有些需求需要设置默认壁纸和锁屏壁纸  在默认情况下 这两个壁纸是相同的  如果需要默认的锁屏壁纸和桌面壁纸不一样 需要额外修改 Android13实现 替换默认桌面壁纸: 将图片文件替换frameworks/base/core/res/res/drawable-nodpi/default_wallpaper.*  (注意不能是bmp格式) 替换默认锁屏壁纸: 将图片资源放入vendo