Spark AQE SkewedJoin 在字节跳动的实践和优化

2023-12-14 21:40

本文主要是介绍Spark AQE SkewedJoin 在字节跳动的实践和优化,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

动手点关注

a2223026a21e6404cc894c8994873661.gif

干货不迷路

1. 概述

本文将首先介绍 Spark AQE SkewedJoin 的基本原理以及字节跳动在使用 AQE SkewedJoin 的实践中遇到的一些问题;其次介绍针对遇到的问题所做的相关优化和功能增强,以及相关优化在字节跳动的收益;此外,我们还将分享 SkewedJoin 的使用经验。

2. 背景

首先对 Spark AQE SkewedJoin 做一个简单的介绍。Spark Adaptive Query Execution, 简称 Spark AQE,总体思想是动态优化和修改 stage 的物理执行计划。利用执行结束的上游 stage 的统计信息(主要是数据量和记录数),来优化下游 stage 的物理执行计划。

33f44c2833a63155b2cf13e74e8b82dc.png

Spark AQE 能够在 stage 提交执行之前,根据上游 stage 的所有 MapTask 的统计信息,计算得到下游每个 ReduceTask 的 shuffle 输入,因此 Spark AQE 能够自动发现发生数据倾斜的 Join,并且做出优化处理,该功能就是 Spark AQE SkewedJoin。

8c31ba0dc97b7e31668fb8bb1fa0b45b.png

例如 A 表 inner join B 表,并且 A 表中第 0 个 partition(A0)是一个倾斜的 partition,正常情况下,A0 会和 B 表的第 0 个 partition(B0)发生 join,由于此时 A0 倾斜,task 0 就会成为长尾 task。

SkewedJoin 在执行 A Join B 之前,通过上游 stage 的统计信息,发现 partition A0 明显超过平均值的数倍,即判断 A Join B 发生了数据倾斜,且倾斜分区为 partition A0。Spark AQE 会将 A0 的数据拆成 N 份,使用 N 个 task 去处理该 partition,每个 task 只读取若干个 MapTask 的 shuffle 输出文件,如下图所示,A0-0 只会读取 Stage0#MapTask0 中属于 A0 的数据。这 N 个 Task 然后都读取 B 表 partition 0 的数据做 join。这 N 个 task 执行的结果和 A 表的 A0 join B0 的结果是等价的。

70cc78807ef570b0d11f74c7e483cc5f.png

不难看出,在这样的处理中,B 表的 partition 0 会被读取 N 次,虽然这增加了一定的额外成本,但是通过 N 个任务处理倾斜数据带来的收益仍然大于这样的成本。

Spark 从3.0 版本开始支持了 AQE SkewedJoin 功能,但是我们在实践中发现了一些问题。

  • 不准确的统计数据可能导致 Spark 无法识别数据倾斜。

  • 切分不均匀导致优化处理效果不理想。

  • 不支持复杂场景例如同一个字段发生连续 join。

我将在【优化增强】中详述这些问题以及我们的优化和解决方案。

3. 优化增强

3.1 提高数据倾斜的识别能力

由 Spark AQE 处理数据倾斜的原理不难发现,Spark AQE 识别倾斜以及切分数据倾斜的功能依赖于上游 Stage 的统计数据,统计数据越准确,倾斜的识别能力和处理能力就越高,直观表现就是倾斜数据被拆分的非常平均,拆分后的数据大小几乎和中位数一致,将长尾Task的影响降到最低。

MapStage 执行结束之后,每一个 MapTask 会生成统计结果 MapStatus,并将其发送给 Driver。MapStatus维护了一个 Array[Long],记录了该 MapTask 中属于下游每一个 ReduceTask 的数据大小。当 Driver 收集到了所有的 MapTask 的MapStatu之后,就能够计算得到每一个 ReduceTask 的输入数据量,以及分属于每一个上游 MapTask 的数据大小。根据每一个 ReduceTask 的数据大小,Spark AQE 能够判断出数据倾斜,并根据上游 MapTask 的统计信息,合理切分 Reducetask,尽可能保证切分的均匀性。

26ff24d71fd3a2d4ef1951bb808f029c.png

如下图描述,ReduceTask0 的 ShuffleRead(shuffle 过程中读取的数据量) 为 200,明显大于 ReduceTask1 和 ReduceTask2 的 100,发生了数据倾斜。我们可以将 ReduceTask0 拆成 2 份,ReduceTask0-0 读取 MapTask0 和 MapTask1 的数据,ReduceTask0-1 读取 MapTask2 和 MapTask3 的数据,拆分后的两个 task 的 ShuffleRead 均为 100。

