Nutch学习——读源码 Injector.java

2024-01-14 20:50

本文主要是介绍Nutch学习——读源码 Injector.java,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Injector.java主要是向crawldb注入 URL,这些URL也可以选择性的带上对应当metadata。里面用到了MapReduce和插件机制


 Injector.inject(...):

public void inject(Path crawlDb, Path urlDir) throws IOException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long start = System.currentTimeMillis();/** 打印了开始的时间;注入的路径;要注入的url文档*/if (LOG.isInfoEnabled()) {LOG.info("Injector: starting at " + sdf.format(start));LOG.info("Injector: crawlDb: " + crawlDb);LOG.info("Injector: urlDir: " + urlDir);}/** * 创建一个临时文件夹 以mapred.temp.dir 属性对应的值(如果没有则默认用“.”) + “/inject-temp-” +* 一个随机数、* 为这个临时文件夹名字* inject-temp-236972440* * 对于getConf()加载:* Configuration: core-default.xml, core-site.xml, nutch-default.xml, nutch-site.xml*/Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")+ "/inject-temp-"+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));// map text input file to a <url,CrawlDatum> fileif (LOG.isInfoEnabled()) {LOG.info("Injector: Converting injected urls to crawl db entries.");}/*初始化一个NutchJob,并在getConf() 中加载 * Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, nutch-default.xml, nutch-site.xml* * 其中还设置了输入:存放url的文件,输出临时文件夹tempDir;输出的key为Text类型(可以理解为String)value为crawlDatum类型。* * 最后 runJob————将会跳入InjectMapper这个内部类中* * */JobConf sortJob = new NutchJob(getConf());sortJob.setJobName("inject " + urlDir);FileInputFormat.addInputPath(sortJob, urlDir);sortJob.setMapperClass(InjectMapper.class);FileOutputFormat.setOutputPath(sortJob, tempDir);sortJob.setOutputFormat(SequenceFileOutputFormat.class);sortJob.setOutputKeyClass(Text.class);sortJob.setOutputValueClass(CrawlDatum.class);sortJob.setLong("injector.current.time", System.currentTimeMillis());RunningJob mapJob = JobClient.runJob(sortJob);

启动sortJob,将进入静态内部类:InjectMapper 类: 

