Hadoop TaskScheduler浅析

2023-12-02 00:32
文章标签 浅析 hadoop taskscheduler

本文主要是介绍Hadoop TaskScheduler浅析,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Hadoop TaskScheduler浅析

TaskScheduler,顾名思义,就是MapReduce中的任务调度器。在MapReduce中,JobTracker接收JobClient提交的Job,将它们按InputFormat的划分以及其他相关配置,生成若干个Map和Reduce任务。然后,当一个TaskTracker通过心跳告知JobTracker自己还有空闲的任务Slot时,JobTracker就会向其分派任务。具体应该分派一些什么样的任务给这台TaskTracker,这就是TaskScheduler所需要考虑的事情。

TaskScheduler工作在JobTracker上。在JobTracker启动时,根据配置“mapred.jobtracker.taskScheduler”选定一个TaskScheduler的派生类实例作为任务调度器。任务的分派配由JobTracker的调度框架和TaskScheduler的具体调度策略配合完成。简单来说:
1、JobTracker通过一种Listener机制,将Job的变化情况同步给TaskScheduler。然后TaskScheduler就按照自己的策略将需要调度的Job管理起来;
2、在JobTracker需要向TaskTracker分派任务时,调用TaskScheduler的assignTask()方法来获得应当分派的任务;

listener

首先,JobClient提交的Job被JobTracker维护在一个Map结构中,Job的所有状态变更都会在这个结构中体现。但是Job中的任务的分派却跟这个结构无关,因为分派任务是TaskScheduler的事情,显然不能通过直接操作JobTracker的数据结构来完成。
为此,JobTracker中提供了一套JobInProgressListener的机制。当Map结构中的Job发生变化时(增、删、改),JobTracker就会回调已注册的JobInProgressListener的相应方法。TaskScheduler就使用它来监听JobTracker上Job的变化。
JobInProgressListener有如下方法:
void jobAdded(JobInProgress job);
void jobRemoved(JobInProgress job);
void jobUpdated(JobChangeEvent event);

TaskScheduler在初始化时会向JobTracker注册若干个JobInProgressListener。当JobTracker接收到新的Job时,会回调每个已注册的JobInProgressListener的jobAdded()方法。同样,当Job被移除或者发生更新时,jobRemoved()或jobUpdated()会被回调。
其中,jobUpdated()需要传递一个JobChangeEvent,用于描述Job的变化。这里面主要包含三种状态变化:运行状态改变、优先级改变、开始时间改变。
关于Job的运行状态,它包括PREP、RUNNING、FAILED、KILLED、SUCCEEDED五种。其中,PREP状态是Job的Setup任务执行完成之前的状态,JobTracker接收到Job后,需要等Job的Setup任务执行完成之后才能分派Job的Map和Reduce任务(见《Hadoop OutputFormat浅析》);而RUNNING、FAILED、SUCCEEDED则对应Job的执行中、执行失败和执行成功,这应该是很好理解的;而KILLED状态就是用户手动将Job杀死后的状态。
一般来说,Job的运行状态改变主要是由MapReduce框架来触发的,Job的生老病死都是随着框架而运转。而Job的运行状态改变成KILLED、以及Job的优先级改变和Job的开始时间改变,则都是由用户来发起的(可以通过JobClient来提交相应的请求)。

TaskScheduler可能会向JobTracker注册不止一个JobInProgressListener,每个Listener可能关心不同的信息、做不同的事。不过其实TaskScheduler只注册一个Listener肯定就已经能满足需要了的,之所以要将一个Listener就能完成的功能拆分成多份,目的是将Listener组件化,让不同的TaskScheduler可以共享同一个Listener(换句话说,不同的TaskScheduler可以使用同样的Listener作为其组件)。

下面看一下MapReduce中默认的TaskScheduler--JobQueueTaskScheduler--的做法。
JobQueueTaskScheduler是一个按优先级顺序选择Job的调度器。它在初始化时会向JobTracker注册JobQueueJobInProgressListener、EagerTaskInitializationListener两个Listener:

