【SparkSQL】聊一聊 Join

2024-05-12 23:38
文章标签 join sparksql 聊一聊

本文主要是介绍【SparkSQL】聊一聊 Join,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

1. Join 背景介绍

Join 是数据库查询永远绕不开的话题,传统查询 SQL 技术总体可以分为简单操作(过滤操作 WHERE、排序操作 LIMIT 等),聚合操作 GROUPBY 等以及 JOIN 操作等。其中 Join 操作是其中最复杂、代价最大的操作类型,也是 OLAP 场景中使用相对较多的操作。因此很有必要聊聊这个话题。

另外,从业务层面来讲,用户在数仓建设的时候也会涉及 Join 使用的问题。通常情况下,数据仓库中的表一般会分为“低层次表”和“高层次表”。

所谓“低层次表”,就是数据源导入数仓之后直接生成的表,单表列值较少,一般可以明显归为维度表或者事实表,表和表之间大多存在外健依赖,所以查询起来会遇到大量 Join 运算,查询效率相对比较差。而“高层次表”是在”低层次表”的基础上加工转换而来,通常做法是使用 SQL 语句将需要 Join 的表预先进行合并形成“宽表”,在宽表上的查询因为不需要执行大量 Join 因而效率相对较高,很明显,宽表缺点是数据会有大量冗余,而且生成相对比较滞后,查询结果可能并不及时。

因此,为了获得实效性更高的查询结果,大多数场景还是需要进行复杂的 Join 操作。Join 操作之所以复杂,不仅仅因为通常情况下其时间空间复杂度高,更重要的是它有很多算法,在不同场景下需要选择特定算法才能获得最好的优化效果。关系型数据库也有关于 Join 的各种用法,姜承尧大神之前由浅入深地介绍过 MySQL Join 的各种算法以及调优方案。本文接下来会介绍 SparkSQL 所支持的几种常见的 Join 算法以及其适用场景。

2. Join 常见分类以及基本实现机制

当前 SparkSQL 支持三种 Join 算法

  1. Shuffle Hash Join
  2. Broadcast Hash Join
  3. Sort Merge Join

其中前两者归根到底都属于 Hash Join,只不过在 Hash Join 之前需要先 Shuffle 还是先 Broadcast。其实,这些算法并不是什么新鲜玩意,都是数据库几十年前的老古董了(参考),只不过换上了分布式的皮而已。不过话说回来,SparkSQL/Hive…等等,所有这些大数据技术哪一样不是来自于传统数据库技术,什么语法解析 AST、基于规则优化(CRO)、基于代价优化(CBO)、列存,都来自于传统数据库。就拿 Shuffle Hash JoinBroadcast Hash Join 来说,Hash Join 算法就来自于传统数据库,而 Shuffle 和 Broadcast 是大数据的皮,两者一结合就成了大数据的算法了。因此可以这样说,大数据的根就是传统数据库,传统数据库人才可以很快的转型到大数据。好吧,这些都是闲篇。

继续来看技术,既然 Hash Join 是“内核”,那就刨出来看看,看完把“皮”再分析一下。

2.1 Hash Join

先来看看这样一条SQL语句:select * from order,item where item.id = order.i_id,很简单一个 Join 节点,参与 Join 的两张表是 item 和 order,Join Key 分别是 item.id 以及 order.i_id。现在假设这个 Join 采用的是 Hash Join 算法,整个过程会经历三步:

  1. 确定 Build Table 以及 Probe Table:这个概念比较重要,Build Table 使用 Join Key 构建 Hash Table,而 Probe Table 使用 Join Key 进行探测,探测成功就可以 Join 在一起。通常情况下,小表会作为 Build Table,大表作为 Probe Table。此事例中 item 为Build Table,order 为 Probe Table。
  2. 构建 Hash Table:依次读取 Build Table(item)的数据,对于每一行数据根据 Join Key(item.id)进行 Hash,Hash 到对应的 Bucket,生成 Hash Table 中的一条记录。数据缓存在内存中,如果内存放不下需要 dump 到外存。
  3. 探测:再依次扫描 Probe Table(order)的数据,使用相同的 Hash 函数映射 Hash Table 中的记录,映射成功之后再检查 Join 条件(item.id = order.i_id),如果匹配成功就可以将两者 Join 在一起。
    Hash Join

基本流程可以参考上图,这里有两个小问题需要关注:

  1. Hash Join 性能如何?很显然,hash join 基本都只扫描两表一次,可以认为 O(a+b),较之最极端的笛卡尔集运算 O(a*b),不知甩了多少条街;
  2. 为什么 Build Table 选择小表?道理很简单,因为构建的 Hash Table 最好能全部加载在内存,效率最高;这也决定了 Hash Join 算法只适合至少一个小表的 Join 场景,对于两个大表的 Join 场景并不适用;

