Spark任务调度机制论述(转载)

2024-06-17 12:38

本文主要是介绍Spark任务调度机制论述(转载),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Spark任务调度机制论述

在生产环境下,Spark集群的部署方式一般为YARN-Cluster模式。 Driver线程主要是初始化SparkContext对象,准备运行所需的上下文,然后一方面保持与ApplicationMaster的RPC连接,通过ApplicationMaster申请资源,另一方面根据用户业务逻辑开始调度任务,将任务下发到已有的空闲Executor上。
当ResourceManager向ApplicationMaster返回Container资源时,ApplicationMaster就尝试在对应的Container上启动Executor进程,Executor进程起来后,会向Driver反向注册,注册成功后保持与Driver的心跳,同时等待Driver分发任务,当分发的任务执行完毕后,将任务状态上报给Driver。

1. Spark任务调度概述

1.1 基础概念

当Driver起来后,Driver则会根据用户程序逻辑准备任务,并根据Executor资源情况逐步分发任务。在详细阐述任务调度前,首先说明下Spark里的几个概念。一个Spark应用程序包括Job、Stage以及Task三个概念:

job:以 action 方法为界,一个 action 触发一个 job

stage:它是 job 的子集,以 RDD 宽依赖为界,遇到宽依赖即划分 stage

task:它是 stage 的子集,以分区数来衡量,分区数多少,task 就有多少
在这里插入图片描述

1.2 任务调度

spark 任务从发起到执行可用下图表示
在这里插入图片描述

1.3 Client—>ResourceManage

(1). Client 端通过 spark-submit + 参数 发起任务,即向ResourceManage 提交 application,注意该 application 包含了一堆参数,如 Executor 数,Executor 内存,Driver 内存等;

(2). ResourceManage 需要先判断现在资源是否能满足该 application,如果满足,则响应该 application,如果不满足,报错;

(3). 如果资源满足,Client 端准备 ApplicationMaster 的启动上下文,并交给 ResourceManage;

(4). 并且循环监控 application 的状态;

1.4 ResourceManage—>ApplicationMaster

(1). ResourceManage 找一个 worker 启动 ApplicationMaster;

(2). ApplicationMaster 向 ResourceManage 申请 Container;

(3). ResourceManage 收集可用资源,并告诉 ApplicationMaster;

(4). ApplicationMaster 尝试在对应的 Container 上启动 Executor 进程;

1.5 ApplicationMaster-Driver

(1). 有了资源,ApplicationMaster 启动 Driver;

//Driver 线程主要是初始化 SparkContext 对象,准备运行所需上下文,并保持与 ApplicationMaster 的 RPC 连接,通过 ApplicationMaster 申请资源

(2). Driver 启动成功后,告诉 ApplicationMaster;

1.6 Driver-Executor

(1). Executor 启动成功后,反向注册到 Driver 上,并持续向 Driver 发送心跳;

(2). Driver 启动 task,分发给 Executor,并监控 task 状态;

(3). 当 Executor 任务执行完毕后,将任务状态发送给 Driver;

spark 的核心就是资源申请和任务调度,主要通过 ApplicationMaster、Driver、Executor 来完成

spark 任务调度分为两层,一层是 stage 级的调度,一层是 task 级的调度
在这里插入图片描述

RDD 间的血缘关系,代表了计算的流程,构成了 有向无环图,即 DAG;

最后通过 action 触发 job 并调度执行;

DAGScheduler 负责 stage 级的调度,主要是将 DAG 切分成多个 stage,并将 stage 打包成 TaskSet 交给 TaskScheduler;

TaskScheduler 负责 task 级的调度,将 DAGScheduler 发过来的 TaskSet 按照指定的调度策略发送给 Executor;

SchedulerBackend 负责给 调度策略 提供可用资源,调度策略决定把 task 发送给哪个 Executor;【其中 SchedulerBackend 有多种实现,分别对接不同的资源管理系统】

基于上述认知,再来看一张图
在这里插入图片描述

Driver 在启动过程中,除了初始化 SparkContext 外,也初始化了 DAGScheduler、TaskScheduler、 SchedulerBackend 3个调度对象,同时初始化了 HeartbeatReceiver 心跳接收器;

并且各个线程之间保存通信;

SchedulerBackend 向 ApplicationMaster 申请资源,并不间断地从 TaskScheduler 获取 task 并发送给 合适的 Executor;

HeartbeatReceiver 负责接收 Executor 心跳报文,监控 Executor 存活状态;

2. Spark Stage级调度