我们可以看出,统计信息的大小的空间复杂度是 O(M*R),对于大任务而言,会占据大量的 Driver 内存,所以 Spark 原生做了限制,对于 MapTask,当下游 ReduceTask 个数大于某一阈值(spark.shuffle.minNumPartitionsToHighlyCompress,默认 2000),就会将MapStatus进行压缩,所有小于 spark.shuffle.accurateBlockThreshold(默认100M)的值都会被一个平均值所代替填充。

举个例子,下图是我们遇到的一个 SkewedJoin 没有生效的作业,从运行 metrics 来看,ShuffleRead 发生了很严重的倾斜,符合 SkewedJoin 生效的场景,但实际运行时并没有生效。

f84270edb923e521500c12ba4715b8f6.png

通过阅读日志,可以看到,Spark AQE 在运行时,获取的 join 两侧的 shuffle partitions 的中位数和最大值都是一样的,所以没有识别到任何的倾斜。这就是由于压缩后 MapStatus 的统计数据的不准确造成的。

2451bfae295f99f194b1cb40249896bf.png

我们在实践中,遇到很多大作业由于统计数据不准确,无法识别倾斜。而当我们尝试提高这一阈值之后,部分大作业由于 Driver 内存使用上涨而失败,为了解决这一问题,我们做了以下优化:

  1. Driver 收到详细的 MapStatus之后,先将数据用于更新每个 ReduceTask 的累计输入数据,然后将 MapStatus压缩,这样就不会占用太多内存。此时,虽然压缩后的 MapStatus无法让我们获得 ReduceTask 准确的上游分布,但是能够获得准确的 ReduceTask 的输入数据总大小,这样我们就能够准确的识别发生倾斜的 ReduceTask。

  1. 上述优化增加了一次 MapStatus 的解压操作,而 MapStatus 的解压是一个比较耗CPU的操作,对于大作业可能出现 Driver CPU 被打满,无法处理 Executor 心跳导致作业失败的情况。对此,我们使用缓存保证Driver端在消费 MapStatus 时,每个 MapStatus 只会被解压一次,大大降低了优化带来的 Overhead。

通过上述优化,我们成功在线上将默认阈值从 2000 调整为 5000,保证了线上 96.6% 的 Spark 作业能够准确的识别数据倾斜(如果存在)。

3.2 提高倾斜数据的切分均匀程度

由于 HighlyCompressMapStatus 用平均值填充所有低于 spark.shuffle.accurateBlockThreshold 的值,每个 ReduceTask 通过压缩后的 MapStatus 累加计算得到的总数据大小和数据分布,就和实际差距很大。

举个简单的例子:我们得到 ReduceTask0 的实际总数据是 1G,而中位数是 100M,因此我们的期望是将 ReduceTask0 拆成 10 份,每一份是 100M。此时上游的 MapStage 一共有 100 个 MapTask,除了 MapTask0 中属于 ReduceTask0 的数据是 100M,其他 99 个 MapStak 的数据都是 10M。当我们将所有的 MapStatus 压缩之后,AQE 获取的 ReduceTask0 的上游分布,就是 MapTask0 有 100M (因为大块数据所以被保留),其他 99 个 MapTask 的数据都是 1M(在压缩时使用平均值填充)。这时,Spark AQE 按照 100M 的期望值来切分,只会切分成两个 ReduceTask:ReduceTask0-0(读取MapTask0)和 ReduceTask0-1(读取剩下99个MapTask)。

基于此,我们改进后的方法是利用精确的 ReduceTask 数据量来反推每个 MapperTask 对应的数据量,得到尽可能准确的数据分布。同样是刚才的例子,我们已知 ReduceTask0 的实际总数据是 1G,MapStatus 压缩的阈值是 100M,那么可以确定的是,MapTask0 关于 ReduceTask0 的数据 100M 是准确被保留的(因为大于等于阈值),而其他 99 个 MapTask 的数据都是不准确的。此时 AQE 就不会使用被压缩的数据,而是通过 1G 的总数据反推得到其他 99 个 MapTask 中属于 ReduceTask0 的数据是 10M,虽然同样是存在误差的平均值,但是相比压缩数据,通过准确的总量反推得到的平均值会更加准确。这个时候 Spark 按照 100M 的期望值来切分,就会切成 10 个 ReduceTask,符合我们的预期。

而在实际应用中,利用新方案,AQE SkewedJoin 切分倾斜数据更加平均,优化效果有明显的提升。

下图是某个倾斜处理效果不理想的作业,SkewedJoin 生效后,该 Stage ShuffleReadSize 的中位数和最大值分别为 4M 和 9.9G。

c07138f8beafa7f87c88c356c7470c88.png

经过我们的优化后,该 Stage 的 ShuffleReadSize 的中位数和最大值分别为 149M 和 1427M,倾斜分区的切分更加均匀,该 Stage 的运行时间也由原来的 2h 降为 20m。

22efface34ef59a19cf62e9227518a63.png

