Nutch学习——读源码 Injector.java

2024-01-14 20:50

本文主要是介绍Nutch学习——读源码 Injector.java,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Injector.java主要是向crawldb注入 URL,这些URL也可以选择性的带上对应当metadata。里面用到了MapReduce和插件机制


 Injector.inject(...):

public void inject(Path crawlDb, Path urlDir) throws IOException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long start = System.currentTimeMillis();/** 打印了开始的时间;注入的路径;要注入的url文档*/if (LOG.isInfoEnabled()) {LOG.info("Injector: starting at " + sdf.format(start));LOG.info("Injector: crawlDb: " + crawlDb);LOG.info("Injector: urlDir: " + urlDir);}/** * 创建一个临时文件夹 以mapred.temp.dir 属性对应的值(如果没有则默认用“.”) + “/inject-temp-” +* 一个随机数、* 为这个临时文件夹名字* inject-temp-236972440* * 对于getConf()加载:* Configuration: core-default.xml, core-site.xml, nutch-default.xml, nutch-site.xml*/Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")+ "/inject-temp-"+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));// map text input file to a <url,CrawlDatum> fileif (LOG.isInfoEnabled()) {LOG.info("Injector: Converting injected urls to crawl db entries.");}/*初始化一个NutchJob,并在getConf() 中加载 * Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, nutch-default.xml, nutch-site.xml* * 其中还设置了输入:存放url的文件,输出临时文件夹tempDir;输出的key为Text类型(可以理解为String)value为crawlDatum类型。* * 最后 runJob————将会跳入InjectMapper这个内部类中* * */JobConf sortJob = new NutchJob(getConf());sortJob.setJobName("inject " + urlDir);FileInputFormat.addInputPath(sortJob, urlDir);sortJob.setMapperClass(InjectMapper.class);FileOutputFormat.setOutputPath(sortJob, tempDir);sortJob.setOutputFormat(SequenceFileOutputFormat.class);sortJob.setOutputKeyClass(Text.class);sortJob.setOutputValueClass(CrawlDatum.class);sortJob.setLong("injector.current.time", System.currentTimeMillis());RunningJob mapJob = JobClient.runJob(sortJob);

启动sortJob,将进入静态内部类:InjectMapper 类: 

