Spark3 新特性之AQE

2023-12-14 21:40
文章标签 特性 spark3 aqe

本文主要是介绍Spark3 新特性之AQE,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

文章目录

    • Spark3 AQE
      • 一、 背景
      • 二、 Spark 为什么需要AQE? (Why)
      • 三、 AQE 到底是什么?(What)
      • 四、AQE怎么用?(How)
        • 4.1 自动分区合并
        • 4.2 自动数据倾斜处理
        • 4.3 Join 策略调整
      • 五、对比验证
        • 5.1 执行耗时
        • 5.2 自动分区合并
        • 5.3 自动数据倾斜处理
      • 六、结论

Spark3 AQE

一、 背景

Spark 2.x 在遇到有数据倾斜的任务时,需要人为地去优化任务,比较费时费力;如果任务在Reduce阶段,Reduce Task 数据分布参差不齐,会造成各个excutor节点资源利用率不均衡,影响任务的执行效率;Spark 3新特性AQE极大地优化了以上任务的执行效率。

二、 Spark 为什么需要AQE? (Why)

RBO(Rule Based Optimization,基于规则的优化),它往往基于一些规则和策略实现,如谓词下推、列剪枝,这些规则和策略来源于数据库领域已有的应用经验。RBO实际上算是一种经验主义。

经验主义的弊端就是对待相似的问题和场景都使用同一类套路。Spark 社区正是因为意识到了 RBO 的局限性,因此在 2.x 版本中推出了CBO(Cost Based Optimization,基于成本的优化)。

CBO 是基于数据表的统计信息(如表大小、数据列分布)来选择优化策略。CBO 支持的统计信息很丰富,比如数据表的行数、每列的基数(Cardinality)、空值数、最大值、最小值等。因为有统计数据做支持,所以 CBO 选择的优化策略往往优于 RBO 选择的优化规则。

但是,CBO 也有三个方面的不足:

  • 适用面太窄,CBO 仅支持注册到 Hive Metastore 的数据表,但在其他的应用场景中,数据源往往是存储在分布式文件系统的各类文件,如 Parquet、ORC、CSV 等。

  • 统计信息的搜集效率比较低。对于注册到 Hive Metastore 的数据表,开发者需要调用 ANALYZE TABLE COMPUTE STATISTICS 语句收集统计信息,而各类信息的收集会消耗大量时间。

  • 静态优化,RBO、CBO执行计划一旦制定完成,就会按照该计划坚定不移地执行;如果在运行时数据分布发生动态变化,先前制定的执行计划并不会跟着调整、适配。

基于CBO的执行计划
在这里插入图片描述

  • Spark parses the query and creates the Unresolved Logical Plan 创建Unresolved Logical Plan
    • Validates the syntax of the query. 验证语法
    • Doesn’t validate the semantics meaning column name existence, data types. 不验证语义、字段是否存在、数据类型
  • Analysis: Using the Catalyst, it converts the Unresolved Logical Plan to Resolved Logical Plan a.k.a Logical Plan. 转换为Logical Plan
    • The catalog contains the column names and data types, during this step, it validates the columns mentioned in a query with catalog.
  • Optimization: Converts Logical Plan into Optimized Logical Plan. 转换为 Optimized Logical Plan
  • Planner: Now it creates One or More Physical Plans from an optimized Logical plan. 创建一个或多个Physical Plans
  • Cost Model: In this phase, calculates the cost for each Physical plan and select the Best Physical Plan. CBO择优
  • RDD Generation: RDD’s are generated, this is the final phase of query optimization which generates RDD in Java bytecode.

三、 AQE 到底是什么?(What)

考虑到 RBO 和 CBO 的种种限制,Spark 在 3.0 版本推出了 AQE(Adaptive Query Execution,自适应查询执行)。用一句话来概括,AQE 是 Spark SQL 的一种动态优化机制,在运行时,每当 Shuffle Map 阶段执行完毕,AQE 都会结合这个阶段的统计信息,基于既定的规则动态地调整、修正尚未执行的逻辑计划和物理计划,来完成对原始查询语句的运行时优化

