Spark SQL 的 AQE 机制

2023-12-14 21:40
文章标签 sql 机制 database spark aqe

本文主要是介绍Spark SQL 的 AQE 机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文

本文翻译自 Spark SQL AQE 机制的原始 JIRA 和官方设计文档 《New Adaptive Query Execution in Spark SQL》

背景

SPARK-9850 在 Spark 中提出了自适应执行的基本思想。

DAGScheduler中,添加了一个新的 API 来支持提交单个 Map Stage。

DAGScheduler请参考我的这篇博客——DAGScheduler 是什么?有什么作用?

目前 Spark SQL 中自适应执行的实现支持在运行时更改 Reducer 的数量。

如果一个阶段(Stage)需要从一个或多个阶段中获取 Shuffle 数据,则 Exchange Coordinator可以帮助它确定 Shuffle 后分区的数量。

在我们添加 Exchange 时,当前的实现增加了Exchange Coordinator

然而,有一些局限性。

首先,它可能会导致额外的 Shuffle,从而降低性能。

当它添加Exchange Coordinator时,我们可以从EnsureRequirements规则中看到这一点。

其次,在我们添加Exchange时添加Exchange Coordinator不是一个好主意,因为我们没有 Shuffle 后 阶段所有 Shuffle 依赖项的全局视图。

比如,对于单个阶段中 3 个表的 JOIN,应在三个Exchange中使用相同的Exchange Coordinator,但是目前会添加两个单独的Exchange Coordinator

第三,在当前的框架下,在自适应执行中灵活实现其他功能并不容易,例如更改执行计划和在运行时处理数据倾斜的 JOIN。

我们想介绍一种在 Spark SQL 中执行自适应执行的新方法,并解决其局限性。

这个想法在Adaptive Execution Enhancement in Spark SQL中进行了描述。

目标

我们的目标是实现一个灵活的框架来在 Spark SQL 中进行自适应执行,并支持在运行时更改 Reducer 的数量。

新的实现应该解决前面讨论的所有限制。

更改 JOIN 策略和处理数据倾斜的 JOIN 等其他功能将作为单独的规则实现,并在以后轻松插入。

规划

Spark 确定物理执行计划后,根据每个算子的定义生成一个 RDD 的 DAG。

关于 RDD 请参考我的博客——Spark RDD 论文详解(一)摘要和介绍

Spark 调度程序通过在 shuffle 边界处破坏 RDD 图来创建阶段(Stage)并提交阶段以供执行。

一旦确定了 SQL 执行计划,就无法在执行期间对其进行更新。

新的自适应执行的想法是基于 SQL 执行计划而不是 RDD 图来划分阶段。

我们将介绍称为 QueryStageQueryStageInput 的新节点。

在自适应执行模式下,一个执行计划被分成多个QueryStages

每个 QueryStage 都是在单个阶段中运行的子树。

QueryStageInputQueryStage 的叶节点,用来隐藏其子阶段。

它获取其子阶段的结果并将其作为 QueryStage 的输入。

QueryStage 通过收集 QueryStageInputs 了解其所有子阶段,因此它具有所有 shuffle 依赖项的全局视图。

我们添加了 QueryStageQueryStageInputs,从而在计划中查找 Exchange

下面是在一个阶段 中 JOIN 3 个表的示例。

在这里插入图片描述

我们将计划分为四个子树。

最后一个是具有三个 QueryStageInputsResult StageQueryStage4

QueryStageInput 是一个叶节点,但它指向一个子阶段,例如 QueryStageInput1 指向 QueryStage1

除了最后一个 QueryStageQueryStage 的子级始终是 ExchangeBroadcastExchangeExec

我们添加一个规则PlanQueryStage 来添加QueryStageQueryStageInput

仅当启用自适应执行时才会应用该规则。

在这里插入图片描述

执行和调度

我们从最后一个 QueryStage 开始执行。

在这个查询阶段执行计划之前,我们执行所有子阶段(Stage)并收集它们的输出统计信息。

线程池用于提交子阶段。

如果一个子阶段也有它的子阶段,它将首先提交自己的子阶段,这会递归地发生。

所以实际上 QueryStages 没有依赖关系将首先提交,其他 QueryStages 将等待其子阶段完成。

子阶段完成后,我们可以优化这个阶段的计划,根据子阶段的统计数据确定Reducer 的数量。

最后,我们为此查询阶段进行代码生成(CodeGen),并使用新计划更新 UI。

只要我们在 QueryStage 中优化计划时不添加任何 Exchange,就不会发生额外的 shuffle。

自动设置 Reducer 的数量

将使用三种配置来控制Reducer的数量。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。

spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。

这两种配置在 Spark 中已经存在。

我们添加了一个新的配置:spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。

最终用户可以设置 Shuffle 后分区的最小和最大数量以及 Shuffle 后输入的目标大小。

在运行时,自适应执行会自动在 min 和 max 之间设置 reducer 的数量。

对于每个 QueryStage,我们使用以下方法在运行时自动设置分区的数量。

  1. 我们首先提交其所有子阶段,并收集 Map 输出统计信息。
  2. 我们创建一个exchange coordinator,将子阶段的 Map 输出统计信息传递给它,并调用estimatePartitionStartIndices方法来确定Shuffle后分区的数量。 (将来我们可能会删除类 ExchangeCoordinator,因为在更改之后只使用其中的一种方法)
  3. 每个子阶段获取相同的partitionStartIndex,并以此为基础构造一个新的ShuffledRowRDD。这些 ShuffledRowRDD 是当前阶段的输入 RDD。

