Spark AQE 导致的 Driver OOM问题

2024-04-27 04:20
文章标签 问题 导致 driver spark oom aqe

本文主要是介绍Spark AQE 导致的 Driver OOM问题,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

背景

最近在做Spark 3.1 升级 Spark 3.5的过程中,遇到了一批SQL在运行的过程中 Driver OOM的情况,排查到是AQE开启导致的问题,再次分析记录一下,顺便了解一下Spark中指标的事件处理情况

结论

SQLAppStatusListener 类在内存中存放着 一个整个SQL查询链的所有stage以及stage的指标信息,在AQE中 一个job会被拆分成很多job,甚至几百上千的job,这个时候 stageMetrics的数据就会成百上倍的被存储在内存中,从而导致Driver OOM
解决方法:

  1. 关闭AQE spark.sql.adaptive.enabled false
  2. 合并对应的PR-SPARK-45439

分析

背景知识:对于一个完整链接的sql语句来说(比如说从 读取数据源,到 数据处理操作,再到插入hive表),这可以称其为一个最小的SQL执行单元,这最小的数据执行单元在Spark内部是可以跟踪的,也就是用executionId来进行跟踪的。
对于一个sql,举例来说 :

insert into  TableA select * from TableB;

在生成 物理计划的过程中会调用 QueryExecution.assertOptimized 方法,该方法会触发eagerlyExecuteCommands调用,最终会到SQLExecution.withNewExecutionId方法:

  def assertOptimized(): Unit = optimizedPlan...lazy val commandExecuted: LogicalPlan = mode match {case CommandExecutionMode.NON_ROOT => analyzed.mapChildren(eagerlyExecuteCommands)case CommandExecutionMode.ALL => eagerlyExecuteCommands(analyzed)case CommandExecutionMode.SKIP => analyzed}...lazy val optimizedPlan: LogicalPlan = {// We need to materialize the commandExecuted here because optimizedPlan is also tracked under// the optimizing phaseassertCommandExecuted()executePhase(QueryPlanningTracker.OPTIMIZATION) {// clone the plan to avoid sharing the plan instance between different stages like analyzing,// optimizing and planning.val plan =sparkSession.sessionState.optimizer.executeAndTrack(withCachedData.clone(), tracker)// We do not want optimized plans to be re-analyzed as literals that have been constant// folded and such can cause issues during analysis. While `clone` should maintain the// `analyzed` state of the LogicalPlan, we set the plan as analyzed here as well out of// paranoia.plan.setAnalyzed()plan}def assertCommandExecuted(): Unit = commandExecuted...private def eagerlyExecuteCommands(p: LogicalPlan) = p transformDown {case c: Command =>// Since Command execution will eagerly take place here,// and in most cases be the bulk of time and effort,// with the rest of processing of the root plan being just outputting command results,// for eagerly executed commands we mark this place as beginning of execution.tracker.setReadyForExecution()val qe = sparkSession.sessionState.executePlan(c, CommandExecutionMode.NON_ROOT)val name = commandExecutionName(c)val result = QueryExecution.withInternalError(s"Eagerly executed $name failed.") {SQLExecution.withNewExecutionId(qe, Some(name)) {qe.executedPlan.executeCollect()}}  

SQLExecution.withNewExecutionId主要的作用是设置当前计划的所属的executionId:

    val executionId = SQLExecution.nextExecutionIdsc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)