AQE 赖以优化的统计信息与 CBO 不同,这些统计信息并不是关于某张表或是哪个列,而是 Shuffle Map 阶段输出的中间文件。

基于AQE的执行计划
在这里插入图片描述

四、AQE怎么用?(How)

AQE三大特性:自动分区合并 、自动数据倾斜处理、Join 策略调整

4.1 自动分区合并

在 Shuffle 过后,Reduce Task 数据分布参差不齐,AQE 将自动合并过小的数据分区。

那么AQE是如何确定多小的分区需要合并,以及分区合并到多大时停止合并?

对于所有的数据分区,无论大小,AQE 按照分区编号从左到右进行扫描,边扫描边记录分区尺寸,当相邻分区的尺寸之和大于 “推荐尺寸”时,AQE 就把这些扫描过的分区进行合并。然后,继续向右扫描,并采用同样的算法,按照目标尺寸合并剩余分区,直到所有分区都处理完毕。

“推荐尺寸”是由spark.sql.adaptive.advisoryPartitionSizeInBytes设置

配置说明
spark.sql.adaptive.coalescePartitions.enabledWhen true and ‘spark.sql.adaptive.enabled’ is true, Spark will coalesce contiguous shuffle partitions according to the target size (specified by ‘spark.sql.adaptive.advisoryPartitionSizeInBytes’), to avoid too many small tasks.
开启分区自动合并,默认开启
spark.sql.adaptive.advisoryPartitionSizeInBytesThe advisory size in bytes of the shuffle partition during adaptive optimization (when spark.sql.adaptive.enabled is true). It takes effect when Spark coalesces small shuffle partitions or splits skewed shuffle partition.
分区合并后的推荐尺寸,默认为64M
spark.sql.adaptive.coalescePartitions.minPartitionSizeThe minimum size of shuffle partitions after coalescing. This is useful when the adaptively calculated target size is too small during partition coalescing.
可合并分区尺寸大小,默认为1M

假设推荐尺寸为100M,shuffle后每一个分区的大小为70M、30M、80M、90M、10M、20M

按照正常情况(顺序处理),会启动4个reduce task:

第一个处理:70M、30M

第二个处理:80M

第三个处理:90M

第四个处理:10M、20M

spark3.0版本按照上述情况合并之后,各分区数据还是出现了不均衡,从而导致后续计算出现小的数据倾斜

查看spark 3.2官网新增了 spark.sql.adaptive.coalescePartitions.minPartitionSize(合并分区后,最小分区尺寸),如果把该参数设置为和推荐尺寸一致,那是不是只会启动3个 reduce task,3个都处理100M的数据?(个人猜想,不是按照顺序合并,而是会先遍历分区大小,保证合并后的分区大小相近)

You do not need to set a proper shuffle partition number to fit your dataset. Spark can pick the proper shuffle partition number at runtime once you set a large enough initial number of shuffle partitions via spark.sql.adaptive.coalescePartitions.initialPartitionNum configuration. 官方建议:分区数不用设置,spark会自动设置合适的分区。

4.2 自动数据倾斜处理

AQE 自动拆分 Reduce 阶段过大的数据分区,降低单个Reduce Task 的工作负载。

AQE 如何判定数据分区是否倾斜呢?它又是怎么把大分区拆分成多个小分区的?

配置说明
spark.sql.adaptive.skewJoin.enabledWhen true and ‘spark.sql.adaptive.enabled’ is true, Spark dynamically handles skew in shuffled join (sort-merge and shuffled hash) by splitting (and replicating if needed) skewed partitions.
开启AQE自动数据倾斜处理
spark.sql.adaptive.skewJoin.skewedPartitionFactorA partition is considered as skewed if its size is larger than this factor multiplying the median partition size and also larger than ‘spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes’
判定数据分区是否倾斜,默认值为5
spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytesA partition is considered as skewed if its size in bytes is larger than this threshold and also larger than ‘spark.sql.adaptive.skewJoin.skewedPartitionFactor’ multiplying the median partition size. Ideally this config should be set larger than ‘spark.sql.adaptive.advisoryPartitionSizeInBytes’.
判定数据分区是否倾斜,默认值为256M

