Mahout源码分析之 -- 文档向量化TF-IDF

2024-06-12 20:18

本文主要是介绍Mahout源码分析之 -- 文档向量化TF-IDF,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

 Mahout之SparseVectorsFromSequenceFiles源码分析

一、原理

TF-IDF是一种统计方法,用以评估一字词对于一个文件集或一个语料库中的其中一份文件的重要程度。字词的重要性随着它在文件中出现的次数成正比增加,但同时会随着它在语料库中出现的频率成反比下降。

TFIDF的主要思想是:如果某个词或短语在一篇文章中出现的频率TF高,并且在其他文章中很少出现,则认为此词或者短语具有很好的类别区分能力,适合用来分类。

TFIDF实际上是:TF * IDF,TF词频(Term Frequency),IDF逆向文件频率(Inverse Document Frequency)。

词频 (TF) 指的是某一个给定的词语在文件中出现的次数。这个数字通常会被归一化,以防止它偏向长的文件。(同一个词语在长文件里可能会比短文件有更高的词频,而不管该词语重要与否。)

逆向文件频率(IDF)是一个词语普遍重要性的度量,其主要思想是:如果包含词条t的文档越少,也就是n越小,IDF越大,则说明词条t具有很好的类别区分能力。

对于在某一特定文件里的词语  来说,它的重要性可表示为:


以上式子中  是该词在文件中的出现次数,而分母则是在文件中所有字词的出现次数之和(分母也可以是词出现次数的最大值)。

逆向文件频率(inverse document frequency,IDF)是一个词语普遍重要性的度量。某一特定词语的IDF,可以由总文件数目除以包含该词语之文件的数目,再将得到的商取对数得到:


其中

  • |D|:语料库中的文件总数
  • :包含词语的文件数目(即的文件数目)如果该词语不在语料库中,就会导致分母为零,因此一般情况下使用

然后


某一特定文件内的高词语频率,以及该词语在整个文件集合中的低文件频率,可以产生出高权重的TF-IDF。因此,TF-IDF倾向于过滤掉常见的词语,保留重要的词语。

二、源码分析

目标:将一个给定的sequence文件集合转化为SparseVectors

1、对文档分词

1.1)使用最新的{@link org.apache.lucene.util.Version}创建一个Analyzer,用来下文1.2分词;

复制代码
      Class<? extends Analyzer> analyzerClass = StandardAnalyzer.class;if (cmdLine.hasOption(analyzerNameOpt)) {String className = cmdLine.getValue(analyzerNameOpt).toString();analyzerClass = Class.forName(className).asSubclass(Analyzer.class);// try instantiating it, b/c there isn't any point in setting it if// you can't instantiate it
        AnalyzerUtils.createAnalyzer(analyzerClass);}
复制代码

1.2)使用{@link StringTuple}将input documents转化为token数组(input documents必须是{@link org.apache.hadoop.io.SequenceFile}格式);

DocumentProcessor.tokenizeDocuments(inputDir, analyzerClass, tokenizedPath, conf);

输入:inputDir     输出:tokenizedPath

SequenceFileTokenizerMapper

复制代码
 //将input documents按Analyzer进行分词,并将分得的词放在一个StringTuple中TokenStream stream = analyzer.tokenStream(key.toString(), new StringReader(value.toString()));CharTermAttribute termAtt = stream.addAttribute(CharTermAttribute.class);stream.reset();StringTuple document = new StringTuple();//StringTuple是一个能够被用于Hadoop Map/Reduce Job的String类型有序Listwhile (stream.incrementToken()) {if (termAtt.length() > 0) {document.add(new String(termAtt.buffer(), 0, termAtt.length()));}}
复制代码

2、创建TF向量(Term Frequency Vectors)---多个Map/Reduce Job

复制代码
        DictionaryVectorizer.createTermFrequencyVectors(tokenizedPath,outputDir,tfDirName,conf,minSupport,maxNGramSize,minLLRValue,-1.0f,false,reduceTasks,chunkSize,sequentialAccessOutput,namedVectors);
复制代码

2.1)全局词统计(TF)

startWordCounting(input, dictionaryJobPath, baseConf, minSupport);

使用Map/Reduce并行地统计全局的词频,这里只考虑(maxNGramSize == 1) 

