本文主要是介绍【Mapred】 JobTracker 内部类RecoveryManager简介(下),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
接上一篇,jt在offerService方法中执行作业recovery操作,需要注意的是,下面介绍的recovery操作是在jt的rpc server启动之前,即jt对外提供服务之前。
记得十一节前排查发现ecomon集群堆积4000个待恢复作业时还搞不清楚为啥有问题,通过下面的介绍就可以发现,恢复那么多的作业,jt启动之后很长时间将不能提供服务。
recovery的过程其实也不复杂,简单来讲是这样:首先要找到history存放点,这里是放到hdfs上,然后遍历里边的文件,找到待恢复作业的相关文件,然后逐个解析,文件并不复杂,里边保存了这个作业的执行信息,以确定从哪开始恢复,到底是整个作业重跑还是重跑哪些task等等。然后根据作业用户名和作业号(重跑的作业号不变)重新构建jobInProcess对象,加入到正常的待执行作业队列里,使其回到正常的作业调度序列。但是中间确有很多细节需要阐释,这里仅把大致流程贴出来,回头陆续介绍这些细节。
public void recover() {if (!shouldRecover()) {// clean up jobs structurejobsToRecover.clear();return;}//如果没有需要重跑的作业,或者没有配置作业恢复机制,那么把RecoveryManager保存的jobID全部清空,即清空jobsToRecover。LOG.info("Restart count of the jobtracker : " + restartCount);//cache 住每个作业的history日志文件Map<JobID, Path> jobHistoryFilenameMap = new HashMap<JobID, Path>();Iterator<JobID> idIter = jobsToRecover.iterator();JobInProgress job = null;File jobIdFile = null;boolean isJobIdFileStoreLocal="".equals(conf.get("hadoop.job.history.location",""));//作业的history文件可以放到集群上,也可以放到jobtracker本地,如果不配hadoop.job.history.location,就是默认本地//实际应用是放到hdfs的/jobtracker/job.history/路径下while (idIter.hasNext()) {JobID id = idIter.next();LOG.info("Trying to recover details of job " + id);try {String logFileName = JobHistory.JobInfo.getJobHistoryFileNameByID(conf, id);//根据jobID找到其对应的history文件名if (logFileName == null) {LOG.info("No history file found for job " + id);idIter.remove(); // 没有历史文件的作业就不能恢复了continue;}// 找到作业的用户名,重新构建JobInProgress结构String user = null;if(isJobIdFileStoreLocal){//history日志在本机情况处理jobIdFile = new File(lDirAlloc.getLocalPathToRead(SUBDIR + "/" + id, conf).toString());//如果是jt本机磁盘保存作业历史文件,那么这样处理,直接读计算目录下的相应的文件,里边记录了作业的用户if (jobIdFile != null && jobIdFile.exists()) {LOG.info("File " + jobIdFile + " exists for job " + id);FileInputStream in = new FileInputStream(jobIdFile);BufferedReader reader = null;try {reader = new BufferedReader(new InputStreamReader(in));user = reader.readLine();LOG.info("Recovered user " + user + " for job " + id);} finally {if (reader != null) {reader.close();}in.close();}}}else{String[] fields=logFileName.split("_");if(fields!=null && fields.length>4){user=fields[4];}}//作业的history文件名包含了用户名,例如,以"_"切分,第5个就是用户名,seo//job_201305281414_832629_1387852833833_seo_share%5F0if (user == null) {throw new RuntimeException("Incomplete job " + id);}// Create the job/* THIS PART OF THE CODE IS USELESS. JOB RECOVERY SHOULD BE* BACKPORTED (MAPREDUCE-873)*/job = new JobInProgress(JobTracker.this, conf,new JobInfo((org.apache.hadoop.mapreduce.JobID) id,new Text(user), new Path(getStagingAreaDirInternal(user,id.toString()))),restartCount, new Credentials() /*HACK*/);//restartCount计数器,用于统计这个作业被重新跑了多少次,每个作业都有统计
//JobInfo创建了所谓的staging Area Dir 这里配置的就是/user/.staging/jobid// 2.检查用户的合法性// Get the user group info for the job's ownerUserGroupInformation ugi =UserGroupInformation.createRemoteUser(job.getJobConf().getUser());LOG.info("Submitting job " + id + " on behalf of user "+ ugi.getShortUserName() + " in groups : "+ StringUtils.arrayToString(ugi.getGroupNames()));// check the accesstry {aclsManager.checkAccess(job, ugi, Operation.SUBMIT_JOB);} catch (Throwable t) {LOG.warn("Access denied for user " + ugi.getShortUserName() + " in groups : [" + StringUtils.arrayToString(ugi.getGroupNames()) + "]");throw t;}// 3. Get the log file and the file pathPath jobHistoryFilePath = JobHistory.JobInfo.getJobHistoryLogLocation(logFileName);// 4. Recover the history file. This involved// - deleting file.recover if file exists// - renaming file.recover to file if file doesnt exist// This makes sure that the (master) file existsJobHistory.JobInfo.recoverJobHistoryFile(job.getJobConf(), jobHistoryFilePath);//确保作业的history文件要准备好// 5. Cache the history file name as it costs one dfs accessjobHistoryFilenameMap.put(job.getJobID(), jobHistoryFilePath);// 6. Sumbit the job to the jobtrackeraddJob(id, job);} catch (Throwable t) {LOG.warn("Failed to recover job " + id + " Ignoring the job.", t);idIter.remove();if (jobIdFile != null) {jobIdFile.delete();jobIdFile = null;}if (job != null) {job.fail();job = null;}continue;}}//到这里,把所有需要恢复的作业弄了一圈,history文件cache住,然后重新构造JobInProgress对象,//并加到jt管理的JobInProgress cache中.long recoveryStartTime = clock.getTime();// II. Recover each jobidIter = jobsToRecover.iterator();while (idIter.hasNext()) {JobID id = idIter.next();JobInProgress pJob = getJob(id);// 1. Get the required info// Get the recovered history filePath jobHistoryFilePath = jobHistoryFilenameMap.get(pJob.getJobID());String logFileName = jobHistoryFilePath.getName();FileSystem fs;try {fs = jobHistoryFilePath.getFileSystem(conf);} catch (IOException ioe) {LOG.warn("Failed to get the filesystem for job " + id + ". Ignoring.",ioe);continue;}// 2. Parse the history file// Note that this also involves job updateJobRecoveryListener listener = new JobRecoveryListener(pJob);try {JobHistory.parseHistoryFromFS(jobHistoryFilePath.toString(), listener, fs);//解析hdfs上的每一个history文件,这个history文件很重要,我日,里边保存了恢复信息,//里边回答了你要恢复整个作业,还是task,是map还是reduce?//从哪个attempt恢复等等,这样的一个东西叫做一个event} catch (Throwable t) {LOG.info("Error reading history file of job " + pJob.getJobID() + ". Ignoring the error and continuing.", t);}// 3. Close the listenerlistener.close();// 4. Update the recovery metrictotalEventsRecovered += listener.getNumEventsRecovered();// 5. Cleanup history// Delete the master log file as an indication that the new file// should be used in futuretry {synchronized (pJob) {JobHistory.JobInfo.checkpointRecovery(logFileName, pJob.getJobConf());//history路径下保存了一个作业的两个文件,一个是.recover另一个是作业主文件}} catch (Throwable t) {LOG.warn("Failed to delete log file (" + logFileName + ") for job " + id + ". Continuing.", t);}if (pJob.isComplete()) {idIter.remove(); // no need to keep this job info as its successful}}recoveryDuration = clock.getTime() - recoveryStartTime;hasRecovered = true;// III. Finalize the recoverysynchronized (trackerExpiryQueue) {// Make sure that the tracker statuses in the expiry-tracker queue// are updatedlong now = clock.getTime();int size = trackerExpiryQueue.size();for (int i = 0; i < size ; ++i) {// Get the first tasktrackerTaskTrackerStatus taskTracker = trackerExpiryQueue.first();// Remove ittrackerExpiryQueue.remove(taskTracker);// Set the new timetaskTracker.setLastSeen(now);// Add back to get the sorted listtrackerExpiryQueue.add(taskTracker);}}LOG.info("Restoration complete");}
这篇关于【Mapred】 JobTracker 内部类RecoveryManager简介(下)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!