Spark的任务调度是从DAG切割开始,主要是由DAGScheduler来完成。当遇到一个Action操作后就会触发一个Job的计算,并交给DAGScheduler来提交,下图是涉及到Job提交的相关方法调用流程图。
在这里插入图片描述

  1. Job由最终的RDD和Action方法封装而成;

  2. SparkContext将Job交给DAGScheduler提交,它会根据RDD的血缘关系构成的DAG进行切分,将一个Job划分为若干Stages,具体划分策略是,由最终的RDD不断通过依赖回溯判断父依赖是否是宽依赖,即以Shuffle为界,划分Stage,窄依赖的RDD之间被划分到同一个Stage中,可以进行pipeline式的计算。划分的Stages分两类,一类叫做ResultStage,为DAG最下游的Stage,由Action方法决定,另一类叫做ShuffleMapStage,为下游Stage准备数据,下面看一个简单的例子WordCount。
    在这里插入图片描述

Job由saveAsTextFile触发,该Job由RDD-3和saveAsTextFile方法组成,根据RDD之间的依赖关系从RDD-3开始回溯搜索,直到没有依赖的RDD-0,在回溯搜索过程中,RDD-3依赖RDD-2,并且是宽依赖,所以在RDD-2和RDD-3之间划分Stage,RDD-3被划到最后一个Stage,即ResultStage中,RDD-2依赖RDD-1,RDD-1依赖RDD-0,这些依赖都是窄依赖,所以将RDD-0、RDD-1和RDD-2划分到同一个Stage,形成pipeline操作,。即ShuffleMapStage中,实际执行的时候,数据记录会一气呵成地执行RDD-0到RDD-2的转化。不难看出,其本质上是一个深度优先搜索(Depth First Search)算法。

一个Stage是否被提交,需要判断它的父Stage是否执行,只有在父Stage执行完毕才能提交当前Stage,如果一个Stage没有父Stage,那么从该Stage开始提交。Stage提交时会将Task信息(分区信息以及方法等)序列化并被打包成TaskSet交给TaskScheduler,一个Partition对应一个Task,另一方面TaskScheduler会监控Stage的运行状态,只有Executor丢失或者Task由于Fetch失败才需要重新提交失败的Stage以调度运行失败的任务,其他类型的Task失败会在TaskScheduler的调度过程中重试。

相对来说DAGScheduler做的事情较为简单,仅仅是在Stage层面上划分DAG,提交Stage并监控相关状态信息。TaskScheduler则相对较为复杂,下面详细阐述其细节。

3. Spark Task级调度

Spark Task的调度是由TaskScheduler来完成,由前文可知,DAGScheduler将Stage打包到交给TaskScheTaskSetduler,TaskScheduler会将TaskSet封装为TaskSetManager加入到调度队列中,TaskSetManager结构如下图所示。
在这里插入图片描述

TaskSetManager负责监控管理同一个Stage中的Tasks,TaskScheduler就是以TaskSetManager为单元来调度任务。

前面也提到,TaskScheduler初始化后会启动SchedulerBackend,它负责跟外界打交道,接收Executor的注册信息,并维护Executor的状态,所以说SchedulerBackend是管“粮食”的,同时它在启动后会定期地去“询问”TaskScheduler有没有任务要运行,也就是说,它会定期地“问”TaskScheduler“我有这么余粮,你要不要啊”,TaskScheduler在SchedulerBackend“问”它的时候,会从调度队列中按照指定的调度策略选择TaskSetManager去调度运行,大致方法调用流程如下图所示:
在这里插入图片描述

上图中,将TaskSetManager加入rootPool调度池中之后,调用SchedulerBackend的riviveOffers方法给driverEndpoint发送ReviveOffer消息;driverEndpoint收到ReviveOffer消息后调用makeOffers方法,过滤出活跃状态的Executor(这些Executor都是任务启动时反向注册到Driver的Executor),然后将Executor封装成WorkerOffer对象;准备好计算资源(WorkerOffer)后,taskScheduler基于这些资源调用resourceOffer在Executor上分配task。

3.1 调度策略

TaskScheduler支持两种调度策略,一种是FIFO,也是默认的调度策略,另一种是FAIR。在TaskScheduler初始化过程中会实例化rootPool,表示树的根节点,是Pool类型。

(1) FIFO调度策略

如果是采用FIFO调度策略,则直接简单地将TaskSetManager按照先来先到的方式入队,出队时直接拿出最先进队的TaskSetManager,其树结构如下图所示,TaskSetManager保存在一个FIFO队列中。
在这里插入图片描述

(2) FAIR调度策略

FAIR调度策略的树结构如下图所示:
在这里插入图片描述

FAIR模式中有一个rootPool和多个子Pool,各个子Pool中存储着所有待分配的TaskSetMagager。

在FAIR模式中,需要先对子Pool进行排序,再对子Pool里面的TaskSetMagager进行排序,因为Pool和TaskSetMagager都继承了Schedulable特质,因此使用相同的排序算法。

