本文主要是介绍第36课:TaskScheduler内幕天机解密,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
内容:
TaskScheduler与SchedulerBackend
FIFO与FAIR两种调度模式
Task数据本地性资源分配
网上笔记详细笔记:
[Spark内核] 第36课:TaskScheduler内幕天机解密:Spark shell案例运行日志详解、TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解等
TaskScheduler内幕天机:Spark shell案例,TaskScheduler和SchedulerBackend、FIFO与FAIR、Task运行时本地性算法详解
一:通过spark-shell运行程序来观察TaskScheduler内幕
1.当我们启动Spark-shell本身的时候命令终端反馈回来的主要是ClientEndpoint和SparkDeploySchedulerBackend,这是因为此时还没有任何job的触发,只是启动Application本身而已,所以主要是实例化SparkContext并注册当前的应用程序给Master且从集群中获得ExecutorBackend的资源(这就是为什么启动日志没有DriverEndpoint信息的原因,因为此时应用程序内部还没有发生具体计算资源的调度);
2.DAGScheduler划分好Stage后会通过TaskSchedulerImpl中的TaskSetManager来管理当前要运行的Stage的所有任务TaskSet,TaskSetManager会根据locality aware来为Task分配计算资源,监控Task的执行状态(例如重试、慢任务进行推测式执行等);
二:TaskScheduler和SchedulerBackend
1.总体的底层任务的调度的过程如下:
a)TaskSchedulerImp.submitTasks--主要的作用是将TaskSet加入到TaskSetManager
b)SchedulableBuilder.addTaskSetManager--SchedulableBuilder会确定TasksetManager的调度顺序,然后按照TasksetManager的locality aware来确定每一个Tasks具体运行在哪个ExecutorBackend中;
c)CoarseGrainedSchedulerBackend.reviewOffers:给DriverEndPoint发送ReviewOffers,ReviewOffers本身是一个空的case object对象,只是起到底层资源调度的作用,有Task提交或者计算资源变动的时候会发送ReviewOffers这个消息作为触发器
d)在driverEndpoint接受ReviewOffers消息并路由到makeOffers具体的方法中;在makeOffers方法中首先准备好所有可以用于计算的workerOffers(代表了所有可用ExcutorBackend中可用的Cores等信息)
e)TaskSchedulerImpl.ResourceOffers:为每一个Task具体分配资源,输入是ExecutorBackend及其可用的Cores,输出TaskDescription的二维数组,在其中确定了每一个Task具体运行在哪一个ExecutorBackend;ResourceOffers到底是如何Task具体运行在哪个ExecutorBackend的?算法的实现如下:
i.通过Random.shuffer方法重新洗牌所有的计算资源以寻求计算资源的均衡;
ii.根据每个ExecutorBackend的Cores的个数声明类型为TaskDescription的ArrayBuffer数组;
iii.如果有新的ExecutorBackend分配给job,此时会调用ExecutorAdded来获得最新的完整的可用的计算资源
iv.通过下面的代码追求最高级别的优先级本地性
v.通过调用TasksetManager的ResourceOffer最终确定每个task具体运行在哪个ExecutorBackend的具体的locality level:
f)通过launchTasks把任务把ExecutorBackend去执行;
补充:
1.Task默认的最大重试次数是4次
def this(sc: SparkContext) = this(sc, sc.conf.get(config.MAX_TASK_FAILURES))private[spark] val MAX_TASK_FAILURES =ConfigBuilder("spark.task.maxFailures").intConf.createWithDefault(4)
2.Spark应用程序目前支持两种调度模式:FIFO、FAIR,可以通过spark-env.sh中的spark.scheduler.mode进行设置具体,默认情况下是FIFO的方式:
// default scheduler is FIFOprivate val schedulingModeConf = conf.get("spark.scheduler.mode", "FIFO")
3.TaskScheduler中要负责为Taask分配计算资源,此时程序已经具备集群中的就算资源了,根据计算本地性原则确定Task具体要运行在哪一个ExecutorBackend中;
4.TaskDescription中已经确定好了Task具体要运行在
而确定Task具体运行在哪一个ExecutorBackend上的算法是由
5.数据本地性优先级从高到底一=依次为:PROCESS_LOCAL,NODE_LOCAL,NO_PREF,RACK_LOCAL,ANY
NO_PREF是指机器本地性
6.每个Task默认是采用一个线程进行计算的:
// CPUs to request per taskval CPUS_PER_TASK = conf.getInt("spark.task.cpus", 1)
7.DAGScheduler是从数据层面考虑prefereLocaltion的,而TaskScheduler是从具体计算的Task角度考虑计算本地性;
/** Returns the configured max frame size for Akka messages in bytes. */def maxFrameSizeBytes(conf: SparkConf): Int = {val frameSizeInMB = conf.getInt("spark.akka.frameSize", 128)
8.Task进行广播时候的AkkFrameSize大小是128MB,如果任务大于等于128MB-200k的话,则Task会直接被丢弃掉,如果小于128MB-200k的话,会通过CoarseGrainedSchedulerBackend去launchTask到具体的ExecutorBackend;
这篇关于第36课:TaskScheduler内幕天机解密的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!