Spark TaskSchedulerImpl TaskSet处理

2024-03-31 13:58

本文主要是介绍Spark TaskSchedulerImpl TaskSet处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark TaskSchedulerImpl TaskSet处理

更多资源

  • SPARK 源码分析技术分享(bilibilid视频汇总套装视频): https://www.bilibili.com/video/av37442139/
  • github: https://github.com/opensourceteams/spark-scala-maven
  • csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769

视频分享

  • Spark TaskSchedulerImpl TaskSet原理分析(bilibili视频) : https://www.bilibili.com/video/av37442139/?p=20
  • Spark TaskSchedulerImpl TaskSet原码分析(bilibili视频) : https://www.bilibili.com/video/av37442139/?p=21
src="//player.bilibili.com/player.html?aid=37442139&cid=66006637&page=20" scrolling="no" border="0" allowfullscreen="true"> src="//player.bilibili.com/player.html?aid=37442139&cid=66008946&page=21" scrolling="no" border="0" allowfullscreen="true">

图解

任务集处理

TaskSchedulerImpl提交任务集

  • 在DAGScheduler.scal中件中的submitMissingTasks()方法中调用 taskScheduler.submitTasks
  • 把任务集通过任务调度器进行提交
 taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
  • 任务调度器实现
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}backend.reviveOffers()}
  • 把任务集放到TaskSetManager(任务集管理器)中
  • TaskSetManager(任务集管理器)继承 Schedulable,(可调度元素,就是把到调度池队列中的一个元素,供调度使用)
val manager = createTaskSetManager(taskSet, maxTaskFailures)
  • 把任务集管理器增加到指定调度类型(FIFO,PAIR)的调度池中,也就是调度池中的调度队列中schedulableQueue
  • 此时,相当于需要调度的任务已有了,存放在调度池中,下面是用具体的调度算法,按指定的顺序调度池中的任务
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
  • 任务调度器的submitTasks()方法中调用 backend.reviveOffers()方法,backend为SparkDeploySchedulerBackend,继承CoarseGrainedSchedulerBackend,所以调用的是CoarseGrainedSchedulerBackend中的reviveOffers()方法
 backend.reviveOffers()
  • 相当于是给Driver发送消息ReviveOffers
   override def reviveOffers() {driverEndpoint.send(ReviveOffers)}
  • driverEndpoint 中receive()方法处理消息,调用makeOffers()方法
     case ReviveOffers =>makeOffers()
  • scheduler.resourceOffers(workOffers)会计算出需要启动的任务序列
  • resourceOffers()方法中调用方法得到调度任务的队列(按指定顺序的) rootPool.getSortedTaskSetQueue()
  • launchTasks()方法把启动任务消息发送给executor
   // Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killingval activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))}
  • 计算当前stage转换的TaskSet中的部分任务,发送执行任务的消息executor处理
