Spark DAGScheduler中stage转换成TaskSet的过程

2024-03-31 13:58

本文主要是介绍Spark DAGScheduler中stage转换成TaskSet的过程,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark DAGScheduler中stage转换成TaskSet的过程

更多资源分享

  • github: https://github.com/opensourceteams/spark-scala-maven
  • csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769

Youtube视频分析

  • Spark DAGScheduler中stage转换成TaskSet的过程(youtube视频) :

Bilibili视频分析

  • Spark DAGScheduler中stage转换成TaskSet的过程(bilibili视频) : https://www.bilibili.com/video/av37442139/?p=18
width="800" height="500" src="//player.bilibili.com/player.html?aid=37442311&page=1" scrolling="no" border="0" allowfullscreen="true">

DAGScheduler 转化stage为TaskSet

  • 得到 partitions(分区信息)
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  • 计算分区的首选位置信息
 val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {stage match {case s: ShuffleMapStage =>partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMapcase s: ResultStage =>val job = s.activeJob.getpartitionsToCompute.map { id =>val p = s.partitions(id)(id, getPreferredLocs(stage.rdd, p))}.toMap}} catch {case NonFatal(e) =>stage.makeNewStageAttempt(partitionsToCompute.size)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))runningStages -= stagereturn}
  • 把stage转换成对应的任务,stage有多少个partition就有多少个任务
  • ShuffleMapStage 转换成对应的 ShuffleMapTask
  • ResultStage 转换成对应的 ResultTask
 val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage =>stage.pendingPartitions.clear()partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)stage.pendingPartitions += idnew ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.internalAccumulators)}case stage: ResultStage =>val job = stage.activeJob.getpartitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, stage.internalAccumulators)}}}
  • 把tasks转换成TaskSet给任务调度器进行提交
   taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())

