Spark SQL 的 AQE 机制

2023-12-14 21:40
文章标签 sql 机制 database spark aqe

本文主要是介绍Spark SQL 的 AQE 机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

原文

本文翻译自 Spark SQL AQE 机制的原始 JIRA 和官方设计文档 《New Adaptive Query Execution in Spark SQL》

背景

SPARK-9850 在 Spark 中提出了自适应执行的基本思想。

DAGScheduler中,添加了一个新的 API 来支持提交单个 Map Stage。

DAGScheduler请参考我的这篇博客——DAGScheduler 是什么?有什么作用?

目前 Spark SQL 中自适应执行的实现支持在运行时更改 Reducer 的数量。

如果一个阶段(Stage)需要从一个或多个阶段中获取 Shuffle 数据,则 Exchange Coordinator可以帮助它确定 Shuffle 后分区的数量。

在我们添加 Exchange 时,当前的实现增加了Exchange Coordinator

然而,有一些局限性。

首先,它可能会导致额外的 Shuffle,从而降低性能。

当它添加Exchange Coordinator时,我们可以从EnsureRequirements规则中看到这一点。

其次,在我们添加Exchange时添加Exchange Coordinator不是一个好主意,因为我们没有 Shuffle 后 阶段所有 Shuffle 依赖项的全局视图。

比如,对于单个阶段中 3 个表的 JOIN,应在三个Exchange中使用相同的Exchange Coordinator,但是目前会添加两个单独的Exchange Coordinator

第三,在当前的框架下,在自适应执行中灵活实现其他功能并不容易,例如更改执行计划和在运行时处理数据倾斜的 JOIN。

我们想介绍一种在 Spark SQL 中执行自适应执行的新方法,并解决其局限性。

这个想法在Adaptive Execution Enhancement in Spark SQL中进行了描述。

目标

我们的目标是实现一个灵活的框架来在 Spark SQL 中进行自适应执行,并支持在运行时更改 Reducer 的数量。

新的实现应该解决前面讨论的所有限制。

更改 JOIN 策略和处理数据倾斜的 JOIN 等其他功能将作为单独的规则实现,并在以后轻松插入。

规划

Spark 确定物理执行计划后,根据每个算子的定义生成一个 RDD 的 DAG。

关于 RDD 请参考我的博客——Spark RDD 论文详解(一)摘要和介绍

Spark 调度程序通过在 shuffle 边界处破坏 RDD 图来创建阶段(Stage)并提交阶段以供执行。

一旦确定了 SQL 执行计划,就无法在执行期间对其进行更新。

新的自适应执行的想法是基于 SQL 执行计划而不是 RDD 图来划分阶段。

我们将介绍称为 QueryStageQueryStageInput 的新节点。

在自适应执行模式下,一个执行计划被分成多个QueryStages

每个 QueryStage 都是在单个阶段中运行的子树。

QueryStageInputQueryStage 的叶节点,用来隐藏其子阶段。

它获取其子阶段的结果并将其作为 QueryStage 的输入。

QueryStage 通过收集 QueryStageInputs 了解其所有子阶段,因此它具有所有 shuffle 依赖项的全局视图。

我们添加了 QueryStageQueryStageInputs,从而在计划中查找 Exchange

下面是在一个阶段 中 JOIN 3 个表的示例。

在这里插入图片描述

我们将计划分为四个子树。

最后一个是具有三个 QueryStageInputsResult StageQueryStage4

QueryStageInput 是一个叶节点,但它指向一个子阶段,例如 QueryStageInput1 指向 QueryStage1

除了最后一个 QueryStageQueryStage 的子级始终是 ExchangeBroadcastExchangeExec

我们添加一个规则PlanQueryStage 来添加QueryStageQueryStageInput

仅当启用自适应执行时才会应用该规则。

在这里插入图片描述

执行和调度

我们从最后一个 QueryStage 开始执行。

在这个查询阶段执行计划之前,我们执行所有子阶段(Stage)并收集它们的输出统计信息。

线程池用于提交子阶段。

如果一个子阶段也有它的子阶段,它将首先提交自己的子阶段,这会递归地发生。

所以实际上 QueryStages 没有依赖关系将首先提交,其他 QueryStages 将等待其子阶段完成。

子阶段完成后,我们可以优化这个阶段的计划,根据子阶段的统计数据确定Reducer 的数量。

最后,我们为此查询阶段进行代码生成(CodeGen),并使用新计划更新 UI。

只要我们在 QueryStage 中优化计划时不添加任何 Exchange,就不会发生额外的 shuffle。

自动设置 Reducer 的数量

将使用三种配置来控制Reducer的数量。

spark.sql.adaptive.shuffle.targetPostShuffleInputSize用于控制任务Shuffle后的目标输入大小(以字节为单位)。

spark.sql.adaptive.minNumPostShufflePartitions用于控制自适应执行中使用的shuffle后最小的分区数,可用于控制最小并行度。

这两种配置在 Spark 中已经存在。

我们添加了一个新的配置:spark.sql.adaptive.maxNumPostShufflePartitions来控制Shuffle后分区的最大数量。

最终用户可以设置 Shuffle 后分区的最小和最大数量以及 Shuffle 后输入的目标大小。

在运行时,自适应执行会自动在 min 和 max 之间设置 reducer 的数量。

对于每个 QueryStage,我们使用以下方法在运行时自动设置分区的数量。

  1. 我们首先提交其所有子阶段,并收集 Map 输出统计信息。
  2. 我们创建一个exchange coordinator,将子阶段的 Map 输出统计信息传递给它,并调用estimatePartitionStartIndices方法来确定Shuffle后分区的数量。 (将来我们可能会删除类 ExchangeCoordinator,因为在更改之后只使用其中的一种方法)
  3. 每个子阶段获取相同的partitionStartIndex,并以此为基础构造一个新的ShuffledRowRDD。这些 ShuffledRowRDD 是当前阶段的输入 RDD。

