自适应查询执行AQE:在运行时加速SparkSQL

2023-12-14 21:40

本文主要是介绍自适应查询执行AQE:在运行时加速SparkSQL,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

演讲嘉宾简介:王道远,阿里巴巴技术专家

以下内容根据演讲视频以及PPT整理而成。

点击链接观看精彩回放:

https://developer.aliyun.com/live/43188

自适应查询执行AQE简介

关于自适应查询执行,在数据库领域早有充分研究。在Spark社区,最早在Spark 1.6版本就已经提出发展自适应执行(Adaptive Query Execution,下文简称AQE);到了Spark 2.x时代,Intel大数据团队进行了相应的原型开发和实践;到了Spark 3.0时代,Databricks和Intel一起为社区贡献了新的AQE。

什么是AQE呢?简单来说就是根据在运行时统计信息(runtime statistics)在查询执行的过程中进行动态(Dynamic)的查询优化。那么我们为什么需要AQE呢?在Spark 2.x时代,为了选择最佳执行计划,我们引入了CBO(Cost-based optimization),但是在一些场景下,效果非常不好,缺点明显,比如:

  • 统计信息过期或者缺失导致估计错误;

  • 收集统计信息代价较大(比如column histograms);;

  • 某些谓词使用自定义UDF导致无法预估;

  • 手动指定执行hint跟不上数据变化。

而在Spark 3.0时代,AQE完全基于精确的运行时统计信息进行优化,引入了一个基本的概念Query Stages,并且以Query Stage为粒度,进行运行时的优化,其工作原理如下所示:

整个AQE的工作原理以及流程为:

  1. 运行没有依赖的stage;

  2. 在一个stage完成时再依据新的统计信息优化剩余部分;

  3. 执行其他已经满足依赖的stage;

  4. 重复步骤(2)(3)直至所有stage执行完成。

    Spark 3.0中主要的AQE特性

Spark 3.0中主要的AQE特性包括:

  • 动态合并shuffle分区;

  • 动态转换join策略;

  • 动态优化join中的数据倾斜。

动态合并shuffle分区

Shuffle分区数量和大小对查询性能很关键。在Spark 3.0以前,Shuffle分区是一个固定值,存在着明显的缺点,如果分区过小会导致I/O低效、调度开销和任务启动开销,但是如果分区过大又会带来GC压力和溢写硬盘等问题。另一方面,在Spark 3.0之前,整个查询执行过程中使用统一的分区数,而在查询执行的不同阶段,数据规模会发生明显变化,如果保持统一的分区数,则大大降低了效率。基于以上,动态合并Shuffle分区是非常必要的。

AQE解决上面问题的具体做法是设置较大的初始分区数来满足整个查询执行过程中最大的分区数,并且在每个Query stage结束的时候按需自动合并分区,其具体的流程如下图所示:

具体来说,动态合并Shuffle分区的原理如下:

对于普通的Shuffle来说,没有自动合并的过程,每个MAP读取Shuffle后,会根据指定分区数进行分区,比如下图为5:

进行上图所示的分区后发现,REDUCE1和REDUCE5要处理的数据量明显高于其余三个REDUCE,而我们理想的情况下是每个REDUCE处理的数据量是相当的,所以AQE进行了动态合并分区,将相邻的小分区2,3,4进行合并,输出三个REDUCE,大大提高了后续的效率,如下图所示:

动态转换join策略

在Spark中,我们希望当Join的某一边可以完全放入内存时,Spark选择Broadcast Hash Join,但是实际上会出现预估可能不够准确,导致本来可以优化为BHJ的没有被优化的情况,原因也很多,比如;

  • 统计信息不够准确;

  • 子查询太复杂;

  • 黑盒的谓词,比如自定义UDF。

对于以上问题,AQE的解决方法就是使用运行时数据大小重新选择执行计划,其整个流程与原理如下图所示:

动态优化join中的数据倾斜

在Join中的数据倾斜会导致一系列的问题,比如性能下降、某一个task影响整个stage的运行等,处理数据量比较大的partitions时候还可能会出现溢写磁盘的情况。AQE针对上述问题使用运行时的统计信息自动优化查询执行,动态的发现倾斜数据的数量,并且把倾斜的分区分成更小的子分区来处理。其做法如下图所示:

具体来说其原理如下:
对于普通的sort merge join来说,没有倾斜优化,可能会造成某个Shuffle分区的数据数量明显高于其他分区,如下图中的PART.A0,这种情况会造成A0和B0的这个Join执行速度明显慢于其他的Join。

有了AQE之后,根据数据倾斜优化后的sort merge join,使用skew Shuffle reader,如下图所示将A0分成三个子分区,并将对应的B0复制三份,整个Join任务的运行效率大大提升。

上述的几个特性可以在Demo中查看https://docs.databricks.com/_static/notebooks/aqe-demo.html 。

TPC-DS性能测试

进行TPC-DS性能测试的集群配置如下图所示:

测试结果显示,2条Query获得了1.5倍的性能提升,37条Query获得了1.1倍的性能提升。