submitMissingTasks处理方法

 /** Called when stage's parents are available and we can now do its task. */private def submitMissingTasks(stage: Stage, jobId: Int) {logDebug("submitMissingTasks(" + stage + ")")// First figure out the indexes of partition ids to compute.val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()// Create internal accumulators if the stage has no accumulators initialized.// Reset internal accumulators only if this stage is not partially submitted// Otherwise, we may override existing accumulator values from some tasksif (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {stage.resetInternalAccumulators()}// Use the scheduling pool, job group, description, etc. from an ActiveJob associated// with this Stageval properties = jobIdToActiveJob(jobId).propertiesrunningStages += stage// SparkListenerStageSubmitted should be posted before testing whether tasks are// serializable. If tasks are not serializable, a SparkListenerStageCompleted event// will be posted, which should always come after a corresponding SparkListenerStageSubmitted// event.stage match {case s: ShuffleMapStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)case s: ResultStage =>outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)}val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {stage match {case s: ShuffleMapStage =>partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMapcase s: ResultStage =>val job = s.activeJob.getpartitionsToCompute.map { id =>val p = s.partitions(id)(id, getPreferredLocs(stage.rdd, p))}.toMap}} catch {case NonFatal(e) =>stage.makeNewStageAttempt(partitionsToCompute.size)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))runningStages -= stagereturn}stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast// the serialized copy of the RDD and for each task we will deserialize it, which means each// task gets a different copy of the RDD. This provides stronger isolation between tasks that// might modify state of objects referenced in their closures. This is necessary in Hadoop// where the JobConf/Configuration object is not thread-safe.var taskBinary: Broadcast[Array[Byte]] = nulltry {// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).// For ResultTask, serialize and broadcast (rdd, func).val taskBinaryBytes: Array[Byte] = stage match {case stage: ShuffleMapStage =>closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()case stage: ResultStage =>closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()}taskBinary = sc.broadcast(taskBinaryBytes)} catch {// In the case of a failure during serialization, abort the stage.case e: NotSerializableException =>abortStage(stage, "Task not serializable: " + e.toString, Some(e))runningStages -= stage// Abort executionreturncase NonFatal(e) =>abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))runningStages -= stagereturn}val tasks: Seq[Task[_]] = try {stage match {case stage: ShuffleMapStage =>stage.pendingPartitions.clear()partitionsToCompute.map { id =>val locs = taskIdToLocations(id)val part = stage.rdd.partitions(id)stage.pendingPartitions += idnew ShuffleMapTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, stage.internalAccumulators)}case stage: ResultStage =>val job = stage.activeJob.getpartitionsToCompute.map { id =>val p: Int = stage.partitions(id)val part = stage.rdd.partitions(p)val locs = taskIdToLocations(id)new ResultTask(stage.id, stage.latestInfo.attemptId,taskBinary, part, locs, id, stage.internalAccumulators)}}} catch {case NonFatal(e) =>abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))runningStages -= stagereturn}if (tasks.size > 0) {logInfo(s"Submitting ${tasks.size} missing tasks from $stage (${stage.rdd}) (first 15 " +s"tasks are for partitions ${tasks.take(15).map(_.partitionId)})")taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))stage.latestInfo.submissionTime = Some(clock.getTimeMillis())} else {// Because we posted SparkListenerStageSubmitted earlier, we should mark// the stage as completed here in case there are no tasks to runmarkStageAsFinished(stage, None)val debugString = stage match {case stage: ShuffleMapStage =>s"Stage ${stage} is actually done; " +s"(available: ${stage.isAvailable}," +s"available outputs: ${stage.numAvailableOutputs}," +s"partitions: ${stage.numPartitions})"case stage : ResultStage =>s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"}logDebug(debugString)}}

这篇关于Spark DAGScheduler中stage转换成TaskSet的过程的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/864505

相关文章

将Mybatis升级为Mybatis-Plus的详细过程

《将Mybatis升级为Mybatis-Plus的详细过程》本文详细介绍了在若依管理系统(v3.8.8)中将MyBatis升级为MyBatis-Plus的过程,旨在提升开发效率,通过本文,开发者可实现... 目录说明流程增加依赖修改配置文件注释掉MyBATisConfig里面的Bean代码生成使用IDEA生

C# WinForms存储过程操作数据库的实例讲解

《C#WinForms存储过程操作数据库的实例讲解》:本文主要介绍C#WinForms存储过程操作数据库的实例,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、存储过程基础二、C# 调用流程1. 数据库连接配置2. 执行存储过程(增删改)3. 查询数据三、事务处

JSON Web Token在登陆中的使用过程

《JSONWebToken在登陆中的使用过程》:本文主要介绍JSONWebToken在登陆中的使用过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录JWT 介绍微服务架构中的 JWT 使用结合微服务网关的 JWT 验证1. 用户登录,生成 JWT2. 自定义过滤

java中使用POI生成Excel并导出过程

《java中使用POI生成Excel并导出过程》:本文主要介绍java中使用POI生成Excel并导出过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录需求说明及实现方式需求完成通用代码版本1版本2结果展示type参数为atype参数为b总结注:本文章中代码均为

python dict转换成json格式的实现

《pythondict转换成json格式的实现》本文主要介绍了pythondict转换成json格式的实现,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下... 一开始你变成字典格式data = [ { 'a' : 1, 'b' : 2, 'c编程' : 3,

SpringCloud之LoadBalancer负载均衡服务调用过程

《SpringCloud之LoadBalancer负载均衡服务调用过程》:本文主要介绍SpringCloud之LoadBalancer负载均衡服务调用过程,具有很好的参考价值,希望对大家有所帮助,... 目录前言一、LoadBalancer是什么?二、使用步骤1、启动consul2、客户端加入依赖3、以服务

Oracle存储过程里操作BLOB的字节数据的办法

《Oracle存储过程里操作BLOB的字节数据的办法》该篇文章介绍了如何在Oracle存储过程中操作BLOB的字节数据,作者研究了如何获取BLOB的字节长度、如何使用DBMS_LOB包进行BLOB操作... 目录一、缘由二、办法2.1 基本操作2.2 DBMS_LOB包2.3 字节级操作与RAW数据类型2.

C#原型模式之如何通过克隆对象来优化创建过程

《C#原型模式之如何通过克隆对象来优化创建过程》原型模式是一种创建型设计模式,通过克隆现有对象来创建新对象,避免重复的创建成本和复杂的初始化过程,它适用于对象创建过程复杂、需要大量相似对象或避免重复初... 目录什么是原型模式?原型模式的工作原理C#中如何实现原型模式?1. 定义原型接口2. 实现原型接口3

Spring Security注解方式权限控制过程

《SpringSecurity注解方式权限控制过程》:本文主要介绍SpringSecurity注解方式权限控制过程,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、摘要二、实现步骤2.1 在配置类中添加权限注解的支持2.2 创建Controller类2.3 Us

Spring AI集成DeepSeek三步搞定Java智能应用的详细过程

《SpringAI集成DeepSeek三步搞定Java智能应用的详细过程》本文介绍了如何使用SpringAI集成DeepSeek,一个国内顶尖的多模态大模型,SpringAI提供了一套统一的接口,简... 目录DeepSeek 介绍Spring AI 是什么?Spring AI 的主要功能包括1、环境准备2