上文说过,Hash Join 是传统数据库中的单机 Join 算法,在分布式环境下需要经过一定的分布式改造,说到底就是尽可能利用分布式计算资源进行并行化计算,提高总体效率。Hash Join 分布式改造一般有两种经典方案:

  1. Broadcast Hash Join:将其中一张小表广播分发到另一张大表所在的分区节点上,分别并发地与其上的分区记录进行 Hash Join。Broadcast 适用于小表很小,可以直接广播的场景。
  2. Shuffler Hash Join:一旦小表数据量较大,此时就不再适合进行广播分发。这种情况下,可以根据 Join Key 相同必然分区相同的原理,将两张表分别按照 Join Key 进行重新组织分区,这样就可以将 Join 分而治之,划分为很多小 Join,充分利用集群资源并行化。

2.1.1 Broadcast Hash Join

如下图所示,Broadcast Hash Join 可以分为两步:

  1. Broadcast 阶段:将小表广播分发到大表所在的所有主机。广播算法可以有很多,最简单的是先发给 Driver,Driver 再统一分发给所有 Executor;要不就是基于 Bittorrete 的 P2P 思路;
  2. Hash Join 阶段:在每个 Executor 上执行单机版 Hash Join,小表映射,大表试探;
    Broadcast Hash Join
    SparkSQL 规定 Broadcast Hash Join 执行的基本条件为被广播小表必须小于参数 spark.sql.autoBroadcastJoinThreshold,默认为10M。

2.1.2 Shuffle Hash Join

在大数据条件下如果一张表很小,执行 join 操作最优的选择无疑是Broadcast Hash Join,效率最高。但是一旦小表数据量增大,广播所需内存、带宽等资源必然就会太大,Broadcast Hash Join 就不再是最优方案。此时可以按照 Join Key 进行分区,根据 Key 相同必然分区相同的原理,就可以将大表 Join 分而治之,划分为很多小表的 Join,充分利用集群资源并行化。如下图所示,Shuffle Hash Join 也可以分为两步:

  1. Shuffle 阶段:分别将两个表按照 Join Key 进行分区,将相同 Join Key 的记录重分布到同一节点,两张表的数据会被重分布到集群中所有节点。这个过程称为 Shuffle;
  2. Hash Join 阶段:每个分区节点上的数据单独执行单机 Hash Join 算法。
    Shuffle Hash Join
    看到这里,可以初步总结出来如果两张小表 Join 可以直接使用单机版 Hash Join;如果一张大表 Join 一张极小表,可以选择 Broadcast Hash Join 算法;而如果是一张大表 Join 一张小表,则可以选择 Shuffle Hash Join 算法;那如果是两张大表进行 Join 呢?

2.2 Sort Merge Join

SparkSQL 对两张大表 Join 采用了全新的算法 Sort Merge Join,如下图所示,整个过程分为三个步骤:

  1. Shuffle 阶段:将两张大表根据 Join Key 进行重新分区,两张表数据会分布到整个集群,以便分布式并行处理;
  2. Sort 阶段:对单个分区节点的两表数据,分别进行排序;
  3. Merge 阶段:对排好序的两张分区表数据执行 Join 操作。
    Sort Merge Join
    Join 操作很简单,分别遍历两个有序序列,碰到相同 Join Key 就 Merge 输出,否则取更小一边,见下图示意:
    Sort Merge Join的 Join 过程

仔细分析的话会发现,Sort Merge Join 的代价并不比 Shuffle Hash Join 小,反而是多了很多。那为什么 SparkSQL 还会在两张大表的场景下选择使用 Sort Merge Join 算法呢?这和 Spark 的 Shuffle 实现有关,目前 Spark 的 Shuffle 实现都适用 Sort Merge Join 算法,因此在经过 Shuffle 之后 Partition 数据都是按照 Key 排序的。因此理论上可以认为数据经过 Shuffle 之后是不需要 Sort 的,可以直接 Merge。

经过上文的分析,可以明确每种 Join 算法都有自己的适用场景,数据仓库设计时最好避免大表与大表的 Join 查询,SparkSQL 也可以根据内存资源、带宽资源适量将参数 spark.sql.autoBroadcastJoinThreshold 调大,让更多 Join 实际执行为 Broadcast Hash Join

3. 总结

Join 操作是传统数据库中的一个高级特性,尤其对于当前 MySQL 数据库更是如此,原因很简单,MySQL 对 Join 的支持目前还比较有限,只支持 Nested Loop Join 算法,因此在 OLAP 场景下 MySQL 是很难吃的消的,不要去用 MySQL 去跑任何 OLAP 业务,结果真的很难看。不过好消息是 MySQL 在新版本要开始支持 Hash Join 了,这样也许在将来也可以用 MySQL 来处理一些小规模的 OLAP 业务。

和 MySQL 相比,PostgreSQL、SQLServer、Oracle 等这些数据库对 Join 支持更加全面一些,都支持 Hash Join 算法。由 PostgreSQL 作为内核构建的分布式系统 Greenplum 更是在数据仓库中占有一席之地,这和 PostgreSQL 对 Join 算法的支持其实有很大关系。

