基于LSTM的新闻中文文本分类——基于textCNN与textRNN

2024-04-15 09:12

本文主要是介绍基于LSTM的新闻中文文本分类——基于textCNN与textRNN,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

构建词语字典

def build_vocab(file_path, tokenizer, max_size, min_freq):# 定义词汇表字典:使用 vocab_dic = {} 初始化一个空字典,用于存储每个词及其出现频率vocab_dic = {}with open(file_path, 'r', encoding='UTF-8') as f:for line in tqdm(f):lin = line.strip()if not lin:continuecontent = lin.split('\t')[0]"""分割与计数:对每行内容进行处理,先用 strip() 去除首尾空白符,然后分割出需要处理的文本内容(默认以制表符\t分割)。使用 tokenizer(content) 对内容进行分词,然后统计每个词的出现次数"""for word in tokenizer(content):vocab_dic[word] = vocab_dic.get(word, 0) + 1"""词汇按出现频率筛选(频率大于等于 min_freq)并排序(按频率降序),最多保留 max_size 个词汇"""vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq], key=lambda x: x[1], reverse=True)[:max_size]'''重构词汇表:将筛选并排序后的词汇列表转换为字典,每个词汇映射到一个唯一的索引。词汇表中还额外添加了特殊标记 UNK(未知词)和 PAD(填充符),它们分别在词汇表的末尾添加。'''vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})#  k:v ===> 词:索引return vocab_dic

数据集构建

def build_dataset(config, ues_word):if ues_word:tokenizer = lambda x: x.split(' ')  # 以空格隔开,word-levelelse:tokenizer = lambda x: [y for y in x]  # char-levelif os.path.exists(config.vocab_path):vocab = pkl.load(open(config.vocab_path, 'rb'))else:vocab = build_vocab(config.train_path, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1)pkl.dump(vocab, open(config.vocab_path, 'wb'))print(f"Vocab size: {len(vocab)}")train = load_dataset(config.train_path,vocab, config.pad_size)dev = load_dataset(config.dev_path,vocab, config.pad_size)test = load_dataset(config.test_path,vocab, config.pad_size)return vocab, train, dev, test

数据预处理

数据格式

首先数据格式:
文本内容以及对应过的标签

data label

在这里插入图片描述

数据预处理
        数据预处理:去除空行: 忽略空行。分割行: 将每一行通过制表符\t分割为content(内容)和label(标签)。文本转换: 使用tokenizer函数将content分词。序列填充或截断: 根据pad_size参数(默认为32),如果分词后的序列长度小于pad_size,则用vocab字典中的PAD标记进行填充;如果长度大于pad_size,则进行截断。
def load_dataset(path,vocab,tokenizer, pad_size=32):contents = []with open(path, 'r', encoding='UTF-8') as f:for line in tqdm(f):lin = line.strip()if not lin:continuecontent, label = lin.split('\t')words_line = []token = tokenizer(content)seq_len = len(token)if pad_size:if len(token) < pad_size:token.extend([vocab.get(PAD)] * (pad_size - len(token)))else:token = token[:pad_size]seq_len = pad_size# word to idfor word in token:words_line.append(vocab.get(word, vocab.get(UNK)))contents.append((words_line, int(label), seq_len))return contents  # [([...], 0), ([...], 1), ...]
数据接口类
class DatasetIterater(object):def __init__(self, batches, batch_size, device):self.batch_size = batch_sizeself.batches = batchesself.n_batches = len(batches) // batch_sizeself.residue = False  # 记录batch数量是否为整数 if len(batches) % self.n_batches != 0:self.residue = Trueself.index = 0self.device = devicedef _to_tensor(self, datas):# xx = [xxx[2] for xxx in datas]# indexx = np.argsort(xx)[::-1]# datas = np.array(datas)[indexx]x = torch.LongTensor([_[0] for _ in datas]).to(self.device)y = torch.LongTensor([_[1] for _ in datas]).to(self.device)bigram = torch.LongTensor([_[3] for _ in datas]).to(self.device)trigram = torch.LongTensor([_[4] for _ in datas]).to(self.device)# pad前的长度(超过pad_size的设为pad_size)seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device)return (x, seq_len, bigram, trigram), ydef __next__(self):if self.residue and self.index == self.n_batches:batches = self.batches[self.index * self.batch_size: len(self.batches)]self.index += 1batches = self._to_tensor(batches)return batcheselif self.index > self.n_batches:self.index = 0raise StopIterationelse:batches = self.batches[self.index * self.batch_size: (self.index + 1) * self.batch_size]self.index += 1batches = self._to_tensor(batches)return batchesdef __iter__(self):return selfdef __len__(self):if self.residue:return self.n_batches + 1else:return self.n_batches

