本文主要是介绍源码-stage-task-taskSet-executor,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
本文:接《DAGScheduler及Stage划分提交》分析Stage中得Task是如何生成并且最终提交到Executor中去的。
从org.apache.spark.scheduler.DAGScheduler#submitMissingTasks开始,分析Stage是如何生成TaskSet的。
private def submitMissingTasks(stage: Stage, jobId: Int) {
logDebug("submitMissingTasks(" + stage + ")")
// Get our pending tasks and remember them in our pendingTasks entry
stage.pendingPartitions.clear()
// First figure out the indexes of partition ids to compute.
val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
// Create internal accumulators if the stage has no accumulators initialized.
// Reset internal accumulators only if this stage is not partially submitted
// Otherwise, we may override existing accumulator values from some tasks
if (stage.internalAccumulators.isEmpty || stage.numPartitions == partitionsToCompute.size) {
stage.resetInternalAccumulators()
}
// Use the scheduling pool, job group, description, etc. from an ActiveJob associated
// with this Stage
val properties = jobIdToActiveJob(jobId).properties
runningStages += stage
// SparkListenerStageSubmitted should be posted before testing whether tasks are
// serializable. If tasks are not serializable, a SparkListenerStageCompleted event
// will be posted, which should always come after a corresponding SparkListenerStageSubmitted
// event.
stage match {
case s: ShuffleMapStage =>
outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
case s: ResultStage =>
outputCommitCoordinator.stageStart(
stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
}
val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
stage match {
case s: ShuffleMapStage =>
partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
case s: ResultStage =>
val job = s.activeJob.get
partitionsToCompute.map { id =>
val p = s.partitions(id)
(id, getPreferredLocs(stage.rdd, p))
}.toMap
}
} catch {
case NonFatal(e) =>
stage.makeNewStageAttempt(partitionsToCompute.size)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))
// TODO: Maybe we can keep the taskBinary in Stage to avoid serializing it multiple times.
// Broadcasted binary for the task, used to dispatch tasks to executors. Note that we broadcast
// the serialized copy of the RDD and for each task we will deserialize it, which means each
// task gets a different copy of the RDD. This provides stronger isolation between tasks that
// might modify state of objects referenced in their closures. This is necessary in Hadoop
// where the JobConf/Configuration object is not thread-safe.
var taskBinary: Broadcast[Array[Byte]] = null
try {
// For ShuffleMapTask, serialize and broadcast (rdd, shuffleDep).
// For ResultTask, serialize and broadcast (rdd, func).
val taskBinaryBytes: Array[Byte] = stage match {
case stage: ShuffleMapStage =>
closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
case stage: ResultStage =>
closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
}
taskBinary = sc.broadcast(taskBinaryBytes)
} catch {
// In the case of a failure during serialization, abort the stage.
case e: NotSerializableException =>
abortStage(stage, "Task not serializable: " + e.toString, Some(e))
runningStages -= stage
// Abort execution
return
case NonFatal(e) =>
abortStage(stage, s"Task serialization failed: $e\n${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
val tasks: Seq[Task[_]] = try {
stage match {
case stage: ShuffleMapStage =>
partitionsToCompute.map { id =>
val locs = taskIdToLocations(id)
val part = stage.rdd.partitions(id)
new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, stage.internalAccumulators)
}
case stage: ResultStage =>
val job = stage.activeJob.get
partitionsToCompute.map { id =>
val p: Int = stage.partitions(id)
val part = stage.rdd.partitions(p)
val locs = taskIdToLocations(id)
new ResultTask(stage.id, stage.latestInfo.attemptId,
taskBinary, part, locs, id, stage.internalAccumulators)
}
}
} catch {
case NonFatal(e) =>
abortStage(stage, s"Task creation failed: $e\n${e.getStackTraceString}", Some(e))
runningStages -= stage
return
}
if (tasks.size > 0) {
logInfo("Submitting " + tasks.size + " missing tasks from " + stage + " (" + stage.rdd + ")")
stage.pendingPartitions ++= tasks.map(_.partitionId)
logDebug("New pending partitions: " + stage.pendingPartitions)
taskScheduler.submitTasks(new TaskSet(
tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
} else {
// Because we posted SparkListenerStageSubmitted earlier, we should mark
// the stage as completed here in case there are no tasks to run
markStageAsFinished(stage, None)
val debugString = stage match {
case stage: ShuffleMapStage =>
s"Stage ${stage} is actually done; " +
s"(available: ${stage.isAvailable}," +
s"available outputs: ${stage.numAvailableOutputs}," +
s"partitions: ${stage.numPartitions})"
case stage : ResultStage =>
s"Stage ${stage} is actually done; (partitions: ${stage.numPartitions})"
}
logDebug(debugString)
}
}
org.apache.spark.scheduler.DAGScheduler#submitMissingTasks的计算流程如下:
- 首先得到RDD中需要计算的partition,对于Shuffle类型的stage,需要判断stage中是否缓存了该结果;对于Result类型的Final Stage,则判断计算Job中该partition是否已经计算完成。
- 序列化task的binary。Executor可以通过广播变量得到它。每个task运行的时候首先会反序列化。这样在不同的executor上运行的task是隔离的,不会相互影响。
- 为每个需要计算的partition生成一个task:对于Shuffle类型依赖的Stage,生成ShuffleMapTask类型的task;对于Result类型的Stage,生成一个ResultTask类型的task
- 确保Task是可以被序列化的。因为不同的cluster有不同的taskScheduler,在这里判断可以简化逻辑;保证TaskSet的task都是可以序列化的
- 通过TaskScheduler提交TaskSet。
- private[spark] class TaskSet(
- val tasks: Array[Task[_]],
- val stageId: Int,
- val attempt: Int,
- val priority: Int,
- val properties: Properties) {
- val id: String = stageId + "." + attempt
- override def toString: String = "TaskSet " + id
- }
- org.apache.spark.scheduler.TaskSchedulerImpl#submitTasks
- org.apache.spark.scheduler.SchedulableBuilder#addTaskSetManager
- org.apache.spark.scheduler.cluster.CoarseGrainedSchedulerBackend#reviveOffers
driverEndpoint.send(ReviveOffers)
}
我们继续看driverEndpoint是什么鬼。driverEndpoint是RPC中driver端Endpoint的引用,其类型为RpcEndpointRef。在CoarseGrainedSchedulerBackend启动时的start()方法中,对driverEndpoint进行了赋值:
- // TODO (prashant) send conf instead of properties
- driverEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
- private def getRpcEnvFactory(conf: SparkConf): RpcEnvFactory = {
- // 两种实现方式:
- // akka:org.apache.spark.rpc.akka.AkkaRpcEnvFactory
- // netty:org.apache.spark.rpc.netty.NettyRpcEnvFactory
- val rpcEnvNames = Map(
- "akka" -> "org.apache.spark.rpc.akka.AkkaRpcEnvFactory",
- "netty" -> "org.apache.spark.rpc.netty.NettyRpcEnvFactory")
- // 通过参数spark.rpc配置,默认为netty
- val rpcEnvName = conf.get("spark.rpc", "netty")
- val rpcEnvFactoryClassName = rpcEnvNames.getOrElse(rpcEnvName.toLowerCase, rpcEnvName)
- Utils.classForName(rpcEnvFactoryClassName).newInstance().asInstanceOf[RpcEnvFactory]
- }
下面,我们就看下Netty的概要实现,在NettyRpcEnv的setupEndpoint()方法中:
- override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
- // 调用Dispatcher的registerRpcEndpoint()方法完成注册
- dispatcher.registerRpcEndpoint(name, endpoint)
- }
- protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
- new DriverEndpoint(rpcEnv, properties)
- }
为什么要用RpcEndpoint呢?很简单,Task的调度与执行是在一个分布式集群上进行的,自然需要进程间的通讯。
继续分析,那么上面提到的driverEndpoint是如何赋值的呢?我们继续看Dispatcher的registerRpcEndpoint()方法,因为最终是由它向上返回RpcEndpointRef来完成driverEndpoint的赋值的。代码如下:
- // 注册RpcEndpoint
- // name为“Master”,endpoint为Master对象
- def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
- // 创建RpcEndpointAddress
- val addr = RpcEndpointAddress(nettyEnv.address, name)
- // 创建NettyRpcEndpointRef
- val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
- // 同步代码块
- synchronized {
- if (stopped) {
- throw new IllegalStateException("RpcEnv has been stopped")
- }
- // ConcurrentHashMap的putIfAbsent()方法确保不会重复创建EndpointData
- if (endpoints.putIfAbsent(name, new EndpointData(name, endpoint, endpointRef)) != null) {
- throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
- }
- val data = endpoints.get(name)
- endpointRefs.put(data.endpoint, data.ref)
- receivers.offer(data) // for the OnStart message
- }
- endpointRef
- }
(2)override def receive: PartialFunction[Any, Unit] = {
case StatusUpdate(executorId, taskId, state, data) =>scheduler.statusUpdate(taskId, state, data.value)
if (TaskState.isFinished(state)) {
executorDataMap.get(executorId) match {
case Some(executorInfo) =>
executorInfo.freeCores += scheduler.CPUS_PER_TASK
makeOffers(executorId)
case None =>
// Ignoring the update since we don't know about the executor.
logWarning(s"Ignored task status update ($taskId state $state) " +
s"from unknown executor with ID $executorId")
}
}
case ReviveOffers =>
makeOffers()
(3) private def makeOffers() {
// Filter out executors under killing
val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
val workOffers = activeExecutors.map { case (id, executorData) =>
new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
}.toSeq
launchTasks(scheduler.resourceOffers(workOffers))--进入到TaskSchedulerImpl
}
(4)TaskSchedulerImpl->resourceOffers
def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
// Mark each slave as alive and remember its hostname
// Also track if new executor is added
var newExecAvail = false
for (o <- offers) {
executorIdToHost(o.executorId) = o.host
executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
if (!executorsByHost.contains(o.host)) {
executorsByHost(o.host) = new HashSet[String]()
executorAdded(o.executorId, o.host)
newExecAvail = true
}
for (rack <- getRackForHost(o.host)) {
hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
}
}
// Randomly shuffle offers to avoid always placing tasks on the same set of workers.
val shuffledOffers = Random.shuffle(offers)
// Build a list of tasks to assign to each worker.
val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
val availableCpus = shuffledOffers.map(o => o.cores).toArray
val sortedTaskSets = rootPool.getSortedTaskSetQueue
for (taskSet <- sortedTaskSets) {
logDebug("parentName: %s, name: %s, runningTasks: %s".format(
taskSet.parent.name, taskSet.name, taskSet.runningTasks))
if (newExecAvail) {
taskSet.executorAdded()
}
}
// Take each TaskSet in our scheduling order, and then offer it each node in increasing order
// of locality levels so that it gets a chance to launch local tasks on all of them.
// NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
var launchedTask = false
for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
do {
launchedTask = resourceOfferSingleTaskSet(
taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
} while (launchedTask)
}
if (tasks.size > 0) {
hasLaunchedTask = true
}
return tasks
}
关于resourceOffers的详细介绍见:http://www.jianshu.com/p/9a059ace2f3a/comments/1495052(
【SubStep1】: executor, host, rack等信息更新)
(5) private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
for (task <- tasks.flatten) {
val serializedTask = ser.serialize(task)
if (serializedTask.limit >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
try {
var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
"spark.akka.frameSize (%d bytes) - reserved (%d bytes). Consider increasing " +
"spark.akka.frameSize or using broadcast variables for large values."
msg = msg.format(task.taskId, task.index, serializedTask.limit, akkaFrameSize,
AkkaUtils.reservedSizeBytes)
taskSetMgr.abort(msg)
} catch {
case e: Exception => logError("Exception in error callback", e)
}
}
}
else {
val executorData = executorDataMap(task.executorId)
executorData.freeCores -= scheduler.CPUS_PER_TASK
executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
}
}
}
launchTasks的执行逻辑很简单,针对传入的TaskDescription序列,循环每个Task,做以下处理:
1、首先对Task进行序列化,得到serializedTask;
2、针对序列化后的Task:serializedTask,判断其大小:
2.1、序列化后的task的大小达到或超出规定的上限,即框架配置的Akka消息最大大小,减去除序列化task或task结果外,一个Akka消息需要保留的额外大小的值,则根据task的taskId,在TaskSchedulerImpl的taskIdToTaskSetManager中获取对应的TaskSetManager,并调用其abort()方法,标记对应TaskSetManager为失败;
2.2、序列化后的task的大小未达到上限,在规定的大小范围内,则:
2.2.1、从executorDataMap中,根据task.executorId获取executor描述信息executorData;
2.2.2、在executorData中,freeCores做相应减少;
2.2.3、利用executorData中的executorEndpoint,即Driver端executor通讯端点的引用,发送LaunchTask事件,LaunchTask事件中包含序列化后的task,将Task传递到executor中去执行。那么executor中是如何接收LaunchTask事件的呢?答案就在CoarseGrainedExecutorBackend中。
- private[spark] class CoarseGrainedExecutorBackend(
- override val rpcEnv: RpcEnv,
- driverUrl: String,
- executorId: String,
- hostPort: String,
- cores: Int,
- userClassPath: Seq[URL],
- env: SparkEnv)
- extends ThreadSafeRpcEndpoint with ExecutorBackend with Logging {
- /**
- * A pluggable interface used by the Executor to send updates to the cluster scheduler.
- * 一个被Executor用来发送更新到集群调度器的可插拔接口。
- */
- private[spark] trait ExecutorBackend {
- // 唯一的一个statusUpdate()方法
- // 需要Long类型的taskId、TaskState类型的state、ByteBuffer类型的data三个参数
- def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer)
- }
那么它自然就有两种主要的任务,第一,作为endpoint提供driver与executor间的通讯功能;第二,提供了executor任务执行时状态汇报的功能。
CoarseGrainedExecutorBackend到底是什么呢?这里我们先不深究,留到以后分析,你只要知道它是Executor的一个后台辅助进程,和Executor是一对一的关系,向Executor提供了与Driver通讯、任务执行时状态汇报两个基本功能即可。
接下来,我们看下CoarseGrainedExecutorBackend是如何处理LaunchTask事件的。做为RpcEndpoint,在其处理各类事件或消息的receive()方法中,定义如下:
- case LaunchTask(data) =>
- if (executor == null) {
- logError("Received LaunchTask command but executor was null")
- System.exit(1)
- } else {
- // 反序列话task,得到taskDesc
- val taskDesc = ser.deserialize[TaskDescription](data.value)
- logInfo("Got assigned task " + taskDesc.taskId)
- // 调用executor的launchTask()方法加载task
- executor.launchTask(this, taskId = taskDesc.taskId, attemptNumber = taskDesc.attemptNumber,
- taskDesc.name, taskDesc.serializedTask)
- }
1、反序列话task,得到taskDesc;
2、调用executor的launchTask()方法加载task。
那么,重点就落在了Executor的launchTask()方法中,代码如下:
- def launchTask(
- context: ExecutorBackend,
- taskId: Long,
- attemptNumber: Int,
- taskName: String,
- serializedTask: ByteBuffer): Unit = {
- // 新建一个TaskRunner
- val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
- serializedTask)
- // 将taskId与TaskRunner的对应关系存入runningTasks
- runningTasks.put(taskId, tr)
- // 线程池执行TaskRunner
- threadPool.execute(tr)
- }
我们先看下这个TaskRunner类。我们先看下Class及其成员变量的定义,如下:
- class TaskRunner(
- execBackend: ExecutorBackend,
- val taskId: Long,
- val attemptNumber: Int,
- taskName: String,
- serializedTask: ByteBuffer)
- extends Runnable {
- // TaskRunner继承了Runnable
- /** Whether this task has been killed. */
- // 标志位,task是否被杀掉
- @volatile private var killed = false
- /** How much the JVM process has spent in GC when the task starts to run. */
- @volatile var startGCTime: Long = _
- /**
- * The task to run. This will be set in run() by deserializing the task binary coming
- * from the driver. Once it is set, it will never be changed.
- *
- * 需要运行的task。它将在反序列化来自driver的task二进制数据时在run()方法被设置,一旦被设置,它将不会再发生改变。
- */
- @volatile var task: Task[Any] = _
- }
1、execBackend:Executor后台辅助进程,提供了与Driver通讯、状态汇报等两大基本功能,实际上传入的是CoarseGrainedExecutorBackend实例;
2、taskId:Task的唯一标识;
3、attemptNumber:Task运行的序列号,Spark与MapReduce一样,可以为拖后腿任务启动备份任务,即推测执行原理,如此,就需要通过taskId加attemptNumber来唯一标识一个Task运行实例;
4、serializedTask:ByteBuffer类型,序列化后的Task,包含的是Task的内容,通过发序列化它来得到Task,并运行其中的run()方法来执行Task;
5、killed:Task是否被杀死的标志位;
6、task:Task[Any]类型,需要运行的Task,它将在反序列化来自driver的task二进制数据时在run()方法被设置,一旦被设置,它将不会再发生改变;
7、startGCTime:JVM在task开始运行后,进行垃圾回收的时间。
另外,既然是一个线程,TaskRunner必须得提供run()方法,该run()方法就是TaskRunner线程在线程池中被调度时,需要执行的方法,我们来看下它的定义:
- override def run(): Unit = {
- // Step1:Task及其运行时需要的辅助对象构造
- // 获取任务内存管理器
- val taskMemoryManager = new TaskMemoryManager(env.memoryManager, taskId)
- // 反序列化开始时间
- val deserializeStartTime = System.currentTimeMillis()
- // 当前线程设置上下文类加载器
- Thread.currentThread.setContextClassLoader(replClassLoader)
- // 从SparkEnv中获取序列化器
- val ser = env.closureSerializer.newInstance()
- logInfo(s"Running $taskName (TID $taskId)")
- // execBackend更新状态TaskState.RUNNING
- execBackend.statusUpdate(taskId, TaskState.RUNNING, EMPTY_BYTE_BUFFER)
- var taskStart: Long = 0
- // 计算垃圾回收的时间
- startGCTime = computeTotalGcTime()
- try {
- // 调用Task的deserializeWithDependencies()方法,反序列化Task,得到Task运行需要的文件taskFiles、jar包taskFiles和Task二进制数据taskBytes
- val (taskFiles, taskJars, taskBytes) = Task.deserializeWithDependencies(serializedTask)
- updateDependencies(taskFiles, taskJars)
- // 反序列化Task二进制数据taskBytes,得到task实例
- task = ser.deserialize[Task[Any]](taskBytes, Thread.currentThread.getContextClassLoader)
- // 设置Task的任务内存管理器
- task.setTaskMemoryManager(taskMemoryManager)
- // If this task has been killed before we deserialized it, let's quit now. Otherwise,
- // continue executing the task.
- // 如果此时Task被kill,抛出异常,快速退出
- if (killed) {
- // Throw an exception rather than returning, because returning within a try{} block
- // causes a NonLocalReturnControl exception to be thrown. The NonLocalReturnControl
- // exception will be caught by the catch block, leading to an incorrect ExceptionFailure
- // for the task.
- throw new TaskKilledException
- }
- logDebug("Task " + taskId + "'s epoch is " + task.epoch)
- // mapOutputTracker更新Epoch
- env.mapOutputTracker.updateEpoch(task.epoch)
- // Run the actual task and measure its runtime.
- // 运行真正的task,并度量它的运行时间
- // Step2:Task运行
- // task开始时间
- taskStart = System.currentTimeMillis()
- // 标志位threwException设置为true,标识Task真正执行过程中是否抛出异常
- var threwException = true
- // 调用Task的run()方法,真正执行Task,并获得运行结果value
- val (value, accumUpdates) = try {
- // 调用Task的run()方法,真正执行Task
- val res = task.run(
- taskAttemptId = taskId,
- attemptNumber = attemptNumber,
- metricsSystem = env.metricsSystem)
- // 标志位threwException设置为false
- threwException = false
- // 返回res,Task的run()方法中,res的定义为(T, AccumulatorUpdates)
- // 这里,前者为任务运行结果,后者为累加器更新
- res
- } finally {
- // 通过任务内存管理器清理所有的分配的内存
- val freedMemory = taskMemoryManager.cleanUpAllAllocatedMemory()
- if (freedMemory > 0) {
- val errMsg = s"Managed memory leak detected; size = $freedMemory bytes, TID = $taskId"
- if (conf.getBoolean("spark.unsafe.exceptionOnMemoryLeak", false) && !threwException) {
- throw new SparkException(errMsg)
- } else {
- logError(errMsg)
- }
- }
- }
- // task完成时间
- val taskFinish = System.currentTimeMillis()
- // If the task has been killed, let's fail it.
- // 如果task被杀死,抛出TaskKilledException异常
- if (task.killed) {
- throw new TaskKilledException
- }
- // Step3:Task运行结果处理
- // 通过Spark获取Task运行结果序列化器
- val resultSer = env.serializer.newInstance()
- // 结果序列化前的时间点
- val beforeSerialization = System.currentTimeMillis()
- // 利用Task运行结果序列化器序列化Task运行结果,得到valueBytes
- val valueBytes = resultSer.serialize(value)
- // 结果序列化后的时间点
- val afterSerialization = System.currentTimeMillis()
- // 度量指标体系相关,暂不介绍
- for (m <- task.metrics) {
- // Deserialization happens in two parts: first, we deserialize a Task object, which
- // includes the Partition. Second, Task.run() deserializes the RDD and function to be run.
- m.setExecutorDeserializeTime(
- (taskStart - deserializeStartTime) + task.executorDeserializeTime)
- // We need to subtract Task.run()'s deserialization time to avoid double-counting
- m.setExecutorRunTime((taskFinish - taskStart) - task.executorDeserializeTime)
- m.setJvmGCTime(computeTotalGcTime() - startGCTime)
- m.setResultSerializationTime(afterSerialization - beforeSerialization)
- m.updateAccumulators()
- }
- // 构造DirectTaskResult,同时包含Task运行结果valueBytes和累加器更新值accumulator updates
- val directResult = new DirectTaskResult(valueBytes, accumUpdates, task.metrics.orNull)
- // 序列化DirectTaskResult,得到serializedDirectResult
- val serializedDirectResult = ser.serialize(directResult)
- // 获取Task运行结果大小
- val resultSize = serializedDirectResult.limit
- // directSend = sending directly back to the driver
- // directSend的意思就是直接发送结果至Driver端
- val serializedResult: ByteBuffer = {
- // 如果Task运行结果大小大于所有Task运行结果的最大大小,序列化IndirectTaskResult
- // IndirectTaskResult为存储在Worker上BlockManager中DirectTaskResult的一个引用
- if (maxResultSize > 0 && resultSize > maxResultSize) {
- logWarning(s"Finished $taskName (TID $taskId). Result is larger than maxResultSize " +
- s"(${Utils.bytesToString(resultSize)} > ${Utils.bytesToString(maxResultSize)}), " +
- s"dropping it.")
- ser.serialize(new IndirectTaskResult[Any](TaskResultBlockId(taskId), resultSize))
- }
- // 如果 Task运行结果大小超过Akka除去需要保留的字节外最大大小,则将结果写入BlockManager
- // 即运行结果无法通过消息传递
- else if (resultSize >= akkaFrameSize - AkkaUtils.reservedSizeBytes) {
- val blockId = TaskResultBlockId(taskId)
- env.blockManager.putBytes(
- blockId, serializedDirectResult, StorageLevel.MEMORY_AND_DISK_SER)
- logInfo(
- s"Finished $taskName (TID $taskId). $resultSize bytes result sent via BlockManager)")
- ser.serialize(new IndirectTaskResult[Any](blockId, resultSize))
- }
- // Task运行结果比较小的话,直接返回,通过消息传递
- else {
- logInfo(s"Finished $taskName (TID $taskId). $resultSize bytes result sent to driver")
- serializedDirectResult
- }
- }
- // execBackend更新状态TaskState.FINISHED
- execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
- } catch {// 处理各种异常信息
- case ffe: FetchFailedException =>
- val reason = ffe.toTaskEndReason
- execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
- case _: TaskKilledException | _: InterruptedException if task.killed =>
- logInfo(s"Executor killed $taskName (TID $taskId)")
- execBackend.statusUpdate(taskId, TaskState.KILLED, ser.serialize(TaskKilled))
- case cDE: CommitDeniedException =>
- val reason = cDE.toTaskEndReason
- execBackend.statusUpdate(taskId, TaskState.FAILED, ser.serialize(reason))
- case t: Throwable =>
- // Attempt to exit cleanly by informing the driver of our failure.
- // If anything goes wrong (or this was a fatal exception), we will delegate to
- // the default uncaught exception handler, which will terminate the Executor.
- logError(s"Exception in $taskName (TID $taskId)", t)
- val metrics: Option[TaskMetrics] = Option(task).flatMap { task =>
- task.metrics.map { m =>
- m.setExecutorRunTime(System.currentTimeMillis() - taskStart)
- m.setJvmGCTime(computeTotalGcTime() - startGCTime)
- m.updateAccumulators()
- m
- }
- }
- val serializedTaskEndReason = {
- try {
- ser.serialize(new ExceptionFailure(t, metrics))
- } catch {
- case _: NotSerializableException =>
- // t is not serializable so just send the stacktrace
- ser.serialize(new ExceptionFailure(t, metrics, false))
- }
- }
- // execBackend更新状态TaskState.FAILED
- execBackend.statusUpdate(taskId, TaskState.FAILED, serializedTaskEndReason)
- // Don't forcibly exit unless the exception was inherently fatal, to avoid
- // stopping other tasks unnecessarily.
- if (Utils.isFatalError(t)) {
- SparkUncaughtExceptionHandler.uncaughtException(t)
- }
- } finally {
- // 最后,无论运行成功还是失败,将task从runningTasks中移除
- runningTasks.remove(taskId)
- }
- }
1、Step1:Task及其运行时需要的辅助对象构造;
2、Step2:Task运行;
3、Step3:Task运行结果处理。
对, 就这么简单!鉴于时间与篇幅问题,我们这里先讲下主要流程,细节方面的东西留待下节继续。
下面,我们一个个Step来看,首先看下Step1:Task及其运行时需要的辅助对象构造,主要包括以下步骤:
1.1、构造TaskMemoryManager任务内存管理器,即taskMemoryManager;
1.2、记录反序列化开始时间;
1.3、当前线程设置上下文类加载器;
1.4、从SparkEnv中获取序列化器ser;
1.5、execBackend更新状态TaskState.RUNNING;
1.6、计算垃圾回收时间;
1.7、调用Task的deserializeWithDependencies()方法,反序列化Task,得到Task运行需要的文件taskFiles、jar包taskFiles和Task二进制数据taskBytes;
1.8、反序列化Task二进制数据taskBytes,得到task实例;
1.9、设置Task的任务内存管理器;
1.10、如果此时Task被kill,抛出异常,快速退出;
接下来,是Step2:Task运行,主要流程如下:
2.1、获取task开始时间;
2.2、标志位threwException设置为true,标识Task真正执行过程中是否抛出异常;
2.3、调用Task的run()方法,真正执行Task,并获得运行结果value,和累加器更新accumUpdates;
2.4、标志位threwException设置为false;
2.5、通过任务内存管理器taskMemoryManager清理所有的分配的内存;
2.6、获取task完成时间;
2.7、如果task被杀死,抛出TaskKilledException异常。
最后一步,Step3:Task运行结果处理,大体流程如下:
3.1、通过SparkEnv获取Task运行结果序列化器;
3.2、获取结果序列化前的时间点;
3.3、利用Task运行结果序列化器序列化Task运行结果value,得到valueBytes;
3.4、获取结果序列化后的时间点;
3.5、度量指标体系相关,暂不介绍;
3.6、构造DirectTaskResult,同时包含Task运行结果valueBytes和累加器更新值accumulator updates;
3.7、序列化DirectTaskResult,得到serializedDirectResult;
3.8、获取Task运行结果大小;
3.9、处理Task运行结果:
3.9.1、如果Task运行结果大小大于所有Task运行结果的最大大小,序列化IndirectTaskResult,IndirectTaskResult为存储在Worker上BlockManager中DirectTaskResult的一个引用;
3.9.2、如果 Task运行结果大小超过Akka除去需要保留的字节外最大大小,则将结果写入BlockManager,Task运行结果比较小的话,直接返回,通过消息传递;
3.9.3、Task运行结果比较小的话,直接返回,通过消息传递
3.10、execBackend更新状态TaskState.FINISHED;
最后,无论运行成功还是失败,将task从runningTasks中移除。
参考:http://blog.csdn.net/lipeng_bigdata/article/details/50726216
http://blog.csdn.net/anzhsoft/article/details/40238111
这篇关于源码-stage-task-taskSet-executor的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!