【Nutch】InjectorJob

    xiaoxiao2021-12-14  19

    源码分析

    package org.apache.nutch.crawl;

    首先呢,InjectorJob类存在于包org.apache.nutch.crawl内。

    public class InjectorJob extends NutchTool implements Tool

    它扩展了NutchTool类并实现了Tool类。并实现了NutchTool类的run(Map<String, Object>)方法和Tool类的run(String[])方法,Tool类即Hadoop util中的Tool类。没啥好说的。

    接着呢,我们来看一看它的Mapper类和reducer类。

    public static class UrlMapper extends Mapper<LongWritable, Text, String, WebPage> { private URLNormalizers urlNormalizers; private int interval; private float scoreInjected; private URLFilters filters; private ScoringFilters scfilters; private long curTime; @Override protected void setup(Context context) throws IOException, InterruptedException { /** * 一些准备工作。很多默认配置都在nutch根目录下conf文件夹中的nutch-default.xml中。 */ urlNormalizers = new URLNormalizers(context.getConfiguration(), URLNormalizers.SCOPE_INJECT); /**规范化Url*/ interval = context.getConfiguration().getInt("db.fetch.interval.default", 2592000); /**两次抓取同一个页面之间的默认时间间隔,30天。*/ filters = new URLFilters(context.getConfiguration()); /**过滤不合法的Url*/ scfilters = new ScoringFilters(context.getConfiguration()); /**一个计算分值的类*/ scoreInjected = context.getConfiguration().getFloat("db.score.injected", 1.0f); /**被injector增加的新页面的分值(score)*/ curTime = context.getConfiguration().getLong("injector.current.time", System.currentTimeMillis()); /**注入(inject)的时间*/ } protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { String url = value.toString().trim(); // value is line of text if (url != null && (url.length() == 0 || url.startsWith("#"))) { /* Ignore line that start with # */ return; } // if tabs : metadata that could be stored // must be name=value and separated by \t float customScore = -1f; int customInterval = interval; Map<String, String> metadata = new TreeMap<String, String>(); /**用来映射元数据的name和value值*/ if (url.indexOf("\t") != -1) { String[] splits = url.split("\t"); url = splits[0]; for (int s = 1; s < splits.length; s++) { /**对于每一个被split的name value值*/ // find separation between name and value int indexEquals = splits[s].indexOf("="); if (indexEquals == -1) { // skip anything without a = continue; } String metaname = splits[s].substring(0, indexEquals); String metavalue = splits[s].substring(indexEquals + 1); /** * 对于一个特定Url,用户自定义的分值 * public static String nutchScoreMDName = "nutch.score"; */ if (metaname.equals(nutchScoreMDName)) { try { customScore = Float.parseFloat(metavalue); } catch (NumberFormatException nfe) { } } /** * 对于一个特定Url,用户自定义的抓取间隔 * public static String nutchFetchIntervalMDName = "nutch.fetchInterval"; */ else if (metaname.equals(nutchFetchIntervalMDName)) { try { customInterval = Integer.parseInt(metavalue); } catch (NumberFormatException nfe) { } } else metadata.put(metaname, metavalue); } } try { /** * 这里就是核心的规范化和过滤Url的过程了 */ url = urlNormalizers.normalize(url, URLNormalizers.SCOPE_INJECT); url = filters.filter(url); // filter the url } catch (Exception e) { url = null; } if (url == null) { context.getCounter("injector", "urls_filtered").increment(1); /**被过滤掉的Url数目加一*/ return; } else { // if it passes /** * 如果Url通过,则将从该Url中所获取的信息连同其他信息一同封装于WebPage类中用于保存。 */ String reversedUrl = TableUtil.reverseUrl(url); // collect it WebPage row = WebPage.newBuilder().build(); row.setFetchTime(curTime); row.setFetchInterval(customInterval); // now add the metadata Iterator<String> keysIter = metadata.keySet().iterator(); while (keysIter.hasNext()) { String keymd = keysIter.next(); String valuemd = metadata.get(keymd); row.getMetadata().put(new Utf8(keymd), ByteBuffer.wrap(valuemd.getBytes())); } if (customScore != -1) row.setScore(customScore); else row.setScore(scoreInjected); try { /** * 当注入新页面的时候,计算出一个新的初始值 */ scfilters.injectedScore(url, row); } catch (ScoringFilterException e) { } } context.getCounter("injector", "urls_injected").increment(1); /**已注入Url数目加一*/ row.getMarkers() .put(DbUpdaterJob.DISTANCE, new Utf8(String.valueOf(0))); Mark.INJECT_MARK.putMark(row, YES_STRING); /** * Mapper的输出:<String(reversedUrl), WebPage(row)> */ context.write(reversedUrl, row); } } }

    首先呢,细心的同学已经发现在存储的时候它已经将Url通过调用

    String reversedUrl = TableUtil.reverseUrl(url);

    变成reversedUrl了。

    这是为了反转一个Url的域名,从而方便于在HBase中的存储。因为在同一域名内的扫描能更快一些。举个自带的例子:

    "http://bar.foo.com:8983/to/index.html?a=b" 变成了 "com.foo.bar:8983:http/to/index.html?a=b"

    然后呢,在成员变量里还出现了三个比较陌生的类:URLNormalizers, URLFilters, ScoringFilters.

    这三个类呢,其实都是Nucth插件的接口。稍后我们将会另起一篇文章,利用URLNormalizers或者URLFilters类专门来说一说Nutch的插件机制。

    这里先简单说一说ScoringFilters类吧,因为大家可能被我注释中页面的“分值(score)”这个说法弄晕了。比如当你没有设置conf/nutch-default.xml中的scoring.filter.order属性值,那么程序就会默认调用src/plugin中自带的scoring-opic插件,用其中的org.apache.nutch.scoring.opic.OPICScoringFilter类作为ScoringFilter类的实现。这其实就是Nutch内部的页面评分机制OPIC算法的调用,即Online Page Importance Computation算法。它实现的参考文献是这一篇论文:Adaptive On-Line Page Importance Computation.

    那它是个Filter也就不难理解了,它会将页面中那些评分过低的过滤掉。

    URLNormalizers类默认调用插件urlnormalizer-(pass|regex|basic).而什么是将Url规范化呢?举个例子:

    urlnormalizer-basic就是用来——

    remove dot segments in path: /./ or /../ remove default ports, e.g. 80 for protocol http://

    URLFilters类默认调用插件urlfilter-regex. 而Nutch自带五种过滤插件,分别为:DomainURLFilter, RegexURLFilter, AutomatonURLFilter , PrefixURLFilter, SuffixURLFilter. 这5中过滤器的配置过滤规则的文件分别为:domain-urlfilter.txt、regex-urlfilter.txt、automaton-urlfilter.txt、prefix-urlfilter.txt、suffix-urlfilter.txt。过滤器和过滤规则文件之间的关系同样是通过来nutch-default.xml来定义的。属性urlfilter.order则定义了过滤器的应用顺序,所有过滤器都是与的关系。

    接下来,我们看看它的Job。

    currentJob = new NutchJob(getConf(), "inject " + input); /**NutchJob是对Hadoop Job的一个扩展*/ FileInputFormat.addInputPath(currentJob, input); currentJob.setMapperClass(UrlMapper.class); currentJob.setMapOutputKeyClass(String.class); currentJob.setMapOutputValueClass(WebPage.class); currentJob.setOutputFormatClass(GoraOutputFormat.class); DataStore<String, WebPage> store = StorageUtils.createWebStore( currentJob.getConfiguration(), String.class, WebPage.class); GoraOutputFormat.setOutput(currentJob, store, true); currentJob.setReducerClass(Reducer.class); currentJob.setNumReduceTasks(0); /**这里可以看出这个Job根本就没有用到Reducer*/ currentJob.waitForCompletion(true); /**run job.*/

    由于 此Job没有reduce阶段,结合上面的代码,显然可以看出输出被写入到了Gora的dataStore中。

    总结一下,InjectJob就是从input中读入种子Urls,然后对其进行规范化,过滤,再进行评分。最后进行存储。

    实战演练

    seed.txt

    http://nutch.apache.org/ nutch.score=0.172 nutch.fetchInterval=3600 result:

    http://nutch.apache.org/ key: org.apache.nutch:http/ baseUrl: null status: 0 (null) fetchTime: 1440072184529 prevFetchTime: 0 fetchInterval: 3600 retriesSinceFetch: 0 modifiedTime: 0 prevModifiedTime: 0 protocolStatus: (null) parseStatus: (null) title: null score: 0.172 marker _injmrk_ : y marker dist : 0 reprUrl: null metadata _csh_ : >0 �    
    转载请注明原文地址: https://ju.6miu.com/read-965485.html

    最新回复(0)