本文主要是介绍Spark AQE DataSkew 处理过程中考虑的一些Case,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Spark AQE 在 DataSkew 处理过程中,需要考虑一些边界条件,否则可能会引入一些额外的Shuffle。
EnsureRequirements
在开始今天的Topic之前,需要先回顾一下 EnsureRequirements, 熟悉的同学请跳过。
EnsureRequirements 是为了保证Spark 算子的数据输入要求,在算子之间引入Shuffle的核心工具。
outputPartitioning
算子的 Partitioning 表示该算子的数据在集群上是如何分布的。
- SparkPlan: SparkPlan 中默认数据分布是,UnknownPartitioning(0)
- 实际的 SparkPlan 如果没有特殊说明,其数据分布是子节点的数据分布,即
override def outputPartitioning: Partitioning = child.outputPartitioning
. - 对算子的OutputPartition 有特殊定义的有如下一些算子
- 通过参数直接 outputPartitioning, 比如 ShuffleExchangeExec,这个直接将shuffle的数据安装partitioning的格式给组织好
- 因为自定义了Shuffle 的数据组织形式,导致
child.outputPartitioning
不再有效的,比如:PartitionRecombinationExec
,CoalesceExec
- 数据参与
Aggregate
计算,此时数据分布一定要是ClusteredDistribution(Final Aggregate Expressions)
- HashJoin: 因为我们不关心BuildSide 数据分布,所以取的是 StreamSide 的 outputPartitioning
- ShuffledJoin
- InnerLike: 这个返回的是
PartitioningCollection(Seq(left.outputPartitioning, right.outputPartitioning))
这种特殊的数据分区 - LeftOuter | LeftExistence :
left.outputPartitioning
- RightOuter :
right.outputPartitioning
- InnerLike: 这个返回的是
- SMJ: 如果是SkewJoin, 返回 UnknownPartitioning;否则按照 ShuffledJoin 计算
Data Skew Case 1
Query
Test case 参数
Config Name | Config String | Config Value |
---|---|---|
ADAPTIVE_EXECUTION_ENABLED | spark.sql.adaptive.enabled | true |
AUTO_BROADCASTJOIN_THRESHOLD | spark.sql.autoBroadcastJoinThreshold | -1 |
SKEW_JOIN_SKEWED_PARTITION_THRESHOLD | spark.sql.adaptive.skewJoin.skewedPartitionThresholdInBytes | 100 |
ADVISORY_PARTITION_SIZE_IN_BYTES | spark.sql.adaptive.advisoryPartitionSizeInBytes | 100 |
ALLOW_ADDITIONAL_SHUFFLE | spark.sql.adaptive.allowAdditionalShuffle | false |
SELECT key1
FROM (select id % 3 as key1, id as value1 from range(1, 1000)
) as skewData1
JOIN (select id % 1 as key2, id as value2 from range(1, 1000)
) as skewData2
ON key1 = key2
GROUP BY key1
DAG分析
Data Exchange
在做Join之前,这里两张表都进行了数据Shuffle,我们在 TestSQLContext
中配置了 test suite 的 SHUFFLE_PARTITIONS = 5,所以Shuffle结果如下:
validMetrics = {$colon$colon@14794} "::" size = 20 = {MapOutputStatistics@14802} # 对应算子 Exchange hashpartitioning(key1#276L, 5), ENSURE_REQUIREMENTS, [id=#143]shuffleId = 0bytesByPartitionId = {long[5]@14804} [660, 0, 0, 660, 660]1 = {MapOutputStatistics@14803} # 对应算子 Exchange hashpartitioning(key2#282L, 5), ENSURE_REQUIREMENTS, [id=#154]shuffleId = 1bytesByPartitionId = {long[5]@14805} [720, 0, 0, 0, 0]
CustomShuffleReader coalesced
在AQE里,为了减少数据的Shuffle,AQE 的 CoalesceShufflePartitions Rule 支持对多个小的Map结果进行合并读。
测试UT中配置参数 ADVISORY_PARTITION_SIZE_IN_BYTES = 100
。在Rule的日志中显示: For shuffle(0, 1), advisory target size: 100, actual target size 100.
最终 Shuffle partitionSpecs:
partitionSpecs = {ArrayBuffer@14877} "ArrayBuffer" size = 40 = {CoalescedPartitionSpec@14883} "CoalescedPartitionSpec(0,1)"1 = {CoalescedPartitionSpec@14884} "CoalescedPartitionSpec(1,3)"2 = {CoalescedPartitionSpec@14885} "CoalescedPartitionSpec(3,4)"3 = {CoalescedPartitionSpec@14886} "CoalescedPartitionSpec(4,5)"
CoalescedPartitionSpec(0,1) 读取的数据为 660 + 720,为数据倾斜分区。
尝试应用 OptimizeSkewedJoin
- 通过签名的分析,发现该Query Data Skew
- 如果此时我们对 SMJ 应用 OptimizeSkewedJoin,SMJ 算子的
outputPartitioning = UnknownPartitioning
(因为Skew数据处理改变了数据分布)。再次进行 EnsureRequirements 检查时,不满足后面的FinalAggregate
要求的DistributionClusteredDistribution(Seq(keys=[key1#286L])) :: Nil
,所以要引入额外的 Shuffle.
(这里 PartialAggregate 的requiredChildDistributionExpressions = None
所以对应UnspecifiedDistribution
可以先忽略) - 如果没有引入进行Skew处理呢?此时SMJ 的 outputPartitioning 为
PartitioningCollection(Seq(hashpartitioning(key1#286L, 5), hashpartitioning(key2#288L, 5)))
满足后面的FinalAggregate
要求的DistributionClusteredDistribution(Seq(keys=[key1#286L])) :: Nil
(其中一个partition 马满足就够了),所以这里也没有额外的Shuffle - 通过上面判定,如果进行Data Skew 处理会引入额外Shuffle,所以,不能进行Data Skew 处理
允许引入额外Shuffle
在正常情况下,DAG 增加一个额外的Shuffle,是一个非常重的操作,但是当发生Skew 时,对应的Task可能非常的长尾,引入额外Shuffle,增加倾斜的数据的并行处理,有可能会更快。我们线上是允许在Skew Join的时候增加一个额外的Shuffle,即 spark.sql.adaptive.allowAdditionalShuffle = true
所以对应的DAG 如下
这篇关于Spark AQE DataSkew 处理过程中考虑的一些Case的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!