spark的shuffle hash join对Full Outer Join的支持

2024-03-28 14:20

本文主要是介绍spark的shuffle hash join对Full Outer Join的支持,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

变迁历史

Spark3.1.0版本之前,shuffle hash join(SHJ)支持除了full outer join之外的类型,此时对于一个full outer join的等值join的sql,会使用smj,smj缺点就是在表很大的时候会增加排序所需的cpu和io,而SHJ可以排序所需的CPU和IO,特别是表比较大的时候。

而从3.1.0版本开始, Shuffle-Hash Join (SHJ) 支持所有的 join 类型(SPARK-32399),同时支持相应的 codegen execution(SPARK-32421)。

3.1.0版本怎么支持Full Outer Join

具体处理方式

  1. 通过哈希关系查找处理流侧的行,并通过以下方式标记构建侧( build side)的匹配行:
    1. 对于具有唯一键的连接,使用 BitSet 记录构建侧的匹配行(key index表示每一行)。
    2. 对于具有非唯一键的连接,使用 HashSet[Long] 记录构建侧的匹配行(key index+value index表示每一行)。

key index定义为BytesToBytesMap中longArray的index key

(is defined as the index into key addressing array longArray in BytesToBytesMap.)

value index 定义为相同键的值的 iterator index

  1. 通过迭代哈希关系处理构建侧的行,并过滤已经查找过的构建侧行(在  ShuffledHashJoinExec.fullOuterJoin 中完成)。

可以看到full outer shuffled hash join将迭代 两次 build side(一次用于构建hash map,另一次用于输出不匹配行)和迭代一次 stream side  

而full outer sort merge join需要两边都迭代两次,并且对大表进行排序可能会更加消耗CPU和IO。因此,当 stream side比build side大得多时,full outer shuffled hash join比full outer sort merge join更有效。

相关源码

在org.apache.spark.sql.execution.joins.ShuffledHashJoinExec中:

 private def fullOuterJoin(streamIter: Iterator[InternalRow],hashedRelation: HashedRelation,numOutputRows: SQLMetric): Iterator[InternalRow] = {val joinKeys = streamSideKeyGenerator()val joinRow = new JoinedRowval (joinRowWithStream, joinRowWithBuild) = {buildSide match {case BuildLeft => (joinRow.withRight _, joinRow.withLeft _)case BuildRight => (joinRow.withLeft _, joinRow.withRight _)}}val buildNullRow = new GenericInternalRow(buildOutput.length)val streamNullRow = new GenericInternalRow(streamedOutput.length)lazy val streamNullJoinRowWithBuild = {buildSide match {case BuildLeft =>joinRow.withRight(streamNullRow)joinRow.withLeft _case BuildRight =>joinRow.withLeft(streamNullRow)joinRow.withRight _}}val iter = if (hashedRelation.keyIsUnique) {fullOuterJoinWithUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, streamNullRow)} else {fullOuterJoinWithNonUniqueKey(streamIter, hashedRelation, joinKeys, joinRowWithStream,joinRowWithBuild, streamNullJoinRowWithBuild, buildNullRow, streamNullRow)}val resultProj = UnsafeProjection.create(output, output)iter.map { r =>numOutputRows += 1resultProj(r)}}

BHJ和SMJ在full outer join上的性能对比

例如下面的查询,与full outer sort merge join相比,full outer shuffled hash join节省了 30% 的wall clock time 。注意,当构建端(build side)很大时,SHJ 可能会导致 OOM,因为构建 hashmap 是内存密集型的。

"wall clock time"是指实际时间,也称为挂钟时间或墙上时间。这是指从一个事件的开始到结束所经过的实际时间,包括所有时间延迟和等待时间。

Wall clock time包括了计算机系统中的各种时间消耗,比如CPU执行时间、I/O操作等等。它可以用来评估一个程序或操作的整体执行时间,包括了所有可能的延迟和等待时间,反映了程序在实际环境中的整体执行效率。