加载预训练模型

  1. 设置文件路径和参数:

    • vocab_dir:词汇表的文件路径,这个文件包含从词汇到索引的映射。
    • pretrain_dir:预训练词向量文件的路径。
    • emb_dim:词向量的维度,这里设为300。
    • filename_trimmed_dir:压缩后保存新词向量的文件路径。
  2. 加载词汇表:

    • 使用picklepkl)加载词汇表,得到word_to_id字典,它将词汇映射到一个唯一的索引。
  3. 初始化词向量矩阵:

    • 创建一个随机初始化的词向量矩阵embeddings,其形状为词汇表长度×词向量维度(len(word_to_id), emb_dim)。
  4. 读取预训练的词向量:

    • 打开预训练词向量文件,按行读取。
    • 对于每一行,去掉首尾空白并分割空格,得到一个列表lin,其中lin[0]是词汇,lin[1:301]是对应的300维词向量。
  5. 更新词向量矩阵:

    • 如果词汇lin[0]存在于word_to_id中,找到对应的索引idx
    • lin[1:301]中的字符串转换为浮点数,形成新的词向量emb
    • 更新embeddings矩阵中的idx行,即用新的词向量替换原来的随机向量。
  6. 保存词向量矩阵:

    • 使用numpysavez_compressed方法,将更新后的embeddings矩阵压缩保存到指定路径。
    '''提取预训练词向量'''vocab_dir = "./THUCNews/data/vocab.pkl"pretrain_dir = "./THUCNews/data/sgns.sogou.char"emb_dim = 300filename_trimmed_dir = "./THUCNews/data/vocab.embedding.sougou"word_to_id = pkl.load(open(vocab_dir, 'rb'))embeddings = np.random.rand(len(word_to_id), emb_dim)f = open(pretrain_dir, "r", encoding='UTF-8')for i, line in enumerate(f.readlines()):# if i == 0:  # 若第一行是标题,则跳过#     continuelin = line.strip().split(" ")if lin[0] in word_to_id:idx = word_to_id[lin[0]]emb = [float(x) for x in lin[1:(emb_dim+1)]]embeddings[idx] = np.asarray(emb, dtype='float32')f.close()np.savez_compressed(filename_trimmed_dir, embeddings=embeddings)

模型定义

输入文本先通过embedding层转换为词向量表示。
添加一个维度以适配卷积操作(unsqueeze(1))。
应用多个卷积层和池化层(conv_and_pool),然后将结果拼接。
应用Dropout。
通过全连接层得到最终分类结果。