在内部,我们使用最大数量(max)作为初始 shuffle 分区数量。

假设 max 配置为 5,min 配置为 1。

map 阶段完成后,我们知道每个分区的大小为 70MB、30MB、20MB、10MB 和 50MB。

如果我们将每个 reducer 的目标数据大小设置为 64MB,我们可以在运行时使用 3 个 reducer。

第一个 reducer 处理分区 0 (70MB)。

第二个 reducer 处理 3 个连续的分区(分区 1 到 3,总共 60MB)。

第三个 reducer 处理分区 4 (50MB)。

Spark SQL UI

执行计划可能会在运行时发生变化,因此 SQL UI 也应该反映这些变化。

在自适应执行模式下,SQL UI 会在开头显示原始的执行计划。

当自适应执行开始时,每个 QueryStage 都会提交子阶段,并且可能会更改其中的执行计划。

我们将发布一个事件 SparkListenerSQLAdaptiveExecutionUpdate(executionId, physicalPlanDescription, sparkPlanInfo) 来更新 UI 上的执行计划。

优化执行计划和处理倾斜 JOIN

通过上面讨论的更改,我们可以在运行时轻松优化 QueryStage 中的执行计划,即当我们发现一个表大小小于广播阈值时,将 SortMergeJoin 更改为 BroadcastHashJoin

我们还可以在执行子阶段后检测 JOIN 中的倾斜分区并自动处理。

这些策略可以作为单独的规则添加到自适应执行中并单独启用。

这篇关于Spark SQL 的 AQE 机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493979

相关文章

MySQL双主搭建+keepalived高可用的实现

《MySQL双主搭建+keepalived高可用的实现》本文主要介绍了MySQL双主搭建+keepalived高可用的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,... 目录一、测试环境准备二、主从搭建1.创建复制用户2.创建复制关系3.开启复制,确认复制是否成功4.同

MyBatis 动态 SQL 优化之标签的实战与技巧(常见用法)

《MyBatis动态SQL优化之标签的实战与技巧(常见用法)》本文通过详细的示例和实际应用场景,介绍了如何有效利用这些标签来优化MyBatis配置,提升开发效率,确保SQL的高效执行和安全性,感... 目录动态SQL详解一、动态SQL的核心概念1.1 什么是动态SQL?1.2 动态SQL的优点1.3 动态S

Mysql表的简单操作(基本技能)

《Mysql表的简单操作(基本技能)》在数据库中,表的操作主要包括表的创建、查看、修改、删除等,了解如何操作这些表是数据库管理和开发的基本技能,本文给大家介绍Mysql表的简单操作,感兴趣的朋友一起看... 目录3.1 创建表 3.2 查看表结构3.3 修改表3.4 实践案例:修改表在数据库中,表的操作主要

java中反射(Reflection)机制举例详解

《java中反射(Reflection)机制举例详解》Java中的反射机制是指Java程序在运行期间可以获取到一个对象的全部信息,:本文主要介绍java中反射(Reflection)机制的相关资料... 目录一、什么是反射?二、反射的用途三、获取Class对象四、Class类型的对象使用场景1五、Class

mysql出现ERROR 2003 (HY000): Can‘t connect to MySQL server on ‘localhost‘ (10061)的解决方法

《mysql出现ERROR2003(HY000):Can‘tconnecttoMySQLserveron‘localhost‘(10061)的解决方法》本文主要介绍了mysql出现... 目录前言:第一步:第二步:第三步:总结:前言:当你想通过命令窗口想打开mysql时候发现提http://www.cpp

MySQL大表数据的分区与分库分表的实现

《MySQL大表数据的分区与分库分表的实现》数据库的分区和分库分表是两种常用的技术方案,本文主要介绍了MySQL大表数据的分区与分库分表的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有... 目录1. mysql大表数据的分区1.1 什么是分区?1.2 分区的类型1.3 分区的优点1.4 分

MySQL错误代码2058和2059的解决办法

《MySQL错误代码2058和2059的解决办法》:本文主要介绍MySQL错误代码2058和2059的解决办法,2058和2059的错误码核心都是你用的客户端工具和mysql版本的密码插件不匹配,... 目录1. 前置理解2.报错现象3.解决办法(敲重点!!!)1. php前置理解2058和2059的错误

Mysql删除几亿条数据表中的部分数据的方法实现

《Mysql删除几亿条数据表中的部分数据的方法实现》在MySQL中删除一个大表中的数据时,需要特别注意操作的性能和对系统的影响,本文主要介绍了Mysql删除几亿条数据表中的部分数据的方法实现,具有一定... 目录1、需求2、方案1. 使用 DELETE 语句分批删除2. 使用 INPLACE ALTER T

MySQL INSERT语句实现当记录不存在时插入的几种方法

《MySQLINSERT语句实现当记录不存在时插入的几种方法》MySQL的INSERT语句是用于向数据库表中插入新记录的关键命令,下面:本文主要介绍MySQLINSERT语句实现当记录不存在时... 目录使用 INSERT IGNORE使用 ON DUPLICATE KEY UPDATE使用 REPLACE

MySQL Workbench 安装教程(保姆级)

《MySQLWorkbench安装教程(保姆级)》MySQLWorkbench是一款强大的数据库设计和管理工具,本文主要介绍了MySQLWorkbench安装教程,文中通过图文介绍的非常详细,对大... 目录前言:详细步骤:一、检查安装的数据库版本二、在官网下载对应的mysql Workbench版本,要是