本文主要是介绍Spark TaskSchedulerImpl 任务调度方式(FIFO),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Spark TaskSchedulerImpl 任务调度方式(FIFO)
更多资源
- SPARK 源码分析技术分享(bilibilid视频汇总套装视频): https://www.bilibili.com/video/av37442139/
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
视频分享
- Spark TaskSchedulerImpl 任务调度方式(FIFO)(bilibili视频) : https://www.bilibili.com/video/av37442139/?p=19
图解
TaskSchedulerImpl提交任务集
- 在DAGScheduler.scal中件中的submitMissingTasks()方法中调用 taskScheduler.submitTasks
- 把任务集通过任务调度器进行提交
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
- 任务调度器实现
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}backend.reviveOffers()}
- 把任务集放到TaskSetManager(任务集管理器)中
- TaskSetManager(任务集管理器)继承 Schedulable,(可调度元素,就是把到调度池队列中的一个元素,供调度使用)
val manager = createTaskSetManager(taskSet, maxTaskFailures)
- 把任务集管理器增加到指定调度类型(FIFO,PAIR)的调度池中,也就是调度池中的调度队列中schedulableQueue
- 此时,相当于需要调度的任务已有了,存放在调度池中,下面是用具体的调度算法,按指定的顺序调度池中的任务
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
- 任务调度器的submitTasks()方法中调用 backend.reviveOffers()方法,backend为SparkDeploySchedulerBackend,继承CoarseGrainedSchedulerBackend,所以调用的是CoarseGrainedSchedulerBackend中的reviveOffers()方法
backend.reviveOffers()
- 相当于是给Driver发送消息ReviveOffers
override def reviveOffers() {driverEndpoint.send(ReviveOffers)}
- driverEndpoint 中receive()方法处理消息,调用makeOffers()方法
case ReviveOffers =>makeOffers()
- scheduler.resourceOffers(workOffers)会计算出需要启动的任务序列
- resourceOffers()方法中调用方法得到调度任务的队列(按指定顺序的) rootPool.getSortedTaskSetQueue()
- launchTasks()方法把启动任务消息发送给executor
// Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killingval activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))}
- 按指定的调度算法,对调度池中的调度任务进行排序
- 返回排序后调度队列
override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]val sortedSchedulableQueue =schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)for (schedulable <- sortedSchedulableQueue) {sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue}sortedTaskSetQueue}
FIFO调度算法的实现
- 默认的调度算法FIFO
- 按作业id进行比较,id小的放在前,也就是先进来的作业先处理
- 如果作业id相同,就按stageId比较,StageId小的放在前,也就是从第一个Stage依次开始排列
private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {val priority1 = s1.priorityval priority2 = s2.priorityvar res = math.signum(priority1 - priority2)if (res == 0) {val stageId1 = s1.stageIdval stageId2 = s2.stageIdres = math.signum(stageId1 - stageId2)}if (res < 0) {true} else {false}}
}
定时任务处理调度池中的任务
- DriverEndpoint 的 onStart()方法中会每秒调用一次处理调度池中调度任务的方法
- 通过发送Driver消息ReviveOffers 来触发
override def onStart() {// Periodically revive offers to allow delay scheduling to workval reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")reviveThread.scheduleAtFixedRate(new Runnable {override def run(): Unit = Utils.tryLogNonFatalError {Option(self).foreach(_.send(ReviveOffers))}}, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)}
这篇关于Spark TaskSchedulerImpl 任务调度方式(FIFO)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!