输入:tokenizedPath   输出:wordCountPath

TermCountMapper

复制代码
  //统计一个文本文档中的词频OpenObjectLongHashMap<String> wordCount = new OpenObjectLongHashMap<String>();for (String word : value.getEntries()) {if (wordCount.containsKey(word)) {wordCount.put(word, wordCount.get(word) + 1);} else {wordCount.put(word, 1);}}wordCount.forEachPair(new ObjectLongProcedure<String>() {@Overridepublic boolean apply(String first, long second) {try {context.write(new Text(first), new LongWritable(second));} catch (IOException e) {context.getCounter("Exception", "Output IO Exception").increment(1);} catch (InterruptedException e) {context.getCounter("Exception", "Interrupted Exception").increment(1);}return true;}});
复制代码

TermCountCombiner:( 同 TermCountReducer)

TermCountReducer

复制代码
//汇总所有的words和单词的weights,并将同一word的权重sumlong sum = 0;for (LongWritable value : values) {sum += value.get();}if (sum >= minSupport) {//TermCountCombiner没有这个过滤)context.write(key, new LongWritable(sum));}
复制代码

2.2)创建词典

 List<Path> dictionaryChunks;dictionaryChunks =createDictionaryChunks(dictionaryJobPath, output, baseConf, chunkSizeInMegabytes, maxTermDimension);

读取2.1词频Job的feature frequency List,并给它们指定id

输入:wordCountPath   输出:dictionaryJobPath

复制代码
 /*** Read the feature frequency List which is built at the end of the Word Count Job and assign ids to them.* This will use constant memory and will run at the speed of your disk read*/private static List<Path> createDictionaryChunks(Path wordCountPath,Path dictionaryPathBase,Configuration baseConf,int chunkSizeInMegabytes,int[] maxTermDimension) throws IOException {List<Path> chunkPaths = Lists.newArrayList();Configuration conf = new Configuration(baseConf);FileSystem fs = FileSystem.get(wordCountPath.toUri(), conf);long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;//默认64Mint chunkIndex = 0;Path chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);chunkPaths.add(chunkPath);SequenceFile.Writer dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);try {long currentChunkSize = 0;Path filesPattern = new Path(wordCountPath, OUTPUT_FILES_PATTERN);int i = 0;for (Pair<Writable,Writable> record: new SequenceFileDirIterable<Writable,Writable>(filesPattern, PathType.GLOB, null, null, true, conf)) {if (currentChunkSize > chunkSizeLimit) {//生成新的词典文件Closeables.close(dictWriter, false);chunkIndex++;chunkPath = new Path(dictionaryPathBase, DICTIONARY_FILE + chunkIndex);chunkPaths.add(chunkPath);dictWriter = new SequenceFile.Writer(fs, conf, chunkPath, Text.class, IntWritable.class);currentChunkSize = 0;}Writable key = record.getFirst();int fieldSize = DICTIONARY_BYTE_OVERHEAD + key.toString().length() * 2 + Integer.SIZE / 8;currentChunkSize += fieldSize;dictWriter.append(key, new IntWritable(i++));//指定id}maxTermDimension[0] = i;//记录最大word数目} finally {Closeables.close(dictWriter, false);}return chunkPaths;}
复制代码

2.3)构造PartialVectors(TF)

复制代码
int partialVectorIndex = 0;Collection<Path> partialVectorPaths = Lists.newArrayList();for (Path dictionaryChunk : dictionaryChunks) {Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);partialVectorPaths.add(partialVectorOutputPath);makePartialVectors(input, baseConf, maxNGramSize, dictionaryChunk, partialVectorOutputPath,maxTermDimension[0], sequentialAccess, namedVectors, numReducers);}
复制代码

将input documents使用a chunk of features创建a partial vector

(这是由于词典文件被分成了多个文件,每个文件只能构造总的vector的一部分,其中每一部分叫一个partial vector)

输入:tokenizedPath   输出:partialVectorPaths

Mapper:(Mapper)

TFPartialVectorReducer

复制代码
    //读取词典文件