排序过程的比较是基于Fair-share来比较的,每个要排序的对象包含三个属性: runningTasks值(正在运行的Task数)、minShare值、weight值,比较时会综合考量runningTasks值,minShare值以及weight值。

注意,minShare、weight的值均在公平调度配置文件fairscheduler.xml中被指定,调度池在构建阶段会读取此文件的相关配置。

  1. 如果A对象的runningTasks大于它的minShare,B对象的runningTasks小于它的minShare,那么B排在A前面;(runningTasks比minShare小的先执行)

  2. 如果A、B对象的runningTasks都小于它们的minShare,那么就比较runningTasks与minShare的比值(minShare使用率),谁小谁排前面;(minShare使用率低的先执行)

  3. 如果A、B对象的runningTasks都大于它们的minShare,那么就比较runningTasks与weight的比值(权重使用率),谁小谁排前面。(权重使用率低的先执行)

  4. 如果上述比较均相等,则比较名字。

整体上来说就是通过minShare和weight这两个参数控制比较过程,可以做到让minShare使用率和权重使用率少(实际运行task比例较少)的先运行。

FAIR模式排序完成后,所有的TaskSetManager被放入一个ArrayBuffer里,之后依次被取出并发送给Executor执行。

从调度队列中拿到TaskSetManager后,由于TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出Task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。

3.2 本地化调度

DAGScheduler切割Job,划分Stage, 通过调用submitStage来提交一个Stage对应的tasks,submitStage会调用submitMissingTasks,submitMissingTasks 确定每个需要计算的 task 的preferredLocations,通过调用getPreferrdeLocations()得到partition 的优先位置,由于一个partition对应一个Task,此partition的优先位置就是task的优先位置,对于要提交到TaskScheduler的TaskSet中的每一个Task,该task优先位置与其对应的partition对应的优先位置一致。

从调度队列中拿到TaskSetManager后,那么接下来的工作就是TaskSetManager按照一定的规则一个个取出task给TaskScheduler,TaskScheduler再交给SchedulerBackend去发到Executor上执行。前面也提到,TaskSetManager封装了一个Stage的所有Task,并负责管理调度这些Task。

根据每个Task的优先位置,确定Task的Locality级别,Locality一共有五种,优先级由高到低顺序:
在这里插入图片描述

在调度执行时,Spark调度总是会尽量让每个task以最高的本地性级别来启动,当一个task以X本地性级别启动,但是该本地性级别对应的所有节点都没有空闲资源而启动失败,此时并不会马上降低本地性级别启动而是在某个时间长度内再次以X本地性级别来启动该task,若超过限时时间则降级启动,去尝试下一个本地性级别,依次类推。

可以通过调大每个类别的最大容忍延迟时间,在等待阶段对应的Executor可能就会有相应的资源去执行此task,这就在在一定程度上提到了运行性能。

3.3 失败重试与黑名单机制

除了选择合适的Task调度运行外,还需要监控Task的执行状态,前面也提到,与外部打交道的是SchedulerBackend,Task被提交到Executor启动执行后,Executor会将执行状态上报给SchedulerBackend,SchedulerBackend则告诉TaskScheduler,TaskScheduler找到该Task对应的TaskSetManager,并通知到该TaskSetManager,这样TaskSetManager就知道Task的失败与成功状态,对于失败的Task,会记录它失败的次数,如果失败次数还没有超过最大重试次数,那么就把它放回待调度的Task池子中,否则整个Application失败。

在记录Task失败次数过程中,会记录它上一次失败所在的Executor Id和Host,这样下次再调度这个Task时,会使用黑名单机制,避免它被调度到上一次失败的节点上,起到一定的容错作用。黑名单记录Task上一次失败所在的Executor Id和Host,以及其对应的“拉黑”时间,“拉黑”时间是指这段时间内不要再往这个节点上调度这个Task了。

4. 总结

本图有助于理解job,stage,task工作的原理。Spark通用运行流程图,体现了基本的Spark应用程序在部署中的基本提交流程。
在这里插入图片描述

流程按照如下的核心步骤进行工作的:

  1. 任务提交后,都会先启动Driver程序;

  2. 随后Driver向集群管理器注册应用程序;

  3. 之后集群管理器根据此任务的配置文件分配Executor并启动;

  4. Driver开始执行main函数,Spark查询为懒执行,当执行到Action算子时开始反向推算,根据宽依赖进行Stage的划分,随后每一个Stage对应一个Taskset,Taskset中有多个Task,查找可用资源Executor进行调度;

  5. 根据本地化原则,Task会被分发到指定的Executor去执行,在任务执行的过程中,Executor也会不断与Driver进行通信,报告任务运行情况。

这篇关于Spark任务调度机制论述(转载)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1069465