在内部,我们使用最大数量(max)作为初始 shuffle 分区数量。

假设 max 配置为 5,min 配置为 1。

map 阶段完成后,我们知道每个分区的大小为 70MB、30MB、20MB、10MB 和 50MB。

如果我们将每个 reducer 的目标数据大小设置为 64MB,我们可以在运行时使用 3 个 reducer。

第一个 reducer 处理分区 0 (70MB)。

第二个 reducer 处理 3 个连续的分区(分区 1 到 3,总共 60MB)。

第三个 reducer 处理分区 4 (50MB)。

Spark SQL UI

执行计划可能会在运行时发生变化,因此 SQL UI 也应该反映这些变化。

在自适应执行模式下,SQL UI 会在开头显示原始的执行计划。

当自适应执行开始时,每个 QueryStage 都会提交子阶段,并且可能会更改其中的执行计划。

我们将发布一个事件 SparkListenerSQLAdaptiveExecutionUpdate(executionId, physicalPlanDescription, sparkPlanInfo) 来更新 UI 上的执行计划。

优化执行计划和处理倾斜 JOIN

通过上面讨论的更改,我们可以在运行时轻松优化 QueryStage 中的执行计划,即当我们发现一个表大小小于广播阈值时,将 SortMergeJoin 更改为 BroadcastHashJoin

我们还可以在执行子阶段后检测 JOIN 中的倾斜分区并自动处理。

这些策略可以作为单独的规则添加到自适应执行中并单独启用。

这篇关于Spark SQL 的 AQE 机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493979

相关文章

MySQL 中的 CAST 函数详解及常见用法

《MySQL中的CAST函数详解及常见用法》CAST函数是MySQL中用于数据类型转换的重要函数,它允许你将一个值从一种数据类型转换为另一种数据类型,本文给大家介绍MySQL中的CAST... 目录mysql 中的 CAST 函数详解一、基本语法二、支持的数据类型三、常见用法示例1. 字符串转数字2. 数字

Mysql实现范围分区表(新增、删除、重组、查看)

《Mysql实现范围分区表(新增、删除、重组、查看)》MySQL分区表的四种类型(范围、哈希、列表、键值),主要介绍了范围分区的创建、查询、添加、删除及重组织操作,具有一定的参考价值,感兴趣的可以了解... 目录一、mysql分区表分类二、范围分区(Range Partitioning1、新建分区表:2、分

MySQL 定时新增分区的实现示例

《MySQL定时新增分区的实现示例》本文主要介绍了通过存储过程和定时任务实现MySQL分区的自动创建,解决大数据量下手动维护的繁琐问题,具有一定的参考价值,感兴趣的可以了解一下... mysql创建好分区之后,有时候会需要自动创建分区。比如,一些表数据量非常大,有些数据是热点数据,按照日期分区MululbU

SQL Server配置管理器无法打开的四种解决方法

《SQLServer配置管理器无法打开的四种解决方法》本文总结了SQLServer配置管理器无法打开的四种解决方法,文中通过图文示例介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的... 目录方法一:桌面图标进入方法二:运行窗口进入检查版本号对照表php方法三:查找文件路径方法四:检查 S

MySQL 删除数据详解(最新整理)

《MySQL删除数据详解(最新整理)》:本文主要介绍MySQL删除数据的相关知识,本文通过实例代码给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录一、前言二、mysql 中的三种删除方式1.DELETE语句✅ 基本语法: 示例:2.TRUNCATE语句✅ 基本语

MySQL中查找重复值的实现

《MySQL中查找重复值的实现》查找重复值是一项常见需求,比如在数据清理、数据分析、数据质量检查等场景下,我们常常需要找出表中某列或多列的重复值,具有一定的参考价值,感兴趣的可以了解一下... 目录技术背景实现步骤方法一:使用GROUP BY和HAVING子句方法二:仅返回重复值方法三:返回完整记录方法四:

从入门到精通MySQL联合查询

《从入门到精通MySQL联合查询》:本文主要介绍从入门到精通MySQL联合查询,本文通过实例代码给大家介绍的非常详细,需要的朋友可以参考下... 目录摘要1. 多表联合查询时mysql内部原理2. 内连接3. 外连接4. 自连接5. 子查询6. 合并查询7. 插入查询结果摘要前面我们学习了数据库设计时要满

Spring事务传播机制最佳实践

《Spring事务传播机制最佳实践》Spring的事务传播机制为我们提供了优雅的解决方案,本文将带您深入理解这一机制,掌握不同场景下的最佳实践,感兴趣的朋友一起看看吧... 目录1. 什么是事务传播行为2. Spring支持的七种事务传播行为2.1 REQUIRED(默认)2.2 SUPPORTS2

MySQL查询JSON数组字段包含特定字符串的方法

《MySQL查询JSON数组字段包含特定字符串的方法》在MySQL数据库中,当某个字段存储的是JSON数组,需要查询数组中包含特定字符串的记录时传统的LIKE语句无法直接使用,下面小编就为大家介绍两种... 目录问题背景解决方案对比1. 精确匹配方案(推荐)2. 模糊匹配方案参数化查询示例使用场景建议性能优

mysql表操作与查询功能详解

《mysql表操作与查询功能详解》本文系统讲解MySQL表操作与查询,涵盖创建、修改、复制表语法,基本查询结构及WHERE、GROUPBY等子句,本文结合实例代码给大家介绍的非常详细,感兴趣的朋友跟随... 目录01.表的操作1.1表操作概览1.2创建表1.3修改表1.4复制表02.基本查询操作2.1 SE