本文主要是介绍Spark SQL 的 AQE 机制,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
原文
本文翻译自 Spark SQL AQE 机制的原始 JIRA 和官方设计文档 《New Adaptive Query Execution in Spark SQL》
背景
SPARK-9850 在 Spark 中提出了自适应执行的基本思想。
在DAGScheduler
中,添加了一个新的 API 来支持提交单个 Map Stage。
DAGScheduler
请参考我的这篇博客——DAGScheduler 是什么?有什么作用?
目前 Spark SQL 中自适应执行的实现支持在运行时更改 Reducer 的数量。
如果一个阶段(Stage
)需要从一个或多个阶段中获取 Shuffle 数据,则 Exchange Coordinator
可以帮助它确定 Shuffle 后分区的数量。
在我们添加 Exchange
时,当前的实现增加了Exchange Coordinator
。
然而,有一些局限性。
首先,它可能会导致额外的 Shuffle,从而降低性能。
当它添加Exchange Coordinator
时,我们可以从EnsureRequirements
规则中看到这一点。
其次,在我们添加Exchange
时添加Exchange Coordinator
不是一个好主意,因为我们没有 Shuffle 后 阶段所有 Shuffle 依赖项的全局视图。
比如,对于单个阶段中 3 个表的 JOIN,应在三个Exchange
中使用相同的Exchange Coordinator
,但是目前会添加两个单独的Exchange Coordinator
。
第三,在当前的框架下,在自适应执行中灵活实现其他功能并不容易,例如更改执行计划和在运行时处理数据倾斜的 JOIN。
我们想介绍一种在 Spark SQL 中执行自适应执行的新方法,并解决其局限性。
这个想法在Adaptive Execution Enhancement in Spark SQL中进行了描述。
目标
我们的目标是实现一个灵活的框架来在 Spark SQL 中进行自适应执行,并支持在运行时更改 Reducer 的数量。
新的实现应该解决前面讨论的所有限制。
更改 JOIN 策略和处理数据倾斜的 JOIN 等其他功能将作为单独的规则实现,并在以后轻松插入。
规划
Spark 确定物理执行计划后,根据每个算子的定义生成一个 RDD 的 DAG。
关于 RDD 请参考我的博客——Spark RDD 论文详解(一)摘要和介绍
Spark 调度程序通过在 shuffle 边界处破坏 RDD 图来创建阶段(Stage)并提交阶段以供执行。
一旦确定了 SQL 执行计划,就无法在执行期间对其进行更新。
新的自适应执行的想法是基于 SQL 执行计划而不是 RDD 图来划分阶段。
我们将介绍称为 QueryStage
和 QueryStageInput
的新节点。
在自适应执行模式下,一个执行计划被分成多个QueryStages
。
每个 QueryStage
都是在单个阶段中运行的子树。
QueryStageInput
是 QueryStage
的叶节点,用来隐藏其子阶段。
它获取其子阶段的结果并将其作为 QueryStage
的输入。
QueryStage
通过收集 QueryStageInputs
了解其所有子阶段,因此它具有所有 shuffle 依赖项的全局视图。
我们添加了 QueryStage
和 QueryStageInputs
,从而在计划中查找 Exchange
。
下面是在一个阶段 中 JOIN
3 个表的示例。
我们将计划分为四个子树。
最后一个是具有三个 QueryStageInputs
的Result Stage
: QueryStage4
。
QueryStageInput
是一个叶节点,但它指向一个子阶段,例如 QueryStageInput1
指向 QueryStage1
。
除了最后一个 QueryStage
,QueryStage
的子级始终是 Exchange
或 BroadcastExchangeExec
。
我们添加一个规则PlanQueryStage
来添加QueryStage
和QueryStageInput
。
仅当启用自适应执行时才会应用该规则。
执行和调度
我们从最后一个 QueryStage
开始执行。
在这个查询阶段执行计划之前,我们执行所有子阶段(Stage)并收集它们的输出统计信息。
线程池用于提交子阶段。
如果一个子阶段也有它的子阶段,它将首先提交自己的子阶段,这会递归地发生。
所以实际上 QueryStages
没有依赖关系将首先提交,其他 QueryStages
将等待其子阶段完成。
子阶段完成后,我们可以优化这个阶段的计划,根据子阶段的统计数据确定Reducer
的数量。
最后,我们为此查询阶段进行代码生成(CodeGen),并使用新计划更新 UI。
只要我们在 QueryStage
中优化计划时不添加任何 Exchange
,就不会发生额外的 shuffle。
自动设置 Reducer 的数量
将使用三种配置来控制Reducer
的数量。
spark.sql.adaptive.shuffle.targetPostShuffleInputSize
用于控制任务Shuffle
后的目标输入大小(以字节为单位)。
spark.sql.adaptive.minNumPostShufflePartitions
用于控制自适应执行中使用的shuffle
后最小的分区数,可用于控制最小并行度。
这两种配置在 Spark 中已经存在。
我们添加了一个新的配置:spark.sql.adaptive.maxNumPostShufflePartitions
来控制Shuffle
后分区的最大数量。
最终用户可以设置 Shuffle 后分区的最小和最大数量以及 Shuffle 后输入的目标大小。
在运行时,自适应执行会自动在 min 和 max 之间设置 reducer 的数量。
对于每个 QueryStage
,我们使用以下方法在运行时自动设置分区的数量。
- 我们首先提交其所有子阶段,并收集 Map 输出统计信息。
- 我们创建一个
exchange coordinator
,将子阶段的 Map 输出统计信息传递给它,并调用estimatePartitionStartIndices
方法来确定Shuffle
后分区的数量。 (将来我们可能会删除类ExchangeCoordinator
,因为在更改之后只使用其中的一种方法) - 每个子阶段获取相同的
partitionStartIndex
,并以此为基础构造一个新的ShuffledRowRDD
。这些ShuffledRowRDD
是当前阶段的输入 RDD。
在内部,我们使用最大数量(max)作为初始 shuffle 分区数量。
假设 max 配置为 5,min 配置为 1。
map 阶段完成后,我们知道每个分区的大小为 70MB、30MB、20MB、10MB 和 50MB。
如果我们将每个 reducer 的目标数据大小设置为 64MB,我们可以在运行时使用 3 个 reducer。
第一个 reducer 处理分区 0 (70MB)。
第二个 reducer 处理 3 个连续的分区(分区 1 到 3,总共 60MB)。
第三个 reducer 处理分区 4 (50MB)。
Spark SQL UI
执行计划可能会在运行时发生变化,因此 SQL UI 也应该反映这些变化。
在自适应执行模式下,SQL UI 会在开头显示原始的执行计划。
当自适应执行开始时,每个 QueryStage
都会提交子阶段,并且可能会更改其中的执行计划。
我们将发布一个事件 SparkListenerSQLAdaptiveExecutionUpdate(executionId, physicalPlanDescription, sparkPlanInfo)
来更新 UI 上的执行计划。
优化执行计划和处理倾斜 JOIN
通过上面讨论的更改,我们可以在运行时轻松优化 QueryStage
中的执行计划,即当我们发现一个表大小小于广播阈值时,将 SortMergeJoin
更改为 BroadcastHashJoin
。
我们还可以在执行子阶段后检测 JOIN 中的倾斜分区并自动处理。
这些策略可以作为单独的规则添加到自适应执行中并单独启用。
这篇关于Spark SQL 的 AQE 机制的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!