与之相对的,CPU时间只计算CPU实际执行代码的时间,不包括等待I/O操作、线程切换等操作所消耗的时间。 Wall clock time更能真实地反映一个任务的总体耗时,因为它包含了所有可能的时间开销,而不仅仅是CPU的执行时间。

def shuffleHashJoin(): Unit = {val N: Long = 4 << 22withSQLConf(SQLConf.SHUFFLE_PARTITIONS.key -> "2",SQLConf.AUTO_BROADCASTJOIN_THRESHOLD.key -> "20000000") {codegenBenchmark("shuffle hash join", N) {val df1 = spark.range(N).selectExpr(s"cast(id as string) as k1")val df2 = spark.range(N / 10).selectExpr(s"cast(id * 10 as string) as k2")val df = df1.join(df2, col("k1") === col("k2"), "full_outer")df.noop()}}
}
Running benchmark: shuffle hash joinRunning case: shuffle hash join offStopped after 2 iterations, 16602 msRunning case: shuffle hash join onStopped after 5 iterations, 31911 msJava HotSpot(TM) 64-Bit Server VM 1.8.0_181-b13 on Mac OS X 10.15.4
Intel(R) Core(TM) i9-9980HK CPU @ 2.40GHz
shuffle hash join:                        Best Time(ms)   Avg Time(ms)   Stdev(ms)    Rate(M/s)   Per Row(ns)   Relative
------------------------------------------------------------------------------------------------------------------------
shuffle hash join off                              7900           8301         567          2.1         470.9       1.0X
shuffle hash join on                               6250           6382          95          2.7         372.5       1.3X

3.1.0版本之前为什么不支持full outer join

3.1.0之前的版本中,之所以SHJ不支持full outer join,主要是因为在执行shuffle hash join时,Spark会将两个输入数据集按照指定的连接键进行分区,然后在每个分区节点上的数据单独执行单机hash join算法。

在执行join操作时,Spark会将相同连接键的数据进行合并。如果使用hash join来实现全连接,那么可能会导致一些连接键只出现在一个数据集中而没有出现在另一个数据集中,这样就无法准确地将两个数据集进行合并。

例如,本来是table_a={id:1,id:2,id:3,id:4},table_b={id:1,id:2,id:3,id:4},则table_a和table_b中id=1/id=4的数据发到节点1,id=2的数据发到节点2,id=3的数据发到节点3,那样在各自节点上进行table_a full outer join table_b on table_a.id=table_b.id就会分别得到:

节点1:{1,1},{4,4}  -- 缺少了{id=2, null}{id=3, null},因为根本该节点上根本不知道有id=2,3

节点2:{2,2}  -- 缺少了{id=1, null}{id=3, null}{id=4, null},因为根本该节点上根本不知道有id=1,3,4

节点3:{3,3} -- 缺少了{id=1, null}{id=2, null}{id=4, null},因为根本该节点上根本不知道有id=1,2,4

Broadcast hash join支持full outer join吗?

目前还不支持。

从源码可以看到,支持full outer join的有BNLJ,SHJ,SMJ

Cartesian product join适合没有指定连接条件,支持等值和不等值 Join,只支持内连接(因为笛卡尔积没有啥实际意义,只有使用on语句形成内连接才有意义),下面代码就写死了inner

跟上面SHJ的例子一样,如果BHJ支持full outer join,则也会遇到以下情况,小表只能跟各个节点上拥有的个别id集合进行join,因而不能支持full outer join

大表table_a={id:1,id:2,id:3,id:4},广播的小表table_b={id:1,id:4},则table_b中id=1/id=4的数据发到各个节点上,其中节点1中table_a的数据是{id:1,id:4},节点2中table_a的数据是{id:2},节点3中table_a的数据是{id:3},那样在各自节点上进行table_a full outer join table_b on table_a.id=table_b.id就会分别得到:

节点1:{1,1},{4,4}  -- 缺少了{id=2, null}{id=3, null},因为根本该节点上根本不知道有id=2,3