JobQueueJobInProgressListener比较简单,其成员只有一个Map类型的jobQueue--按优先级顺序排列的Job队列。Map的KEY是JobSchedulingInfo,它包含三个信息:
1、priority。Job的优先级,第一排序因子。由“mapred.job.priority”配置;
2、startTime。Job开始时间、第二排序因子。Job实际开始的时间,也可以在Job执行过程中被用户改写;
3、id。也就是JobId,非排序因子;
回调函数jobAdded()会将新的Job加入到jobQueue中,并排序;而jobUpdated()则会根据Job开始时间或优先级的变化,调整它在jobQueue中的排序。并且当Job的运行状态变为中止状态(KILLED、FALED、SUCCESSED)时会将Job从jobQueue中移除;而jobRemoved()则是什么都不做(因为移除Job的工作已经在jobUpdated()里面做了)。

EagerTaskInitializationListener是一个用于初始化Job的Listener。它维护了若干个工作线程(数目由“mapred.jobinit.threads”配置,默认为4)来初始化Job。回调函数jobAdded()会将Job提交给这些工作线程。
对Job的初始化是通过调用JobTracker.initJob()方法来实现的(注意,就算不使用EagerTaskInitializationListener,也应该通过某种途径来调用这个方法,以便初始化Job),其主要工作就是为Job生成一堆的任务(在这一步之前,只有Job,没有Task):
1、获取JobClient提交的Splits信息(这是JobClient使用InputFormat划分得来的,见《Hadoop InputFormat浅析》);
2、为每一个Split创建一个对应的Map任务,并按照Split的Location信息将这些Map任务归类,为之后的任务分派做准备(这里将网络视作树状的拓扑结构,每个运行TaskTracker的机器都对应到树的叶子节点上,而非叶子节点就是连接它们的交换机。然后,每个Map任务就挂在其Location对应的叶子节点及其所有祖先节点上。这样,在分派任务的时候,就可以从TaskTracker对应的叶子节点出发,逐步向上,从而寻找到一个距离该TaskTracker最近的尚未分派的Map任务);
3、根据Job的配置“mapred.reduce.tasks”,创建若干个Reduce任务:
4、额外创建两个Cleanup任务和两个Setup任务,用于执行OutputCommit中的操作(见《Hadoop OutputFormat浅析》)。为什么Cleanup和Setup任务会各有两个呢?因为它们需要以Map任务或是Reduce任务的身份被分派(其区别是占用Map的Slot或是Reduce的Slot),所以同一个东西有了两种不同的形态;
在这一步Job初始化完成之后,Job下的Task才能被分派给TaskTracker。(然后,首先只能分派一个Setup任务,并且等它执行完成之后,其他的Map和Reduce任务才能被分派,见《Hadoop OutputFormat浅析》。)
EagerTaskInitializationListener完成了对Job的初始化,很多TaskScheduler都会使用到它。
此外,在EagerTaskInitializationListener中,提交给工作线程初始化的Job会先放到一个按优先级排序的Queue里面,同样是以priority为第一排序因子、以startTime为第二排序因子。然后jobUpdated()会根据Job的priority或startTime的变化来调整Job在Queue中的排序;而jobRemoved()则会将Job从Queue中移除。
不过这里的优先级其实作用不是很大,因为相比于Job的执行时间,Job的初始化时间基本上可以忽略不计。(Job在Queue中发生竞争的可能性不大。)

任务分派

在TaskTracker通过心跳向JobTracker告知尚有空余的任务Slot时,JobTracker就会考虑向TaskTracker分派任务。
虽说Task的分派主要由TaskScheduler来负责,但是JobTracker会优先进行Setup和Cleanup任务的分派,如果有Job需要分派Setup或Cleanup任务,JobTracker会优先分派它们(而不考虑Job的优先级)。不过一般需要执行的Setup和Cleanup任务数量很少,而且执行时间很短,所以影响不大。重头戏还是在后面,JobTracker调用TaskScheduler的assignTask()方法,来分派任务。

