spark的shuffle hash join对Full Outer Join的支持

2024-03-28 14:20

本文主要是介绍spark的shuffle hash join对Full Outer Join的支持,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

变迁历史

Spark3.1.0版本之前,shuffle hash join(SHJ)支持除了full outer join之外的类型,此时对于一个full outer join的等值join的sql,会使用smj,smj缺点就是在表很大的时候会增加排序所需的cpu和io,而SHJ可以排序所需的CPU和IO,特别是表比较大的时候。

而从3.1.0版本开始, Shuffle-Hash Join (SHJ) 支持所有的 join 类型(SPARK-32399),同时支持相应的 codegen execution(SPARK-32421)。

3.1.0版本怎么支持Full Outer Join

具体处理方式

  1. 通过哈希关系查找处理流侧的行,并通过以下方式标记构建侧( build side)的匹配行:
    1. 对于具有唯一键的连接,使用 BitSet 记录构建侧的匹配行(key index表示每一行)。
    2. 对于具有非唯一键的连接,使用 HashSet[Long] 记录构建侧的匹配行(key index+value index表示每一行)。

key index定义为BytesToBytesMap中longArray的index key

(is defined as the index into key addressing array longArray in BytesToBytesMap.)

value index 定义为相同键的值的 iterator index

  1. 通过迭代哈希关系处理构建侧的行,并过滤已经查找过的构建侧行(在  ShuffledHashJoinExec.fullOuterJoin 中完成)。

可以看到full outer shuffled hash join将迭代 两次 build side(一次用于构建hash map,另一次用于输出不匹配行)和迭代一次 stream side  

而full outer sort merge join需要两边都迭代两次,并且对大表进行排序可能会更加消耗CPU和IO。因此,当 stream side比build side大得多时,full outer shuffled hash join比full outer sort merge join更有效。

相关源码

在org.apache.spark.sql.execution.joins.ShuffledHashJoinExec中:

 private def fullOuterJoin(streamIter: Iterator[InternalRow],hashedRelation: HashedRelation,numOutputRows: SQLMetric): Iterator[InternalRow] = {val joinKeys = streamSideKeyGenerator()val joinRow = new JoinedRowval (joinRowWithStream, joinRowWithBuild) = {buildSide match {case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)case BuildRight => (joinRow.withLeft _, joinRow.withRight _)}}val buildNullRow = new GenericInternalRow(buildOutput.length)val streamNullRow = new GenericInternalRow(streamedOutput.length)lazy val streamNullJoinRowWithBuild = {buildSide match {case BuildLeft =>joinRow.withRight(streamNullRow)joinRow.withLeft _case BuildRight =>joinRow.withLeft(streamNullRow)joinRow.withRight _}}val iter = if (hashedRelation.keyIsUnique) {fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, streamNullRow)} else {fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, streamNullRow)}val resultProj = UnsafeProjection.create(output, output)iter.map { r =>numOutputRows += 1resultProj(r)}}

BHJ和SMJ在full outer join上的性能对比

例如下面的查询,与full outer sort merge join相比,full outer shuffled hash join节省了 30% 的wall clock time 。注意,当构建端(build side)很大时,SHJ 可能会导致 OOM,因为构建 hashmap 是内存密集型的。

"wall clock time"是指实际时间,也称为挂钟时间或墙上时间。这是指从一个事件的开始到结束所经过的实际时间,包括所有时间延迟和等待时间。

Wall clock time包括了计算机系统中的各种时间消耗,比如CPU执行时间、I/O操作等等。它可以用来评估一个程序或操作的整体执行时间,包括了所有可能的延迟和等待时间,反映了程序在实际环境中的整体执行效率。

与之相对的,CPU时间只计算CPU实际执行代码的时间,不包括等待I/O操作、线程切换等操作所消耗的时间。 Wall clock time更能真实地反映一个任务的总体耗时,因为它包含了所有可能的时间开销,而不仅仅是CPU的执行时间。

def shuffleHashJoin(): Unit = {val N: Long = 4 << 22withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000") {codegenBenchmark("shuffle hash join", N) {val df1 = spark.range(N).selectExpr(s"cast(id as string) as k1")val df2 = spark.range(N / 10).selectExpr(s"cast(id * 10 as string) as k2")val df = df1.join(df2, col("k1") === col("k2"), "full_outer")df.noop()}}
}
Running benchmark: shuffle hash joinRunning case: shuffle hash join offStopped after 2 iterations, 16602 msRunning case: shuffle hash join onStopped after 5 iterations, 31911 msJava HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join off                              7900           8301         567          2.1         470.9       1.0X
shuffle hash join on                               6250           6382          95          2.7         372.5       1.3X

3.1.0版本之前为什么不支持full outer join

3.1.0之前的版本中,之所以SHJ不支持full outer join,主要是因为在执行shuffle hash join时,Spark会将两个输入数据集按照指定的连接键进行分区,然后在每个分区节点上的数据单独执行单机hash join算法。

在执行join操作时,Spark会将相同连接键的数据进行合并。如果使用hash join来实现全连接,那么可能会导致一些连接键只出现在一个数据集中而没有出现在另一个数据集中,这样就无法准确地将两个数据集进行合并。

例如,本来是table_a={id:1,id:2,id:3,id:4},table_b={id:1,id:2,id:3,id:4},则table_a和table_b中id=1/id=4的数据发到节点1,id=2的数据发到节点2,id=3的数据发到节点3,那样在各自节点上进行table_a full outer join table_b on table_a.id=table_b.id就会分别得到:

