Spark TaskSchedulerImpl 任务调度方式(FIFO)

2024-03-31 13:58

本文主要是介绍Spark TaskSchedulerImpl 任务调度方式(FIFO),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark TaskSchedulerImpl 任务调度方式(FIFO)

更多资源

  • SPARK 源码分析技术分享(bilibilid视频汇总套装视频): https://www.bilibili.com/video/av37442139/
  • github: https://github.com/opensourceteams/spark-scala-maven
  • csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769

视频分享

  • Spark TaskSchedulerImpl 任务调度方式(FIFO)(bilibili视频) : https://www.bilibili.com/video/av37442139/?p=19
width="800" height="500" src="//player.bilibili.com/player.html?aid=37442139&cid=66005253&page=19" scrolling="no" border="0" allowfullscreen="true">

图解

FIFO

TaskSchedulerImpl提交任务集

  • 在DAGScheduler.scal中件中的submitMissingTasks()方法中调用 taskScheduler.submitTasks
  • 把任务集通过任务调度器进行提交
 taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  • 任务调度器实现
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}backend.reviveOffers()}
  • 把任务集放到TaskSetManager(任务集管理器)中
  • TaskSetManager(任务集管理器)继承 Schedulable,(可调度元素,就是把到调度池队列中的一个元素,供调度使用)
val manager = createTaskSetManager(taskSet, maxTaskFailures)
  • 把任务集管理器增加到指定调度类型(FIFO,PAIR)的调度池中,也就是调度池中的调度队列中schedulableQueue
  • 此时,相当于需要调度的任务已有了,存放在调度池中,下面是用具体的调度算法,按指定的顺序调度池中的任务
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  • 任务调度器的submitTasks()方法中调用 backend.reviveOffers()方法,backend为SparkDeploySchedulerBackend,继承CoarseGrainedSchedulerBackend,所以调用的是CoarseGrainedSchedulerBackend中的reviveOffers()方法
 backend.reviveOffers()
  • 相当于是给Driver发送消息ReviveOffers
   override def reviveOffers() {driverEndpoint.send(ReviveOffers)}
  • driverEndpoint 中receive()方法处理消息,调用makeOffers()方法
     case ReviveOffers =>makeOffers()
  • scheduler.resourceOffers(workOffers)会计算出需要启动的任务序列
  • resourceOffers()方法中调用方法得到调度任务的队列(按指定顺序的) rootPool.getSortedTaskSetQueue()
  • launchTasks()方法把启动任务消息发送给executor
   // Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killingval activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))}
  • 按指定的调度算法,对调度池中的调度任务进行排序
  • 返回排序后调度队列
  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]val sortedSchedulableQueue =schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)for (schedulable <- sortedSchedulableQueue) {sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue}sortedTaskSetQueue}

FIFO调度算法的实现

  • 默认的调度算法FIFO
  • 按作业id进行比较,id小的放在前,也就是先进来的作业先处理
  • 如果作业id相同,就按stageId比较,StageId小的放在前,也就是从第一个Stage依次开始排列

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}if (res < 0) {true} else {false}}
}

定时任务处理调度池中的任务

  • DriverEndpoint 的 onStart()方法中会每秒调用一次处理调度池中调度任务的方法
  • 通过发送Driver消息ReviveOffers 来触发
   override def onStart() {// Periodically revive offers to allow delay scheduling to workval reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")reviveThread.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReviveOffers))}}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)}

这篇关于Spark TaskSchedulerImpl 任务调度方式(FIFO)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/864506

相关文章

python logging模块详解及其日志定时清理方式

《pythonlogging模块详解及其日志定时清理方式》:本文主要介绍pythonlogging模块详解及其日志定时清理方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地... 目录python logging模块及日志定时清理1.创建logger对象2.logging.basicCo

C#TextBox设置提示文本方式(SetHintText)

《C#TextBox设置提示文本方式(SetHintText)》:本文主要介绍C#TextBox设置提示文本方式(SetHintText),具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑... 目录C#TextBox设置提示文本效果展示核心代码总结C#TextBox设置提示文本效果展示核心代

SpringValidation数据校验之约束注解与分组校验方式

《SpringValidation数据校验之约束注解与分组校验方式》本文将深入探讨SpringValidation的核心功能,帮助开发者掌握约束注解的使用技巧和分组校验的高级应用,从而构建更加健壮和可... 目录引言一、Spring Validation基础架构1.1 jsR-380标准与Spring整合1

Android实现打开本地pdf文件的两种方式

《Android实现打开本地pdf文件的两种方式》在现代应用中,PDF格式因其跨平台、稳定性好、展示内容一致等特点,在Android平台上,如何高效地打开本地PDF文件,不仅关系到用户体验,也直接影响... 目录一、项目概述二、相关知识2.1 PDF文件基本概述2.2 android 文件访问与存储权限2.

Spring中配置ContextLoaderListener方式

《Spring中配置ContextLoaderListener方式》:本文主要介绍Spring中配置ContextLoaderListener方式,具有很好的参考价值,希望对大家有所帮助,如有错误... 目录Spring中配置ContextLoaderLishttp://www.chinasem.cntene

AJAX请求上传下载进度监控实现方式

《AJAX请求上传下载进度监控实现方式》在日常Web开发中,AJAX(AsynchronousJavaScriptandXML)被广泛用于异步请求数据,而无需刷新整个页面,:本文主要介绍AJAX请... 目录1. 前言2. 基于XMLHttpRequest的进度监控2.1 基础版文件上传监控2.2 增强版多

Docker镜像修改hosts及dockerfile修改hosts文件的实现方式

《Docker镜像修改hosts及dockerfile修改hosts文件的实现方式》:本文主要介绍Docker镜像修改hosts及dockerfile修改hosts文件的实现方式,具有很好的参考价... 目录docker镜像修改hosts及dockerfile修改hosts文件准备 dockerfile 文

Linux中的计划任务(crontab)使用方式

《Linux中的计划任务(crontab)使用方式》:本文主要介绍Linux中的计划任务(crontab)使用方式,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录一、前言1、linux的起源与发展2、什么是计划任务(crontab)二、crontab基础1、cro

Win11安装PostgreSQL数据库的两种方式详细步骤

《Win11安装PostgreSQL数据库的两种方式详细步骤》PostgreSQL是备受业界青睐的关系型数据库,尤其是在地理空间和移动领域,:本文主要介绍Win11安装PostgreSQL数据库的... 目录一、exe文件安装 (推荐)下载安装包1. 选择操作系统2. 跳转到EDB(PostgreSQL 的

Java枚举类实现Key-Value映射的多种实现方式

《Java枚举类实现Key-Value映射的多种实现方式》在Java开发中,枚举(Enum)是一种特殊的类,本文将详细介绍Java枚举类实现key-value映射的多种方式,有需要的小伙伴可以根据需要... 目录前言一、基础实现方式1.1 为枚举添加属性和构造方法二、http://www.cppcns.co