//MAHOUT-1247Path dictionaryFile = HadoopUtil.getSingleCachedFile(conf);// key is word value is idfor (Pair<Writable, IntWritable> record: new SequenceFileIterable<Writable, IntWritable>(dictionaryFile, true, conf)) {dictionary.put(record.getFirst().toString(), record.getSecond().get());}
复制代码
复制代码
//转化a document为a sparse vectorStringTuple value = it.next();Vector vector = new RandomAccessSparseVector(dimension, value.length()); // guess at initial sizefor (String term : value.getEntries()) {if (!term.isEmpty() && dictionary.containsKey(term)) { // unigramint termId = dictionary.get(term);vector.setQuick(termId, vector.getQuick(termId) + 1);}}
复制代码

2.4)合并PartialVectors(TF)

    Configuration conf = new Configuration(baseConf);Path outputDir = new Path(output, tfVectorsFolderName);PartialVectorMerger.mergePartialVectors(partialVectorPaths, outputDir, conf, normPower, logNormalize,maxTermDimension[0], sequentialAccess, namedVectors, numReducers);

合并所有的partial {@link org.apache.mahout.math.RandomAccessSparseVector}s为完整的{@link org.apache.mahout.math.RandomAccessSparseVector}

输入:partialVectorPaths   输出:tfVectorsFolder

Mapper:(Mapper)

PartialVectorMergeReducer:

//合并partial向量为完整的TF向量Vector vector = new RandomAccessSparseVector(dimension, 10);for (VectorWritable value : values) {vector.assign(value.get(), Functions.PLUS);//将包含不同word的向量合并为一个}

 3、创建IDF向量(document frequency Vectors)---多个Map/Reduce Job

复制代码
      Pair<Long[], List<Path>> docFrequenciesFeatures = null;// Should document frequency features be processedif (shouldPrune || processIdf) {log.info("Calculating IDF");docFrequenciesFeatures =TFIDFConverter.calculateDF(new Path(outputDir, tfDirName), outputDir, conf, chunkSize);}
复制代码

3.1)统计DF词频

Path wordCountPath = new Path(output, WORDCOUNT_OUTPUT_FOLDER);

startDFCounting(input, wordCountPath, baseConf);

输入:tfDir  输出:featureCountPath

 TermDocumentCountMapper

复制代码
 //为一个文档中的每个word计数1、文档数1Vector vector = value.get();for (Vector.Element e : vector.nonZeroes()) {out.set(e.index());context.write(out, ONE);}context.write(TOTAL_COUNT, ONE);
复制代码

Combiner:(TermDocumentCountReducer)

TermDocumentCountReducer

   //将每个word的文档频率和文档总数sumlong sum = 0;for (LongWritable value : values) {sum += value.get();}

3.2)df词频分块

 return createDictionaryChunks(wordCountPath, output, baseConf, chunkSizeInMegabytes);

将df词频分块存放到多个文件,记录word总数、文档总数

输入:featureCountPath    输出:dictionaryPathBase