节点1:{1,1},{4,4}  -- 缺少了{id=2, null}{id=3, null},因为根本该节点上根本不知道有id=2,3

节点2:{2,2}  -- 缺少了{id=1, null}{id=3, null}{id=4, null},因为根本该节点上根本不知道有id=1,3,4

节点3:{3,3} -- 缺少了{id=1, null}{id=2, null}{id=4, null},因为根本该节点上根本不知道有id=1,2,4

Broadcast hash join支持full outer join吗?

目前还不支持。

从源码可以看到,支持full outer join的有BNLJ,SHJ,SMJ

Cartesian product join适合没有指定连接条件,支持等值和不等值 Join,只支持内连接(因为笛卡尔积没有啥实际意义,只有使用on语句形成内连接才有意义),下面代码就写死了inner

跟上面SHJ的例子一样,如果BHJ支持full outer join,则也会遇到以下情况,小表只能跟各个节点上拥有的个别id集合进行join,因而不能支持full outer join

大表table_a={id:1,id:2,id:3,id:4},广播的小表table_b={id:1,id:4},则table_b中id=1/id=4的数据发到各个节点上,其中节点1中table_a的数据是{id:1,id:4},节点2中table_a的数据是{id:2},节点3中table_a的数据是{id:3},那样在各自节点上进行table_a full outer join table_b on table_a.id=table_b.id就会分别得到:

节点1:{1,1},{4,4}  -- 缺少了{id=2, null}{id=3, null},因为根本该节点上根本不知道有id=2,3

节点2:{1,null},{2,null},{4,null}  -- 缺少了{id=3, null},因为根本该节点上根本不知道有id=3

节点3:{1,null},{3,null},{4,null}  -- 缺少了{id=2, null},因为根本该节点上根本不知道有id=2

这篇关于spark的shuffle hash join对Full Outer Join的支持的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/855853

相关文章

定价129元!支持双频 Wi-Fi 5的华为AX1路由器发布

《定价129元!支持双频Wi-Fi5的华为AX1路由器发布》华为上周推出了其最新的入门级Wi-Fi5路由器——华为路由AX1,建议零售价129元,这款路由器配置如何?详细请看下文介... 华为 Wi-Fi 5 路由 AX1 已正式开售,新品支持双频 1200 兆、配有四个千兆网口、提供可视化智能诊断功能,建

MySQL报错sql_mode=only_full_group_by的问题解决

《MySQL报错sql_mode=only_full_group_by的问题解决》本文主要介绍了MySQL报错sql_mode=only_full_group_by的问题解决,文中通过示例代码介绍的非... 目录报错信息DataGrip 报错还原Navicat 报错还原报错原因解决方案查看当前 sql mo

数据库使用之union、union all、各种join的用法区别解析

《数据库使用之union、unionall、各种join的用法区别解析》:本文主要介绍SQL中的Union和UnionAll的区别,包括去重与否以及使用时的注意事项,还详细解释了Join关键字,... 目录一、Union 和Union All1、区别:2、注意点:3、具体举例二、Join关键字的区别&php

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Redis的Hash类型及相关命令小结

《Redis的Hash类型及相关命令小结》edisHash是一种数据结构,用于存储字段和值的映射关系,本文就来介绍一下Redis的Hash类型及相关命令小结,具有一定的参考价值,感兴趣的可以了解一下... 目录HSETHGETHEXISTSHDELHKEYSHVALSHGETALLHMGETHLENHSET

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

hdu1043(八数码问题,广搜 + hash(实现状态压缩) )

利用康拓展开将一个排列映射成一个自然数,然后就变成了普通的广搜题。 #include<iostream>#include<algorithm>#include<string>#include<stack>#include<queue>#include<map>#include<stdio.h>#include<stdlib.h>#include<ctype.h>#inclu

康拓展开(hash算法中会用到)

康拓展开是一个全排列到一个自然数的双射(也就是某个全排列与某个自然数一一对应) 公式: X=a[n]*(n-1)!+a[n-1]*(n-2)!+...+a[i]*(i-1)!+...+a[1]*0! 其中,a[i]为整数,并且0<=a[i]<i,1<=i<=n。(a[i]在不同应用中的含义不同); 典型应用: 计算当前排列在所有由小到大全排列中的顺序,也就是说求当前排列是第

hdu1496(用hash思想统计数目)

作为一个刚学hash的孩子,感觉这道题目很不错,灵活的运用的数组的下标。 解题步骤:如果用常规方法解,那么时间复杂度为O(n^4),肯定会超时,然后参考了网上的解题方法,将等式分成两个部分,a*x1^2+b*x2^2和c*x3^2+d*x4^2, 各自作为数组的下标,如果两部分相加为0,则满足等式; 代码如下: #include<iostream>#include<algorithm

usaco 1.2 Milking Cows(类hash表)

第一种思路被卡了时间 到第二种思路的时候就觉得第一种思路太坑爹了 代码又长又臭还超时!! 第一种思路:我不知道为什么最后一组数据会被卡 超时超了0.2s左右 大概想法是 快排加一个遍历 先将开始时间按升序排好 然后开始遍历比较 1 若 下一个开始beg[i] 小于 tem_end 则说明本组数据与上组数据是在连续的一个区间 取max( ed[i],tem_end ) 2 反之 这个