具体的分派策略就跟具体的TaskScheduler相关了。还是以默认的JobQueueTaskScheduler为例:
1、顺序遍历JobQueueJobInProgressListener.jobQueue中的每一个Job(前面说过,已按优先级排好序),选择第一个RUNNING状态的Job(注意,jobQueue中可能有PREP状态的Job,它们尚未被EagerTaskInitializationListener初始化好),尝试从中分派任务。如果没有任务可以分派,则再选择下一个RUNNING状态的Job;
2、每次从一个Job中分派Map任务时,遵序如下步骤:
2.1、优先分派执行失败后等待重试的任务。对于这些需要重试的任务,并不考虑Location问题(因为要保证这些重试任务的优先顺序。另外,这个任务第一次分派的时候已经考虑了Location问题,但是失败了。如果重试还要考虑Location,那么这个任务可能很又会被分派到之前执行失败过的那台TaskTracker上。这显然是不好的);
2.2、从未分派的Map任务中进行分派,根据Location优先选择输入Split离TaskTracker最近的Map任务(前面已经介绍了,Map任务按照其Location信息放到网络树中);
2.3、如果启用了推断执行(由“mapred.map.tasks.speculative.execution”配置,默认为true),则尝试对已分派的Task进行再次分派(如果该Map执行进度比较落后的话);
3、Reduce任务的分派逻辑也是类似的,但是不需要考虑Location问题;

总的来说,任务分派是要将TaskTracker上空闲的Slot都填满,让MapReduce集群的计算能力尽量投入到Job的执行中。但是,在一次assignTask()的过程中并不总是一口气把TaskTracker上空闲的Slot全部填满,里面会有一些break逻辑。遇到下列情况时,分派流程将会break:
1、对于非Data-Local、非Rack-Local(同一机架)的Map任务,最多只会分派一个;
2、Reduce任务最多只会分派一个;
3、JobQueueTaskScheduler会为每个TaskTracker预留少量的计算能力,专为最高优先级的Job服务(也就是jobQueue中第一个能够分派任务的Job)。如果当前TaskTracker用完了非预留的计算能力,则最多只会分派一个任务(当然,分派的任务一定属于最高优先级的Job,因为分派过程是按Job的优先级顺序来进行的);

break之后,本次分派的任务可能就填不满TaskTracker的空闲Slot了,不过没关系,等这个TaskTracker下一次来心跳的时候,JobTracker又会再给它分派任务。这样的设计使得:
1、Map任务趋于分派到更合适的TaskTracker上。如果没有合适的Map任务分派给当前TaskTracker,则延迟其他Map任务的分派。那么这些延迟分派的Map任务可能可以分派到其他更合适的TaskTracker上;
2、Reduce任务趋于平均分派到每一个TaskTracker上。因为每个TaskTracker每次心跳都最多领走一个Reduce任务,只要每个TaskTracker心跳的频率相当,Reduce任务就能平均分派;

其他调度器

除了前面介绍的默认调度器--JobQueueTaskScheduler,MapReduce中还有提供了几种调度器可供选择。当然,你也可以实现自己的调度器。就像前面所介绍的那样,实现一组Listener、再实现assignTask()方法,基本上也就差不多了。
下面就简要介绍几种调度器:

1、LimitTasksPerJobTaskScheduler
这个调度器比较简单,它继承了JobQueueTaskScheduler,并重写了assignTask()方法。其思想是:给每个Job设置一个执行任务的限额(由“mapred.jobtracker.taskScheduler.maxRunningTasksPerJob”配置)。在以Job优先级顺序分派任务的基础上,如果某个Job分派出去的任务大于任务限额,则暂时跳过它,而考虑优先级更低的任务。如果不存在低优先级的Job、或者低优先级的Job分派不出任务,那么就算高优先级的Job超过了任务限额,也能继续分派任务。
这样一来,就既可以保证高优先级的Job得到较多的计算资源,又避免低优先级的Job过于饥饿。

2、FairScheduler
公平调度,其思想是:让每个Job能够得到公平的资源分配。不过,说起来简单,FairScheduler的实现却是非常复杂的。除了最基本的公平任务分配,还支持按Pool划分Job(每个Pool可以设置不同的配额),还支持在不公平情况下的任务抢占,等等。有兴趣的朋友可以参阅:http://hadoop.apache.org/common/docs/r0.20.2/fair_scheduler.html。
值得一提的是,FairScheduler也使用了EagerTaskInitializationListener来初始化Job。