总体而言,传统数据库单机模式做 Join 的场景毕竟有限,也建议尽量减少使用 Join。然而大数据领域就完全不同,Join 是标配, OLAP 业务根本无法离开表与表之间的关联,对 Join 的支持成熟度一定程度上决定了系统的性能,夸张点说,“得 Join 者得天下”。本文只是试图带大家真正走进 Join 的世界,了解常用的几种 Join 算法以及各自的适用场景。

这篇关于【SparkSQL】聊一聊 Join的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/984067

相关文章

MySQL学习笔记-join语句类型

join从句的类型:内链接(inner) 全外连接(full outer) 左外连接(left outer) 右外连接(right outer) 交叉链接(cross) 连接条件:使用ON设定连接条件,也可以用WHERE代替 · ON:设定连接条件 · WHERE:进行结果集记录的过滤 一,内连接inner join:  内连接是返回左表及右表符合连接条件的记录,在MySQL中JO

多线程 | join方法

文章目录 1. 作用2. 用法3. 异常4. 源码为什么使用wait方法 5. 如何实现按照指定顺序执行线程6. 线程运行状态 1. 作用 在 Java 多线程中,join方法用于等待一个线程执行完毕。当一个线程调用另一个线程的join方法时,当前线程会进入等待状态,直到被调用的线程执行完毕。这使得开发者可以控制线程的执行顺序,确保某些关键线程在其他线程之前完成执行。 2. 用

SylixOS pthread_join退出

1 问题描述 在移植中间件过程中,在SylixOS下调用pthread_join时,如果线程在pthread_join等待之前结束,则线程返回无效线程错误值。在Linux下这种调用会正常返回。两种实现是有差别的,实现的原理分别如下。 2 函数实现机制 2.1 实现机制 在SylixOS下调用pthread_join时,如果线程在pthread_join等待之前结束,线程返回无效线程错误标志

多表连接的三种方式hash join,merge join,nested loop

多表之间的连接有三种方式:Nested Loops,Hash Join和 Sort Merge Join. 下面来介绍三种不同连接的不同:     一. NESTED LOOP: 对于被连接的数据子集较小的情况,嵌套循环连接是个较好的选择。在嵌套循环中,内表被外表驱动,外表返回的每一行都要在内表中检索找到与它匹配的行,因此整个查询返回的结果集不能太大(大于1 万不适合),要把返回

Apache-Flink深度解析-Temporal-Table-JOIN

在《JOIN LATERAL》中提到了Temporal Table JOIN,本篇就向大家详细介绍什么是Temporal Table JOIN。在ANSI-SQL 2011 中提出了Temporal 的概念,Oracle,SQLServer,DB2等大的数据库厂商也先后实现了这个标准。Temporal Table记录了历史上任何时间点所有的数据改动,Temporal Table的工作流程如下:

SparkSQL在字节跳动的应用实践和优化实战

来源:字节跳动白泉的分享 作者:大数据技术与架构整理 点击右侧关注,大数据开发领域最强公众号! 点击右侧关注,暴走大数据! By  大数据技术与架构 场景描述: 面对大量复杂的数据分析需求,提供一套稳定、高效、便捷的企业级查询分析服务具有重大意义。本次演讲介绍了字节跳动

SparkSQL内核解析-执行全过程概述

大数据技术与架构 点击右侧关注,大数据开发领域最强公众号! 暴走大数据 点击右侧关注,暴走大数据! 从SQL到RDD // 创建SparkSession类。从2.0开始逐步替代SparkContext称为Spark应用入口var spark = SparkSession.builder().appName("appName").master("local").getOrCreate()

【硬刚大数据之面试篇】2021年从零到大数据专家面试篇之SparkSQL篇

📢欢迎关注博客主页:https://blog.csdn.net/u013411339 📢欢迎点赞 👍 收藏 ⭐留言 📝 ,欢迎留言交流! 📢本文由【王知无】原创,首发于 CSDN博客! 📢本文首发CSDN论坛,未经过官方和本人允许,严禁转载! 本文是对《【硬刚大数据之学习路线篇】2021年从零到大数据专家的学习指南(全面升级版)》的面试部分补充。 硬刚大数据系列文章链接:

Flink重点难点:维表关联理论和Join实战

点击上方蓝色字体,选择“设为星标” 回复”面试“获取更多惊喜 在阅读本文之前,你应该阅读过的系列: 《Flink重点难点:时间、窗口和流Join》《Flink重点难点:网络流控和反压》 Flink官方文档中公开的信息 1 Join 的概念 在阅读之前请一定要先了解: 数据流操作的另一个常见需求是对两条数据流中的事件进行联结(connect)或Join。Flink DataStream API中

【硬刚Hadoop】HADOOP MAPREDUCE(11):Join应用

本文是对《【硬刚大数据之学习路线篇】从零到大数据专家的学习指南(全面升级版)》的Hadoop部分补充。 1 Reduce Join Map端的主要工作:为来自不同表或文件的key/value对,打标签以区别不同来源的记录。然后用连接字段作为key,其余部分和新加的标志作为value,最后进行输出。 Reduce端的主要工作:在Reduce端以连接字段作为key的分组已经完成,我们只需要在