EXECUTION_ID_KEY的值会在JobStart的时候传递给Event,以便记录跟踪整个执行过程中的指标信息。
同时我们在方法中eagerlyExecuteCommands看到qe.executedPlan.executeCollect()这是具体的执行方法,针对于insert into 操作来说,物理计划就是
InsertIntoHadoopFsRelationCommand,这里的run方法最终会流转到DAGScheduler.submitJob方法:

    eventProcessLoop.post(JobSubmitted(jobId, rdd, func2, partitions.toArray, callSite, waiter,JobArtifactSet.getActiveOrDefault(sc),Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleJobSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理,这里有会涉及到指标信息的存储,这里我们截图出dump的内存占用情况:
在这里插入图片描述

可以看到 SQLAppStatusListener 的 LiveStageMetrics 占用很大,也就是 accumIdsToMetricType占用很大

那在AQE中是怎么回事呢?
我们知道再AQE中,任务会从source节点按照shuffle进行分割,从而形成单独的job,从而生成对应的shuffle指标,具体的分割以及执行代码在AdaptiveSparkPlanExec.getFinalPhysicalPlan中,如下:

      var result = createQueryStages(currentPhysicalPlan)val events = new LinkedBlockingQueue[StageMaterializationEvent]()val errors = new mutable.ArrayBuffer[Throwable]()var stagesToReplace = Seq.empty[QueryStageExec]while (!result.allChildStagesMaterialized) {currentPhysicalPlan = result.newPlanif (result.newStages.nonEmpty) {stagesToReplace = result.newStages ++ stagesToReplaceexecutionId.foreach(onUpdatePlan(_, result.newStages.map(_.plan)))// SPARK-33933: we should submit tasks of broadcast stages first, to avoid waiting// for tasks to be scheduled and leading to broadcast timeout.// This partial fix only guarantees the start of materialization for BroadcastQueryStage// is prior to others, but because the submission of collect job for broadcasting is// running in another thread, the issue is not completely resolved.val reorderedNewStages = result.newStages.sortWith {case (_: BroadcastQueryStageExec, _: BroadcastQueryStageExec) => falsecase (_: BroadcastQueryStageExec, _) => truecase _ => false}// Start materialization of all new stages and fail fast if any stages failed eagerlyreorderedNewStages.foreach { stage =>try {stage.materialize().onComplete { res =>if (res.isSuccess) {events.offer(StageSuccess(stage, res.get))} else {events.offer(StageFailure(stage, res.failed.get))}// explicitly clean up the resources in this stagestage.cleanupResources()}(AdaptiveSparkPlanExec.executionContext)

这里就是得看stage.materialize()这个方法,这两个stage只有两类:BroadcastQueryStageExec 和 ShuffleQueryStageExec
这两个物理计划稍微分析一下如下:

  • BroadcastQueryStageExec
    数据流如下:
    broadcast.submitBroadcastJob||\/
    promise.future||\/
    relationFuture||\/
    child.executeCollectIterator()
    其中 promise的设置在relationFuture方法中,而relationFuture 会被doPrepare调用,而submitBroadcastJob会调用executeQuery,从而调用doPrepare,executeCollectIterator()最终也会发送JobSubmitted事件,分析和上面的一样
  • ShuffleQueryStageExec
     shuffle.submitShuffleJob||\/sparkContext.submitMapStage(shuffleDependency)||\/dagScheduler.submitMapStage

submitMapStage会发送MapStageSubmitted事件:

    eventProcessLoop.post(MapStageSubmitted(jobId, dependency, callSite, waiter, JobArtifactSet.getActiveOrDefault(sc),Utils.cloneProperties(properties)))

最终会被DAGScheduler.handleMapStageSubmitted处理,其中会发送SparkListenerJobStart事件:

    listenerBus.post(SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos,Utils.cloneProperties(properties)))

该事件会被SQLAppStatusListener捕获,从而转到onJobStart处理:

  private val liveExecutions = new ConcurrentHashMap[Long, LiveExecutionData]()private val stageMetrics = new ConcurrentHashMap[Int, LiveStageMetrics]()...override def onJobStart(event: SparkListenerJobStart): Unit = {val executionIdString = event.properties.getProperty(SQLExecution.EXECUTION_ID_KEY)if (executionIdString == null) {// This is not a job created by SQLreturn}val executionId = executionIdString.toLongval jobId = event.jobIdval exec = Option(liveExecutions.get(executionId))

该方法会获取事件中的executionId,在AQE中,同一个执行单元的executionId是一样的,所以stageMetrics内存占用会越来越大。
而这里指标的更新是在AdaptiveSparkPlanExec.onUpdatePlan等方法中。

这样整个事件的数据流以及问题的产生原因就应该很清楚了。

其他

为啥AQE以后多个Job还是共享一个executionId呢?因为原则上来说,如果没有开启AQE之前,一个SQL执行单元的是属于同一个Job的,开启了AQE之后,因为AQE的原因,一个Job被拆成了了多个Job,但是从逻辑上来说,还是属于同一个SQL处理单元的所以还是得归属到一次执行中。

这篇关于Spark AQE 导致的 Driver OOM问题的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/939505

相关文章

springboot循环依赖问题案例代码及解决办法

《springboot循环依赖问题案例代码及解决办法》在SpringBoot中,如果两个或多个Bean之间存在循环依赖(即BeanA依赖BeanB,而BeanB又依赖BeanA),会导致Spring的... 目录1. 什么是循环依赖?2. 循环依赖的场景案例3. 解决循环依赖的常见方法方法 1:使用 @La

SpringBoot启动报错的11个高频问题排查与解决终极指南

《SpringBoot启动报错的11个高频问题排查与解决终极指南》这篇文章主要为大家详细介绍了SpringBoot启动报错的11个高频问题的排查与解决,文中的示例代码讲解详细,感兴趣的小伙伴可以了解一... 目录1. 依赖冲突:NoSuchMethodError 的终极解法2. Bean注入失败:No qu

MySQL新增字段后Java实体未更新的潜在问题与解决方案

《MySQL新增字段后Java实体未更新的潜在问题与解决方案》在Java+MySQL的开发中,我们通常使用ORM框架来映射数据库表与Java对象,但有时候,数据库表结构变更(如新增字段)后,开发人员可... 目录引言1. 问题背景:数据库与 Java 实体不同步1.1 常见场景1.2 示例代码2. 不同操作

如何解决mysql出现Incorrect string value for column ‘表项‘ at row 1错误问题

《如何解决mysql出现Incorrectstringvalueforcolumn‘表项‘atrow1错误问题》:本文主要介绍如何解决mysql出现Incorrectstringv... 目录mysql出现Incorrect string value for column ‘表项‘ at row 1错误报错

如何解决Spring MVC中响应乱码问题

《如何解决SpringMVC中响应乱码问题》:本文主要介绍如何解决SpringMVC中响应乱码问题,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring MVC最新响应中乱码解决方式以前的解决办法这是比较通用的一种方法总结Spring MVC最新响应中乱码解

pip无法安装osgeo失败的问题解决

《pip无法安装osgeo失败的问题解决》本文主要介绍了pip无法安装osgeo失败的问题解决,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面随着小编来一... 进入官方提供的扩展包下载网站寻找版本适配的whl文件注意:要选择cp(python版本)和你py

解决Java中基于GeoTools的Shapefile读取乱码的问题

《解决Java中基于GeoTools的Shapefile读取乱码的问题》本文主要讨论了在使用Java编程语言进行地理信息数据解析时遇到的Shapefile属性信息乱码问题,以及根据不同的编码设置进行属... 目录前言1、Shapefile属性字段编码的情况:一、Shp文件常见的字符集编码1、System编码

Spring MVC使用视图解析的问题解读

《SpringMVC使用视图解析的问题解读》:本文主要介绍SpringMVC使用视图解析的问题解读,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录Spring MVC使用视图解析1. 会使用视图解析的情况2. 不会使用视图解析的情况总结Spring MVC使用视图

Redis解决缓存击穿问题的两种方法

《Redis解决缓存击穿问题的两种方法》缓存击穿问题也叫热点Key问题,就是⼀个被高并发访问并且缓存重建业务较复杂的key突然失效了,无数的请求访问会在瞬间给数据库带来巨大的冲击,本文给大家介绍了Re... 目录引言解决办法互斥锁(强一致,性能差)逻辑过期(高可用,性能优)设计逻辑过期时间引言缓存击穿:给

Java程序运行时出现乱码问题的排查与解决方法

《Java程序运行时出现乱码问题的排查与解决方法》本文主要介绍了Java程序运行时出现乱码问题的排查与解决方法,包括检查Java源文件编码、检查编译时的编码设置、检查运行时的编码设置、检查命令提示符的... 目录一、检查 Java 源文件编码二、检查编译时的编码设置三、检查运行时的编码设置四、检查命令提示符