3.3 支持更多的场景

  • 场景1:JoinWithAggOrWin

以下图为例,Stage10 虽然只有一个 SortMergeJoin,但是 join 的一边并不是 Sort+Exchange 的组合,而是存在 Aggregate 算子或者 Window 算子,因此不属于社区实现的范围内。

0387bcbbd382c73b84f61c103a6a28d0.png

  • 场景2:MultipleSkewedJoin

在用户的业务逻辑中,经常出现这样一种场景:一张表的主键需要连续的 join 多张表,这种场景体现在 Spark 的具体执行上,就是连续的 join 存在于同一个 Stage 当中。如下图所示 Stage21 中存在连续的多个 SortMergeJoin,而这种场景也是社区的实现无法优化的。

2e43f1d61a2e396526ab2917acd758be.png

  • 场景3:JoinWithUnion

Stage 中有 Union 算子,且 Union 的 children 中有 SMJ。

82c3089924f7f15f9b1cd6c0a6f645e0.png

此外,我们还支持了 ShuffleHashJoin、 BucketJoin、MultipleJoinWithAggOrWin 等更多场景。

4. 字节的实践

上面介绍的 LAS 对 Spark AQE SkewedJoin 的优化功能在字节跳动内部已使用 1 年左右,截止 2022年8月,优化日均覆盖1.8万+ Spark 作业,优化命中作业平均性能提升 35% 左右,其中 30% 被优化的 Spark 作业所属于的场景是 LAS 自研支持的,大家可以通过火山引擎开通 LAS 服务并体验这些优化功能。

5. 用户指南

5.1 哪些场景 AQE SkewedJoin 不支持

AQE SkewedJoin 功能并不能处理所有发生数据倾斜的 Join,这是由它的实现逻辑所决定的。

第一,如果倾斜的分区的大部分数据来自于上游的同一个 Mapper,AQE SkewedJoin 无法处理,原因是 Spark 不支持 Reduce Task 只读取上游 Mapper 的一个 block 的部分数据。

第二,如果 Join 的发生倾斜的一侧存在 Agg 或者 Window 这类有指定 requiredChildDistribution 的算子,那么 SkewedJoin 优化无法处理,因为将分区切分会破坏 RDD 的 outputPartitioning,导致不再满足 requiredChildDistribution。

第三,对于 Outer/Semi Join,AQE SkewedJoin 是无法处理非 Outer/Semi 侧的数据倾斜。比如,对于 LeftOuter Join,SkewedJoin 无法处理右侧的数据倾斜。

第四,AQE 无法处理倾斜的 BroadcastHashJoin。

5.2 AQE SkewedJoin 优化效果不明显时的措施

如果遇到了符合应用场景但是 SkewedJoin 没有生效或者倾斜处理效果不理想的情况,有以下调优手段:

  • 提高 spark.shuffle.minNumPartitionsToHighlyCompress,保证值大于等于 shuffle 并发(当开启 AQE 时,即为spark.sql.adaptive.coalescePartitions.initialPartitionNum)。

  • 调小 spark.shuffle.accurateBlockThreshold,比如 4M。但是需要注意的是,这会增加 Driver 的内存消耗,需要同步增加 Driver 的 cpu 和内存。

  • 降低 spark.sql.adaptive.skewJoin.skewedPartitionFactor,降低定义发生倾斜的阈值。

6. 总结

本文首先简单介绍了 Spark AQE 的基本思想以及 SkewedJoin 功能的原理,接着提出了我们在应用 SkewedJoin的过程中遇到的一些问题。针对这些问题,我们介绍了对 AQE SkewedJoin 做的优化和增强——提高统计的准确度;提高倾斜数据的切分均匀程度;支持了更多的场景。接着,本文介绍了 AQE SkewedJoin 在字节跳动的使用情况,包括日均优化覆盖作业和优化效果,其中30%被优化的 Spark 作业所属于的场景是字节自研支持的。最后分享了我们关于 AQE SkewedJoin 的用户指南:哪些场景 AQE SkewedJoin 不支持;当 AQE SkewedJoin 效果不明显时,可以采取哪些措施。

7. 附录A :本文涉及的关于 AQE SkewedJoin 优化的相关参数配置

参数配置名默认值参数意义
spark.shuffle.minNumPartitionsToHighlyCompress2000决定 Mapstatus 使用 HighlyCompressedMapStatus还是 CompressedMapStatus 的阈值,如果 huffle partition 大于该值,则使用 HighlyCompressedMapStatus。
spark.shuffle.accurateBlockThreshold100MHighlyCompressedMapStatus 中记录 shuffle blcok 准确大小的阈值,当 block 小于该值则用平均值代替。
spark.sql.adaptive.skewJoin.skewedPartitionFactor10如果一个 partition 大于该因子乘以分区大小的中位数,那么它就是倾斜的 partition。