textcnn
# coding: UTF-8
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as npclass Config(object):"""配置参数"""def __init__(self, dataset, embedding):self.model_name = 'TextCNN'self.train_path = dataset + '/data/train.txt'                                # 训练集self.dev_path = dataset + '/data/dev.txt'                                    # 验证集self.test_path = dataset + '/data/test.txt'                                  # 测试集self.class_list = [x.strip() for x in open(dataset + '/data/class.txt').readlines()]                                # 类别名单self.vocab_path = dataset + '/data/vocab.pkl'                                # 词表self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt'        # 模型训练结果self.log_path = dataset + '/log/' + self.model_nameself.embedding_pretrained = torch.tensor(np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32'))\if embedding != 'random' else None                                       # 预训练词向量self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')   # 设备self.dropout = 0.5                                              # 随机失活self.require_improvement = 1000                                 # 若超过1000batch效果还没提升,则提前结束训练self.num_classes = len(self.class_list)                         # 类别数self.n_vocab = 0                                                # 词表大小,在运行时赋值self.num_epochs = 20                                            # epoch数self.batch_size = 128                                           # mini-batch大小self.pad_size = 32                                              # 每句话处理成的长度(短填长切)self.learning_rate = 1e-3                                       # 学习率self.embed = self.embedding_pretrained.size(1)\if self.embedding_pretrained is not None else 300           # 字向量维度self.filter_sizes = (2, 3, 4)                                   # 卷积核尺寸self.num_filters = 256                                          # 卷积核数量(channels数)'''Convolutional Neural Networks for Sentence Classification'''class Model(nn.Module):def __init__(self, config):super(Model, self).__init__()if config.embedding_pretrained is not None:self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)else:self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)self.convs = nn.ModuleList([nn.Conv2d(1, config.num_filters, (k, config.embed)) for k in config.filter_sizes])self.dropout = nn.Dropout(config.dropout)self.fc = nn.Linear(config.num_filters * len(config.filter_sizes), config.num_classes)def conv_and_pool(self, x, conv):x = F.relu(conv(x)).squeeze(3)x = F.max_pool1d(x, x.size(2)).squeeze(2)return xdef forward(self, x):#print (x[0].shape)out = self.embedding(x[0])out = out.unsqueeze(1)out = torch.cat([self.conv_and_pool(out, conv) for conv in self.convs], 1)out = self.dropout(out)out = self.fc(out)return out
textRnn模型定义
# coding: UTF-8
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as npclass Config(object):"""配置参数"""def __init__(self, dataset, embedding):self.model_name = 'TextRNN'self.train_path = dataset + '/data/train.txt'                                # 训练集self.dev_path = dataset + '/data/dev.txt'                                    # 验证集self.test_path = dataset + '/data/test.txt'                                  # 测试集self.class_list = [x.strip() for x in open(dataset + '/data/class.txt').readlines()]                                # 类别名单self.vocab_path = dataset + '/data/vocab.pkl'                                # 词表self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt'        # 模型训练结果self.log_path = dataset + '/log/' + self.model_nameself.embedding_pretrained = torch.tensor(np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32'))\if embedding != 'random' else None                                       # 预训练词向量self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')   # 设备self.dropout = 0.5                                              # 随机失活self.require_improvement = 1000                                 # 若超过1000batch效果还没提升,则提前结束训练self.num_classes = len(self.class_list)                         # 类别数self.n_vocab = 0                                                # 词表大小,在运行时赋值self.num_epochs = 10                                            # epoch数self.batch_size = 128                                           # mini-batch大小self.pad_size = 32                                              # 每句话处理成的长度(短填长切)self.learning_rate = 1e-3                                       # 学习率self.embed = self.embedding_pretrained.size(1)\if self.embedding_pretrained is not None else 300           # 字向量维度, 若使用了预训练词向量,则维度统一self.hidden_size = 128                                          # lstm隐藏层self.num_layers = 2                                             # lstm层数'''Recurrent Neural Network for Text Classification with Multi-Task Learning'''class Model(nn.Module):def __init__(self, config):super(Model, self).__init__()if config.embedding_pretrained is not None:self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)else:self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)self.lstm = nn.LSTM(config.embed, config.hidden_size, config.num_layers,bidirectional=True, batch_first=True, dropout=config.dropout)self.fc = nn.Linear(config.hidden_size * 2, config.num_classes)def forward(self, x):x, _ = xout = self.embedding(x)  # [batch_size, seq_len, embeding]=[128, 32, 300]out, _ = self.lstm(out)out = self.fc(out[:, -1, :])  # 句子最后时刻的 hidden statereturn out

训练、测试、验证

