本文主要是介绍Word2vec之skip-gram训练词向量,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
参考自哈工大车万翔等老师编写的《自然语言处理-基于预训练模型的方法》
# coding: utf-8
# Name: tesst2
# Author: dell
# Data: 2021/10/12# 基于负采样的skip-garm模型
import torch
import torch.nn.functional as F
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from collections import defaultdict
from tqdm import tqdm
import torch.optim as optimBOS_TOKEN = "<bos>"
EOS_TOKEN = "<eos>"
PAD_TOKEN = "<pad>"def load_reuters():# 从NLTK中导入Reuters数据处理模块from nltk.corpus import reuters# 获取Reuters数据中的所有句子(已完成标记解析)text = reuters.sents()# (可选)将预料中的词转换为小写text = [[word.lower() for word in sentence] for sentence in text]# 构建词表,并传入预留标记vocab = Vocab.build(text, reserved_tokens=[PAD_TOKEN, BOS_TOKEN, EOS_TOKEN])# 利用词表将文本数据转换为id表示corpus = [vocab.convert_tokens_to_ids(sentence) for sentence in text]return corpus, vocabclass Vocab:def __init__(self, tokens=None):self.idx_to_token = list()self.token_to_idx = dict()if tokens is not None:if "<unk>" not in tokens:tokens = tokens + ["<unk>"]for token in tokens:self.idx_to_token.append(token)self.token_to_idx[token] = len(self.idx_to_token) - 1self.unk = self.token_to_idx["<unk>"]@classmethoddef build(cls, text, min_freq=1, reserved_tokens=None):token_freqs = defaultdict(int)for sentence in text:for token in sentence:token_freqs[token] += 1uniq_tokens = ["<unk>"] + (reserved_tokens if reserved_tokens else [])uniq_tokens += [token for token, freq in token_freqs.items() if freq >= min_freq and token != "<unk>"]return cls(uniq_tokens)def __len__(self):# 返回词表的大小,即词表中有多少个互不相同的标记return len(self.idx_to_token)def __getitem__(self, token):# 查找输入标记对应的索引值,如果该标记不存在,返回<unk>对应的索引值return self.token_to_idx.get(token, self.unk)def convert_tokens_to_ids(self, tokens):# 查找一系列输入标记对应的索引值,此处直接使用self即可,会调用__getitem__得到idreturn [self[token] for token in tokens]# 或者是# return [self.token_to_idx[token] for token in tokens]def convert_ids_to_tokens(self, indices):# 查找一系列索引值对应的标记return [self.idx_to_token[index] for index in indices]class SGNSDataset(Dataset):def __init__(self, corpus, vocab, context_size=2, n_negatives=5, ns_dist=None):self.data = []self.bos = vocab[BOS_TOKEN]self.eos = vocab[EOS_TOKEN]self.pad = vocab[PAD_TOKEN]for sentence in tqdm(corpus, desc="Dataset Construction"):sentence = [self.bos] + sentence + [self.eos]for i in range(1, len(sentence)-1):# 模型输入: (w, context)w = sentence[i]left_context_index = max(0, i-context_size)right_context_index = min(len(sentence), i+context_size)context = sentence[left_context_index:i] + sentence[i+1:right_context_index+1]context += [self.pad] * (2*context_size - len(context_size))self.data.append((w, context))# 负采样数量self.n_negatives = n_negatives# 负采样分布:若参数ns_dist为None,则使用均匀分布(从词表中均匀采样)self.ns_dist = ns_dist if ns_dist else torch.ones(len(vocab))def __len__(self):return len(self.data)def __getitem__(self, i):return self.data[i]def collate_fn(self, examples):words = torch.tensor([ex[0] for ex in examples], dtype=torch.long)contexts = torch.tensor([ex[1] for ex in examples], dtype=torch.long)batch_size, context_size = contexts.shapeneg_contexts = []# 对批次内的样本分别进行负采样for i in range(batch_size):# 保证负样本不包含当前样本中的contextns_dist = self.ns_dist.index_fill(0, contexts[i], .0) # dim=0, 需要填充的tensor的索引-contexts[i], vale=.0# torch.multinomial--对ns_dist中的值,有放回(replacement=True)地抽取self.n_negatives * context_sizeneg_contexts.append(torch.multinomial(ns_dist, self.n_negatives * context_size, replacement=True))neg_contexts = torch.stack(neg_contexts, dim=0)return words, contexts, neg_contextsclass SGNSModel(nn.Module):def __init__(self, vocab_size, embedding_dim):super(SGNSModel, self).__init__()# 词向量self.w_embeddings = nn.Embedding(vocab_size, embedding_dim)# 上下文向量self.c_embeddings = nn.Embedding(vocab_size, embedding_dim)def forward_w(self, words):w_embeds = self.w_embeddings(words)return w_embedsdef forward_c(self, contexts):c_embeds = self.c_embeddings(contexts)return c_embedsdef get_unigram_distribution(corpus, vocab_size):# 从给定的语料中计算Unigram概率分布token_counts = torch.tensor([0]*vocab_size)total_count = 0for sentence in corpus:total_count += len(sentence)for token in sentence:token_counts[token] += 1unigram_dist = torch.div(token_counts.float(), total_count)return unigram_distdef save_pretrained(vocab, embeds, save_path):with open(save_path, "w") as writer:# 记录词向量大小writer.write(f"{embeds.shape[0]} {embeds.shape[1]}\n")for idx, token in enumerate(vocab.idx_to_token):vec = " ".join([f"{x}" for x in embeds[idx]])# 每一行对应一个单词以及由空格分隔的词向量writer.write(f"{token} {vec}\n")def main():# 设置超参数embedding_dim = 128context_size = 3batch_size = 1024n_negatives = 5 # 负样本数量num_epoch = 10# 读取文本数据corpus, vocab = load_reuters()# 计算Unigram概率分布unigram_dist = get_unigram_distribution(corpus, len(vocab))# 根据Unigram概率分布计算负采样分布: p(w)**0.75# 为了防止低频单词被忽略-->通过取 0.75 次方,低频单词的概率将稍微变大。negative_sampling_dist = unigram_dist ** 0.75negative_sampling_dist /= negative_sampling_dist.sum()# 构建SGNS训练数据集dataset = SGNSDataset(corpus, vocab, context_size=context_size, n_negatives=n_negatives, ns_dist=negative_sampling_dist)# data_loader = get_loader(dataset, batch_size)data_loader = DataLoader(dataset, batch_size)model = SGNSModel(len(vocab), embedding_dim)device = "cuda" if torch.cuda.is_available() else "cpu"model.to(device)optimizer = optim.Adam(model.parameters(), lr=0.001)model.train()for epoch in range(num_epoch):total_loss = 0for batch in tqdm(data_loader, desc=f"Training Epoch {epoch}"):words, contexts, neg_contexts = [x.to(device) for x in batch]optimizer.zero_grad()batch_size = words.shape[0]# 分贝提取batch内词、上下文和负样本的向量表示word_embeds = model.forward_w(words).unsqueeze(dim=2) # [batch_size, word_embedding, 1]context_embeds = model.forward_c(contexts) # [batch_size, context_num, context_word_embedding]neg_context_embeds = model.forward_c(neg_contexts) # [batch_size, neg_context_word_embedding]# 正样本的分类(对数)似然context_loss = F.logsigmoid(torch.bmm(context_embeds, word_embeds).seqeeze(dim=2)) # [batch_size, context_num]-->预测上下文的词context_loss = context_loss.mean(dim=1)# 负样本的分类(对数)似然# torch.neg()--->按元素取负-->output = -1 * inputneg_context_loss = F.logsigmoid(torch.bmm(neg_context_embeds, word_embeds).squeeze(dim=2).neg())neg_context_loss = neg_context_loss.view(batch_size, -1, n_negatives).sum(2) # [batch_size, context_size, n_negatives]neg_context_loss = neg_context_loss.mean(dim=1)# 总体损失loss = -(context_loss + neg_context_loss).mean()loss.backward()optimizer.step()total_loss += loss.item()print(f"Loss: {total_loss:.2f}")# 合并词向量矩阵与上下文向量矩阵,作为最终的预训练词向量combined_embeds = model.w_embeddings.weight + model.c_embeddings.weight# 将词向量保存至sgns,vec文件save_pretrained(vocab, combined_embeds.data, "sgns.vec")if __name__ == "__main__":main()
这篇关于Word2vec之skip-gram训练词向量的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!