判定倾斜分区:大于 median partition size * spark.sql.adaptive.skewJoin.skewedPartitionFactor 且 大于 spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes

假设数据表 A 有 3 个分区,分区大小分别是80MB、100MB 和 512MB。这些分区按大小个排序后的中位数是 100MB,因为

skewedPartitionFactor 的默认值是 5 ,skewedPartitionThresholdInBytes默认值是256M,512M大于 100MB * 5 = 500MB且大于256M 的分区被判定为倾斜分区。

拆分倾斜分区:上述例子512M分区会被拆分 512/256 =2个分区

4.3 Join 策略调整

如果某张表在过滤之后,尺寸小于广播变量阈值,这张表参与的数据关联就会从 Shuffle Sort Merge Join 降级(Demote)为执行效率更高的 Broadcast Hash Join。

Join 策略调整指的就是 Spark SQL在运行时动态地将原本的 Shuffle Join 策略,调整为执行更加高效的 Broadcast Join。

具体来说,每当 DAG 中的 Map 阶段执行完毕,Spark SQL 就会结合 Shuffle 中间文件的统计信息,重新计算 Reduce 阶段数据表的存储大小。如果发现基表尺寸小于广播阈值, 那么 Spark SQL 就把下一阶段的 Shuffle Join 调整为 Broadcast Join。

Broadcast Join 以广播的方式将小表的全量数据分发到集群中所有的 Executors,大表的数据就可以与小表数据在Process local级别进行关联操作。本地性级别有 4 种:Process local < Node local < Rack local < Any。

配置说明
spark.sql.adaptive.autoBroadcastJoinThresholdConfigures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. The default value is same with spark.sql.autoBroadcastJoinThreshold. Note that, this config is used only in adaptive framework.
可广播的表尺寸阈值,默认10M

五、对比验证

spark3.2 开启AQE

http://szzb-bg-uat-etl-16:18080/history/application_1665940579703_5390/

spark3.2 关闭AQE

http://szzb-bg-uat-etl-16:18080/history/application_1665940579703_5812/

spark2.2 无AQE

http://szzb-bg-uat-etl-11:18080/history/application_1665940579703_9257/

5.1 执行耗时

使用资源: --driver-memory 6g --executor-memory 6g --executor-cores 6

耗时
spark3.2 开启AQE4.3 min
spark3.2 关闭AQE6.7 min
spark2.2 无AQE> 9.6min (OOM)
5.2 自动分区合并

在这里插入图片描述

5.3 自动数据倾斜处理

查看各stage执行时间

  • spark3.2 开启AQE 每个stage执行时间相差不大 (需要看每个stage的tasks )

  • spark2.2 无AQE 每个stage执行时间相差较大 (需要看每个stage的tasks )
    在这里插入图片描述

查看执行时间最长的stage数据分布

  • spark3.2 开启AQE Shuffle Read/Write 数据较均衡

  • spark2.2 无AQE Shuffle Read/Write 数据不均衡

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GB5NWCsF-1677203632031)(C:\Users\602946\AppData\Roaming\Typora\typora-user-images\image-20221019110500813.png)]

六、结论

spark3.2.2 开启AQE(默认开启),当Reduce Task 数据分布参差不齐时,能够自动合并过小的数据分区;且在 Reduce 阶段存在数据倾斜的情况下,能够拆分大分区;通过对比执行时间,AQE能极大的提升任务的执行效率。

这篇关于Spark3 新特性之AQE的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493982

相关文章

ActiveMQ—消息特性(延迟和定时消息投递)

ActiveMQ消息特性:延迟和定时消息投递(Delay and Schedule Message Delivery) 转自:http://blog.csdn.net/kimmking/article/details/8443872 有时候我们不希望消息马上被broker投递出去,而是想要消息60秒以后发给消费者,或者我们想让消息没隔一定时间投递一次,一共投递指定的次数。。。 类似

PostgreSQL核心功能特性与使用领域及场景分析