下面两张图是关于分区合并和Join策略的性能测试结果,可以看出AQE对于性能的提升还是非常明显的。


除了在TPC-DS的测试中AQE表现优秀,在实际生产环境中AQE对于性能的提升也非常优秀,比如某电商公司分享在某些典型的倾斜查询中使用了AQE之后获得了十几倍的性能提升,某互联网巨头使用了AQE之后发现在2个典型的查询中性能分别有了5倍和1.38倍的提升等等。

QA

Q1:Shuffle是如何对大量小文件进行优化的?
A1:AQE 支持的动态分区合并可以减少 shuffle 后的分区数,如果是 ETL 作业写动态分区表,建议手动添加distribute by partkey 等子句来减少输出文件数量。

Q2:AQE是否支持外部的Shuffle Service?
A2:支持,需要 shuffle service 提供基本的统计信息

Q3:如果join的两边的part都比较大,是不是都会拆分?还会broadcast 么?
A3:都比较大的话优化就没啥用了,需要从业务出发进行优化。

猜你喜欢

1、Spark 背后的商业公司收购的 Redash 是个啥?

2、马铁大神的 Apache Spark 十年回顾

3、基于Apache Iceberg打造T+0实时数仓

4、Presto on Spark:扩展 Presto 以支持大规模 ETL

这篇关于自适应查询执行AQE:在运行时加速SparkSQL的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493969

相关文章

Python使用国内镜像加速pip安装的方法讲解

《Python使用国内镜像加速pip安装的方法讲解》在Python开发中,pip是一个非常重要的工具,用于安装和管理Python的第三方库,然而,在国内使用pip安装依赖时,往往会因为网络问题而导致速... 目录一、pip 工具简介1. 什么是 pip?2. 什么是 -i 参数?二、国内镜像源的选择三、如何

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

SQL 中多表查询的常见连接方式详解

《SQL中多表查询的常见连接方式详解》本文介绍SQL中多表查询的常见连接方式,包括内连接(INNERJOIN)、左连接(LEFTJOIN)、右连接(RIGHTJOIN)、全外连接(FULLOUTER... 目录一、连接类型图表(ASCII 形式)二、前置代码(创建示例表)三、连接方式代码示例1. 内连接(I

在MySQL执行UPDATE语句时遇到的错误1175的解决方案

《在MySQL执行UPDATE语句时遇到的错误1175的解决方案》MySQL安全更新模式(SafeUpdateMode)限制了UPDATE和DELETE操作,要求使用WHERE子句时必须基于主键或索引... mysql 中遇到的 Error Code: 1175 是由于启用了 安全更新模式(Safe Upd

轻松上手MYSQL之JSON函数实现高效数据查询与操作

《轻松上手MYSQL之JSON函数实现高效数据查询与操作》:本文主要介绍轻松上手MYSQL之JSON函数实现高效数据查询与操作的相关资料,MySQL提供了多个JSON函数,用于处理和查询JSON数... 目录一、jsON_EXTRACT 提取指定数据二、JSON_UNQUOTE 取消双引号三、JSON_KE

查询SQL Server数据库服务器IP地址的多种有效方法

《查询SQLServer数据库服务器IP地址的多种有效方法》作为数据库管理员或开发人员,了解如何查询SQLServer数据库服务器的IP地址是一项重要技能,本文将介绍几种简单而有效的方法,帮助你轻松... 目录使用T-SQL查询方法1:使用系统函数方法2:使用系统视图使用SQL Server Configu

通过prometheus监控Tomcat运行状态的操作流程

《通过prometheus监控Tomcat运行状态的操作流程》文章介绍了如何安装和配置Tomcat,并使用Prometheus和TomcatExporter来监控Tomcat的运行状态,文章详细讲解了... 目录Tomcat安装配置以及prometheus监控Tomcat一. 安装并配置tomcat1、安装

MYSQL关联关系查询方式

《MYSQL关联关系查询方式》文章详细介绍了MySQL中如何使用内连接和左外连接进行表的关联查询,并展示了如何选择列和使用别名,文章还提供了一些关于查询优化的建议,并鼓励读者参考和支持脚本之家... 目录mysql关联关系查询关联关系查询这个查询做了以下几件事MySQL自关联查询总结MYSQL关联关系查询

Java实现Elasticsearch查询当前索引全部数据的完整代码

《Java实现Elasticsearch查询当前索引全部数据的完整代码》:本文主要介绍如何在Java中实现查询Elasticsearch索引中指定条件下的全部数据,通过设置滚动查询参数(scrol... 目录需求背景通常情况Java 实现查询 Elasticsearch 全部数据写在最后需求背景通常情况下

Spring Boot 整合 ShedLock 处理定时任务重复执行的问题小结

《SpringBoot整合ShedLock处理定时任务重复执行的问题小结》ShedLock是解决分布式系统中定时任务重复执行问题的Java库,通过在数据库中加锁,确保只有一个节点在指定时间执行... 目录前言什么是 ShedLock?ShedLock 的工作原理:定时任务重复执行China编程的问题使用 Shed