# coding: UTF-8
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn import metrics
import time
from utils import get_time_dif
from tensorboardX import SummaryWriter# 权重初始化,默认xavier
def init_network(model, method='xavier', exclude='embedding', seed=123):for name, w in model.named_parameters():if exclude not in name:if 'weight' in name:if method == 'xavier':nn.init.xavier_normal_(w)elif method == 'kaiming':nn.init.kaiming_normal_(w)else:nn.init.normal_(w)elif 'bias' in name:nn.init.constant_(w, 0)else:passdef train(config, model, train_iter, dev_iter, test_iter,writer):start_time = time.time()model.train()optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)# 学习率指数衰减,每次epoch:学习率 = gamma * 学习率# scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)total_batch = 0  # 记录进行到多少batchdev_best_loss = float('inf')last_improve = 0  # 记录上次验证集loss下降的batch数flag = False  # 记录是否很久没有效果提升#writer = SummaryWriter(log_dir=config.log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime()))for epoch in range(config.num_epochs):print('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs))# scheduler.step() # 学习率衰减for i, (trains, labels) in enumerate(train_iter):#print (trains[0].shape)outputs = model(trains)model.zero_grad()loss = F.cross_entropy(outputs, labels)loss.backward()optimizer.step()if total_batch % 100 == 0:# 每多少轮输出在训练集和验证集上的效果true = labels.data.cpu()predic = torch.max(outputs.data, 1)[1].cpu()train_acc = metrics.accuracy_score(true, predic)dev_acc, dev_loss = evaluate(config, model, dev_iter)if dev_loss < dev_best_loss:dev_best_loss = dev_losstorch.save(model.state_dict(), config.save_path)improve = '*'last_improve = total_batchelse:improve = ''time_dif = get_time_dif(start_time)msg = 'Iter: {0:>6},  Train Loss: {1:>5.2},  Train Acc: {2:>6.2%},  Val Loss: {3:>5.2},  Val Acc: {4:>6.2%},  Time: {5} {6}'print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, time_dif, improve))writer.add_scalar("loss/train", loss.item(), total_batch)writer.add_scalar("loss/dev", dev_loss, total_batch)writer.add_scalar("acc/train", train_acc, total_batch)writer.add_scalar("acc/dev", dev_acc, total_batch)model.train()total_batch += 1if total_batch - last_improve > config.require_improvement:# 验证集loss超过1000batch没下降,结束训练print("No optimization for a long time, auto-stopping...")flag = Truebreakif flag:breakwriter.close()test(config, model, test_iter)def test(config, model, test_iter):# testmodel.load_state_dict(torch.load(config.save_path))model.eval()start_time = time.time()test_acc, test_loss, test_report, test_confusion = evaluate(config, model, test_iter, test=True)msg = 'Test Loss: {0:>5.2},  Test Acc: {1:>6.2%}'print(msg.format(test_loss, test_acc))print("Precision, Recall and F1-Score...")print(test_report)print("Confusion Matrix...")print(test_confusion)time_dif = get_time_dif(start_time)print("Time usage:", time_dif)def evaluate(config, model, data_iter, test=False):model.eval()loss_total = 0predict_all = np.array([], dtype=int)labels_all = np.array([], dtype=int)with torch.no_grad():for texts, labels in data_iter:outputs = model(texts)loss = F.cross_entropy(outputs, labels)loss_total += losslabels = labels.data.cpu().numpy()predic = torch.max(outputs.data, 1)[1].cpu().numpy()labels_all = np.append(labels_all, labels)predict_all = np.append(predict_all, predic)acc = metrics.accuracy_score(labels_all, predict_all)if test:report = metrics.classification_report(labels_all, predict_all, target_names=config.class_list, digits=4)confusion = metrics.confusion_matrix(labels_all, predict_all)return acc, loss_total / len(data_iter), report, confusionreturn acc, loss_total / len(data_iter)

github项目地址

这篇关于基于LSTM的新闻中文文本分类——基于textCNN与textRNN的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/905420

相关文章

中文分词jieba库的使用与实景应用(一)

知识星球:https://articles.zsxq.com/id_fxvgc803qmr2.html 目录 一.定义: 精确模式(默认模式): 全模式: 搜索引擎模式: paddle 模式(基于深度学习的分词模式): 二 自定义词典 三.文本解析   调整词出现的频率 四. 关键词提取 A. 基于TF-IDF算法的关键词提取 B. 基于TextRank算法的关键词提取

基于人工智能的图像分类系统

