【Mapred】 JobTracker 内部类RecoveryManager简介(下)

2024-04-21 08:38

本文主要是介绍【Mapred】 JobTracker 内部类RecoveryManager简介(下),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

接上一篇,jt在offerService方法中执行作业recovery操作,需要注意的是,下面介绍的recovery操作是在jt的rpc server启动之前,即jt对外提供服务之前。

记得十一节前排查发现ecomon集群堆积4000个待恢复作业时还搞不清楚为啥有问题,通过下面的介绍就可以发现,恢复那么多的作业,jt启动之后很长时间将不能提供服务。

recovery的过程其实也不复杂,简单来讲是这样:首先要找到history存放点,这里是放到hdfs上,然后遍历里边的文件,找到待恢复作业的相关文件,然后逐个解析,文件并不复杂,里边保存了这个作业的执行信息,以确定从哪开始恢复,到底是整个作业重跑还是重跑哪些task等等。然后根据作业用户名和作业号(重跑的作业号不变)重新构建jobInProcess对象,加入到正常的待执行作业队列里,使其回到正常的作业调度序列。但是中间确有很多细节需要阐释,这里仅把大致流程贴出来,回头陆续介绍这些细节。

    public void recover() {if (!shouldRecover()) {// clean up jobs structurejobsToRecover.clear();return;}//如果没有需要重跑的作业,或者没有配置作业恢复机制,那么把RecoveryManager保存的jobID全部清空,即清空jobsToRecover。LOG.info("Restart count of the jobtracker : " + restartCount);//cache 住每个作业的history日志文件Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();Iterator<JobID> idIter = jobsToRecover.iterator();JobInProgress job = null;File jobIdFile = null;boolean isJobIdFileStoreLocal="".equals(conf.get("hadoop.job.history.location",""));//作业的history文件可以放到集群上,也可以放到jobtracker本地,如果不配hadoop.job.history.location,就是默认本地//实际应用是放到hdfs的/jobtracker/job.history/路径下while (idIter.hasNext()) {JobID id = idIter.next();LOG.info("Trying to recover details of job " + id);try {String logFileName = JobHistory.JobInfo.getJobHistoryFileNameByID(conf, id);//根据jobID找到其对应的history文件名if (logFileName == null) {LOG.info("No history file found for job " + id);idIter.remove(); // 没有历史文件的作业就不能恢复了continue;}// 找到作业的用户名,重新构建JobInProgress结构String user = null;if(isJobIdFileStoreLocal){//history日志在本机情况处理jobIdFile = new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, conf).toString());//如果是jt本机磁盘保存作业历史文件,那么这样处理,直接读计算目录下的相应的文件,里边记录了作业的用户if (jobIdFile != null && jobIdFile.exists()) {LOG.info("File " + jobIdFile + " exists for job " + id);FileInputStream in = new FileInputStream(jobIdFile);BufferedReader reader = null;try {reader = new BufferedReader(new InputStreamReader(in));user = reader.readLine();LOG.info("Recovered user " + user + " for job " + id);} finally {if (reader != null) {reader.close();}in.close();}}}else{String[] fields=logFileName.split("_");if(fields!=null && fields.length>4){user=fields[4];}}//作业的history文件名包含了用户名,例如,以"_"切分,第5个就是用户名,seo//job_201305281414_832629_1387852833833_seo_share%5F0if (user == null) {throw new RuntimeException("Incomplete job " + id);}// Create the job/* THIS PART OF THE CODE IS USELESS. JOB RECOVERY SHOULD BE* BACKPORTED (MAPREDUCE-873)*/job = new JobInProgress(JobTracker.this, conf,new JobInfo((org.apache.hadoop.mapreduce.JobID) id,new Text(user), new Path(getStagingAreaDirInternal(user,id.toString()))),restartCount, new Credentials() /*HACK*/);//restartCount计数器,用于统计这个作业被重新跑了多少次,每个作业都有统计
//JobInfo创建了所谓的staging Area Dir 这里配置的就是/user/.staging/jobid// 2.检查用户的合法性// Get the user group info for the job's ownerUserGroupInformation ugi =UserGroupInformation.createRemoteUser(job.getJobConf().getUser());LOG.info("Submitting job " + id + " on behalf of user "+ ugi.getShortUserName() + " in groups : "+ StringUtils.arrayToString(ugi.getGroupNames()));// check the accesstry {aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);} catch (Throwable t) {LOG.warn("Access denied for user " + ugi.getShortUserName() + " in groups : [" + StringUtils.arrayToString(ugi.getGroupNames()) + "]");throw t;}// 3. Get the log file and the file pathPath jobHistoryFilePath = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);// 4. Recover the history file. This involved//     - deleting file.recover if file exists//     - renaming file.recover to file if file doesnt exist// This makes sure that the (master) file existsJobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), jobHistoryFilePath);//确保作业的history文件要准备好// 5. Cache the history file name as it costs one dfs accessjobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);// 6. Sumbit the job to the jobtrackeraddJob(id, job);} catch (Throwable t) {LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);idIter.remove();if (jobIdFile != null) {jobIdFile.delete();jobIdFile = null;}if (job != null) {job.fail();job = null;}continue;}}//到这里,把所有需要恢复的作业弄了一圈,history文件cache住,然后重新构造JobInProgress对象,//并加到jt管理的JobInProgress cache中.long recoveryStartTime = clock.getTime();// II. Recover each jobidIter = jobsToRecover.iterator();while (idIter.hasNext()) {JobID id = idIter.next();JobInProgress pJob = getJob(id);// 1. Get the required info// Get the recovered history filePath jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());String logFileName = jobHistoryFilePath.getName();FileSystem fs;try {fs = jobHistoryFilePath.getFileSystem(conf);} catch (IOException ioe) {LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",ioe);continue;}// 2. Parse the history file// Note that this also involves job updateJobRecoveryListener listener = new JobRecoveryListener(pJob);try {JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), listener, fs);//解析hdfs上的每一个history文件,这个history文件很重要,我日,里边保存了恢复信息,//里边回答了你要恢复整个作业,还是task,是map还是reduce?//从哪个attempt恢复等等,这样的一个东西叫做一个event} catch (Throwable t) {LOG.info("Error reading history file of job " + pJob.getJobID() + ". Ignoring the error and continuing.", t);}// 3. Close the listenerlistener.close();// 4. Update the recovery metrictotalEventsRecovered += listener.getNumEventsRecovered();// 5. Cleanup history// Delete the master log file as an indication that the new file// should be used in futuretry {synchronized (pJob) {JobHistory.JobInfo.checkpointRecovery(logFileName, pJob.getJobConf());//history路径下保存了一个作业的两个文件,一个是.recover另一个是作业主文件}} catch (Throwable t) {LOG.warn("Failed to delete log file (" + logFileName + ") for job " + id + ". Continuing.", t);}if (pJob.isComplete()) {idIter.remove(); // no need to keep this job info as its successful}}recoveryDuration = clock.getTime() - recoveryStartTime;hasRecovered = true;// III. Finalize the recoverysynchronized (trackerExpiryQueue) {// Make sure that the tracker statuses in the expiry-tracker queue// are updatedlong now = clock.getTime();int size = trackerExpiryQueue.size();for (int i = 0; i < size ; ++i) {// Get the first tasktrackerTaskTrackerStatus taskTracker = trackerExpiryQueue.first();// Remove ittrackerExpiryQueue.remove(taskTracker);// Set the new timetaskTracker.setLastSeen(now);// Add back to get the sorted listtrackerExpiryQueue.add(taskTracker);}}LOG.info("Restoration complete");}


这篇关于【Mapred】 JobTracker 内部类RecoveryManager简介(下)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/922639

相关文章

ASIO网络调试助手之一:简介

多年前,写过几篇《Boost.Asio C++网络编程》的学习文章,一直没机会实践。最近项目中用到了Asio,于是抽空写了个网络调试助手。 开发环境: Win10 Qt5.12.6 + Asio(standalone) + spdlog 支持协议: UDP + TCP Client + TCP Server 独立的Asio(http://www.think-async.com)只包含了头文件,不依

业务协同平台--简介

一、使用场景         1.多个系统统一在业务协同平台定义协同策略,由业务协同平台代替人工完成一系列的单据录入         2.同时业务协同平台将执行任务推送给pda、pad等执行终端,通知各人员、设备进行作业执行         3.作业过程中,可设置完成时间预警、作业节点通知,时刻了解作业进程         4.做完再给你做过程分析,给出优化建议         就问你这一套下

容器编排平台Kubernetes简介

目录 什么是K8s 为什么需要K8s 什么是容器(Contianer) K8s能做什么? K8s的架构原理  控制平面(Control plane)         kube-apiserver         etcd         kube-scheduler         kube-controller-manager         cloud-controlle

【Tools】AutoML简介

摇来摇去摇碎点点的金黄 伸手牵来一片梦的霞光 南方的小巷推开多情的门窗 年轻和我们歌唱 摇来摇去摇着温柔的阳光 轻轻托起一件梦的衣裳 古老的都市每天都改变模样                      🎵 方芳《摇太阳》 AutoML(自动机器学习)是一种使用机器学习技术来自动化机器学习任务的方法。在大模型中的AutoML是指在大型数据集上使用自动化机器学习技术进行模型训练和优化。

STM32内部闪存FLASH(内部ROM)、IAP

1 FLASH简介  1 利用程序存储器的剩余空间来保存掉电不丢失的用户数据 2 通过在程序中编程(IAP)实现程序的自我更新 (OTA) 3在线编程(ICP把整个程序都更新掉) 1 系统的Bootloader写死了,只能用串口下载到指定的位置,启动方式也不方便需要配置BOOT引脚触发启动  4 IAP(自己写的Bootloader,实现程序升级) 1 比如蓝牙转串口,

FreeRTOS内部机制学习03(事件组内部机制)

文章目录 事件组使用的场景事件组的核心以及Set事件API做的事情事件组的特殊之处事件组为什么不关闭中断xEventGroupSetBitsFromISR内部是怎么做的? 事件组使用的场景 学校组织秋游,组长在等待: 张三:我到了 李四:我到了 王五:我到了 组长说:好,大家都到齐了,出发! 秋游回来第二天就要提交一篇心得报告,组长在焦急等待:张三、李四、王五谁先写好就交谁的

java线程深度解析(一)——java new 接口?匿名内部类给你答案

http://blog.csdn.net/daybreak1209/article/details/51305477 一、内部类 1、内部类初识 一般,一个类里主要包含类的方法和属性,但在Java中还提出在类中继续定义类(内部类)的概念。 内部类的定义:类的内部定义类 先来看一个实例 [html]  view plain copy pu

SaaS、PaaS、IaaS简介

云计算、云服务、云平台……现在“云”已成了一个家喻户晓的概念,但PaaS, IaaS 和SaaS的区别估计还没有那么多的人分得清,下面就分别向大家普及一下它们的基本概念: SaaS 软件即服务 SaaS是Software-as-a-Service的简称,意思是软件即服务。随着互联网技术的发展和应用软件的成熟, 在21世纪开始兴起的一种完全创新的软件应用模式。 它是一种通过Internet提供

LIBSVM简介

LIBSVM简介 支持向量机所涉及到的数学知识对一般的化学研究者来说是比较难的,自己编程实现该算法难度就更大了。但是现在的网络资源非常发达,而且国际上的科学研究者把他们的研究成果已经放在网络上,免费提供给用于研究目的,这样方便大多数的研究者,不必要花费大量的时间理解SVM算法的深奥数学原理和计算机程序设计。目前有关SVM计算的相关软件有很多,如LIBSVM、mySVM、SVMLight等,这些

urllib与requests爬虫简介

urllib与requests爬虫简介 – 潘登同学的爬虫笔记 文章目录 urllib与requests爬虫简介 -- 潘登同学的爬虫笔记第一个爬虫程序 urllib的基本使用Request对象的使用urllib发送get请求实战-喜马拉雅网站 urllib发送post请求 动态页面获取数据请求 SSL证书验证伪装自己的爬虫-请求头 urllib的底层原理伪装自己的爬虫-设置代理爬虫coo