/** Normalize and filter injected urls. */public static class InjectMapper implementsMapper<WritableComparable<?>, Text, Text, CrawlDatum> {private URLNormalizers urlNormalizers;private int interval;private float scoreInjected;private JobConf jobConf;private URLFilters filters;private ScoringFilters scfilters;private long curTime;/** 进入这个内部类,先调用configure方法,主要说一下插件的配置: urlNormalizers ,filters涉及插件机制,会加载插件:* **( 加载一个extensionPoint: <extension-point id="org.apache.nutch.net.URLFilter" name="Nutch URL Filter"/>* 当然下边会加载对应的扩展类org.apache.nutch.urlfilter.regex.RegexURLFilter * 另一个同理,不赘述。)** * * urlNormalizers:* org.apache.nutch.net.urlnormalizer.basic.BasicURLNormalizer* org.apache.nutch.net.urlnormalizer.pass.PassURLNormalizer* org.apache.nutch.net.urlnormalizer.regex.RegexURLNormalizer filters:* * filters:* org.apache.nutch.urlfilter.regex.RegexURLFilter* * * 这些都是 配置于nutch-default.xml中 plugin.includes的配置块* <property> <name>plugin.includes</name><value>protocol-http|urlfilter-regex|parse-(html|tika)|index-(basic|anchor)|indexer-solr|scoring-opic|urlnormalizer-(pass|regex|basic)</value><description> </description> </property>* * -------urlNormalizers:--对应------urlnormalizer-(pass|regex|basic)* -------filters:--对应----urlfilter-regex -----*/public void configure(JobConf job) {this.jobConf = job;urlNormalizers = new URLNormalizers(job,URLNormalizers.SCOPE_INJECT);interval = jobConf.getInt("db.fetch.interval.default", 2592000);filters = new URLFilters(jobConf);scfilters = new ScoringFilters(jobConf);scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);curTime = job.getLong("injector.current.time",System.currentTimeMillis());}

		/** 这里才是这个job要做的具体内容(此处重写了),key是你的url文本的偏移量(e.g.首次:0),value为读取的文本(e.g.首次 :http://www.soho.com/)。 * 这个map你可以理解为一行一行读你的url.txt,直到读完* 对于后两个入参我理解的不够深入,希望读者补充*/public void map(WritableComparable<?> key, Text value,OutputCollector<Text, CrawlDatum> output, Reporter reporter)throws IOException {System.out.println("map key:"+key);String url = value.toString(); // value is line of text/* 如果读入的文本,即url。为null,为#开头(即注掉了) 都无效返回 */System.out.println("map:value"+value);if (url != null && url.trim().startsWith("#")) {/* Ignore line that start with # */return;}/* 这里是处理读入文本中的metadata信息 */// if tabs : metadata that could be stored// must be name=value and separated by \tfloat customScore = -1f;int customInterval = interval;int fixedInterval = -1;Map<String, String> metadata = new TreeMap<String, String>();if (url.indexOf("\t") != -1) {/* 如果有metadata信息,则按照预定分开,第一个必定是url链接了 */String[] splits = url.split("\t");url = splits[0];/** 然后对url链接后的metadata组成的数组遍历,注意 int s=1,另外metadata 的格式 都是* "key=value"*/for (int s = 1; s < splits.length; s++) {// find separation between name and valueint indexEquals = splits[s].indexOf("=");if (indexEquals == -1) {// skip anything without a =continue;}String metaname = splits[s].substring(0, indexEquals);String metavalue = splits[s].substring(indexEquals + 1);/* 如果这个 metadata key 名为nutch.score,则获取对应的float类型值 */if (metaname.equals(nutchScoreMDName)) {try {customScore = Float.parseFloat(metavalue);} catch (NumberFormatException nfe) {}}/* 如果这个 metadata key 名为nutch.fetchInterval,则获取对应的Int类型值 */else if (metaname.equals(nutchFetchIntervalMDName)) {try {customInterval = Integer.parseInt(metavalue);} catch (NumberFormatException nfe) {}}/** 如果这个 metadata key* 名为nutch.fetchInterval.fixed,则获取对应的Int类型值*/else if (metaname.equals(nutchFixedFetchIntervalMDName)) {try {fixedInterval = Integer.parseInt(metavalue);} catch (NumberFormatException nfe) {}} else/* 如果有除了上述三种key属性,还有其他的,则加入新定义的metadata,用来存放没有被解析的metadata,最后会处理 */metadata.put(metaname, metavalue);}}/* 处理完了metadata(如果metadata存在),接下来我们处理url。这里涉及到插件机制,上边加载配置时已经说过了 */try {url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);url = filters.filter(url); // filter the url} catch (Exception e) {if (LOG.isWarnEnabled()) {LOG.warn("Skipping " + url + ":" + e);}url = null;}/*如果url没有通过上边的插件检查被过滤掉了,则计数increment(1)*/if (url == null) {reporter.getCounter("injector", "urls_filtered").increment(1);} else { // if it passes/*如果url通过上边的插件检查,则在覆盖掉value,也就最终的value值,同时将对应的datum设置为injected状态,表示新注入)*/value.set(url); // collect itCrawlDatum datum = new CrawlDatum();datum.setStatus(CrawlDatum.STATUS_INJECTED);// Is interval custom? Then set as meta dataif (fixedInterval > -1) {// Set writable using float. Flaot is used by// AdaptiveFetchScheduledatum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,new FloatWritable(fixedInterval));datum.setFetchInterval(fixedInterval);} else {datum.setFetchInterval(customInterval);}datum.setFetchTime(curTime);// now add the metadataIterator<String> keysIter = metadata.keySet().iterator();while (keysIter.hasNext()) {String keymd = keysIter.next();String valuemd = metadata.get(keymd);datum.getMetaData().put(new Text(keymd), new Text(valuemd));}if (customScore != -1)datum.setScore(customScore);elsedatum.setScore(scoreInjected);try {scfilters.injectedScore(value, datum);} catch (ScoringFilterException e) {if (LOG.isWarnEnabled()) {LOG.warn("Cannot filter injected score for url " + url+ ", using default (" + e.getMessage() + ")");}}/*放入输出 并计数 +1*/reporter.getCounter("injector", "urls_injected").increment(1);output.collect(value, datum);}}}
...下边reduce中 update override 默认为false 


	/** Combine multiple new entries for a url. */public static class InjectReducer implementsReducer<Text, CrawlDatum, Text, CrawlDatum> {private int interval;private float scoreInjected;private boolean overwrite = false;private boolean update = false;public void configure(JobConf job) {interval = job.getInt("db.fetch.interval.default", 2592000);scoreInjected = job.getFloat("db.score.injected", 1.0f);overwrite = job.getBoolean("db.injector.overwrite", false);update = job.getBoolean("db.injector.update", false);}public void close() {}private CrawlDatum old = new CrawlDatum();private CrawlDatum injected = new CrawlDatum();/** 这里参数 key 是url(e.g. keyhttp://blog.tianya.com/ ) 而 values 不止一个* :来自上一步map中的注入的datum 和以前注入的datum* * 以前注入的* 1i-----------Version: 7 Status: 2 (db_fetched) Fetch time: Wed Dec 10* 15:02:27 CST 2014 Modified time: Thu Jan 01 08:00:00 CST 1970 Retries* since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0* Signature: e7193c90eb7a5ca3d0a0969ed1a444d2 Metadata: Content-Type:* text/html_pst_: success(1), lastModified=0* * 上一步map注入的* 2i-----------Version: 7 Status: 66 (injected) Fetch time: Wed Nov 12* 18:29:30 CST 2014 Modified time: Thu Jan 01 08:00:00 CST 1970 Retries* since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0* Signature: null Metadata:* * 上一步map注入的(重复注入的)* 3i-----------Version: 7 Status: 66 (injected) Fetch time: Wed Nov 12* 18:29:30 CST 2014 Modified time: Thu Jan 01 08:00:00 CST 1970 Retries* since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0* Signature: null Metadata:* * 当新的和老的都有,会默认保留老的。并且一个url 对应 一个datum(old default),这不就是一个过滤的作用嘛。* * 所谓老的datum 是来自上一次产生的current,而这次注入url对应的datum都在tempDir中*/public void reduce(Text key, Iterator<CrawlDatum> values,OutputCollector<Text, CrawlDatum> output, Reporter reporter)throws IOException {System.out.println("key" + key + "//n values" + values);System.out.println("output" + output + "---reporter" + reporter);System.out.println("reporter.getCounter(injector urls_injected)"+ reporter.getCounter("injector", "urls_injected").getValue());System.out.println("reporter.getCounter( injector urls_filtered )"+ reporter.getCounter("injector", "urls_filtered").getCounter());boolean oldSet = false;boolean injectedSet = false;int i = 1;while (values.hasNext()) {i++;CrawlDatum val = values.next();System.out.println(i + "i-----------" + val);if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {injected.set(val);injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);injectedSet = true;} else {old.set(val);oldSet = true;}}CrawlDatum res = null;/*** Whether to overwrite, ignore or update existing records* * @see https://issues.apache.org/jira/browse/NUTCH-1405*/// Injected record already exists and overwrite but not updateif (injectedSet && oldSet && overwrite) {res = injected;if (update) {LOG.info(key.toString()+ " overwritten with injected record but update was specified.");}}// Injected record already exists and update but not overwriteif (injectedSet && oldSet && update && !overwrite) {res = old;old.putAllMetaData(injected);old.setScore(injected.getScore() != scoreInjected ? injected.getScore() : old.getScore());old.setFetchInterval(injected.getFetchInterval() != interval ? injected.getFetchInterval() : old.getFetchInterval());}// Old default behaviourif (injectedSet && !oldSet) {res = injected;} else {res = old;}output.collect(key, res);}}




好了,这里边的map/reduce就讲完了。最后删除tempDir,把合并后的改名为Current ,如顶图。 剩下的部分很简单,自己看吧!

这篇关于Nutch学习——读源码 Injector.java的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/606472

相关文章

Spring Security自定义身份认证的实现方法

《SpringSecurity自定义身份认证的实现方法》:本文主要介绍SpringSecurity自定义身份认证的实现方法,下面对SpringSecurity的这三种自定义身份认证进行详细讲解,... 目录1.内存身份认证(1)创建配置类(2)验证内存身份认证2.JDBC身份认证(1)数据准备 (2)配置依

SpringBoot整合OpenFeign的完整指南

《SpringBoot整合OpenFeign的完整指南》OpenFeign是由Netflix开发的一个声明式Web服务客户端,它使得编写HTTP客户端变得更加简单,本文为大家介绍了SpringBoot... 目录什么是OpenFeign环境准备创建 Spring Boot 项目添加依赖启用 OpenFeig

Java Spring 中 @PostConstruct 注解使用原理及常见场景

《JavaSpring中@PostConstruct注解使用原理及常见场景》在JavaSpring中,@PostConstruct注解是一个非常实用的功能,它允许开发者在Spring容器完全初... 目录一、@PostConstruct 注解概述二、@PostConstruct 注解的基本使用2.1 基本代

springboot使用Scheduling实现动态增删启停定时任务教程

《springboot使用Scheduling实现动态增删启停定时任务教程》:本文主要介绍springboot使用Scheduling实现动态增删启停定时任务教程,具有很好的参考价值,希望对大家有... 目录1、配置定时任务需要的线程池2、创建ScheduledFuture的包装类3、注册定时任务,增加、删

SpringBoot整合mybatisPlus实现批量插入并获取ID详解

《SpringBoot整合mybatisPlus实现批量插入并获取ID详解》这篇文章主要为大家详细介绍了SpringBoot如何整合mybatisPlus实现批量插入并获取ID,文中的示例代码讲解详细... 目录【1】saveBATch(一万条数据总耗时:2478ms)【2】集合方式foreach(一万条数

IntelliJ IDEA 中配置 Spring MVC 环境的详细步骤及问题解决

《IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决》:本文主要介绍IntelliJIDEA中配置SpringMVC环境的详细步骤及问题解决,本文分步骤结合实例给大... 目录步骤 1:创建 Maven Web 项目步骤 2:添加 Spring MVC 依赖1、保存后执行2、将新的依赖

SpringBoot中配置文件的加载顺序解读

《SpringBoot中配置文件的加载顺序解读》:本文主要介绍SpringBoot中配置文件的加载顺序,具有很好的参考价值,希望对大家有所帮助,如有错误或未考虑完全的地方,望不吝赐教... 目录SpringBoot配置文件的加载顺序1、命令⾏参数2、Java系统属性3、操作系统环境变量5、项目【外部】的ap

SpringBoot UserAgentUtils获取用户浏览器的用法

《SpringBootUserAgentUtils获取用户浏览器的用法》UserAgentUtils是于处理用户代理(User-Agent)字符串的工具类,一般用于解析和处理浏览器、操作系统以及设备... 目录介绍效果图依赖封装客户端工具封装IP工具实体类获取设备信息入库介绍UserAgentUtils

Spring 中的循环引用问题解决方法

《Spring中的循环引用问题解决方法》:本文主要介绍Spring中的循环引用问题解决方法,本文给大家介绍的非常详细,对大家的学习或工作具有一定的参考借鉴价值,需要的朋友参考下吧... 目录什么是循环引用?循环依赖三级缓存解决循环依赖二级缓存三级缓存本章来聊聊Spring 中的循环引用问题该如何解决。这里聊

Java学习手册之Filter和Listener使用方法

《Java学习手册之Filter和Listener使用方法》:本文主要介绍Java学习手册之Filter和Listener使用方法的相关资料,Filter是一种拦截器,可以在请求到达Servl... 目录一、Filter(过滤器)1. Filter 的工作原理2. Filter 的配置与使用二、Listen