3、CapacityScheduler
假设我们有多个为不同应用服务的MapReduce集群。一般来说,为了满足应用的峰值需求,集群总是需要配备足够的计算资源。但是大多数情况下,应用对计算资源的需求往往是远低于峰值需求的,这就带来了很大的资源浪费。于是,我们可以将这些集群合并起来,以期最大限度的提高集群的利用率。
但是简单的集群合并没法保证各个应用所需要的最低计算资源,特别是当多个应用的高峰同时到来时,“倒霉”的应用可能会饿死。而使用CapacityScheduler就能提供这样的保证。CapacityScheduler可以配置若干个Queue,以Queue为单位来设置最低容量保证。具体可以参阅:http://hadoop.apache.org/common/docs/r0.20.2/capacity_scheduler.html。

这篇关于Hadoop TaskScheduler浅析的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/443279

相关文章

浅析Python中的绝对导入与相对导入

《浅析Python中的绝对导入与相对导入》这篇文章主要为大家详细介绍了Python中的绝对导入与相对导入的相关知识,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1 Imports快速介绍2 import语句的语法2.1 基本使用2.2 导入声明的样式3 绝对import和相对i

浅析如何使用Swagger生成带权限控制的API文档

《浅析如何使用Swagger生成带权限控制的API文档》当涉及到权限控制时,如何生成既安全又详细的API文档就成了一个关键问题,所以这篇文章小编就来和大家好好聊聊如何用Swagger来生成带有... 目录准备工作配置 Swagger权限控制给 API 加上权限注解查看文档注意事项在咱们的开发工作里,API

浅析Rust多线程中如何安全的使用变量

《浅析Rust多线程中如何安全的使用变量》这篇文章主要为大家详细介绍了Rust如何在线程的闭包中安全的使用变量,包括共享变量和修改变量,文中的示例代码讲解详细,有需要的小伙伴可以参考下... 目录1. 向线程传递变量2. 多线程共享变量引用3. 多线程中修改变量4. 总结在Rust语言中,一个既引人入胜又可

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Hadoop企业开发案例调优场景

需求 (1)需求:从1G数据中,统计每个单词出现次数。服务器3台,每台配置4G内存,4核CPU,4线程。 (2)需求分析: 1G / 128m = 8个MapTask;1个ReduceTask;1个mrAppMaster 平均每个节点运行10个 / 3台 ≈ 3个任务(4    3    3) HDFS参数调优 (1)修改:hadoop-env.sh export HDFS_NAMENOD

Hadoop集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性) plan后面带的节点的名字必须是已经存在的,并且是需要均衡的节点。 如果节点不存在,会报如下错误: 如果节点只有一个硬盘的话,不会创建均衡计划: (1)生成均衡计划 hdfs diskbalancer -plan hadoop102 (2)执行均衡计划 hd

hadoop开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。 开启回收站功能参数说明 (1)默认值fs.trash.interval = 0,0表示禁用回收站;其他值表示设置文件的存活时间。 (2)默认值fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为0,则该值设置和fs.trash.interval的参数值相等。

Hadoop数据压缩使用介绍

一、压缩原则 (1)运算密集型的Job,少用压缩 (2)IO密集型的Job,多用压缩 二、压缩算法比较 三、压缩位置选择 四、压缩参数配置 1)为了支持多种压缩/解压缩算法,Hadoop引入了编码/解码器 2)要在Hadoop中启用压缩,可以配置如下参数

(入门篇)JavaScript 网页设计案例浅析-简单的交互式图片轮播

网页设计已经成为了每个前端开发者的必备技能,而 JavaScript 作为前端三大基础之一,更是为网页赋予了互动性和动态效果。本篇文章将通过一个简单的 JavaScript 案例,带你了解网页设计中的一些常见技巧和技术原理。今天就说一说一个常见的图片轮播效果。相信大家在各类电商网站、个人博客或者展示页面中,都看到过这种轮播图。它的核心功能是展示多张图片,并且用户可以通过点击按钮,左右切换图片。

【Hadoop|MapReduce篇】MapReduce概述

1. MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。 MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 2. MapReduce优缺点 2.1 优点 MapReduce易于编程 它简单的实现一些接口,就可以完成一个分布式