Nutch学习——读源码 Injector.java

2024-01-14 20:50

本文主要是介绍Nutch学习——读源码 Injector.java,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

Injector.java主要是向crawldb注入 URL,这些URL也可以选择性的带上对应当metadata。里面用到了MapReduce和插件机制


 Injector.inject(...):

public void inject(Path crawlDb, Path urlDir) throws IOException {SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");long start = System.currentTimeMillis();/** 打印了开始的时间;注入的路径;要注入的url文档*/if (LOG.isInfoEnabled()) {LOG.info("Injector: starting at " + sdf.format(start));LOG.info("Injector: crawlDb: " + crawlDb);LOG.info("Injector: urlDir: " + urlDir);}/** * 创建一个临时文件夹 以mapred.temp.dir 属性对应的值(如果没有则默认用“.”) + “/inject-temp-” +* 一个随机数、* 为这个临时文件夹名字* inject-temp-236972440* * 对于getConf()加载:* Configuration: core-default.xml, core-site.xml, nutch-default.xml, nutch-site.xml*/Path tempDir = new Path(getConf().get("mapred.temp.dir", ".")+ "/inject-temp-"+ Integer.toString(new Random().nextInt(Integer.MAX_VALUE)));// map text input file to a <url,CrawlDatum> fileif (LOG.isInfoEnabled()) {LOG.info("Injector: Converting injected urls to crawl db entries.");}/*初始化一个NutchJob,并在getConf() 中加载 * Configuration: core-default.xml, core-site.xml, mapred-default.xml, mapred-site.xml, nutch-default.xml, nutch-site.xml* * 其中还设置了输入:存放url的文件,输出临时文件夹tempDir;输出的key为Text类型(可以理解为String)value为crawlDatum类型。* * 最后 runJob————将会跳入InjectMapper这个内部类中* * */JobConf sortJob = new NutchJob(getConf());sortJob.setJobName("inject " + urlDir);FileInputFormat.addInputPath(sortJob, urlDir);sortJob.setMapperClass(InjectMapper.class);FileOutputFormat.setOutputPath(sortJob, tempDir);sortJob.setOutputFormat(SequenceFileOutputFormat.class);sortJob.setOutputKeyClass(Text.class);sortJob.setOutputValueClass(CrawlDatum.class);sortJob.setLong("injector.current.time", System.currentTimeMillis());RunningJob mapJob = JobClient.runJob(sortJob);

启动sortJob,将进入静态内部类:InjectMapper 类: 

/** Normalize and filter injected urls. */public static class InjectMapper implementsMapper<WritableComparable<?>, Text, Text, CrawlDatum> {private URLNormalizers urlNormalizers;private int interval;private float scoreInjected;private JobConf jobConf;private URLFilters filters;private ScoringFilters scfilters;private long curTime;/** 进入这个内部类,先调用configure方法,主要说一下插件的配置: urlNormalizers ,filters涉及插件机制,会加载插件:* **( 加载一个extensionPoint: <extension-point id="org.apache.nutch.net.URLFilter" name="Nutch URL Filter"/>* 当然下边会加载对应的扩展类org.apache.nutch.urlfilter.regex.RegexURLFilter * 另一个同理,不赘述。)** * * urlNormalizers:* org.apache.nutch.net.urlnormalizer.basic.BasicURLNormalizer* org.apache.nutch.net.urlnormalizer.pass.PassURLNormalizer* org.apache.nutch.net.urlnormalizer.regex.RegexURLNormalizer filters:* * filters:* org.apache.nutch.urlfilter.regex.RegexURLFilter* * * 这些都是 配置于nutch-default.xml中 plugin.includes的配置块* <property> <name>plugin.includes</name><value>protocol-http|urlfilter-regex|parse-(html|tika)|index-(basic|anchor)|indexer-solr|scoring-opic|urlnormalizer-(pass|regex|basic)</value><description> </description> </property>* * -------urlNormalizers:--对应------urlnormalizer-(pass|regex|basic)* -------filters:--对应----urlfilter-regex -----*/public void configure(JobConf job) {this.jobConf = job;urlNormalizers = new URLNormalizers(job,URLNormalizers.SCOPE_INJECT);interval = jobConf.getInt("db.fetch.interval.default", 2592000);filters = new URLFilters(jobConf);scfilters = new ScoringFilters(jobConf);scoreInjected = jobConf.getFloat("db.score.injected", 1.0f);curTime = job.getLong("injector.current.time",System.currentTimeMillis());}

		/** 这里才是这个job要做的具体内容(此处重写了),key是你的url文本的偏移量(e.g.首次:0),value为读取的文本(e.g.首次 :http://www.soho.com/)。 * 这个map你可以理解为一行一行读你的url.txt,直到读完* 对于后两个入参我理解的不够深入,希望读者补充*/public void map(WritableComparable<?> key, Text value,OutputCollector<Text, CrawlDatum> output, Reporter reporter)throws IOException {System.out.println("map key:"+key);String url = value.toString(); // value is line of text/* 如果读入的文本,即url。为null,为#开头(即注掉了) 都无效返回 */System.out.println("map:value"+value);if (url != null && url.trim().startsWith("#")) {/* Ignore line that start with # */return;}/* 这里是处理读入文本中的metadata信息 */// if tabs : metadata that could be stored// must be name=value and separated by \tfloat customScore = -1f;int customInterval = interval;int fixedInterval = -1;Map<String, String> metadata = new TreeMap<String, String>();if (url.indexOf("\t") != -1) {/* 如果有metadata信息,则按照预定分开,第一个必定是url链接了 */String[] splits = url.split("\t");url = splits[0];/** 然后对url链接后的metadata组成的数组遍历,注意 int s=1,另外metadata 的格式 都是* "key=value"*/for (int s = 1; s < splits.length; s++) {// find separation between name and valueint indexEquals = splits[s].indexOf("=");if (indexEquals == -1) {// skip anything without a =continue;}String metaname = splits[s].substring(0, indexEquals);String metavalue = splits[s].substring(indexEquals + 1);/* 如果这个 metadata key 名为nutch.score,则获取对应的float类型值 */if (metaname.equals(nutchScoreMDName)) {try {customScore = Float.parseFloat(metavalue);} catch (NumberFormatException nfe) {}}/* 如果这个 metadata key 名为nutch.fetchInterval,则获取对应的Int类型值 */else if (metaname.equals(nutchFetchIntervalMDName)) {try {customInterval = Integer.parseInt(metavalue);} catch (NumberFormatException nfe) {}}/** 如果这个 metadata key* 名为nutch.fetchInterval.fixed,则获取对应的Int类型值*/else if (metaname.equals(nutchFixedFetchIntervalMDName)) {try {fixedInterval = Integer.parseInt(metavalue);} catch (NumberFormatException nfe) {}} else/* 如果有除了上述三种key属性,还有其他的,则加入新定义的metadata,用来存放没有被解析的metadata,最后会处理 */metadata.put(metaname, metavalue);}}/* 处理完了metadata(如果metadata存在),接下来我们处理url。这里涉及到插件机制,上边加载配置时已经说过了 */try {url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT);url = filters.filter(url); // filter the url} catch (Exception e) {if (LOG.isWarnEnabled()) {LOG.warn("Skipping " + url + ":" + e);}url = null;}/*如果url没有通过上边的插件检查被过滤掉了,则计数increment(1)*/if (url == null) {reporter.getCounter("injector", "urls_filtered").increment(1);} else { // if it passes/*如果url通过上边的插件检查,则在覆盖掉value,也就最终的value值,同时将对应的datum设置为injected状态,表示新注入)*/value.set(url); // collect itCrawlDatum datum = new CrawlDatum();datum.setStatus(CrawlDatum.STATUS_INJECTED);// Is interval custom? Then set as meta dataif (fixedInterval > -1) {// Set writable using float. Flaot is used by// AdaptiveFetchScheduledatum.getMetaData().put(Nutch.WRITABLE_FIXED_INTERVAL_KEY,new FloatWritable(fixedInterval));datum.setFetchInterval(fixedInterval);} else {datum.setFetchInterval(customInterval);}datum.setFetchTime(curTime);// now add the metadataIterator<String> keysIter = metadata.keySet().iterator();while (keysIter.hasNext()) {String keymd = keysIter.next();String valuemd = metadata.get(keymd);datum.getMetaData().put(new Text(keymd), new Text(valuemd));}if (customScore != -1)datum.setScore(customScore);elsedatum.setScore(scoreInjected);try {scfilters.injectedScore(value, datum);} catch (ScoringFilterException e) {if (LOG.isWarnEnabled()) {LOG.warn("Cannot filter injected score for url " + url+ ", using default (" + e.getMessage() + ")");}}/*放入输出 并计数 +1*/reporter.getCounter("injector", "urls_injected").increment(1);output.collect(value, datum);}}}
...下边reduce中 update override 默认为false 


	/** Combine multiple new entries for a url. */public static class InjectReducer implementsReducer<Text, CrawlDatum, Text, CrawlDatum> {private int interval;private float scoreInjected;private boolean overwrite = false;private boolean update = false;public void configure(JobConf job) {interval = job.getInt("db.fetch.interval.default", 2592000);scoreInjected = job.getFloat("db.score.injected", 1.0f);overwrite = job.getBoolean("db.injector.overwrite", false);update = job.getBoolean("db.injector.update", false);}public void close() {}private CrawlDatum old = new CrawlDatum();private CrawlDatum injected = new CrawlDatum();/** 这里参数 key 是url(e.g. keyhttp://blog.tianya.com/ ) 而 values 不止一个* :来自上一步map中的注入的datum 和以前注入的datum* * 以前注入的* 1i-----------Version: 7 Status: 2 (db_fetched) Fetch time: Wed Dec 10* 15:02:27 CST 2014 Modified time: Thu Jan 01 08:00:00 CST 1970 Retries* since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0* Signature: e7193c90eb7a5ca3d0a0969ed1a444d2 Metadata: Content-Type:* text/html_pst_: success(1), lastModified=0* * 上一步map注入的* 2i-----------Version: 7 Status: 66 (injected) Fetch time: Wed Nov 12* 18:29:30 CST 2014 Modified time: Thu Jan 01 08:00:00 CST 1970 Retries* since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0* Signature: null Metadata:* * 上一步map注入的(重复注入的)* 3i-----------Version: 7 Status: 66 (injected) Fetch time: Wed Nov 12* 18:29:30 CST 2014 Modified time: Thu Jan 01 08:00:00 CST 1970 Retries* since fetch: 0 Retry interval: 2592000 seconds (30 days) Score: 1.0* Signature: null Metadata:* * 当新的和老的都有,会默认保留老的。并且一个url 对应 一个datum(old default),这不就是一个过滤的作用嘛。* * 所谓老的datum 是来自上一次产生的current,而这次注入url对应的datum都在tempDir中*/public void reduce(Text key, Iterator<CrawlDatum> values,OutputCollector<Text, CrawlDatum> output, Reporter reporter)throws IOException {System.out.println("key" + key + "//n values" + values);System.out.println("output" + output + "---reporter" + reporter);System.out.println("reporter.getCounter(injector urls_injected)"+ reporter.getCounter("injector", "urls_injected").getValue());System.out.println("reporter.getCounter( injector urls_filtered )"+ reporter.getCounter("injector", "urls_filtered").getCounter());boolean oldSet = false;boolean injectedSet = false;int i = 1;while (values.hasNext()) {i++;CrawlDatum val = values.next();System.out.println(i + "i-----------" + val);if (val.getStatus() == CrawlDatum.STATUS_INJECTED) {injected.set(val);injected.setStatus(CrawlDatum.STATUS_DB_UNFETCHED);injectedSet = true;} else {old.set(val);oldSet = true;}}CrawlDatum res = null;/*** Whether to overwrite, ignore or update existing records* * @see https://issues.apache.org/jira/browse/NUTCH-1405*/// Injected record already exists and overwrite but not updateif (injectedSet && oldSet && overwrite) {res = injected;if (update) {LOG.info(key.toString()+ " overwritten with injected record but update was specified.");}}// Injected record already exists and update but not overwriteif (injectedSet && oldSet && update && !overwrite) {res = old;old.putAllMetaData(injected);old.setScore(injected.getScore() != scoreInjected ? injected.getScore() : old.getScore());old.setFetchInterval(injected.getFetchInterval() != interval ? injected.getFetchInterval() : old.getFetchInterval());}// Old default behaviourif (injectedSet && !oldSet) {res = injected;} else {res = old;}output.collect(key, res);}}




好了,这里边的map/reduce就讲完了。最后删除tempDir,把合并后的改名为Current ,如顶图。 剩下的部分很简单,自己看吧!

这篇关于Nutch学习——读源码 Injector.java的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/606472

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

JVM 的类初始化机制

前言 当你在 Java 程序中new对象时,有没有考虑过 JVM 是如何把静态的字节码(byte code)转化为运行时对象的呢,这个问题看似简单,但清楚的同学相信也不会太多,这篇文章首先介绍 JVM 类初始化的机制,然后给出几个易出错的实例来分析,帮助大家更好理解这个知识点。 JVM 将字节码转化为运行时对象分为三个阶段,分别是:loading 、Linking、initialization

Spring Security 基于表达式的权限控制

前言 spring security 3.0已经可以使用spring el表达式来控制授权,允许在表达式中使用复杂的布尔逻辑来控制访问的权限。 常见的表达式 Spring Security可用表达式对象的基类是SecurityExpressionRoot。 表达式描述hasRole([role])用户拥有制定的角色时返回true (Spring security默认会带有ROLE_前缀),去

浅析Spring Security认证过程

类图 为了方便理解Spring Security认证流程,特意画了如下的类图,包含相关的核心认证类 概述 核心验证器 AuthenticationManager 该对象提供了认证方法的入口,接收一个Authentiaton对象作为参数; public interface AuthenticationManager {Authentication authenticate(Authenti

Spring Security--Architecture Overview

1 核心组件 这一节主要介绍一些在Spring Security中常见且核心的Java类,它们之间的依赖,构建起了整个框架。想要理解整个架构,最起码得对这些类眼熟。 1.1 SecurityContextHolder SecurityContextHolder用于存储安全上下文(security context)的信息。当前操作的用户是谁,该用户是否已经被认证,他拥有哪些角色权限…这些都被保

Spring Security基于数据库验证流程详解

Spring Security 校验流程图 相关解释说明(认真看哦) AbstractAuthenticationProcessingFilter 抽象类 /*** 调用 #requiresAuthentication(HttpServletRequest, HttpServletResponse) 决定是否需要进行验证操作。* 如果需要验证,则会调用 #attemptAuthentica

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

Java架构师知识体认识

源码分析 常用设计模式 Proxy代理模式Factory工厂模式Singleton单例模式Delegate委派模式Strategy策略模式Prototype原型模式Template模板模式 Spring5 beans 接口实例化代理Bean操作 Context Ioc容器设计原理及高级特性Aop设计原理Factorybean与Beanfactory Transaction 声明式事物

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06