Spark TaskSchedulerImpl 任务调度方式(FIFO)

2024-03-31 13:58

本文主要是介绍Spark TaskSchedulerImpl 任务调度方式(FIFO),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark TaskSchedulerImpl 任务调度方式(FIFO)

更多资源

  • SPARK 源码分析技术分享(bilibilid视频汇总套装视频): https://www.bilibili.com/video/av37442139/
  • github: https://github.com/opensourceteams/spark-scala-maven
  • csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769

视频分享

  • Spark TaskSchedulerImpl 任务调度方式(FIFO)(bilibili视频) : https://www.bilibili.com/video/av37442139/?p=19
width="800" height="500" src="//player.bilibili.com/player.html?aid=37442139&cid=66005253&page=19" scrolling="no" border="0" allowfullscreen="true">

图解

FIFO

TaskSchedulerImpl提交任务集

  • 在DAGScheduler.scal中件中的submitMissingTasks()方法中调用 taskScheduler.submitTasks
  • 把任务集通过任务调度器进行提交
 taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  • 任务调度器实现
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}backend.reviveOffers()}
  • 把任务集放到TaskSetManager(任务集管理器)中
  • TaskSetManager(任务集管理器)继承 Schedulable,(可调度元素,就是把到调度池队列中的一个元素,供调度使用)
val manager = createTaskSetManager(taskSet, maxTaskFailures)
  • 把任务集管理器增加到指定调度类型(FIFO,PAIR)的调度池中,也就是调度池中的调度队列中schedulableQueue
  • 此时,相当于需要调度的任务已有了,存放在调度池中,下面是用具体的调度算法,按指定的顺序调度池中的任务
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  • 任务调度器的submitTasks()方法中调用 backend.reviveOffers()方法,backend为SparkDeploySchedulerBackend,继承CoarseGrainedSchedulerBackend,所以调用的是CoarseGrainedSchedulerBackend中的reviveOffers()方法
 backend.reviveOffers()
  • 相当于是给Driver发送消息ReviveOffers
   override def reviveOffers() {driverEndpoint.send(ReviveOffers)}
  • driverEndpoint 中receive()方法处理消息,调用makeOffers()方法
     case ReviveOffers =>makeOffers()
  • scheduler.resourceOffers(workOffers)会计算出需要启动的任务序列
  • resourceOffers()方法中调用方法得到调度任务的队列(按指定顺序的) rootPool.getSortedTaskSetQueue()
  • launchTasks()方法把启动任务消息发送给executor
   // Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killingval activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))}
  • 按指定的调度算法,对调度池中的调度任务进行排序
  • 返回排序后调度队列
  override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]val sortedSchedulableQueue =schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)for (schedulable <- sortedSchedulableQueue) {sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue}sortedTaskSetQueue}

FIFO调度算法的实现

  • 默认的调度算法FIFO
  • 按作业id进行比较,id小的放在前,也就是先进来的作业先处理
  • 如果作业id相同,就按stageId比较,StageId小的放在前,也就是从第一个Stage依次开始排列

private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}if (res < 0) {true} else {false}}
}

定时任务处理调度池中的任务

  • DriverEndpoint 的 onStart()方法中会每秒调用一次处理调度池中调度任务的方法
  • 通过发送Driver消息ReviveOffers 来触发
   override def onStart() {// Periodically revive offers to allow delay scheduling to workval reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")reviveThread.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReviveOffers))}}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)}

这篇关于Spark TaskSchedulerImpl 任务调度方式(FIFO)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/864506

相关文章

Mybatis官方生成器的使用方式

《Mybatis官方生成器的使用方式》本文详细介绍了MyBatisGenerator(MBG)的使用方法,通过实际代码示例展示了如何配置Maven插件来自动化生成MyBatis项目所需的实体类、Map... 目录1. MyBATis Generator 简介2. MyBatis Generator 的功能3

Python数据处理之导入导出Excel数据方式

《Python数据处理之导入导出Excel数据方式》Python是Excel数据处理的绝佳工具,通过Pandas和Openpyxl等库可以实现数据的导入、导出和自动化处理,从基础的数据读取和清洗到复杂... 目录python导入导出Excel数据开启数据之旅:为什么Python是Excel数据处理的最佳拍档

SpringBoot项目启动后自动加载系统配置的多种实现方式

《SpringBoot项目启动后自动加载系统配置的多种实现方式》:本文主要介绍SpringBoot项目启动后自动加载系统配置的多种实现方式,并通过代码示例讲解的非常详细,对大家的学习或工作有一定的... 目录1. 使用 CommandLineRunner实现方式:2. 使用 ApplicationRunne

VUE动态绑定class类的三种常用方式及适用场景详解

《VUE动态绑定class类的三种常用方式及适用场景详解》文章介绍了在实际开发中动态绑定class的三种常见情况及其解决方案,包括根据不同的返回值渲染不同的class样式、给模块添加基础样式以及根据设... 目录前言1.动态选择class样式(对象添加:情景一)2.动态添加一个class样式(字符串添加:情

MYSQL行列转置方式

《MYSQL行列转置方式》本文介绍了如何使用MySQL和Navicat进行列转行操作,首先,创建了一个名为`grade`的表,并插入多条数据,然后,通过修改查询SQL语句,使用`CASE`和`IF`函... 目录mysql行列转置开始列转行之前的准备下面开始步入正题总结MYSQL行列转置环境准备:mysq

Linux(Centos7)安装Mysql/Redis/MinIO方式

《Linux(Centos7)安装Mysql/Redis/MinIO方式》文章总结:介绍了如何安装MySQL和Redis,以及如何配置它们为开机自启,还详细讲解了如何安装MinIO,包括配置Syste... 目录安装mysql安装Redis安装MinIO总结安装Mysql安装Redis搜索Red

Java文件上传的多种实现方式

《Java文件上传的多种实现方式》文章主要介绍了文件上传接收接口的使用方法,包括获取文件信息、创建文件夹、保存文件到本地的两种方法,以及如何使用Postman进行接口调用... 目录Java文件上传的多方式1.文件上传接收文件接口2.接口主要内容部分3.postman接口调用总结Java文件上传的多方式1

SSID究竟是什么? WiFi网络名称及工作方式解析

《SSID究竟是什么?WiFi网络名称及工作方式解析》SID可以看作是无线网络的名称,类似于有线网络中的网络名称或者路由器的名称,在无线网络中,设备通过SSID来识别和连接到特定的无线网络... 当提到 Wi-Fi 网络时,就避不开「SSID」这个术语。简单来说,SSID 就是 Wi-Fi 网络的名称。比如

多模块的springboot项目发布指定模块的脚本方式

《多模块的springboot项目发布指定模块的脚本方式》该文章主要介绍了如何在多模块的SpringBoot项目中发布指定模块的脚本,作者原先的脚本会清理并编译所有模块,导致发布时间过长,通过简化脚本... 目录多模块的springboot项目发布指定模块的脚本1、不计成本地全部发布2、指定模块发布总结多模

MySQL中my.ini文件的基础配置和优化配置方式

《MySQL中my.ini文件的基础配置和优化配置方式》文章讨论了数据库异步同步的优化思路,包括三个主要方面:幂等性、时序和延迟,作者还分享了MySQL配置文件的优化经验,并鼓励读者提供支持... 目录mysql my.ini文件的配置和优化配置优化思路MySQL配置文件优化总结MySQL my.ini文件