Spark DAGScheduler中stage转换成TaskSet的过程

2024-03-31 13:58

本文主要是介绍Spark DAGScheduler中stage转换成TaskSet的过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark DAGScheduler中stage转换成TaskSet的过程

更多资源分享

  • github: https://github.com/opensourceteams/spark-scala-maven
  • csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769

Youtube视频分析

  • Spark DAGScheduler中stage转换成TaskSet的过程(youtube视频) :

Bilibili视频分析

  • Spark DAGScheduler中stage转换成TaskSet的过程(bilibili视频) : https://www.bilibili.com/video/av37442139/?p=18
width="800" height="500" src="//player.bilibili.com/player.html?aid=37442311&page=1" scrolling="no" border="0" allowfullscreen="true">

DAGScheduler 转化stage为TaskSet

  • 得到 partitions(分区信息)
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  • 计算分区的首选位置信息
 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {stage match {case s: ShuffleMapStage =>partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMapcase s: ResultStage =>val job = s.activeJob.getpartitionsToCompute.map { id =>val p = s.partitions(id)(id, getPreferredLocs(stage.rdd, p))}.toMap}} catch {case NonFatal(e) =>stage.makeNewStageAttempt(partitionsToCompute.size)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))runningStages -= stagereturn}
  • 把stage转换成对应的任务,stage有多少个partition就有多少个任务
  • ShuffleMapStage 转换成对应的 ShuffleMapTask
  • ResultStage 转换成对应的 ResultTask
 val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage =>stage.pendingPartitions.clear()partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)stage.pendingPartitions += idnew ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.internalAccumulators)}case stage: ResultStage =>val job = stage.activeJob.getpartitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, stage.internalAccumulators)}}}
  • 把tasks转换成TaskSet给任务调度器进行提交
   taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

