本文主要是介绍LLM手撕,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
LayerNorm
import torch
from torch import nnclass LayerNorm(nn.Module):def __init__(self, hidden_size, eps=1e-6):super().__init__()self.hidden_size = hidden_size # 隐藏状态的大小self.eps = eps # 用于数值稳定性的一个小值# 初始化可学习的缩放和平移参数self.gamma = nn.Parameter(torch.ones(hidden_size)) # 缩放参数,初始值为全1self.beta = nn.Parameter(torch.zeros(hidden_size)) # 平移参数,初始值为全0def forward(self, x):# x 形状: (batch_size, seq_len, hidden_size)# 计算每个样本的均值和方差mean = x.mean(dim=-1, keepdim=True) # 计算最后一个维度的均值,形状: (batch_size, seq_len, 1)variance = x.var(dim=-1, keepdim=True, unbiased=False) # 计算最后一个维度的方差,形状: (batch_size, seq_len, 1)# 进行归一化x_normalized = (x - mean) / torch.sqrt(variance + self.eps) # 归一化,形状: (batch_size, seq_len, hidden_size)# 应用缩放和平移参数output = self.gamma * x_normalized + self.beta # 形状: (batch_size, seq_len, hidden_size)return outputdef test_layer_norm():batch_size = 2seq_len = 4hidden_size = 8# 随机生成输入数据x = torch.randn(batch_size, seq_len, hidden_size) # (batch_size, seq_len, hidden_size)# 创建 LayerNorm 模块layer_norm = LayerNorm(hidden_size)# 计算 LayerNorm 输出output = layer_norm(x)print("Input shape:", x.shape)print("Output shape:", output.shape)if __name__ == "__main__":test_layer_norm()
BatchNorm
import torch
from torch import nnclass BatchNorm(nn.Module):def __init__(self, hidden_size, eps=1e-5, momentum=0.1):super().__init__()self.hidden_size = hidden_size # 隐藏状态的大小self.eps = eps # 用于数值稳定性的一个小值self.momentum = momentum # 用于计算运行时均值和方差的动量# 初始化可学习的缩放和平移参数self.gamma = nn.Parameter(torch.ones(hidden_size)) # 缩放参数,初始值为全1self.beta = nn.Parameter(torch.zeros(hidden_size)) # 平移参数,初始值为全0# 初始化运行时均值和方差self.running_mean = torch.zeros(hidden_size) # 运行时均值,初始值为全0self.running_var = torch.ones(hidden_size) # 运行时方差,初始值为全1def forward(self, x):# x 形状: (batch_size, seq_len, hidden_size)if self.training:# 计算当前批次的均值和方差batch_mean = x.mean(dim=(0, 1), keepdim=False) # 计算前两个维度的均值,形状: (hidden_size)batch_var = x.var(dim=(0, 1), keepdim=False, unbiased=False) # 计算前两个维度的方差,形状: (hidden_size)# 更新运行时均值和方差self.running_mean = (1 - self.momentum) * self.running_mean + self.momentum * batch_meanself.running_var = (1 - self.momentum) * self.running_var + self.momentum * batch_varmean = batch_meanvariance = batch_varelse:# 使用运行时均值和方差mean = self.running_meanvariance = self.running_var# 进行归一化x_normalized = (x - mean) / torch.sqrt(variance + self.eps) # 归一化,形状: (batch_size, seq_len, hidden_size)# 应用缩放和平移参数output = self.gamma * x_normalized + self.beta # 形状: (batch_size, seq_len, hidden_size)return outputdef test_batch_norm():batch_size = 2seq_len = 4hidden_size = 8# 随机生成输入数据x = torch.randn(batch_size, seq_len, hidden_size) # (batch_size, seq_len, hidden_size)# 创建 BatchNorm 模块batch_norm = BatchNorm(hidden_size)# 计算 BatchNorm 输出output = batch_norm(x)print("Input shape:", x.shape)print("Output shape:", output.shape)if __name__ == "__main__":test_batch_norm()
Dropout
import torch
from torch import nnclass Dropout(nn.Module):def __init__(self, dropout_prob=0.1):super().__init__()self.dropout_prob = dropout_prob # Dropout 的概率def forward(self, x):if self.training:# 生成与输入形状相同的掩码,元素为 0 或 1,按照 dropout_prob 的概率为 0mask = (torch.rand(x.shape) > self.dropout_prob).float() # 掩码,形状与 x 相同# 归一化掩码,使得训练阶段和推理阶段的一致性output = mask * x / (1.0 - self.dropout_prob) # 形状与 x 相同else:output = x # 推理阶段,不进行 Dropoutreturn outputdef test_dropout():batch_size = 2seq_len = 4hidden_size = 8# 随机生成输入数据x = torch.randn(batch_size, seq_len, hidden_size) # (batch_size, seq_len, hidden_size)# 创建 Dropout 模块dropout = Dropout(dropout_prob=0.1)# 设置为训练模式dropout.train()output_train = dropout(x)# 设置为推理模式dropout.eval()output_eval = dropout(x)print("Input shape:", x.shape)print("Output shape during training:", output_train.shape)print("Output shape during evaluation:", output_eval.shape)if __name__ == "__main__":test_dropout()
Transformer位置编码
def sinusoidal_position_embedding(batch_size, nums_head, max_len, output_dim, device):# (max_len, 1)position = torch.arange(0, max_len, dtype=torch.float).unsqueeze(-1)# (output_dim//2)ids = torch.arange(0, output_dim // 2, dtype=torch.float) # 即公式里的i, i的范围是 [0,d/2]theta = torch.pow(10000, -2 * ids / output_dim)# (max_len, output_dim//2)embeddings = position * theta # 即公式里的:pos / (10000^(2i/d))# (max_len, output_dim//2, 2)embeddings = torch.stack([torch.sin(embeddings), torch.cos(embeddings)], dim=-1)# (bs, head, max_len, output_dim//2, 2)embeddings = embeddings.repeat((batch_size, nums_head, *([1] * len(embeddings.shape)))) # 在bs维度重复,其他维度都是1不重复# (bs, head, max_len, output_dim)# reshape后就是:偶数sin, 奇数cos了embeddings = torch.reshape(embeddings, (batch_size, nums_head, max_len, output_dim))embeddings = embeddings.to(device)return embeddings
RoPE
Self-attention
from math import sqrt
import torch
import torch.nn as nnclass Self_Attention(nn.Module):def __init__(self, input_dim, dim_k, dim_v):super(Self_Attention, self).__init__()self.q = nn.Linear(input_dim, dim_k)self.k = nn.Linear(input_dim, dim_k)self.v = nn.Linear(input_dim, dim_v)self._norm_fact = 1 / sqrt(dim_k)def forward(self, x):Q = self.q(x) # Q: batch_size * seq_len * dim_kK = self.k(x) # K: batch_size * seq_len * dim_kV = self.v(x) # V: batch_size * seq_len * dim_v# Q * K.T() / sqrt(dim_k)atten = torch.bmm(Q, K.permute(0, 2, 1)) * self._norm_fact # batch_size * seq_len * seq_len# 计算 Softmaxatten = torch.softmax(atten, dim=-1)# 计算输出output = torch.bmm(atten, V) # Q * K.T() * V # batch_size * seq_len * dim_vreturn output# 创建一个 Self_Attention 对象
input_dim = 64
dim_k = 32
dim_v = 32
self_attention = Self_Attention(input_dim, dim_k, dim_v)# 创建一个示例输入张量,形状为 batch_size * seq_len * input_dim
batch_size = 2
seq_len = 10
x = torch.randn(batch_size, seq_len, input_dim)# 运行前向传播
output = self_attention(x)print("Input shape:", x.shape)
print("Output shape:", output.shape)
Scaled Cross Product
import torch
from torch import nnclass ScaledDotProductAttention(nn.Module):def __init__(self):super().__init__()def forward(self, query, key, value, attention_mask=None):# query, key, value 形状: (batch_size, seq_len, hidden_size)# 计算注意力分数# key.transpose(-1, -2) 将最后两个维度进行转置,以进行点积# attention_scores 形状: (batch_size, seq_len, seq_len)d_k = query.size(-1) # 获取 hidden_sizeattention_scores = torch.matmul(query, key.transpose(-1, -2)) / torch.sqrt(torch.tensor(d_k, dtype=torch.float32))# 添加注意力掩码(seq_len, seq_len),掩码位置(1)的值为负无穷if attention_mask is not None:attention_scores += attention_mask * -1e9# 对注意力分数进行归一化,得到注意力概率attention_probs = torch.softmax(attention_scores, dim=-1) # (batch_size, num_heads, seq_len, seq_len)# 计算注意力输出,通过注意力概率加权值attention_output = torch.matmul(attention_probs, value) # (batch_size, num_heads, seq_len, hidden_size)return attention_outputdef test_attn():batch_size = 128seq_len = 512hidden_size = 1024query = torch.randn(batch_size, seq_len, hidden_size) # (batch_size, seq_len, hidden_size)key = torch.randn(batch_size, seq_len, hidden_size) # (batch_size, seq_len, hidden_size)value = torch.randn(batch_size, seq_len, hidden_size) # (batch_size, seq_len, hidden_size)sdpa = ScaledDotProductAttention()output = sdpa(query, key, value)print("Query shape:", query.shape)print("Key shape:", key.shape)print("Value shape:", value.shape)print("Output shape:", output.shape)if __name__ == "__main__":test_attn()
MHA
import torch
from torch import nnclass MultiHeadAttention(torch.nn.Module):def __init__(self, hidden_size, num_heads):super().__init__()self.num_heads = num_headsself.head_dim = hidden_size // num_heads # 每个头的维度,二者必须整除# 初始化 Q、K、V 的投影矩阵,将输入词向量线性变换为 Q、K、V,维度保持一致self.q_linear = nn.Linear(hidden_size, hidden_size) self.k_linear = nn.Linear(hidden_size, hidden_size)self.v_linear = nn.Linear(hidden_size, hidden_size)# 输出线性层,将拼接后的多头注意力输出变换为所需的输出维度,这里维度保持一致self.o_linear = nn.Linear(hidden_size, hidden_size)def forward(self, hidden_state, attention_mask=None):# hidden_state 形状: (batch_size, seq_len, hidden_size)batch_size = hidden_state.size(0) # 获取批量大小# 计算 Q、K、V,线性变换query = self.q_linear(hidden_state) # (batch_size, seq_len, hidden_size)key = self.k_linear(hidden_state) # (batch_size, seq_len, hidden_size)value = self.v_linear(hidden_state) # (batch_size, seq_len, hidden_size)# 分割多头,将每个头的维度拆分出来query = self.split_head(query) # (batch_size, num_heads, seq_len, head_dim)key = self.split_head(key) # (batch_size, num_heads, seq_len, head_dim)value = self.split_head(value) # (batch_size, num_heads, seq_len, head_dim)# 计算注意力分数,使用缩放点积注意力机制# attention_scores 形状: (batch_size, num_heads, seq_len, seq_len)attention_scores = torch.matmul(query, key.transpose(-1, -2)) / torch.sqrt(torch.tensor(self.head_dim, dtype=torch.float32))# 添加注意力掩码(seq_len, seq_len),掩码位置(1)的值为负无穷if attention_mask is not None:attention_scores += attention_mask * -1e9# 对注意力分数进行归一化,得到注意力概率attention_probs = torch.softmax(attention_scores, dim=-1) # (batch_size, num_heads, seq_len, seq_len)# 计算注意力输出,通过注意力概率加权值output = torch.matmul(attention_probs, value) # (batch_size, num_heads, seq_len, head_dim)# 对多头注意力输出进行拼接# output.transpose(1, 2) 将 num_heads 和 seq_len 维度转置# 将形状调整为 (batch_size, seq_len, hidden_size)output = output.transpose(1, 2).reshape(batch_size, -1, self.head_dim * self.num_heads)# 通过线性层将拼接后的输出变换为所需的输出维度output = self.o_linear(output) # (batch_size, seq_len, hidden_size)return outputdef split_head(self, x):batch_size = x.size(0) # 获取批量大小# x 形状: (batch_size, seq_len, hidden_size)# 将 hidden_size 分割为 num_heads 和 head_dimreturn x.reshape(batch_size, -1, self.num_heads, self.head_dim).transpose(1, 2)# 返回形状: (batch_size, num_heads, seq_len, head_dim)def test_MHA():batch_size = 128seq_len = 512hidden_size = 1024num_heads = 8# 随机生成输入数据hidden_state = torch.randn(batch_size, seq_len, hidden_size) # (batch_size, seq_len, hidden_size)# 创建多头注意力模块mha = MultiHeadAttention(hidden_size, num_heads)# 计算多头注意力输出output = mha(hidden_state)print("Input shape:", hidden_state.shape)print("Output shape:", output.shape)if __name__ == "__main__":test_MHA()
Softmax
import torchdef softmax(x):# 计算输入张量的指数exp_x = torch.exp(x)# 计算所有指数之和sum_exp_x = torch.sum(exp_x, dim=0)# 将每个元素的指数除以总和softmax_x = exp_x / sum_exp_xreturn softmax_x# 假设我们有一个张量
x = torch.tensor([1.0, 2.0, 3.0])
# 使用自己实现的 softmax 函数
softmax_x = softmax(x)
print(softmax_x)
MSE
import torchdef mse_loss(y_true, y_pred):# 计算平方误差squared_diff = (y_true - y_pred) ** 2# 返回平均平方误差return torch.mean(squared_diff)# 测试均方误差损失函数
y_true = torch.tensor([3.0, -0.5, 2.0, 7.0])
y_pred = torch.tensor([2.5, 0.0, 2.0, 8.0])loss = mse_loss(y_true, y_pred)
print(f"Mean Squared Error: {loss.item()}")
Cross entropy
import torchdef cross_entropy_loss(y_true, y_pred):# 防止 log(0) 的情况epsilon = 1e-12y_pred = torch.clamp(y_pred, epsilon, 1. - epsilon)# 计算交叉熵ce_loss = -torch.sum(y_true * torch.log(y_pred), dim=-1)# 返回平均损失return torch.mean(ce_loss)
y_true = torch.tensor([[1, 0, 0], [0, 1, 0]], dtype=torch.float32)
y_pred = torch.tensor([[0.8, 0.1, 0.1], [0.2, 0.7, 0.1]], dtype=torch.float32)loss = cross_entropy_loss(y_true, y_pred)
print(f"Cross-Entropy Loss: {loss.item()}")
这篇关于LLM手撕的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!