8. 关于我们

火山引擎湖仓一体分析服务 LAS

支持构建开源Hadoop生态的企业级大数据分析系统,完全兼容开源,提供 Hadoop、Spark、Hive、Flink集成和管理,帮助用户轻松完成企业大数据平台的构建,降低运维门槛,快速形成大数据分析能力。点击阅读原文立即体验产品!

欢迎加入字节跳动数据平台官方群,进行数据技术交流、获取更多内容干货。

4fcbe6054f6b70250e787d382a39728e.png

点击阅读原文,了解火山引擎湖仓一体分析服务LAS

这篇关于Spark AQE SkewedJoin 在字节跳动的实践和优化的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493980

相关文章

Vue3 的 shallowRef 和 shallowReactive:优化性能

大家对 Vue3 的 ref 和 reactive 都很熟悉,那么对 shallowRef 和 shallowReactive 是否了解呢? 在编程和数据结构中,“shallow”(浅层)通常指对数据结构的最外层进行操作,而不递归地处理其内部或嵌套的数据。这种处理方式关注的是数据结构的第一层属性或元素,而忽略更深层次的嵌套内容。 1. 浅层与深层的对比 1.1 浅层(Shallow) 定义

基于MySQL Binlog的Elasticsearch数据同步实践

一、为什么要做 随着马蜂窝的逐渐发展,我们的业务数据越来越多,单纯使用 MySQL 已经不能满足我们的数据查询需求,例如对于商品、订单等数据的多维度检索。 使用 Elasticsearch 存储业务数据可以很好的解决我们业务中的搜索需求。而数据进行异构存储后,随之而来的就是数据同步的问题。 二、现有方法及问题 对于数据同步,我们目前的解决方案是建立数据中间表。把需要检索的业务数据,统一放到一张M

HDFS—存储优化(纠删码)

纠删码原理 HDFS 默认情况下,一个文件有3个副本,这样提高了数据的可靠性,但也带来了2倍的冗余开销。 Hadoop3.x 引入了纠删码,采用计算的方式,可以节省约50%左右的存储空间。 此种方式节约了空间,但是会增加 cpu 的计算。 纠删码策略是给具体一个路径设置。所有往此路径下存储的文件,都会执行此策略。 默认只开启对 RS-6-3-1024k

字节面试 | 如何测试RocketMQ、RocketMQ?

字节面试:RocketMQ是怎么测试的呢? 答: 首先保证消息的消费正确、设计逆向用例,在验证消息内容为空等情况时的消费正确性; 推送大批量MQ,通过Admin控制台查看MQ消费的情况,是否出现消费假死、TPS是否正常等等问题。(上述都是临场发挥,但是RocketMQ真正的测试点,还真的需要探讨) 01 先了解RocketMQ 作为测试也是要简单了解RocketMQ。简单来说,就是一个分

使用opencv优化图片(画面变清晰)

文章目录 需求影响照片清晰度的因素 实现降噪测试代码 锐化空间锐化Unsharp Masking频率域锐化对比测试 对比度增强常用算法对比测试 需求 对图像进行优化,使其看起来更清晰,同时保持尺寸不变,通常涉及到图像处理技术如锐化、降噪、对比度增强等 影响照片清晰度的因素 影响照片清晰度的因素有很多,主要可以从以下几个方面来分析 1. 拍摄设备 相机传感器:相机传

MySQL高性能优化规范

前言:      笔者最近上班途中突然想丰富下自己的数据库优化技能。于是在查阅了多篇文章后,总结出了这篇! 数据库命令规范 所有数据库对象名称必须使用小写字母并用下划线分割 所有数据库对象名称禁止使用mysql保留关键字(如果表名中包含关键字查询时,需要将其用单引号括起来) 数据库对象的命名要能做到见名识意,并且最后不要超过32个字符 临时库表必须以tmp_为前缀并以日期为后缀,备份

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

从状态管理到性能优化:全面解析 Android Compose

文章目录 引言一、Android Compose基本概念1.1 什么是Android Compose?1.2 Compose的优势1.3 如何在项目中使用Compose 二、Compose中的状态管理2.1 状态管理的重要性2.2 Compose中的状态和数据流2.3 使用State和MutableState处理状态2.4 通过ViewModel进行状态管理 三、Compose中的列表和滚动

Prometheus与Grafana在DevOps中的应用与最佳实践

Prometheus 与 Grafana 在 DevOps 中的应用与最佳实践 随着 DevOps 文化和实践的普及,监控和可视化工具已成为 DevOps 工具链中不可或缺的部分。Prometheus 和 Grafana 是其中最受欢迎的开源监控解决方案之一,它们的结合能够为系统和应用程序提供全面的监控、告警和可视化展示。本篇文章将详细探讨 Prometheus 和 Grafana 在 DevO