/*** Called by cluster manager to offer resources on slaves. We respond by asking our active task* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so* that tasks are balanced across the cluster.*/def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {// Mark each slave as alive and remember its hostname// Also track if new executor is addedvar newExecAvail = falsefor (o <- offers) {if (!hostToExecutors.contains(o.host)) {hostToExecutors(o.host) = new HashSet[String]()}if (!executorIdToTaskCount.contains(o.executorId)) {hostToExecutors(o.host) += o.executorIdexecutorAdded(o.executorId, o.host)executorIdToHost(o.executorId) = o.hostexecutorIdToTaskCount(o.executorId) = 0newExecAvail = true}for (rack <- getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host}}// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do// this here to avoid a separate thread and added synchronization overhead, and also because// updating the blacklist is only relevant when task offers are being made.blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>offers.filter { offer =>!blacklistTracker.isNodeBlacklisted(offer.host) &&!blacklistTracker.isExecutorBlacklisted(offer.executorId)}}.getOrElse(offers)// Randomly shuffle offers to avoid always placing tasks on the same set of workers.val shuffledOffers = Random.shuffle(filteredOffers)// Build a list of tasks to assign to each worker.val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))val availableCpus = shuffledOffers.map(o => o.cores).toArrayval sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet <- sortedTaskSets) {logDebug("parentName: %s, name: %s, runningTasks: %s".format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))if (newExecAvail) {taskSet.executorAdded()}}// Take each TaskSet in our scheduling order, and then offer it each node in increasing order// of locality levels so that it gets a chance to launch local tasks on all of them.// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYfor (taskSet <- sortedTaskSets) {var launchedAnyTask = falsevar launchedTaskAtCurrentMaxLocality = falsefor (currentMaxLocality <- taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}if (tasks.size > 0) {hasLaunchedTask = true}return tasks}
  • TaskSchedulerImpl中方法resourceOffers()中调用 resourceOfferSingleTaskSet()方法
  • 就算看当前的任务集中任务,按worker机器的cpu内核数进行分配每次发送几个任务给executor进行启动
  • 例: stage中TaskSet包含的任务个数是3个,worker 机器的cpu内核数为2,此时就需要把TastSet中的任务3个,拆分成两次,每一次是2个任务,第二次是1个任务,并行任务按cpu内核最大数来决定
for (taskSet <- sortedTaskSets) {var launchedAnyTask = falsevar launchedTaskAtCurrentMaxLocality = falsefor (currentMaxLocality <- taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}
  • 看具体的方法 taskSet.resourceOffer().从当前stage的taskSet还剩下未处理的任务中,取出worker机器分配的cpu数取新的任务,发送给executor执行
private def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: Seq[WorkerOffer],availableCpus: Array[Int],tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {var launchedTask = false// nodes and executors that are blacklisted for the entire application have already been// filtered out by this pointfor (i <- 0 until shuffledOffers.size) {val execId = shuffledOffers(i).executorIdval host = shuffledOffers(i).hostif (availableCpus(i) >= CPUS_PER_TASK) {try {for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {tasks(i) += taskval tid = task.taskIdtaskIdToTaskSetManager(tid) = taskSettaskIdToExecutorId(tid) = execIdexecutorIdToTaskCount(execId) += 1availableCpus(i) -= CPUS_PER_TASKassert(availableCpus(i) >= 0)launchedTask = true}} catch {case e: TaskNotSerializableException =>logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")// Do not offer resources for this task, but don't throw an error to allow other// task sets to be submitted.return launchedTask}}}return launchedTask}
  • 该方法重点看 dequeueTask()方法
 /*** Respond to an offer of a single executor from the scheduler by finding a task** NOTE: this function is either called with a maxLocality which* would be adjusted by delay scheduling algorithm or it will be with a special* NO_PREF locality which will be not modified** @param execId the executor Id of the offered resource* @param host  the host Id of the offered resource* @param maxLocality the maximum locality we want to schedule the tasks at*/@throws[TaskNotSerializableException]def resourceOffer(execId: String,host: String,maxLocality: TaskLocality.TaskLocality): Option[TaskDescription] ={val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>blacklist.isNodeBlacklistedForTaskSet(host) ||blacklist.isExecutorBlacklistedForTaskSet(execId)}if (!isZombie && !offerBlacklisted) {val curTime = clock.getTimeMillis()var allowedLocality = maxLocalityif (maxLocality != TaskLocality.NO_PREF) {allowedLocality = getAllowedLocalityLevel(curTime)if (allowedLocality > maxLocality) {// We're not allowed to search for farther-away tasksallowedLocality = maxLocality}}dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>// Found a task; do some bookkeeping and return a task descriptionval task = tasks(index)val taskId = sched.newTaskId()// Do various bookkeepingcopiesRunning(index) += 1val attemptNum = taskAttempts(index).sizeval info = new TaskInfo(taskId, index, attemptNum, curTime,execId, host, taskLocality, speculative)taskInfos(taskId) = infotaskAttempts(index) = info :: taskAttempts(index)// Update our locality level for delay scheduling// NO_PREF will not affect the variables related to delay schedulingif (maxLocality != TaskLocality.NO_PREF) {currentLocalityIndex = getLocalityIndex(taskLocality)lastLaunchTime = curTime}// Serialize and return the taskval startTime = clock.getTimeMillis()val serializedTask: ByteBuffer = try {Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)} catch {// If the task cannot be serialized, then there's no point to re-attempt the task,// as it will always fail. So just abort the whole task-set.case NonFatal(e) =>val msg = s"Failed to serialize task $taskId, not attempting to retry it."logError(msg, e)abort(s"$msg Exception during serialization: $e")throw new TaskNotSerializableException(e)}if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&!emittedTaskSizeWarning) {emittedTaskSizeWarning = truelogWarning(s"Stage ${task.stageId} contains a task of very large size " +s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")}addRunningTask(taskId)// We used to log the time it takes to serialize the task, but task size is already// a good proxy to task serialization time.// val timeTaken = clock.getTime() - startTimeval taskName = s"task ${info.id} in stage ${taskSet.id}"logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")sched.dagScheduler.taskStarted(task, info)new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,taskName, index, serializedTask)}} else {None}}
  • 该方法重点看 TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)
  • allPendingTasks中的数据是在TaskSetManager实例方法中调用,并且会按反序增加(…2,1,0)
  • 调用 dequeueTaskFromList方法,移除最后一个任务,也就是任务集索引中排在最前的任务
  for (i <- (0 until numTasks).reverse) {addPendingTask(i)}
 /*** Dequeue a pending task for a given node and return its index and locality level.* Only search for tasks matching the given locality constraint.** @return An option containing (task index within the task set, locality, is speculative?)*/private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] ={for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {return Some((index, TaskLocality.NODE_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {// Look for noPref tasks after NODE_LOCAL for minimize cross-rack trafficfor (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {for {rack <- sched.getRackForHost(host)index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))} {return Some((index, TaskLocality.RACK_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {return Some((index, TaskLocality.ANY, false))}}// find a speculative task if all others tasks have been scheduleddequeueSpeculativeTask(execId, host, maxLocality).map {case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}}
  • 判断当前stage的TaskSet中是示还有未被处理的Task,如果还有就继续找出来发送给Executor执行
  /*** Dequeue a pending task from the given list and return its index.* Return None if the list is empty.* This method also cleans up any tasks in the list that have already* been launched, since we want that to happen lazily.*/private def dequeueTaskFromList(execId: String,host: String,list: ArrayBuffer[Int]): Option[Int] = {var indexOffset = list.sizewhile (indexOffset > 0) {indexOffset -= 1val index = list(indexOffset)if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) {// This should almost always be list.trimEnd(1) to remove taillist.remove(indexOffset)if (copiesRunning(index) == 0 && !successful(index)) {return Some(index)}}}None}

TaskSet中的任务发送给Executor消息LaunchTask

  • 粗粒度调度器调用启动任务的方法
  • 给executor 发送 消息 LaunchTask()
// Launch tasks returned by a set of resource offersprivate def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val serializedTask = ser.serialize(task)if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +"spark.akka.frameSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,AkkaUtils.reservedSizeBytes)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)executorData.freeCores -= scheduler.CPUS_PER_TASKexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}}

executor

  • CoarseGrainedExecutorBackend 收到消息: LaunchTask()
  • receive() 消息处理
    case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = ser.deserialize[TaskDescription](data.value)logInfo("Got assigned task " + taskDesc.taskId)executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,taskDesc.name, taskDesc.serializedTask)}
  • executor 通过线程池调用 TaskRunner
  • TaskRunner的run()会被调用
  def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit = {val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)runningTasks.put(taskId, tr)threadPool.execute(tr)}
  • launchTask 的run()执行后会调用 statusUpdate()方法,发送任务状态为已完成
  • CoarseGrainedExecutorBacker中的 statusUpdate()方法会给Driver发送消息StatusUpdate()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
  override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {val msg = StatusUpdate(executorId, taskId, state, data)driver match {case Some(driverRef) => driverRef.send(msg)case None => logWarning(s"Drop $msg because has not yet connected to driver")}}

反向推,调度池中的调度任务如何移除

Pool 中有removeScheduler()方法

  • 该方法被调用 -> TaskSchedulerImpl中 taskSetFinished()
  override def removeSchedulable(schedulable: Schedulable) {schedulableQueue.remove(schedulable)schedulableNameToSchedulable.remove(schedulable.name)}

TaskSchedulerImpl中 taskSetFinished()

  • 该方法被调用 -> TaskSetManager.maybeFinishTaskSet()
  /*** Called to indicate that all task attempts (including speculated tasks) associated with the* given TaskSetManager have completed, so state associated with the TaskSetManager should be* cleaned up.*/def taskSetFinished(manager: TaskSetManager): Unit = synchronized {taskSetsByStageIdAndAttempt.get(manager.taskSet.stageId).foreach { taskSetsForStage =>taskSetsForStage -= manager.taskSet.stageAttemptIdif (taskSetsForStage.isEmpty) {taskSetsByStageIdAndAttempt -= manager.taskSet.stageId}}manager.parent.removeSchedulable(manager)logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +s" ${manager.parent.name}")}

TaskSetManager.maybeFinishTaskSet()

  • runningTasks 为0时才被调用,说明此时任务都已经运行完了
  • && 条件(tasksSuccessful == numTasks) 即,所有的任务都运行成功才说明是整个任务集已完成,TaskSetManager.handleSuccessfulTask()方法中,每一个任务完成成功后,会tasksSuccessful += 1
  • 该方法被调用 TaskSetManager.handleSuccessfulTask()
  private def maybeFinishTaskSet() {if (isZombie && runningTasks == 0) {sched.taskSetFinished(this)if (tasksSuccessful == numTasks) {blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet(taskSet.stageId,taskSet.stageAttemptId,taskSetBlacklistHelperOpt.get.execToFailures))}}}

TaskSetManager.handleSuccessfulTask()

  • 被TaskSetGetter.enqueueSuccessfulTask()调用
  • TaskSetGetter.enqueueSuccessfulTask()被TaskSchedulerImpl.statusUpdate()方法调用
  • executor 在执行完任务后,触发发送消息: StatusUpdate
/*** Marks the task as successful and notifies the DAGScheduler that a task has ended.*/def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {val info = taskInfos(tid)val index = info.indexinfo.markSuccessful()removeRunningTask(tid)// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not// "deserialize" the value when holding a lock to avoid blocking other threads. So we call// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.// Note: "result.value()" only deserializes the value when it's called at the first time, so// here "result.value()" just returns the value and won't block other threads.sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)if (!successful(index)) {tasksSuccessful += 1logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +s" ($tasksSuccessful/$numTasks)")// Mark successful and stop if all the tasks have succeeded.successful(index) = trueif (tasksSuccessful == numTasks) {isZombie = true}} else {logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +" because task " + index + " has already completed successfully")}maybeFinishTaskSet()}

粗粒度后端调度器处理消息StatusUpdate

  • CoarseGrainedSchedulerBackend.DriverEndpoint的receive()方法中处理消息:StatusUpdate()
  • 调用方法 scheduler.statusUpdate(taskId, state, data.value)
override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {var failedExecutor: Option[String] = Nonesynchronized {try {if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {// We lost this entire executor, so remember that it's goneval execId = taskIdToExecutorId(tid)if (executorIdToTaskCount.contains(execId)) {removeExecutor(execId,SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))failedExecutor = Some(execId)}}taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>if (TaskState.isFinished(state)) {taskIdToTaskSetManager.remove(tid)taskIdToExecutorId.remove(tid).foreach { execId =>if (executorIdToTaskCount.contains(execId)) {executorIdToTaskCount(execId) -= 1}}}if (state == TaskState.FINISHED) {taskSet.removeRunningTask(tid)taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskSet.removeRunningTask(tid)taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}case None =>logError(("Ignoring update with state %s for TID %s because its task set is gone (this is " +"likely the result of receiving duplicate task finished status updates)").format(state, tid))}} catch {case e: Exception => logError("Exception in statusUpdate", e)}}// Update the DAGScheduler without holding a lock on this, since that can deadlockif (failedExecutor.isDefined) {dagScheduler.executorLost(failedExecutor.get)backend.reviveOffers()}}

这篇关于Spark TaskSchedulerImpl TaskSet处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/864507

相关文章

Go语言使用Buffer实现高性能处理字节和字符

《Go语言使用Buffer实现高性能处理字节和字符》在Go中,bytes.Buffer是一个非常高效的类型,用于处理字节数据的读写操作,本文将详细介绍一下如何使用Buffer实现高性能处理字节和... 目录1. bytes.Buffer 的基本用法1.1. 创建和初始化 Buffer1.2. 使用 Writ

Python视频处理库VidGear使用小结

《Python视频处理库VidGear使用小结》VidGear是一个高性能的Python视频处理库,本文主要介绍了Python视频处理库VidGear使用小结,文中通过示例代码介绍的非常详细,对大家的... 目录一、VidGear的安装二、VidGear的主要功能三、VidGear的使用示例四、VidGea

Python结合requests和Cheerio处理网页内容的操作步骤

《Python结合requests和Cheerio处理网页内容的操作步骤》Python因其简洁明了的语法和强大的库支持,成为了编写爬虫程序的首选语言之一,requests库是Python中用于发送HT... 目录一、前言二、环境搭建三、requests库的基本使用四、Cheerio库的基本使用五、结合req

使用Python处理CSV和Excel文件的操作方法

《使用Python处理CSV和Excel文件的操作方法》在数据分析、自动化和日常开发中,CSV和Excel文件是非常常见的数据存储格式,ython提供了强大的工具来读取、编辑和保存这两种文件,满足从基... 目录1. CSV 文件概述和处理方法1.1 CSV 文件格式的基本介绍1.2 使用 python 内

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery

SpringBoot操作spark处理hdfs文件的操作方法

《SpringBoot操作spark处理hdfs文件的操作方法》本文介绍了如何使用SpringBoot操作Spark处理HDFS文件,包括导入依赖、配置Spark信息、编写Controller和Ser... 目录SpringBoot操作spark处理hdfs文件1、导入依赖2、配置spark信息3、cont

MyBatis延迟加载的处理方案

《MyBatis延迟加载的处理方案》MyBatis支持延迟加载(LazyLoading),允许在需要数据时才从数据库加载,而不是在查询结果第一次返回时就立即加载所有数据,延迟加载的核心思想是,将关联对... 目录MyBATis如何处理延迟加载?延迟加载的原理1. 开启延迟加载2. 延迟加载的配置2.1 使用

Android WebView的加载超时处理方案

《AndroidWebView的加载超时处理方案》在Android开发中,WebView是一个常用的组件,用于在应用中嵌入网页,然而,当网络状况不佳或页面加载过慢时,用户可能会遇到加载超时的问题,本... 目录引言一、WebView加载超时的原因二、加载超时处理方案1. 使用Handler和Timer进行超

Python中处理NaN值的技巧分享

《Python中处理NaN值的技巧分享》在数据科学和数据分析领域,NaN(NotaNumber)是一个常见的概念,它表示一个缺失或未定义的数值,在Python中,尤其是在使用pandas库处理数据时,... 目录NaN 值的来源和影响使用 pandas 的 isna()和 isnull()函数直接比较 Na

详解Python中通用工具类与异常处理

《详解Python中通用工具类与异常处理》在Python开发中,编写可重用的工具类和通用的异常处理机制是提高代码质量和开发效率的关键,本文将介绍如何将特定的异常类改写为更通用的ValidationEx... 目录1. 通用异常类:ValidationException2. 通用工具类:Utils3. 示例文