复制代码
  /*** Read the document frequency List which is built at the end of the DF Count Job. This will use constant* memory and will run at the speed of your disk read*/private static Pair<Long[], List<Path>> createDictionaryChunks(Path featureCountPath,Path dictionaryPathBase,Configuration baseConf,int chunkSizeInMegabytes) throws IOException {List<Path> chunkPaths = Lists.newArrayList();Configuration conf = new Configuration(baseConf);FileSystem fs = FileSystem.get(featureCountPath.toUri(), conf);long chunkSizeLimit = chunkSizeInMegabytes * 1024L * 1024L;int chunkIndex = 0;Path chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);chunkPaths.add(chunkPath);SequenceFile.Writer freqWriter =new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);try {long currentChunkSize = 0;long featureCount = 0;long vectorCount = Long.MAX_VALUE;Path filesPattern = new Path(featureCountPath, OUTPUT_FILES_PATTERN);for (Pair<IntWritable,LongWritable> record: new SequenceFileDirIterable<IntWritable,LongWritable>(filesPattern,PathType.GLOB,null,null,true,conf)) {if (currentChunkSize > chunkSizeLimit) {Closeables.close(freqWriter, false);chunkIndex++;chunkPath = new Path(dictionaryPathBase, FREQUENCY_FILE + chunkIndex);chunkPaths.add(chunkPath);freqWriter = new SequenceFile.Writer(fs, conf, chunkPath, IntWritable.class, LongWritable.class);currentChunkSize = 0;}int fieldSize = SEQUENCEFILE_BYTE_OVERHEAD + Integer.SIZE / 8 + Long.SIZE / 8;currentChunkSize += fieldSize;IntWritable key = record.getFirst();LongWritable value = record.getSecond();if (key.get() >= 0) {freqWriter.append(key, value);} else if (key.get() == -1) {//文档数目vectorCount = value.get();}featureCount = Math.max(key.get(), featureCount);}featureCount++;Long[] counts = {featureCount, vectorCount};//word数目、文档数目return new Pair<Long[], List<Path>>(counts, chunkPaths);} finally {Closeables.close(freqWriter, false);}}
复制代码

4、创建TFIDF(Term Frequency-Inverse Document Frequency (Tf-Idf) Vectors)

        TFIDFConverter.processTfIdf(new Path(outputDir, DictionaryVectorizer.DOCUMENT_VECTOR_OUTPUT_FOLDER),outputDir, conf, docFrequenciesFeatures, minDf, maxDF, norm, logNormalize,sequentialAccessOutput, namedVectors, reduceTasks);

4.1)生成PartialVectors(TFIDF)

复制代码
  int partialVectorIndex = 0;List<Path> partialVectorPaths = Lists.newArrayList();List<Path> dictionaryChunks = datasetFeatures.getSecond();for (Path dictionaryChunk : dictionaryChunks) {Path partialVectorOutputPath = new Path(output, VECTOR_OUTPUT_FOLDER + partialVectorIndex++);partialVectorPaths.add(partialVectorOutputPath);makePartialVectors(input,baseConf,datasetFeatures.getFirst()[0],datasetFeatures.getFirst()[1],minDf,maxDF,dictionaryChunk,partialVectorOutputPath,sequentialAccessOutput,namedVector);}
复制代码

使用a chunk of features创建a partial tfidf vector

输入:tfVectorsFolder   输出:partialVectorOutputPath

    DistributedCache.setCacheFiles(new URI[] {dictionaryFilePath.toUri()}, conf);//缓存df分块文件

Mapper:(Mapper)

TFIDFPartialVectorReducer

复制代码
  //计算每个文档中每个word的TFIDF值
Vector value = it.next().get();Vector vector = new RandomAccessSparseVector((int) featureCount, value.getNumNondefaultElements());for (Vector.Element e : value.nonZeroes()) {if (!dictionary.containsKey(e.index())) {continue;}long df = dictionary.get(e.index());if (maxDf > -1 && (100.0 * df) / vectorCount > maxDf) {continue;}if (df < minDf) {df = minDf;}vector.setQuick(e.index(), tfidf.calculate((int) e.get(), (int) df, (int) featureCount, (int) vectorCount));}
复制代码

 4.2)合并partial向量(TFIDF)

复制代码
    Configuration conf = new Configuration(baseConf);Path outputDir = new Path(output, DOCUMENT_VECTOR_OUTPUT_FOLDER);PartialVectorMerger.mergePartialVectors(partialVectorPaths,outputDir,baseConf,normPower,logNormalize,datasetFeatures.getFirst()[0].intValue(),sequentialAccessOutput,namedVector,numReducers);
复制代码

合并所有的partial向量为一个完整的文档向量

 输入:partialVectorOutputPath   输出:outputDir

 Mapper:Mapper

 PartialVectorMergeReducer

    //汇总TFIDF向量Vector vector = new RandomAccessSparseVector(dimension, 10);for (VectorWritable value : values) {vector.assign(value.get(), Functions.PLUS);}

 

这篇关于Mahout源码分析之 -- 文档向量化TF-IDF的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/1055225

相关文章

性能分析之MySQL索引实战案例

文章目录 一、前言二、准备三、MySQL索引优化四、MySQL 索引知识回顾五、总结 一、前言 在上一讲性能工具之 JProfiler 简单登录案例分析实战中已经发现SQL没有建立索引问题,本文将一起从代码层去分析为什么没有建立索引? 开源ERP项目地址:https://gitee.com/jishenghua/JSH_ERP 二、准备 打开IDEA找到登录请求资源路径位置

JAVA智听未来一站式有声阅读平台听书系统小程序源码

智听未来,一站式有声阅读平台听书系统 🌟&nbsp;开篇:遇见未来,从“智听”开始 在这个快节奏的时代,你是否渴望在忙碌的间隙,找到一片属于自己的宁静角落?是否梦想着能随时随地,沉浸在知识的海洋,或是故事的奇幻世界里?今天,就让我带你一起探索“智听未来”——这一站式有声阅读平台听书系统,它正悄悄改变着我们的阅读方式,让未来触手可及! 📚&nbsp;第一站:海量资源,应有尽有 走进“智听

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

Java ArrayList扩容机制 (源码解读)

结论:初始长度为10,若所需长度小于1.5倍原长度,则按照1.5倍扩容。若不够用则按照所需长度扩容。 一. 明确类内部重要变量含义         1:数组默认长度         2:这是一个共享的空数组实例,用于明确创建长度为0时的ArrayList ,比如通过 new ArrayList<>(0),ArrayList 内部的数组 elementData 会指向这个 EMPTY_EL

如何在Visual Studio中调试.NET源码

今天偶然在看别人代码时,发现在他的代码里使用了Any判断List<T>是否为空。 我一般的做法是先判断是否为null,再判断Count。 看了一下Count的源码如下: 1 [__DynamicallyInvokable]2 public int Count3 {4 [__DynamicallyInvokable]5 get

计算机毕业设计 大学志愿填报系统 Java+SpringBoot+Vue 前后端分离 文档报告 代码讲解 安装调试

🍊作者:计算机编程-吉哥 🍊简介:专业从事JavaWeb程序开发,微信小程序开发,定制化项目、 源码、代码讲解、文档撰写、ppt制作。做自己喜欢的事,生活就是快乐的。 🍊心愿:点赞 👍 收藏 ⭐评论 📝 🍅 文末获取源码联系 👇🏻 精彩专栏推荐订阅 👇🏻 不然下次找不到哟~Java毕业设计项目~热门选题推荐《1000套》 目录 1.技术选型 2.开发工具 3.功能

SWAP作物生长模型安装教程、数据制备、敏感性分析、气候变化影响、R模型敏感性分析与贝叶斯优化、Fortran源代码分析、气候数据降尺度与变化影响分析

查看原文>>>全流程SWAP农业模型数据制备、敏感性分析及气候变化影响实践技术应用 SWAP模型是由荷兰瓦赫宁根大学开发的先进农作物模型,它综合考虑了土壤-水分-大气以及植被间的相互作用;是一种描述作物生长过程的一种机理性作物生长模型。它不但运用Richard方程,使其能够精确的模拟土壤中水分的运动,而且耦合了WOFOST作物模型使作物的生长描述更为科学。 本文让更多的科研人员和农业工作者

MOLE 2.5 分析分子通道和孔隙

软件介绍 生物大分子通道和孔隙在生物学中发挥着重要作用,例如在分子识别和酶底物特异性方面。 我们介绍了一种名为 MOLE 2.5 的高级软件工具,该工具旨在分析分子通道和孔隙。 与其他可用软件工具的基准测试表明,MOLE 2.5 相比更快、更强大、功能更丰富。作为一项新功能,MOLE 2.5 可以估算已识别通道的物理化学性质。 软件下载 https://pan.quark.cn/s/57

工厂ERP管理系统实现源码(JAVA)

工厂进销存管理系统是一个集采购管理、仓库管理、生产管理和销售管理于一体的综合解决方案。该系统旨在帮助企业优化流程、提高效率、降低成本,并实时掌握各环节的运营状况。 在采购管理方面,系统能够处理采购订单、供应商管理和采购入库等流程,确保采购过程的透明和高效。仓库管理方面,实现库存的精准管理,包括入库、出库、盘点等操作,确保库存数据的准确性和实时性。 生产管理模块则涵盖了生产计划制定、物料需求计划、

衡石分析平台使用手册-单机安装及启动

单机安装及启动​ 本文讲述如何在单机环境下进行 HENGSHI SENSE 安装的操作过程。 在安装前请确认网络环境,如果是隔离环境,无法连接互联网时,请先按照 离线环境安装依赖的指导进行依赖包的安装,然后按照本文的指导继续操作。如果网络环境可以连接互联网,请直接按照本文的指导进行安装。 准备工作​ 请参考安装环境文档准备安装环境。 配置用户与安装目录。 在操作前请检查您是否有 sud