本文主要是介绍Spark TaskSchedulerImpl TaskSet处理,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Spark TaskSchedulerImpl TaskSet处理
更多资源
- SPARK 源码分析技术分享(bilibilid视频汇总套装视频): https://www.bilibili.com/video/av37442139/
- github: https://github.com/opensourceteams/spark-scala-maven
- csdn(汇总视频在线看): https://blog.csdn.net/thinktothings/article/details/84726769
视频分享
- Spark TaskSchedulerImpl TaskSet原理分析(bilibili视频) : https://www.bilibili.com/video/av37442139/?p=20
- Spark TaskSchedulerImpl TaskSet原码分析(bilibili视频) : https://www.bilibili.com/video/av37442139/?p=21
图解
TaskSchedulerImpl提交任务集
- 在DAGScheduler.scal中件中的submitMissingTasks()方法中调用 taskScheduler.submitTasks
- 把任务集通过任务调度器进行提交
taskScheduler.submitTasks(new TaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
- 任务调度器实现
override def submitTasks(taskSet: TaskSet) {val tasks = taskSet.taskslogInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")this.synchronized {val manager = createTaskSetManager(taskSet, maxTaskFailures)val stage = taskSet.stageIdval stageTaskSets =taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])stageTaskSets(taskSet.stageAttemptId) = managerval conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>ts.taskSet != taskSet && !ts.isZombie}if (conflictingTaskSet) {throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")}schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)if (!isLocal && !hasReceivedTask) {starvationTimer.scheduleAtFixedRate(new TimerTask() {override def run() {if (!hasLaunchedTask) {logWarning("Initial job has not accepted any resources; " +"check your cluster UI to ensure that workers are registered " +"and have sufficient resources")} else {this.cancel()}}}, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)}hasReceivedTask = true}backend.reviveOffers()}
- 把任务集放到TaskSetManager(任务集管理器)中
- TaskSetManager(任务集管理器)继承 Schedulable,(可调度元素,就是把到调度池队列中的一个元素,供调度使用)
val manager = createTaskSetManager(taskSet, maxTaskFailures)
- 把任务集管理器增加到指定调度类型(FIFO,PAIR)的调度池中,也就是调度池中的调度队列中schedulableQueue
- 此时,相当于需要调度的任务已有了,存放在调度池中,下面是用具体的调度算法,按指定的顺序调度池中的任务
schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
- 任务调度器的submitTasks()方法中调用 backend.reviveOffers()方法,backend为SparkDeploySchedulerBackend,继承CoarseGrainedSchedulerBackend,所以调用的是CoarseGrainedSchedulerBackend中的reviveOffers()方法
backend.reviveOffers()
- 相当于是给Driver发送消息ReviveOffers
override def reviveOffers() {driverEndpoint.send(ReviveOffers)}
- driverEndpoint 中receive()方法处理消息,调用makeOffers()方法
case ReviveOffers =>makeOffers()
- scheduler.resourceOffers(workOffers)会计算出需要启动的任务序列
- resourceOffers()方法中调用方法得到调度任务的队列(按指定顺序的) rootPool.getSortedTaskSetQueue()
- launchTasks()方法把启动任务消息发送给executor
// Make fake resource offers on all executorsprivate def makeOffers() {// Filter out executors under killingval activeExecutors = executorDataMap.filterKeys(executorIsAlive)val workOffers = activeExecutors.map { case (id, executorData) =>new WorkerOffer(id, executorData.executorHost, executorData.freeCores)}.toIndexedSeqlaunchTasks(scheduler.resourceOffers(workOffers))}
- 计算当前stage转换的TaskSet中的部分任务,发送执行任务的消息executor处理
/*** Called by cluster manager to offer resources on slaves. We respond by asking our active task* sets for tasks in order of priority. We fill each node with tasks in a round-robin manner so* that tasks are balanced across the cluster.*/def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {// Mark each slave as alive and remember its hostname// Also track if new executor is addedvar newExecAvail = falsefor (o <- offers) {if (!hostToExecutors.contains(o.host)) {hostToExecutors(o.host) = new HashSet[String]()}if (!executorIdToTaskCount.contains(o.executorId)) {hostToExecutors(o.host) += o.executorIdexecutorAdded(o.executorId, o.host)executorIdToHost(o.executorId) = o.hostexecutorIdToTaskCount(o.executorId) = 0newExecAvail = true}for (rack <- getRackForHost(o.host)) {hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host}}// Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do// this here to avoid a separate thread and added synchronization overhead, and also because// updating the blacklist is only relevant when task offers are being made.blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>offers.filter { offer =>!blacklistTracker.isNodeBlacklisted(offer.host) &&!blacklistTracker.isExecutorBlacklisted(offer.executorId)}}.getOrElse(offers)// Randomly shuffle offers to avoid always placing tasks on the same set of workers.val shuffledOffers = Random.shuffle(filteredOffers)// Build a list of tasks to assign to each worker.val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))val availableCpus = shuffledOffers.map(o => o.cores).toArrayval sortedTaskSets = rootPool.getSortedTaskSetQueuefor (taskSet <- sortedTaskSets) {logDebug("parentName: %s, name: %s, runningTasks: %s".format(taskSet.parent.name, taskSet.name, taskSet.runningTasks))if (newExecAvail) {taskSet.executorAdded()}}// Take each TaskSet in our scheduling order, and then offer it each node in increasing order// of locality levels so that it gets a chance to launch local tasks on all of them.// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANYfor (taskSet <- sortedTaskSets) {var launchedAnyTask = falsevar launchedTaskAtCurrentMaxLocality = falsefor (currentMaxLocality <- taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}if (tasks.size > 0) {hasLaunchedTask = true}return tasks}
- TaskSchedulerImpl中方法resourceOffers()中调用 resourceOfferSingleTaskSet()方法
- 就算看当前的任务集中任务,按worker机器的cpu内核数进行分配每次发送几个任务给executor进行启动
- 例: stage中TaskSet包含的任务个数是3个,worker 机器的cpu内核数为2,此时就需要把TastSet中的任务3个,拆分成两次,每一次是2个任务,第二次是1个任务,并行任务按cpu内核最大数来决定
for (taskSet <- sortedTaskSets) {var launchedAnyTask = falsevar launchedTaskAtCurrentMaxLocality = falsefor (currentMaxLocality <- taskSet.myLocalityLevels) {do {launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)launchedAnyTask |= launchedTaskAtCurrentMaxLocality} while (launchedTaskAtCurrentMaxLocality)}if (!launchedAnyTask) {taskSet.abortIfCompletelyBlacklisted(hostToExecutors)}}
- 看具体的方法 taskSet.resourceOffer().从当前stage的taskSet还剩下未处理的任务中,取出worker机器分配的cpu数取新的任务,发送给executor执行
private def resourceOfferSingleTaskSet(taskSet: TaskSetManager,maxLocality: TaskLocality,shuffledOffers: Seq[WorkerOffer],availableCpus: Array[Int],tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {var launchedTask = false// nodes and executors that are blacklisted for the entire application have already been// filtered out by this pointfor (i <- 0 until shuffledOffers.size) {val execId = shuffledOffers(i).executorIdval host = shuffledOffers(i).hostif (availableCpus(i) >= CPUS_PER_TASK) {try {for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {tasks(i) += taskval tid = task.taskIdtaskIdToTaskSetManager(tid) = taskSettaskIdToExecutorId(tid) = execIdexecutorIdToTaskCount(execId) += 1availableCpus(i) -= CPUS_PER_TASKassert(availableCpus(i) >= 0)launchedTask = true}} catch {case e: TaskNotSerializableException =>logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")// Do not offer resources for this task, but don't throw an error to allow other// task sets to be submitted.return launchedTask}}}return launchedTask}
- 该方法重点看 dequeueTask()方法
/*** Respond to an offer of a single executor from the scheduler by finding a task** NOTE: this function is either called with a maxLocality which* would be adjusted by delay scheduling algorithm or it will be with a special* NO_PREF locality which will be not modified** @param execId the executor Id of the offered resource* @param host the host Id of the offered resource* @param maxLocality the maximum locality we want to schedule the tasks at*/@throws[TaskNotSerializableException]def resourceOffer(execId: String,host: String,maxLocality: TaskLocality.TaskLocality): Option[TaskDescription] ={val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>blacklist.isNodeBlacklistedForTaskSet(host) ||blacklist.isExecutorBlacklistedForTaskSet(execId)}if (!isZombie && !offerBlacklisted) {val curTime = clock.getTimeMillis()var allowedLocality = maxLocalityif (maxLocality != TaskLocality.NO_PREF) {allowedLocality = getAllowedLocalityLevel(curTime)if (allowedLocality > maxLocality) {// We're not allowed to search for farther-away tasksallowedLocality = maxLocality}}dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>// Found a task; do some bookkeeping and return a task descriptionval task = tasks(index)val taskId = sched.newTaskId()// Do various bookkeepingcopiesRunning(index) += 1val attemptNum = taskAttempts(index).sizeval info = new TaskInfo(taskId, index, attemptNum, curTime,execId, host, taskLocality, speculative)taskInfos(taskId) = infotaskAttempts(index) = info :: taskAttempts(index)// Update our locality level for delay scheduling// NO_PREF will not affect the variables related to delay schedulingif (maxLocality != TaskLocality.NO_PREF) {currentLocalityIndex = getLocalityIndex(taskLocality)lastLaunchTime = curTime}// Serialize and return the taskval startTime = clock.getTimeMillis()val serializedTask: ByteBuffer = try {Task.serializeWithDependencies(task, sched.sc.addedFiles, sched.sc.addedJars, ser)} catch {// If the task cannot be serialized, then there's no point to re-attempt the task,// as it will always fail. So just abort the whole task-set.case NonFatal(e) =>val msg = s"Failed to serialize task $taskId, not attempting to retry it."logError(msg, e)abort(s"$msg Exception during serialization: $e")throw new TaskNotSerializableException(e)}if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&!emittedTaskSizeWarning) {emittedTaskSizeWarning = truelogWarning(s"Stage ${task.stageId} contains a task of very large size " +s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")}addRunningTask(taskId)// We used to log the time it takes to serialize the task, but task size is already// a good proxy to task serialization time.// val timeTaken = clock.getTime() - startTimeval taskName = s"task ${info.id} in stage ${taskSet.id}"logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")sched.dagScheduler.taskStarted(task, info)new TaskDescription(taskId = taskId, attemptNumber = attemptNum, execId,taskName, index, serializedTask)}} else {None}}
- 该方法重点看 TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)
- allPendingTasks中的数据是在TaskSetManager实例方法中调用,并且会按反序增加(…2,1,0)
- 调用 dequeueTaskFromList方法,移除最后一个任务,也就是任务集索引中排在最前的任务
for (i <- (0 until numTasks).reverse) {addPendingTask(i)}
/*** Dequeue a pending task for a given node and return its index and locality level.* Only search for tasks matching the given locality constraint.** @return An option containing (task index within the task set, locality, is speculative?)*/private def dequeueTask(execId: String, host: String, maxLocality: TaskLocality.Value): Option[(Int, TaskLocality.Value, Boolean)] ={for (index <- dequeueTaskFromList(execId, host, getPendingTasksForExecutor(execId))) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}if (TaskLocality.isAllowed(maxLocality, TaskLocality.NODE_LOCAL)) {for (index <- dequeueTaskFromList(execId, host, getPendingTasksForHost(host))) {return Some((index, TaskLocality.NODE_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.NO_PREF)) {// Look for noPref tasks after NODE_LOCAL for minimize cross-rack trafficfor (index <- dequeueTaskFromList(execId, host, pendingTasksWithNoPrefs)) {return Some((index, TaskLocality.PROCESS_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.RACK_LOCAL)) {for {rack <- sched.getRackForHost(host)index <- dequeueTaskFromList(execId, host, getPendingTasksForRack(rack))} {return Some((index, TaskLocality.RACK_LOCAL, false))}}if (TaskLocality.isAllowed(maxLocality, TaskLocality.ANY)) {for (index <- dequeueTaskFromList(execId, host, allPendingTasks)) {return Some((index, TaskLocality.ANY, false))}}// find a speculative task if all others tasks have been scheduleddequeueSpeculativeTask(execId, host, maxLocality).map {case (taskIndex, allowedLocality) => (taskIndex, allowedLocality, true)}}
- 判断当前stage的TaskSet中是示还有未被处理的Task,如果还有就继续找出来发送给Executor执行
/*** Dequeue a pending task from the given list and return its index.* Return None if the list is empty.* This method also cleans up any tasks in the list that have already* been launched, since we want that to happen lazily.*/private def dequeueTaskFromList(execId: String,host: String,list: ArrayBuffer[Int]): Option[Int] = {var indexOffset = list.sizewhile (indexOffset > 0) {indexOffset -= 1val index = list(indexOffset)if (!isTaskBlacklistedOnExecOrNode(index, execId, host)) {// This should almost always be list.trimEnd(1) to remove taillist.remove(indexOffset)if (copiesRunning(index) == 0 && !successful(index)) {return Some(index)}}}None}
TaskSet中的任务发送给Executor消息LaunchTask
- 粗粒度调度器调用启动任务的方法
- 给executor 发送 消息 LaunchTask()
// Launch tasks returned by a set of resource offersprivate def launchTasks(tasks: Seq[Seq[TaskDescription]]) {for (task <- tasks.flatten) {val serializedTask = ser.serialize(task)if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>try {var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +"spark.akka.frameSize or using broadcast variables for large values."msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,AkkaUtils.reservedSizeBytes)taskSetMgr.abort(msg)} catch {case e: Exception => logError("Exception in error callback", e)}}}else {val executorData = executorDataMap(task.executorId)executorData.freeCores -= scheduler.CPUS_PER_TASKexecutorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))}}}
executor
- CoarseGrainedExecutorBackend 收到消息: LaunchTask()
- receive() 消息处理
case LaunchTask(data) =>if (executor == null) {exitExecutor(1, "Received LaunchTask command but executor was null")} else {val taskDesc = ser.deserialize[TaskDescription](data.value)logInfo("Got assigned task " + taskDesc.taskId)executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,taskDesc.name, taskDesc.serializedTask)}
- executor 通过线程池调用 TaskRunner
- TaskRunner的run()会被调用
def launchTask(context: ExecutorBackend,taskId: Long,attemptNumber: Int,taskName: String,serializedTask: ByteBuffer): Unit = {val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,serializedTask)runningTasks.put(taskId, tr)threadPool.execute(tr)}
- launchTask 的run()执行后会调用 statusUpdate()方法,发送任务状态为已完成
- CoarseGrainedExecutorBacker中的 statusUpdate()方法会给Driver发送消息StatusUpdate()
execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {val msg = StatusUpdate(executorId, taskId, state, data)driver match {case Some(driverRef) => driverRef.send(msg)case None => logWarning(s"Drop $msg because has not yet connected to driver")}}
反向推,调度池中的调度任务如何移除
Pool 中有removeScheduler()方法
- 该方法被调用 -> TaskSchedulerImpl中 taskSetFinished()
override def removeSchedulable(schedulable: Schedulable) {schedulableQueue.remove(schedulable)schedulableNameToSchedulable.remove(schedulable.name)}
TaskSchedulerImpl中 taskSetFinished()
- 该方法被调用 -> TaskSetManager.maybeFinishTaskSet()
/*** Called to indicate that all task attempts (including speculated tasks) associated with the* given TaskSetManager have completed, so state associated with the TaskSetManager should be* cleaned up.*/def taskSetFinished(manager: TaskSetManager): Unit = synchronized {taskSetsByStageIdAndAttempt.get(manager.taskSet.stageId).foreach { taskSetsForStage =>taskSetsForStage -= manager.taskSet.stageAttemptIdif (taskSetsForStage.isEmpty) {taskSetsByStageIdAndAttempt -= manager.taskSet.stageId}}manager.parent.removeSchedulable(manager)logInfo(s"Removed TaskSet ${manager.taskSet.id}, whose tasks have all completed, from pool" +s" ${manager.parent.name}")}
TaskSetManager.maybeFinishTaskSet()
- runningTasks 为0时才被调用,说明此时任务都已经运行完了
- && 条件(tasksSuccessful == numTasks) 即,所有的任务都运行成功才说明是整个任务集已完成,TaskSetManager.handleSuccessfulTask()方法中,每一个任务完成成功后,会tasksSuccessful += 1
- 该方法被调用 TaskSetManager.handleSuccessfulTask()
private def maybeFinishTaskSet() {if (isZombie && runningTasks == 0) {sched.taskSetFinished(this)if (tasksSuccessful == numTasks) {blacklistTracker.foreach(_.updateBlacklistForSuccessfulTaskSet(taskSet.stageId,taskSet.stageAttemptId,taskSetBlacklistHelperOpt.get.execToFailures))}}}
TaskSetManager.handleSuccessfulTask()
- 被TaskSetGetter.enqueueSuccessfulTask()调用
- TaskSetGetter.enqueueSuccessfulTask()被TaskSchedulerImpl.statusUpdate()方法调用
- executor 在执行完任务后,触发发送消息: StatusUpdate
/*** Marks the task as successful and notifies the DAGScheduler that a task has ended.*/def handleSuccessfulTask(tid: Long, result: DirectTaskResult[_]): Unit = {val info = taskInfos(tid)val index = info.indexinfo.markSuccessful()removeRunningTask(tid)// This method is called by "TaskSchedulerImpl.handleSuccessfulTask" which holds the// "TaskSchedulerImpl" lock until exiting. To avoid the SPARK-7655 issue, we should not// "deserialize" the value when holding a lock to avoid blocking other threads. So we call// "result.value()" in "TaskResultGetter.enqueueSuccessfulTask" before reaching here.// Note: "result.value()" only deserializes the value when it's called at the first time, so// here "result.value()" just returns the value and won't block other threads.sched.dagScheduler.taskEnded(tasks(index), Success, result.value(), result.accumUpdates, info, result.metrics)if (!successful(index)) {tasksSuccessful += 1logInfo(s"Finished task ${info.id} in stage ${taskSet.id} (TID ${info.taskId}) in" +s" ${info.duration} ms on ${info.host} (executor ${info.executorId})" +s" ($tasksSuccessful/$numTasks)")// Mark successful and stop if all the tasks have succeeded.successful(index) = trueif (tasksSuccessful == numTasks) {isZombie = true}} else {logInfo("Ignoring task-finished event for " + info.id + " in stage " + taskSet.id +" because task " + index + " has already completed successfully")}maybeFinishTaskSet()}
粗粒度后端调度器处理消息StatusUpdate
- CoarseGrainedSchedulerBackend.DriverEndpoint的receive()方法中处理消息:StatusUpdate()
- 调用方法 scheduler.statusUpdate(taskId, state, data.value)
override def receive: PartialFunction[Any, Unit] = {case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)if (TaskState.isFinished(state)) {executorDataMap.get(executorId) match {case Some(executorInfo) =>executorInfo.freeCores += scheduler.CPUS_PER_TASKmakeOffers(executorId)case None =>// Ignoring the update since we don't know about the executor.logWarning(s"Ignored task status update ($taskId state $state) " +s"from unknown executor with ID $executorId")}}
def statusUpdate(tid: Long, state: TaskState, serializedData: ByteBuffer) {var failedExecutor: Option[String] = Nonesynchronized {try {if (state == TaskState.LOST && taskIdToExecutorId.contains(tid)) {// We lost this entire executor, so remember that it's goneval execId = taskIdToExecutorId(tid)if (executorIdToTaskCount.contains(execId)) {removeExecutor(execId,SlaveLost(s"Task $tid was lost, so marking the executor as lost as well."))failedExecutor = Some(execId)}}taskIdToTaskSetManager.get(tid) match {case Some(taskSet) =>if (TaskState.isFinished(state)) {taskIdToTaskSetManager.remove(tid)taskIdToExecutorId.remove(tid).foreach { execId =>if (executorIdToTaskCount.contains(execId)) {executorIdToTaskCount(execId) -= 1}}}if (state == TaskState.FINISHED) {taskSet.removeRunningTask(tid)taskResultGetter.enqueueSuccessfulTask(taskSet, tid, serializedData)} else if (Set(TaskState.FAILED, TaskState.KILLED, TaskState.LOST).contains(state)) {taskSet.removeRunningTask(tid)taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)}case None =>logError(("Ignoring update with state %s for TID %s because its task set is gone (this is " +"likely the result of receiving duplicate task finished status updates)").format(state, tid))}} catch {case e: Exception => logError("Exception in statusUpdate", e)}}// Update the DAGScheduler without holding a lock on this, since that can deadlockif (failedExecutor.isDefined) {dagScheduler.executorLost(failedExecutor.get)backend.reviveOffers()}}
这篇关于Spark TaskSchedulerImpl TaskSet处理的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!