本文主要是介绍【Mapred】 JobTracker 内部类RecoveryManager简介(上),希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
Hadoop1.0.3版本
一直知道jobtracker有任务恢复、作业重跑功能等,那么jobtracker是如何实现的呢?
今天通过对jobtracker启动过程的跟踪,来一探究竟。
jobtracker有一个内部类叫做RecoveryManager ,同时jobtracker有个私有成员:
RecoveryManager recoveryManager;
在初始化的时候,对其构造:
recoveryManager = new RecoveryManager();
构造方法其实啥也没干,对其成员变量进行初始化,new一个容器用于保存需要重跑的jobId
public RecoveryManager() {jobsToRecover = new TreeSet<JobID>();}
到这里似乎还看不到它到底在干啥,接着往下看,初始化程序开始找hdfs上的所谓系统路径
public String getSystemDir() {Path sysDir = new Path(conf.get("mapred.system.dir", "/tmp/hadoop/mapred/system")); return fs.makeQualified(sysDir).toString();}
for (FileStatus status : systemDirData) {try {recoveryManager.checkAndAddJob(status);} catch (Throwable t) {LOG.warn("Failed to add the job " + status.getPath().getName(), t);}}
到这里,RecoveryManager 开始干活了,要对这个路径下的所有目录进行检查,进去看看到底检查啥东西:
// checks if the job dir has the required filespublic void checkAndAddJob(FileStatus status) throws IOException {String fileName = status.getPath().getName();if (isJobNameValid(fileName)) {recoveryManager.addJobForRecovery(JobID.forName(fileName));shouldRecover = true;}}
这里先要搞清楚集群上mapred.system.dir配置的路径下到底存了些什么玩意儿:
drwx------ - hadoop supergroup 0 2013-12-24 01:09 /jobtracker/system/job_201305281414_821530
drwx------ - hadoop supergroup 0 2013-12-24 01:12 /jobtracker/system/job_201305281414_821531
drwx------ - hadoop supergroup 0 2013-12-24 01:12 /jobtracker/system/job_201305281414_821532
drwx------ - hadoop supergroup 0 2013-12-24 01:12 /jobtracker/system/job_201305281414_821533
drwx------ - hadoop supergroup 0 2013-12-24 01:12 /jobtracker/system/job_201305281414_821534
drwx------ - hadoop supergroup 0 2013-12-24 01:12 /jobtracker/system/job_201305281414_821535
-rw------- 3 hadoop supergroup 4 2013-05-28 14:14 /jobtracker/system/jobtracker.info
看了一眼,除了jobtracker.info,全是以jobid 命名的目录,里边都有个jobToken文件,并且是个二进制文件,至于这是个什么吊东西,回头再说。
// checks if the job dir has the required filespublic void checkAndAddJob(FileStatus status) throws IOException {String fileName = status.getPath().getName();if (isJobNameValid(fileName)) {//对以作业号命名的目录进行处理recoveryManager.addJobForRecovery(JobID.forName(fileName));//我日,全部加到初始化的容器里了shouldRecover = true;}}
这样,凡是作业号出现在mapred.system.dir中的作业都收集起来,并且等待重跑。
注意这里重跑与否要看配置
<property><name>mapred.jobtracker.restart.recover</name><value>true</value><description>"true" to enable (job) recovery upon restart, "false" to start afresh </description>
</property>
否则,jobtracker会清空那个mapred.system.dir目录。
if (!hasRestarted) {jobConf.deleteLocalFiles(SUBDIR);}//如果没有需要重跑的作业,那么清空jobtracker本机上的所有计算目录,//1.0.3版本是配置的所有目录下的jobTracker目录SUBDIR="jobTracker"
这样,在jobtracker初始化中,基本上没有RecoveryManager 的事了,转到jobtracker的offerService方法中,一上来就是它,擦:
while (true) {try {recoveryManager.updateRestartCount();break;} catch (IOException ioe) {LOG.warn("Failed to initialize recovery manager. ", ioe);// wait for some timeThread.sleep(FS_ACCESS_RETRY_PERIOD);LOG.warn("Retrying...");}}
updateRestartCount()方法就不讲了,比较啰嗦,就是干了一件事,把要重跑的作业数加1,再写到new Path(getSystemDir(), "jobtracker.info");文件中。
中间弄了个jobtracker.info.recover来回倒换。
从这里可以看出来,一定是那些需要重跑的作业以及总的个数都放到mapred.system.dir里了,至于啥时候放的,再哪放的,后续继续讨论。
下一篇接着讲这些需要重跑的作业到底怎么recover的,打个飞机睡了...
这篇关于【Mapred】 JobTracker 内部类RecoveryManager简介(上)的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!