PostgreSQL有什么优点? 开源和免费 PostgreSQL是一个开源的数据库管理系统,可以免费使用和修改。这降低了企业的成本,并为开发者提供了一个活跃的社区和丰富的资源。 高度兼容 PostgreSQL支持多种操作系统(如Linux、Windows、macOS等)和编程语言(如C、C++、Java、Python、Ruby等),并提供了多种接口(如JDBC、ODBC、ADO.NET等

详解Tomcat 7的七大新特性和新增功能(1)

http://developer.51cto.com/art/201009/228537.htm http://tomcat.apache.org/tomcat-7.0-doc/index.html  Apache发布首个Tomcat 7版本已经发布了有一段时间了,Tomcat 7引入了许多新功能,并对现有功能进行了增强。很多文章列出了Tomcat 7的新功能,但大多数并没有详细解释它们

如何掌握面向对象编程的四大特性、Lambda 表达式及 I/O 流:全面指南

这里写目录标题 OOP语言的四大特性lambda输入/输出流(I/O流) OOP语言的四大特性 面向对象编程(OOP)是一种编程范式,它通过使用“对象”来组织代码。OOP 的四大特性是封装、继承、多态和抽象。这些特性帮助程序员更好地管理复杂的代码,使程序更易于理解和维护。 类-》实体的抽象类型 实体(属性,行为) -》 ADT(abstract data type) 属性-》成

《C++标准库》读书笔记/第一天(C++新特性(1))

C++11新特性(1) 以auto完成类型自动推导 auto i=42; //以auto声明的变量,其类型会根据其初值被自动推倒出来,因此一定需要一个初始化操作; static auto a=0.19;//可以用额外限定符修饰 vector<string> v;  auto pos=v.begin();//如果类型很长或类型表达式复杂 auto很有用; auto l=[] (int

12C 新特性,MOVE DATAFILE 在线移动 包括system, 附带改名 NID ,cdb_data_files视图坏了

ALTER DATABASE MOVE DATAFILE  可以改名 可以move file,全部一个命令。 resue 可以重用,keep好像不生效!!! system照移动不误-------- SQL> select file_name, status, online_status from dba_data_files where tablespace_name='SYSTEM'

Cmake之3.0版本重要特性及用法实例(十三)

简介: CSDN博客专家、《Android系统多媒体进阶实战》一书作者 新书发布:《Android系统多媒体进阶实战》🚀 优质专栏: Audio工程师进阶系列【原创干货持续更新中……】🚀 优质专栏: 多媒体系统工程师系列【原创干货持续更新中……】🚀 优质视频课程:AAOS车载系统+AOSP14系统攻城狮入门视频实战课 🚀 人生格言: 人生从来没有捷径,只有行动才是治疗恐惧

Java8特性:分组、提取字段、去重、过滤、差集、交集

总结下自己使用过的特性 将对象集合根据某个字段分组 //根据id分组Map<String, List<Bean>> newMap = successCf.stream().collect(Collectors.groupingBy(b -> b.getId().trim())); 获取对象集合里面的某个字段的集合 List<Bean> list = new ArrayList<>

【JVM】JVM栈帧中的动态链接 与 Java的面向对象特性--多态

栈帧 每一次方法调用都会有一个对应的栈帧被压入栈(虚拟机栈)中,每一个方法调用结束后,都会有一个栈帧被弹出。 每个栈帧中包括:局部变量表、操作数栈、动态链接、方法返回地址。 JavaGuide:Java内存区域详解(重点) 动态链接 动态链接:指向运行时常量池中该栈帧所属方法的引用。 多态 多态允许不同类的对象对同一消息做出响应,但表现出不同的行为(即方法的多样性)。 多态

HCIA--实验十:路由的递归特性

递归路由的理解 一、实验内容 1.需求/要求: 使用4台路由器,在AR1和AR4上分别配置一个LOOPBACK接口,根据路由的递归特性,写一系列的静态路由实现让1.1.1.1和4.4.4.4的双向通信。 二、实验过程 1.拓扑图: 2.步骤: (下列命令行可以直接复制在ensp) 1.如拓扑图所示,配置各路由器的基本信息: 各接口的ip地址及子网掩码,给AR1和AR4分别配置