目录 引言项目背景环境准备 硬件要求软件安装与配置系统设计 系统架构关键技术代码示例 数据预处理模型训练模型预测应用场景结论 1. 引言 图像分类是计算机视觉中的一个重要任务,目标是自动识别图像中的对象类别。通过卷积神经网络(CNN)等深度学习技术,我们可以构建高效的图像分类系统,广泛应用于自动驾驶、医疗影像诊断、监控分析等领域。本文将介绍如何构建一个基于人工智能的图像分类系统,包括环境

认识、理解、分类——acm之搜索

普通搜索方法有两种:1、广度优先搜索;2、深度优先搜索; 更多搜索方法: 3、双向广度优先搜索; 4、启发式搜索(包括A*算法等); 搜索通常会用到的知识点:状态压缩(位压缩,利用hash思想压缩)。

Vue3项目开发——新闻发布管理系统(六)

文章目录 八、首页设计开发1、页面设计2、登录访问拦截实现3、用户基本信息显示①封装用户基本信息获取接口②用户基本信息存储③用户基本信息调用④用户基本信息动态渲染 4、退出功能实现①注册点击事件②添加退出功能③数据清理 5、代码下载 八、首页设计开发 登录成功后,系统就进入了首页。接下来,也就进行首页的开发了。 1、页面设计 系统页面主要分为三部分,左侧为系统的菜单栏,右侧

vscode中文乱码问题,注释,终端,调试乱码一劳永逸版

忘记咋回事突然出现了乱码问题,很多方法都试了,注释乱码解决了,终端又乱码,调试窗口也乱码,最后经过本人不懈努力,终于全部解决了,现在分享给大家我的方法。 乱码的原因是各个地方用的编码格式不统一,所以把他们设成统一的utf8. 1.电脑的编码格式 开始-设置-时间和语言-语言和区域 管理语言设置-更改系统区域设置-勾选Bata版:使用utf8-确定-然后按指示重启 2.vscode

解决Office Word不能切换中文输入

我们在使用WORD的时可能会经常碰到WORD中无法输入中文的情况。因为,虽然我们安装了搜狗输入法,但是到我们在WORD中使用搜狗的输入法的切换中英文的按键的时候会发现根本没有效果,无法将输入法切换成中文的。下面我就介绍一下如何在WORD中把搜狗输入法切换到中文。

Level3 — PART 3 — 自然语言处理与文本分析

目录 自然语言处理概要 分词与词性标注 N-Gram 分词 分词及词性标注的难点 法则式分词法 全切分 FMM和BMM Bi-direction MM 优缺点 统计式分词法 N-Gram概率模型 HMM概率模型 词性标注(Part-of-Speech Tagging) HMM 文本挖掘概要 信息检索(Information Retrieval) 全文扫描 关键词

用Pytho解决分类问题_DBSCAN聚类算法模板

一:DBSCAN聚类算法的介绍 DBSCAN(Density-Based Spatial Clustering of Applications with Noise)是一种基于密度的聚类算法,DBSCAN算法的核心思想是将具有足够高密度的区域划分为簇,并能够在具有噪声的空间数据库中发现任意形状的簇。 DBSCAN算法的主要特点包括: 1. 基于密度的聚类:DBSCAN算法通过识别被低密

sqlite不支持中文排序,采用java排序

方式一 不支持含有重复字段进行排序 /*** sqlite不支持中文排序,改用java排序* 根据指定的对象属性字段,排序对象集合,顺序* @param list* @param field* @return*/public static List sortListByField(List<?> list,String field){List temp = new ArrayList(

PMP–一、二、三模–分类–14.敏捷–技巧–看板面板与燃尽图燃起图

文章目录 技巧一模14.敏捷--方法--看板(类似卡片)1、 [单选] 根据项目的特点,项目经理建议选择一种敏捷方法,该方法限制团队成员在任何给定时间执行的任务数。此方法还允许团队提高工作过程中问题和瓶颈的可见性。项目经理建议采用以下哪种方法? 易错14.敏捷--精益、敏捷、看板(类似卡片)--敏捷、精益和看板方法共同的重点在于交付价值、尊重人、减少浪费、透明化、适应变更以及持续改善等方面。