本文主要是介绍基于LSTM的新闻中文文本分类——基于textCNN与textRNN,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
构建词语字典
def build_vocab(file_path, tokenizer, max_size, min_freq):# 定义词汇表字典:使用 vocab_dic = {} 初始化一个空字典,用于存储每个词及其出现频率vocab_dic = {}with open(file_path, 'r', encoding='UTF-8') as f:for line in tqdm(f):lin = line.strip()if not lin:continuecontent = lin.split('\t')[0]"""分割与计数:对每行内容进行处理,先用 strip() 去除首尾空白符,然后分割出需要处理的文本内容(默认以制表符\t分割)。使用 tokenizer(content) 对内容进行分词,然后统计每个词的出现次数"""for word in tokenizer(content):vocab_dic[word] = vocab_dic.get(word, 0) + 1"""词汇按出现频率筛选(频率大于等于 min_freq)并排序(按频率降序),最多保留 max_size 个词汇"""vocab_list = sorted([_ for _ in vocab_dic.items() if _[1] >= min_freq], key=lambda x: x[1], reverse=True)[:max_size]'''重构词汇表:将筛选并排序后的词汇列表转换为字典,每个词汇映射到一个唯一的索引。词汇表中还额外添加了特殊标记 UNK(未知词)和 PAD(填充符),它们分别在词汇表的末尾添加。'''vocab_dic = {word_count[0]: idx for idx, word_count in enumerate(vocab_list)}vocab_dic.update({UNK: len(vocab_dic), PAD: len(vocab_dic) + 1})# k:v ===> 词:索引return vocab_dic
数据集构建
def build_dataset(config, ues_word):if ues_word:tokenizer = lambda x: x.split(' ') # 以空格隔开,word-levelelse:tokenizer = lambda x: [y for y in x] # char-levelif os.path.exists(config.vocab_path):vocab = pkl.load(open(config.vocab_path, 'rb'))else:vocab = build_vocab(config.train_path, tokenizer=tokenizer, max_size=MAX_VOCAB_SIZE, min_freq=1)pkl.dump(vocab, open(config.vocab_path, 'wb'))print(f"Vocab size: {len(vocab)}")train = load_dataset(config.train_path,vocab, config.pad_size)dev = load_dataset(config.dev_path,vocab, config.pad_size)test = load_dataset(config.test_path,vocab, config.pad_size)return vocab, train, dev, test
数据预处理
数据格式
首先数据格式:
文本内容以及对应过的标签
data label
数据预处理
数据预处理:去除空行: 忽略空行。分割行: 将每一行通过制表符\t分割为content(内容)和label(标签)。文本转换: 使用tokenizer函数将content分词。序列填充或截断: 根据pad_size参数(默认为32),如果分词后的序列长度小于pad_size,则用vocab字典中的PAD标记进行填充;如果长度大于pad_size,则进行截断。
def load_dataset(path,vocab,tokenizer, pad_size=32):contents = []with open(path, 'r', encoding='UTF-8') as f:for line in tqdm(f):lin = line.strip()if not lin:continuecontent, label = lin.split('\t')words_line = []token = tokenizer(content)seq_len = len(token)if pad_size:if len(token) < pad_size:token.extend([vocab.get(PAD)] * (pad_size - len(token)))else:token = token[:pad_size]seq_len = pad_size# word to idfor word in token:words_line.append(vocab.get(word, vocab.get(UNK)))contents.append((words_line, int(label), seq_len))return contents # [([...], 0), ([...], 1), ...]
数据接口类
class DatasetIterater(object):def __init__(self, batches, batch_size, device):self.batch_size = batch_sizeself.batches = batchesself.n_batches = len(batches) // batch_sizeself.residue = False # 记录batch数量是否为整数 if len(batches) % self.n_batches != 0:self.residue = Trueself.index = 0self.device = devicedef _to_tensor(self, datas):# xx = [xxx[2] for xxx in datas]# indexx = np.argsort(xx)[::-1]# datas = np.array(datas)[indexx]x = torch.LongTensor([_[0] for _ in datas]).to(self.device)y = torch.LongTensor([_[1] for _ in datas]).to(self.device)bigram = torch.LongTensor([_[3] for _ in datas]).to(self.device)trigram = torch.LongTensor([_[4] for _ in datas]).to(self.device)# pad前的长度(超过pad_size的设为pad_size)seq_len = torch.LongTensor([_[2] for _ in datas]).to(self.device)return (x, seq_len, bigram, trigram), ydef __next__(self):if self.residue and self.index == self.n_batches:batches = self.batches[self.index * self.batch_size: len(self.batches)]self.index += 1batches = self._to_tensor(batches)return batcheselif self.index > self.n_batches:self.index = 0raise StopIterationelse:batches = self.batches[self.index * self.batch_size: (self.index + 1) * self.batch_size]self.index += 1batches = self._to_tensor(batches)return batchesdef __iter__(self):return selfdef __len__(self):if self.residue:return self.n_batches + 1else:return self.n_batches
加载预训练模型
-
设置文件路径和参数:
vocab_dir
:词汇表的文件路径,这个文件包含从词汇到索引的映射。pretrain_dir
:预训练词向量文件的路径。emb_dim
:词向量的维度,这里设为300。filename_trimmed_dir
:压缩后保存新词向量的文件路径。
-
加载词汇表:
- 使用
pickle
(pkl
)加载词汇表,得到word_to_id
字典,它将词汇映射到一个唯一的索引。
- 使用
-
初始化词向量矩阵:
- 创建一个随机初始化的词向量矩阵
embeddings
,其形状为词汇表长度×词向量维度(len(word_to_id), emb_dim
)。
- 创建一个随机初始化的词向量矩阵
-
读取预训练的词向量:
- 打开预训练词向量文件,按行读取。
- 对于每一行,去掉首尾空白并分割空格,得到一个列表
lin
,其中lin[0]
是词汇,lin[1:301]
是对应的300维词向量。
-
更新词向量矩阵:
- 如果词汇
lin[0]
存在于word_to_id
中,找到对应的索引idx
。 - 将
lin[1:301]
中的字符串转换为浮点数,形成新的词向量emb
。 - 更新
embeddings
矩阵中的idx
行,即用新的词向量替换原来的随机向量。
- 如果词汇
-
保存词向量矩阵:
- 使用
numpy
的savez_compressed
方法,将更新后的embeddings
矩阵压缩保存到指定路径。
- 使用
'''提取预训练词向量'''vocab_dir = "./THUCNews/data/vocab.pkl"pretrain_dir = "./THUCNews/data/sgns.sogou.char"emb_dim = 300filename_trimmed_dir = "./THUCNews/data/vocab.embedding.sougou"word_to_id = pkl.load(open(vocab_dir, 'rb'))embeddings = np.random.rand(len(word_to_id), emb_dim)f = open(pretrain_dir, "r", encoding='UTF-8')for i, line in enumerate(f.readlines()):# if i == 0: # 若第一行是标题,则跳过# continuelin = line.strip().split(" ")if lin[0] in word_to_id:idx = word_to_id[lin[0]]emb = [float(x) for x in lin[1:(emb_dim+1)]]embeddings[idx] = np.asarray(emb, dtype='float32')f.close()np.savez_compressed(filename_trimmed_dir, embeddings=embeddings)
模型定义
输入文本先通过embedding层转换为词向量表示。
添加一个维度以适配卷积操作(unsqueeze(1))。
应用多个卷积层和池化层(conv_and_pool),然后将结果拼接。
应用Dropout。
通过全连接层得到最终分类结果。
textcnn
# coding: UTF-8
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as npclass Config(object):"""配置参数"""def __init__(self, dataset, embedding):self.model_name = 'TextCNN'self.train_path = dataset + '/data/train.txt' # 训练集self.dev_path = dataset + '/data/dev.txt' # 验证集self.test_path = dataset + '/data/test.txt' # 测试集self.class_list = [x.strip() for x in open(dataset + '/data/class.txt').readlines()] # 类别名单self.vocab_path = dataset + '/data/vocab.pkl' # 词表self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt' # 模型训练结果self.log_path = dataset + '/log/' + self.model_nameself.embedding_pretrained = torch.tensor(np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32'))\if embedding != 'random' else None # 预训练词向量self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 设备self.dropout = 0.5 # 随机失活self.require_improvement = 1000 # 若超过1000batch效果还没提升,则提前结束训练self.num_classes = len(self.class_list) # 类别数self.n_vocab = 0 # 词表大小,在运行时赋值self.num_epochs = 20 # epoch数self.batch_size = 128 # mini-batch大小self.pad_size = 32 # 每句话处理成的长度(短填长切)self.learning_rate = 1e-3 # 学习率self.embed = self.embedding_pretrained.size(1)\if self.embedding_pretrained is not None else 300 # 字向量维度self.filter_sizes = (2, 3, 4) # 卷积核尺寸self.num_filters = 256 # 卷积核数量(channels数)'''Convolutional Neural Networks for Sentence Classification'''class Model(nn.Module):def __init__(self, config):super(Model, self).__init__()if config.embedding_pretrained is not None:self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)else:self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)self.convs = nn.ModuleList([nn.Conv2d(1, config.num_filters, (k, config.embed)) for k in config.filter_sizes])self.dropout = nn.Dropout(config.dropout)self.fc = nn.Linear(config.num_filters * len(config.filter_sizes), config.num_classes)def conv_and_pool(self, x, conv):x = F.relu(conv(x)).squeeze(3)x = F.max_pool1d(x, x.size(2)).squeeze(2)return xdef forward(self, x):#print (x[0].shape)out = self.embedding(x[0])out = out.unsqueeze(1)out = torch.cat([self.conv_and_pool(out, conv) for conv in self.convs], 1)out = self.dropout(out)out = self.fc(out)return out
textRnn模型定义
# coding: UTF-8
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as npclass Config(object):"""配置参数"""def __init__(self, dataset, embedding):self.model_name = 'TextRNN'self.train_path = dataset + '/data/train.txt' # 训练集self.dev_path = dataset + '/data/dev.txt' # 验证集self.test_path = dataset + '/data/test.txt' # 测试集self.class_list = [x.strip() for x in open(dataset + '/data/class.txt').readlines()] # 类别名单self.vocab_path = dataset + '/data/vocab.pkl' # 词表self.save_path = dataset + '/saved_dict/' + self.model_name + '.ckpt' # 模型训练结果self.log_path = dataset + '/log/' + self.model_nameself.embedding_pretrained = torch.tensor(np.load(dataset + '/data/' + embedding)["embeddings"].astype('float32'))\if embedding != 'random' else None # 预训练词向量self.device = torch.device('cuda' if torch.cuda.is_available() else 'cpu') # 设备self.dropout = 0.5 # 随机失活self.require_improvement = 1000 # 若超过1000batch效果还没提升,则提前结束训练self.num_classes = len(self.class_list) # 类别数self.n_vocab = 0 # 词表大小,在运行时赋值self.num_epochs = 10 # epoch数self.batch_size = 128 # mini-batch大小self.pad_size = 32 # 每句话处理成的长度(短填长切)self.learning_rate = 1e-3 # 学习率self.embed = self.embedding_pretrained.size(1)\if self.embedding_pretrained is not None else 300 # 字向量维度, 若使用了预训练词向量,则维度统一self.hidden_size = 128 # lstm隐藏层self.num_layers = 2 # lstm层数'''Recurrent Neural Network for Text Classification with Multi-Task Learning'''class Model(nn.Module):def __init__(self, config):super(Model, self).__init__()if config.embedding_pretrained is not None:self.embedding = nn.Embedding.from_pretrained(config.embedding_pretrained, freeze=False)else:self.embedding = nn.Embedding(config.n_vocab, config.embed, padding_idx=config.n_vocab - 1)self.lstm = nn.LSTM(config.embed, config.hidden_size, config.num_layers,bidirectional=True, batch_first=True, dropout=config.dropout)self.fc = nn.Linear(config.hidden_size * 2, config.num_classes)def forward(self, x):x, _ = xout = self.embedding(x) # [batch_size, seq_len, embeding]=[128, 32, 300]out, _ = self.lstm(out)out = self.fc(out[:, -1, :]) # 句子最后时刻的 hidden statereturn out
训练、测试、验证
# coding: UTF-8
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from sklearn import metrics
import time
from utils import get_time_dif
from tensorboardX import SummaryWriter# 权重初始化,默认xavier
def init_network(model, method='xavier', exclude='embedding', seed=123):for name, w in model.named_parameters():if exclude not in name:if 'weight' in name:if method == 'xavier':nn.init.xavier_normal_(w)elif method == 'kaiming':nn.init.kaiming_normal_(w)else:nn.init.normal_(w)elif 'bias' in name:nn.init.constant_(w, 0)else:passdef train(config, model, train_iter, dev_iter, test_iter,writer):start_time = time.time()model.train()optimizer = torch.optim.Adam(model.parameters(), lr=config.learning_rate)# 学习率指数衰减,每次epoch:学习率 = gamma * 学习率# scheduler = torch.optim.lr_scheduler.ExponentialLR(optimizer, gamma=0.9)total_batch = 0 # 记录进行到多少batchdev_best_loss = float('inf')last_improve = 0 # 记录上次验证集loss下降的batch数flag = False # 记录是否很久没有效果提升#writer = SummaryWriter(log_dir=config.log_path + '/' + time.strftime('%m-%d_%H.%M', time.localtime()))for epoch in range(config.num_epochs):print('Epoch [{}/{}]'.format(epoch + 1, config.num_epochs))# scheduler.step() # 学习率衰减for i, (trains, labels) in enumerate(train_iter):#print (trains[0].shape)outputs = model(trains)model.zero_grad()loss = F.cross_entropy(outputs, labels)loss.backward()optimizer.step()if total_batch % 100 == 0:# 每多少轮输出在训练集和验证集上的效果true = labels.data.cpu()predic = torch.max(outputs.data, 1)[1].cpu()train_acc = metrics.accuracy_score(true, predic)dev_acc, dev_loss = evaluate(config, model, dev_iter)if dev_loss < dev_best_loss:dev_best_loss = dev_losstorch.save(model.state_dict(), config.save_path)improve = '*'last_improve = total_batchelse:improve = ''time_dif = get_time_dif(start_time)msg = 'Iter: {0:>6}, Train Loss: {1:>5.2}, Train Acc: {2:>6.2%}, Val Loss: {3:>5.2}, Val Acc: {4:>6.2%}, Time: {5} {6}'print(msg.format(total_batch, loss.item(), train_acc, dev_loss, dev_acc, time_dif, improve))writer.add_scalar("loss/train", loss.item(), total_batch)writer.add_scalar("loss/dev", dev_loss, total_batch)writer.add_scalar("acc/train", train_acc, total_batch)writer.add_scalar("acc/dev", dev_acc, total_batch)model.train()total_batch += 1if total_batch - last_improve > config.require_improvement:# 验证集loss超过1000batch没下降,结束训练print("No optimization for a long time, auto-stopping...")flag = Truebreakif flag:breakwriter.close()test(config, model, test_iter)def test(config, model, test_iter):# testmodel.load_state_dict(torch.load(config.save_path))model.eval()start_time = time.time()test_acc, test_loss, test_report, test_confusion = evaluate(config, model, test_iter, test=True)msg = 'Test Loss: {0:>5.2}, Test Acc: {1:>6.2%}'print(msg.format(test_loss, test_acc))print("Precision, Recall and F1-Score...")print(test_report)print("Confusion Matrix...")print(test_confusion)time_dif = get_time_dif(start_time)print("Time usage:", time_dif)def evaluate(config, model, data_iter, test=False):model.eval()loss_total = 0predict_all = np.array([], dtype=int)labels_all = np.array([], dtype=int)with torch.no_grad():for texts, labels in data_iter:outputs = model(texts)loss = F.cross_entropy(outputs, labels)loss_total += losslabels = labels.data.cpu().numpy()predic = torch.max(outputs.data, 1)[1].cpu().numpy()labels_all = np.append(labels_all, labels)predict_all = np.append(predict_all, predic)acc = metrics.accuracy_score(labels_all, predict_all)if test:report = metrics.classification_report(labels_all, predict_all, target_names=config.class_list, digits=4)confusion = metrics.confusion_matrix(labels_all, predict_all)return acc, loss_total / len(data_iter), report, confusionreturn acc, loss_total / len(data_iter)
github项目地址
这篇关于基于LSTM的新闻中文文本分类——基于textCNN与textRNN的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!