/** Normalize and filter injected urls. */public static class InjectMapper implementsMapper<WritableComparable<?>, Text, Text, CrawlDatum> {private URLNormalizers urlNormalizers;private int interval;private float scoreInjected;private JobConf jobConf;private URLFilters filters;private ScoringFilters scfilters;private long curTime;/** 进入这个内部类,先调用configure方法,主要说一下插件的配置: urlNormalizers ,filters涉及插件机制,会加载插件:* **( 加载一个extensionPoint: <extension-point id="org.apache.nutch.net.URLFilter" name="Nutch URL Filter"/>* 当然下边会加载对应的扩展类org.apache.nutch.urlfilter.regex.RegexURLFilter * 另一个同理,不赘述。)** * * urlNormalizers:* org.apache.nutch.net.urlnormalizer.basic.BasicURLNormalizer* org.apache.nutch.net.urlnormalizer.pass.PassURLNormalizer* org.apache.nutch.net.urlnormalizer.regex.RegexURLNormalizer filters:* * filters:* org.apache.nutch.urlfilter.regex.RegexURLFilter* * * 这些都是 配置于nutch-default.xml中 plugin.includes的配置块* <property> <name>plugin.includes</name><value>protocol-http|urlfilter-regex|parse-(html|tika)|index-(basic|anchor)|indexer-solr|scoring-opic|urlnormalizer-(pass|regex|basic)</value><description> </description> </property>* * -------urlNormalizers:--对应------urlnormalizer-(pass|regex|basic)* -------filters:--对应----urlfilter-regex -----*/public void configure(JobConf job) {this.jobConf = job;urlNormalizers = new URLNormalizers(job,URLNormalizers.SCOPE_INJECT);interval = jobConf.getInt("db.fetch.interval.default", 2592000);filters = new URLFilters(jobConf);scfilters = new ScoringFilters(jobConf);scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);curTime = job.getLong("injector.current.time",System.currentTimeMillis());}

		/** 这里才是这个job要做的具体内容(此处重写了),key是你的url文本的偏移量(e.g.首次:0),value为读取的文本(e.g.首次 :http://www.soho.com/)。 * 这个map你可以理解为一行一行读你的url.txt,直到读完* 对于后两个入参我理解的不够深入,希望读者补充*/public void map(WritableComparable<?> key, Text value,OutputCollector<Text, CrawlDatum> output, Reporter reporter)throws IOException {System.out.println("map key:"+key);String url = value.toString(); // value is line of text/* 如果读入的文本,即url。为null,为#开头(即注掉了) 都无效返回 */System.out.println("map:value"+value);if (url != null && url.trim().startsWith("#")) {/* Ignore line that start with # */return;}/* 这里是处理读入文本中的metadata信息 */// if tabs : metadata that could be stored// must be name=value and separated by \tfloat customScore = -1f;int customInterval = interval;int fixedInterval = -1;Map<String, String> metadata = new TreeMap<String, String>();if (url.indexOf("\t") != -1) {/* 如果有metadata信息,则按照预定分开,第一个必定是url链接了 */String[] splits = url.split("\t");url = splits[0];/** 然后对url链接后的metadata组成的数组遍历,注意 int s=1,另外metadata 的格式 都是* "key=value"*/for (int s = 1; s < splits.length; s++) {// find separation between name and valueint indexEquals = splits[s].indexOf("=");if (indexEquals == -1) {// skip anything without a =continue;}String metaname = splits[s].substring(0, indexEquals);String metavalue = splits[s].substring(indexEquals + 1);/* 如果这个 metadata key 名为nutch.score,则获取对应的float类型值 */if (metaname.equals(nutchScoreMDName)) {try {customScore = Float.parseFloat(metavalue);} catch (NumberFormatException nfe) {}}/* 如果这个 metadata key 名为nutch.fetchInterval,则获取对应的Int类型值 */else if (metaname.equals(nutchFetchIntervalMDName)) {try {customInterval = Integer.parseInt(metavalue);} catch (NumberFormatException nfe) {}}/** 如果这个 metadata key* 名为nutch.fetchInterval.fixed,则获取对应的Int类型值*/else if (metaname.equals(nutchFixedFetchIntervalMDName)) {try {fixedInterval = Integer.parseInt(metavalue);} catch (NumberFormatException nfe) {}} else/* 如果有除了上述三种key属性,还有其他的,则加入新定义的metadata,用来存放没有被解析的metadata,最后会处理 */metadata.put(metaname, metavalue);}}/* 处理完了metadata(如果metadata存在),接下来我们处理url。这里涉及到插件机制,上边加载配置时已经说过了 */try {url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);url = filters.filter(url); // filter the url} catch (Exception e) {if (LOG.isWarnEnabled()) {LOG.warn("Skipping " + url + ":" + e);}url = null;}/*如果url没有通过上边的插件检查被过滤掉了,则计数increment(1)*/if (url == null) {reporter.getCounter("injector", "urls_filtered").increment(1);} else { // if it passes/*如果url通过上边的插件检查,则在覆盖掉value,也就最终的value值,同时将对应的datum设置为injected状态,表示新注入)*/value.set(url); // collect itCrawlDatum datum = new CrawlDatum();datum.setStatus(CrawlDatum.STATUS_INJECTED);// Is interval custom? Then set as meta dataif (fixedInterval > -1) {// Set writable using float. Flaot is used by// AdaptiveFetchScheduledatum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,new FloatWritable(fixedInterval));datum.setFetchInterval(fixedInterval);} else {datum.setFetchInterval(customInterval);}datum.setFetchTime(curTime);// now add the metadataIterator<String> keysIter = metadata.keySet().iterator();while (keysIter.hasNext()) {String keymd = keysIter.next();String valuemd = metadata.get(keymd);datum.getMetaData().put(new Text(keymd), new Text(valuemd));}if (customScore != -1)datum.setScore(customScore);elsedatum.setScore(scoreInjected);try {scfilters.injectedScore(value, datum);} catch (ScoringFilterException e) {if (LOG.isWarnEnabled()) {LOG.warn("Cannot filter injected score for url " + url+ ", using default (" + e.getMessage() + ")");}}/*放入输出 并计数 +1*/reporter.getCounter("injector", "urls_injected").increment(1);output.collect(value, datum);}}}
...下边reduce中 update override 默认为false 


	/** Combine multiple new entries for a url. */public static class InjectReducer implementsReducer<Text, CrawlDatum, Text, CrawlDatum> {private int interval;private float scoreInjected;private boolean overwrite = false;private boolean update = false;public void configure(JobConf job) {interval = job.getInt("db.fetch.interval.default", 2592000);scoreInjected = job.getFloat("db.score.injected", 1.0f);overwrite = job.getBoolean("db.injector.overwrite", false);update = job.getBoolean("db.injector.update", false);}public void close() {}private CrawlDatum old = new CrawlDatum();private CrawlDatum injected = new CrawlDatum();/** 这里参数 key 是url(e.g. keyhttp://blog.tianya.com/ ) 而 values 不止一个* :来自上一步map中的注入的datum 和以前注入的datum* * 以前注入的* 1i-----------Version: 7 Status: 2 (db_fetched) Fetch time: Wed Dec 10* 15:02:27 CST 2014 Modified time: Thu Jan 01 08:00:00 CST 1970 Retries* since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0* Signature: e7193c90eb7a5ca3d0a0969ed1a444d2 Metadata: Content-Type:* text/html_pst_: success(1), lastModified=0* * 上一步map注入的* 2i-----------Version: 7 Status: 66 (injected) Fetch time: Wed Nov 12* 18:29:30 CST 2014 Modified time: Thu Jan 01 08:00:00 CST 1970 Retries* since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0* Signature: null Metadata:* * 上一步map注入的(重复注入的)* 3i-----------Version: 7 Status: 66 (injected) Fetch time: Wed Nov 12* 18:29:30 CST 2014 Modified time: Thu Jan 01 08:00:00 CST 1970 Retries* since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0* Signature: null Metadata:* * 当新的和老的都有,会默认保留老的。并且一个url 对应 一个datum(old default),这不就是一个过滤的作用嘛。* * 所谓老的datum 是来自上一次产生的current,而这次注入url对应的datum都在tempDir中*/public void reduce(Text key, Iterator<CrawlDatum> values,OutputCollector<Text, CrawlDatum> output, Reporter reporter)throws IOException {System.out.println("key" + key + "//n values" + values);System.out.println("output" + output + "---reporter" + reporter);System.out.println("reporter.getCounter(injector urls_injected)"+ reporter.getCounter("injector", "urls_injected").getValue());System.out.println("reporter.getCounter( injector urls_filtered )"+ reporter.getCounter("injector", "urls_filtered").getCounter());boolean oldSet = false;boolean injectedSet = false;int i = 1;while (values.hasNext()) {i++;CrawlDatum val = values.next();System.out.println(i + "i-----------" + val);if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {injected.set(val);injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);injectedSet = true;} else {old.set(val);oldSet = true;}}CrawlDatum res = null;/*** Whether to overwrite, ignore or update existing records* * @see https://issues.apache.org/jira/browse/NUTCH-1405*/// Injected record already exists and overwrite but not updateif (injectedSet && oldSet && overwrite) {res = injected;if (update) {LOG.info(key.toString()+ " overwritten with injected record but update was specified.");}}// Injected record already exists and update but not overwriteif (injectedSet && oldSet && update && !overwrite) {res = old;old.putAllMetaData(injected);old.setScore(injected.getScore() != scoreInjected ? injected.getScore() : old.getScore());old.setFetchInterval(injected.getFetchInterval() != interval ? injected.getFetchInterval() : old.getFetchInterval());}// Old default behaviourif (injectedSet && !oldSet) {res = injected;} else {res = old;}output.collect(key, res);}}




好了,这里边的map/reduce就讲完了。最后删除tempDir,把合并后的改名为Current ,如顶图。 剩下的部分很简单,自己看吧!

这篇关于Nutch学习——读源码 Injector.java的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/606472

相关文章

Java实现检查多个时间段是否有重合

《Java实现检查多个时间段是否有重合》这篇文章主要为大家详细介绍了如何使用Java实现检查多个时间段是否有重合,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录流程概述步骤详解China编程步骤1:定义时间段类步骤2:添加时间段步骤3:检查时间段是否有重合步骤4:输出结果示例代码结语作

Java中String字符串使用避坑指南

《Java中String字符串使用避坑指南》Java中的String字符串是我们日常编程中用得最多的类之一,看似简单的String使用,却隐藏着不少“坑”,如果不注意,可能会导致性能问题、意外的错误容... 目录8个避坑点如下:1. 字符串的不可变性:每次修改都创建新对象2. 使用 == 比较字符串,陷阱满

Java判断多个时间段是否重合的方法小结

《Java判断多个时间段是否重合的方法小结》这篇文章主要为大家详细介绍了Java中判断多个时间段是否重合的方法,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录判断多个时间段是否有间隔判断时间段集合是否与某时间段重合判断多个时间段是否有间隔实体类内容public class D

IDEA编译报错“java: 常量字符串过长”的原因及解决方法

《IDEA编译报错“java:常量字符串过长”的原因及解决方法》今天在开发过程中,由于尝试将一个文件的Base64字符串设置为常量,结果导致IDEA编译的时候出现了如下报错java:常量字符串过长,... 目录一、问题描述二、问题原因2.1 理论角度2.2 源码角度三、解决方案解决方案①:StringBui

Java覆盖第三方jar包中的某一个类的实现方法

《Java覆盖第三方jar包中的某一个类的实现方法》在我们日常的开发中,经常需要使用第三方的jar包,有时候我们会发现第三方的jar包中的某一个类有问题,或者我们需要定制化修改其中的逻辑,那么应该如何... 目录一、需求描述二、示例描述三、操作步骤四、验证结果五、实现原理一、需求描述需求描述如下:需要在

Java中ArrayList和LinkedList有什么区别举例详解

《Java中ArrayList和LinkedList有什么区别举例详解》:本文主要介绍Java中ArrayList和LinkedList区别的相关资料,包括数据结构特性、核心操作性能、内存与GC影... 目录一、底层数据结构二、核心操作性能对比三、内存与 GC 影响四、扩容机制五、线程安全与并发方案六、工程

JavaScript中的reduce方法执行过程、使用场景及进阶用法

《JavaScript中的reduce方法执行过程、使用场景及进阶用法》:本文主要介绍JavaScript中的reduce方法执行过程、使用场景及进阶用法的相关资料,reduce是JavaScri... 目录1. 什么是reduce2. reduce语法2.1 语法2.2 参数说明3. reduce执行过程

如何使用Java实现请求deepseek

《如何使用Java实现请求deepseek》这篇文章主要为大家详细介绍了如何使用Java实现请求deepseek功能,文中的示例代码讲解详细,感兴趣的小伙伴可以跟随小编一起学习一下... 目录1.deepseek的api创建2.Java实现请求deepseek2.1 pom文件2.2 json转化文件2.2

Java调用DeepSeek API的最佳实践及详细代码示例

《Java调用DeepSeekAPI的最佳实践及详细代码示例》:本文主要介绍如何使用Java调用DeepSeekAPI,包括获取API密钥、添加HTTP客户端依赖、创建HTTP请求、处理响应、... 目录1. 获取API密钥2. 添加HTTP客户端依赖3. 创建HTTP请求4. 处理响应5. 错误处理6.

Spring AI集成DeepSeek的详细步骤

《SpringAI集成DeepSeek的详细步骤》DeepSeek作为一款卓越的国产AI模型,越来越多的公司考虑在自己的应用中集成,对于Java应用来说,我们可以借助SpringAI集成DeepSe... 目录DeepSeek 介绍Spring AI 是什么?1、环境准备2、构建项目2.1、pom依赖2.2