节点2:{1,null},{2,null},{4,null}  -- 缺少了{id=3, null},因为根本该节点上根本不知道有id=3

节点3:{1,null},{3,null},{4,null}  -- 缺少了{id=2, null},因为根本该节点上根本不知道有id=2

这篇关于spark的shuffle hash join对Full Outer Join的支持的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/855853

相关文章

SpringBoot应用中出现的Full GC问题的场景与解决

《SpringBoot应用中出现的FullGC问题的场景与解决》这篇文章主要为大家详细介绍了SpringBoot应用中出现的FullGC问题的场景与解决方法,文中的示例代码讲解详细,感兴趣的小伙伴可... 目录Full GC的原理与触发条件原理触发条件对Spring Boot应用的影响示例代码优化建议结论F

MySQL高级查询之JOIN、子查询、窗口函数实际案例

《MySQL高级查询之JOIN、子查询、窗口函数实际案例》:本文主要介绍MySQL高级查询之JOIN、子查询、窗口函数实际案例的相关资料,JOIN用于多表关联查询,子查询用于数据筛选和过滤,窗口函... 目录前言1. JOIN(连接查询)1.1 内连接(INNER JOIN)1.2 左连接(LEFT JOI

SpringKafka消息发布之KafkaTemplate与事务支持功能

《SpringKafka消息发布之KafkaTemplate与事务支持功能》通过本文介绍的基本用法、序列化选项、事务支持、错误处理和性能优化技术,开发者可以构建高效可靠的Kafka消息发布系统,事务支... 目录引言一、KafkaTemplate基础二、消息序列化三、事务支持机制四、错误处理与重试五、性能优

java String.join()的使用小结

《javaString.join()的使用小结》String.join()是Java8引入的一个实用方法,用于将多个字符串按照指定分隔符连接成一个字符串,本文主要介绍了javaString.join... 目录1. 方法定义2. 基本用法2.1 拼接多个字符串2.2 拼接集合中的字符串3. 使用场景和示例3

一文教你解决Python不支持中文路径的问题

《一文教你解决Python不支持中文路径的问题》Python是一种广泛使用的高级编程语言,然而在处理包含中文字符的文件路径时,Python有时会表现出一些不友好的行为,下面小编就来为大家介绍一下具体的... 目录问题背景解决方案1. 设置正确的文件编码2. 使用pathlib模块3. 转换路径为Unicod

定价129元!支持双频 Wi-Fi 5的华为AX1路由器发布

《定价129元!支持双频Wi-Fi5的华为AX1路由器发布》华为上周推出了其最新的入门级Wi-Fi5路由器——华为路由AX1,建议零售价129元,这款路由器配置如何?详细请看下文介... 华为 Wi-Fi 5 路由 AX1 已正式开售,新品支持双频 1200 兆、配有四个千兆网口、提供可视化智能诊断功能,建

MySQL报错sql_mode=only_full_group_by的问题解决

《MySQL报错sql_mode=only_full_group_by的问题解决》本文主要介绍了MySQL报错sql_mode=only_full_group_by的问题解决,文中通过示例代码介绍的非... 目录报错信息DataGrip 报错还原Navicat 报错还原报错原因解决方案查看当前 sql mo

数据库使用之union、union all、各种join的用法区别解析

《数据库使用之union、unionall、各种join的用法区别解析》:本文主要介绍SQL中的Union和UnionAll的区别,包括去重与否以及使用时的注意事项,还详细解释了Join关键字,... 目录一、Union 和Union All1、区别:2、注意点:3、具体举例二、Join关键字的区别&php

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

Redis的Hash类型及相关命令小结

《Redis的Hash类型及相关命令小结》edisHash是一种数据结构,用于存储字段和值的映射关系,本文就来介绍一下Redis的Hash类型及相关命令小结,具有一定的参考价值,感兴趣的可以了解一下... 目录HSETHGETHEXISTSHDELHKEYSHVALSHGETALLHMGETHLENHSET