相关文章

Linux系统稳定性的奥秘:探究其背后的机制与哲学

在计算机操作系统的世界里,Linux以其卓越的稳定性和可靠性著称,成为服务器、嵌入式系统乃至个人电脑用户的首选。那么,是什么造就了Linux如此之高的稳定性呢?本文将深入解析Linux系统稳定性的几个关键因素,揭示其背后的技术哲学与实践。 1. 开源协作的力量Linux是一个开源项目,意味着任何人都可以查看、修改和贡献其源代码。这种开放性吸引了全球成千上万的开发者参与到内核的维护与优化中,形成了

Spring中事务的传播机制

一、前言 首先事务传播机制解决了什么问题 Spring 事务传播机制是包含多个事务的方法在相互调用时,事务是如何在这些方法间传播的。 事务的传播级别有 7 个,支持当前事务的:REQUIRED、SUPPORTS、MANDATORY; 不支持当前事务的:REQUIRES_NEW、NOT_SUPPORTED、NEVER,以及嵌套事务 NESTED,其中 REQUIRED 是默认的事务传播级别。

openfire+spark 在linux下安装,配置

文章转自:点击打开链接 相关软件下载 链接: https://pan.baidu.com/s/1boJs61h 密码: 2wd7 Openfire 在linux下安装和配置 + spark 在windows下配置 本机环境 系统:CentOS 6.7 64 位JDK 1.7 64 位MySQL 5.6 Openfir

多头注意力机制(Multi-Head Attention)

文章目录 多头注意力机制的作用多头注意力机制的工作原理为什么使用多头注意力机制?代码示例 多头注意力机制(Multi-Head Attention)是Transformer架构中的一个核心组件。它在机器翻译、自然语言处理(NLP)等领域取得了显著的成功。多头注意力机制的引入是为了增强模型的能力,使其能够从不同的角度关注输入序列的不同部分,从而捕捉更多层次的信息。 多头注意力机

Linux-笔记 线程同步机制

目录 前言 实现 信号量(Semaphore) 计数型信号量 二值信号量  信号量的原语操作 无名信号量的操作函数 例子 互斥锁(mutex) 互斥锁的操作函数 例子 自旋锁 (Spinlock) 自旋锁与互斥锁的区别 自旋锁的操作函数 例子 前言         线程同步是为了对共享资源的访问进行保护,确保数据的一致性,由于进程中会有多个线程的存在,

Spring 集成 RabbitMQ 与其概念,消息持久化,ACK机制

目录 RabbitMQ 概念exchange交换机机制 什么是交换机binding?Direct Exchange交换机Topic Exchange交换机Fanout Exchange交换机Header Exchange交换机RabbitMQ 的 Hello - Demo(springboot实现)RabbitMQ 的 Hello Demo(spring xml实现)RabbitMQ 在生产环境

Rust:Future、async 异步代码机制示例与分析

0. 异步、并发、并行、进程、协程概念梳理 Rust 的异步机制不是多线程或多进程,而是基于协程(或称为轻量级线程、微线程)的模型,这些协程可以在单个线程内并发执行。这种模型允许在单个线程中通过非阻塞的方式处理多个任务,从而实现高效的并发。 关于“并发”和“并行”的区别,这是两个经常被提及但含义不同的概念: 并发(Concurrency):指的是同时处理多个任务的能力,这些任务可能在同一时

ROS话题通信机制实操C++

ROS话题通信机制实操C++ 创建ROS工程发布方(二狗子)订阅方(翠花)编辑配置文件编译并执行注意订阅的第一条数据丢失 ROS话题通信的理论查阅ROS话题通信流程理论 在ROS话题通信机制实现中,ROS master 不需要实现,且连接的建立也已经被封装了,需要关注的关键点有三个: 发布方(二狗子)订阅方(翠花)数据(此处为普通文本) 创建ROS工程 创建一个ROS工程

Java面试题:内存管理、类加载机制、对象生命周期及性能优化

1. 说一下 JVM 的主要组成部分及其作用? JVM包含两个子系统和两个组件:Class loader(类装载)、Execution engine(执行引擎)、Runtime data area(运行时数据区)、Native Interface(本地接口)。 Class loader(类装载):根据给定的全限定名类名(如:java.lang.Object)装载class文件到Runtim

任务5.1 初识Spark Streaming

实战概述:使用Spark Streaming进行词频统计 1. 项目背景与目标 背景: Spark Streaming是Apache Spark的流处理框架,用于构建可伸缩、高吞吐量的实时数据处理应用。目标: 实现一个实时词频统计系统,能够处理流式数据并统计文本中的单词出现频率。 2. 技术要点 Spark Streaming集成: 与Spark生态的其他组件如Spark SQL、ML