submitMissingTasks处理方法

 /** Called when stage's parents are available and we can now do its task. */private def submitMissingTasks(stage: Stage, jobId: Int) {logDebug("submitMissingTasks(" + stage + ")")// First figure out the indexes of partition ids to compute.val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()// Create internal accumulators if the stage has no accumulators initialized.// Reset internal accumulators only if this stage is not partially submitted// Otherwise, we may override existing accumulator values from some tasksif (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {stage.resetInternalAccumulators()}// Use the scheduling pool, job group, description, etc. from an ActiveJob associated// with this Stageval properties = jobIdToActiveJob(jobId).propertiesrunningStages += stage// SparkListenerStageSubmitted should be posted before testing whether tasks are// serializable. If tasks are not serializable, a SparkListenerStageCompleted event// will be posted, which should always come after a corresponding SparkListenerStageSubmitted// event.stage match {case s: ShuffleMapStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)case s: ResultStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)}val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {stage match {case s: ShuffleMapStage =>partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMapcase s: ResultStage =>val job = s.activeJob.getpartitionsToCompute.map { id =>val p = s.partitions(id)(id, getPreferredLocs(stage.rdd, p))}.toMap}} catch {case NonFatal(e) =>stage.makeNewStageAttempt(partitionsToCompute.size)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))runningStages -= stagereturn}stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast// the serialized copy of the RDD and for each task we will deserialize it, which means each// task gets a different copy of the RDD. This provides stronger isolation between tasks that// might modify state of objects referenced in their closures. This is necessary in Hadoop// where the JobConf/Configuration object is not thread-safe.var taskBinary: Broadcast[Array[Byte]] = nulltry {// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).// For ResultTask, serialize and broadcast (rdd, func).val taskBinaryBytes: Array[Byte] = stage match {case stage: ShuffleMapStage =>closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()case stage: ResultStage =>closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()}taskBinary = sc.broadcast(taskBinaryBytes)} catch {// In the case of a failure during serialization, abort the stage.case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString, Some(e))runningStages -= stage// Abort executionreturncase NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))runningStages -= stagereturn}val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage =>stage.pendingPartitions.clear()partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)stage.pendingPartitions += idnew ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.internalAccumulators)}case stage: ResultStage =>val job = stage.activeJob.getpartitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, stage.internalAccumulators)}}} catch {case NonFatal(e) =>abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))runningStages -= stagereturn}if (tasks.size > 0) {logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())} else {// Because we posted SparkListenerStageSubmitted earlier, we should mark// the stage as completed here in case there are no tasks to runmarkStageAsFinished(stage, None)val debugString = stage match {case stage: ShuffleMapStage =>s"Stage ${stage} is actually done; " +s"(available: ${stage.isAvailable}," +s"available outputs: ${stage.numAvailableOutputs}," +s"partitions: ${stage.numPartitions})"case stage : ResultStage =>s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"}logDebug(debugString)}}

这篇关于Spark DAGScheduler中stage转换成TaskSet的过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/864505

相关文章

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

redis群集简单部署过程

《redis群集简单部署过程》文章介绍了Redis,一个高性能的键值存储系统,其支持多种数据结构和命令,它还讨论了Redis的服务器端架构、数据存储和获取、协议和命令、高可用性方案、缓存机制以及监控和... 目录Redis介绍1. 基本概念2. 服务器端3. 存储和获取数据4. 协议和命令5. 高可用性6.

PLsql Oracle 下载安装图文过程详解

《PLsqlOracle下载安装图文过程详解》PL/SQLDeveloper是一款用于开发Oracle数据库的集成开发环境,可以通过官网下载安装配置,并通过配置tnsnames.ora文件及环境变... 目录一、PL/SQL Developer 简介二、PL/SQL Developer 安装及配置详解1.下

在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程

《在Java中使用ModelMapper简化Shapefile属性转JavaBean实战过程》本文介绍了在Java中使用ModelMapper库简化Shapefile属性转JavaBean的过程,对比... 目录前言一、原始的处理办法1、使用Set方法来转换2、使用构造方法转换二、基于ModelMapper

springboot启动流程过程

《springboot启动流程过程》SpringBoot简化了Spring框架的使用,通过创建`SpringApplication`对象,判断应用类型并设置初始化器和监听器,在`run`方法中,读取配... 目录springboot启动流程springboot程序启动入口1.创建SpringApplicat

本地搭建DeepSeek-R1、WebUI的完整过程及访问

《本地搭建DeepSeek-R1、WebUI的完整过程及访问》:本文主要介绍本地搭建DeepSeek-R1、WebUI的完整过程及访问的相关资料,DeepSeek-R1是一个开源的人工智能平台,主... 目录背景       搭建准备基础概念搭建过程访问对话测试总结背景       最近几年,人工智能技术

Linux部署jar包过程

《Linux部署jar包过程》文章介绍了在Linux系统上部署Java(jar)包时需要注意的几个关键点,包括统一JDK版本、添加打包插件、修改数据库密码以及正确执行jar包的方法... 目录linux部署jar包1.统一jdk版本2.打包插件依赖3.修改密码4.执行jar包总结Linux部署jar包部署

SpringBoot 整合 Grizzly的过程

《SpringBoot整合Grizzly的过程》Grizzly是一个高性能的、异步的、非阻塞的HTTP服务器框架,它可以与SpringBoot一起提供比传统的Tomcat或Jet... 目录为什么选择 Grizzly?Spring Boot + Grizzly 整合的优势添加依赖自定义 Grizzly 作为

mysql-8.0.30压缩包版安装和配置MySQL环境过程

《mysql-8.0.30压缩包版安装和配置MySQL环境过程》该文章介绍了如何在Windows系统中下载、安装和配置MySQL数据库,包括下载地址、解压文件、创建和配置my.ini文件、设置环境变量... 目录压缩包安装配置下载配置环境变量下载和初始化总结压缩包安装配置下载下载地址:https://d

springboot整合gateway的详细过程

《springboot整合gateway的详细过程》本文介绍了如何配置和使用SpringCloudGateway构建一个API网关,通过实例代码介绍了springboot整合gateway的过程,需要... 目录1. 添加依赖2. 配置网关路由3. 启用Eureka客户端(可选)4. 创建主应用类5. 自定