Spark AQE DataSkew 处理过程中考虑的一些Case

2023-12-14 21:40

本文主要是介绍Spark AQE DataSkew 处理过程中考虑的一些Case,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark AQE 在 DataSkew 处理过程中,需要考虑一些边界条件,否则可能会引入一些额外的Shuffle。

EnsureRequirements

在开始今天的Topic之前,需要先回顾一下 EnsureRequirements, 熟悉的同学请跳过。

EnsureRequirements 是为了保证Spark 算子的数据输入要求,在算子之间引入Shuffle的核心工具。

outputPartitioning

算子的 Partitioning 表示该算子的数据在集群上是如何分布的。

  • SparkPlan: SparkPlan 中默认数据分布是,UnknownPartitioning(0)
  • 实际的 SparkPlan 如果没有特殊说明,其数据分布是子节点的数据分布,即 override def outputPartitioning: Partitioning = child.outputPartitioning.
  • 对算子的OutputPartition 有特殊定义的有如下一些算子
    • 通过参数直接 outputPartitioning, 比如 ShuffleExchangeExec,这个直接将shuffle的数据安装partitioning的格式给组织好
    • 因为自定义了Shuffle 的数据组织形式,导致 child.outputPartitioning 不再有效的,比如: PartitionRecombinationExec, CoalesceExec
    • 数据参与 Aggregate 计算,此时数据分布一定要是 ClusteredDistribution(Final Aggregate Expressions)
    • HashJoin: 因为我们不关心BuildSide 数据分布,所以取的是 StreamSide 的 outputPartitioning
    • ShuffledJoin
      • InnerLike: 这个返回的是 PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning)) 这种特殊的数据分区
      • LeftOuter | LeftExistence : left.outputPartitioning
      • RightOuter : right.outputPartitioning
    • SMJ: 如果是SkewJoin, 返回 UnknownPartitioning;否则按照 ShuffledJoin 计算

Data Skew Case 1

Query

Test case 参数

Config NameConfig StringConfig Value
ADAPTIVE_EXECUTION_ENABLEDspark.sql.adaptive.enabledtrue
AUTO_BROADCASTJOIN_THRESHOLDspark.sql.autoBroadcastJoinThreshold-1
SKEW_JOIN_SKEWED_PARTITION_THRESHOLDspark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes100
ADVISORY_PARTITION_SIZE_IN_BYTESspark.sql.adaptive.advisoryPartitionSizeInBytes100
ALLOW_ADDITIONAL_SHUFFLEspark.sql.adaptive.allowAdditionalShufflefalse
SELECT key1 
FROM (select id % 3 as key1, id as value1 from range(1, 1000) 
) as skewData1 
JOIN (select id % 1 as key2, id as value2 from range(1, 1000) 
) as skewData2
ON key1 = key2
GROUP BY key1

DAG分析

在这里插入图片描述

Data Exchange

在做Join之前,这里两张表都进行了数据Shuffle,我们在 TestSQLContext 中配置了 test suite 的 SHUFFLE_PARTITIONS = 5,所以Shuffle结果如下:

validMetrics = {$colon$colon@14794} "::" size = 20 = {MapOutputStatistics@14802}    # 对应算子 Exchange hashpartitioning(key1#276L, 5), ENSURE_REQUIREMENTS, [id=#143]shuffleId = 0bytesByPartitionId = {long[5]@14804} [660, 0, 0, 660, 660]1 = {MapOutputStatistics@14803}    # 对应算子 Exchange hashpartitioning(key2#282L, 5), ENSURE_REQUIREMENTS, [id=#154]shuffleId = 1bytesByPartitionId = {long[5]@14805} [720, 0, 0, 0, 0]

CustomShuffleReader coalesced

在AQE里,为了减少数据的Shuffle,AQE 的 CoalesceShufflePartitions Rule 支持对多个小的Map结果进行合并读。
测试UT中配置参数 ADVISORY_PARTITION_SIZE_IN_BYTES = 100。在Rule的日志中显示: For shuffle(0, 1), advisory target size: 100, actual target size 100.
最终 Shuffle partitionSpecs:

partitionSpecs = {ArrayBuffer@14877} "ArrayBuffer" size = 40 = {CoalescedPartitionSpec@14883} "CoalescedPartitionSpec(0,1)"1 = {CoalescedPartitionSpec@14884} "CoalescedPartitionSpec(1,3)"2 = {CoalescedPartitionSpec@14885} "CoalescedPartitionSpec(3,4)"3 = {CoalescedPartitionSpec@14886} "CoalescedPartitionSpec(4,5)"

CoalescedPartitionSpec(0,1) 读取的数据为 660 + 720,为数据倾斜分区。

尝试应用 OptimizeSkewedJoin

  • 通过签名的分析,发现该Query Data Skew
  • 如果此时我们对 SMJ 应用 OptimizeSkewedJoin,SMJ 算子的 outputPartitioning = UnknownPartitioning (因为Skew数据处理改变了数据分布)。再次进行 EnsureRequirements 检查时,不满足后面的 FinalAggregate 要求的Distribution ClusteredDistribution(Seq(keys=[key1#286L])) :: Nil ,所以要引入额外的 Shuffle.
    (这里 PartialAggregate 的 requiredChildDistributionExpressions = None 所以对应 UnspecifiedDistribution 可以先忽略)
  • 如果没有引入进行Skew处理呢?此时SMJ 的 outputPartitioning 为PartitioningCollection(Seq(hashpartitioning(key1#286L, 5), hashpartitioning(key2#288L, 5))) 满足后面的 FinalAggregate 要求的Distribution ClusteredDistribution(Seq(keys=[key1#286L])) :: Nil (其中一个partition 马满足就够了),所以这里也没有额外的Shuffle
  • 通过上面判定,如果进行Data Skew 处理会引入额外Shuffle,所以,不能进行Data Skew 处理

允许引入额外Shuffle

在正常情况下,DAG 增加一个额外的Shuffle,是一个非常重的操作,但是当发生Skew 时,对应的Task可能非常的长尾,引入额外Shuffle,增加倾斜的数据的并行处理,有可能会更快。我们线上是允许在Skew Join的时候增加一个额外的Shuffle,即 spark.sql.adaptive.allowAdditionalShuffle = true 所以对应的DAG 如下
在这里插入图片描述

这篇关于Spark AQE DataSkew 处理过程中考虑的一些Case的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/493973

相关文章

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering)

Spark MLlib模型训练—聚类算法 PIC(Power Iteration Clustering) Power Iteration Clustering (PIC) 是一种基于图的聚类算法,用于在大规模数据集上进行高效的社区检测。PIC 算法的核心思想是通过迭代图的幂运算来发现数据中的潜在簇。该算法适用于处理大规模图数据,特别是在社交网络分析、推荐系统和生物信息学等领域具有广泛应用。Spa

分布式系统的主要考虑

异构性:分布式系统由于基于不同的网路、操作系统、计算机硬件和编程语言来构造,必须要考虑一种通用的网络通讯协议来屏蔽异构系统之间的禅意。一般交由中间件来处理这些差异。缺乏全球时钟:在程序需要协作时,它们通过交换消息来协调它们的动作。紧密的协调经常依赖于对程序动作发生时间的共识,但是,实际上网络上计算机同步时钟的准确性受到极大的限制,即没有一个正确时间的全局概念。这是通过网络发送消息作为唯一的通信方式

Python 中考虑 concurrent.futures 实现真正的并行计算

Python 中考虑 concurrent.futures 实现真正的并行计算 思考,如何将代码所要执行的计算任务划分成多个独立的部分并在各自的核心上面平行地运行。 Python 的全局解释器锁(global interpreter lock,GIL)导致没办法用线程来实现真正的并行​,所以先把这种方案排除掉。另一种常见的方案,是把那些对性能要求比较高的(performance-critica

王立平--switch case

@Override public void onClick(View v) {   switch (v.getId()) { 1. case R.id.btn_addPic: break; 2. case R.id.btn_reflectPic: break; default: break; } } 如果黑色字体的break你忘记了写。。。 那么程序就会从进入swit

【spark 读写数据】数据源的读写操作

通用的 Load/Save 函数 在最简单的方式下,默认的数据源(parquet 除非另外配置通过spark.sql.sources.default)将会用于所有的操作。 Parquet 是一个列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL 支持对 Parquet 文件的读写还可以自动的保存源数据的模式 val usersDF = spark.read.load("e

Python中 Switch/Case 实现

学习Python过程中,发现没有switch-case,过去写C习惯用Switch/Case语句,官方文档说通过if-elif实现。所以不妨自己来实现Switch/Case功能。 方法一 通过字典实现 def foo(var):return {'a': 1,'b': 2,'c': 3,}.get(var,'error') #'error'为默认返回值,可自设置 方法二 通过匿名函数

case when 与 decode 用法

case when 在不同条件需要有不同返回值的情况下使用非常方便,可以在给变量赋值时使用,也可以在select查询语句中使用。 case搜索语句格式: case  when 条件1 then 返回值1  when 条件2 then 返回值2  ...  else 返回值N  end; case when使用示例代码: select empno,ename,job,cas

Spark数据介绍

从趋势上看,DataFrame 和 Dataset 更加流行。 示例场景 数据仓库和 BI 工具集成: 如果你需要处理存储在数据仓库中的结构化数据,并且希望与 BI 工具集成,那么 DataFrame 和 Dataset 是首选。 机器学习流水线: 在构建机器学习流水线时,使用 DataFrame 和 Dataset 可以更好地管理数据流,并且可以方便地与 MLlib 集成。 实时数据处理:

Mac搭建华为云平台Hadoop+spark步骤

1、安装终端和文件传输软件 下载、安装、配置 详戳数据平台搭建文件夹 Transmit 用于文件传输 iTerm2    用于终端 2、连接与登录 mac 使用iTerm2快捷登录远程服务器 Mac Transmit连接 (密码不可复制,手动输入) 3、安装jdk 4、修改主机名 Linux系统下如何修改主机名 4、安装配置hadoop

Spark-在集群上运行Spark

Spark-在集群上运行Spark