本文主要是介绍深度学习本科课程 实验5 循环神经网络,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
循环神经网络实验
任务内容
- 理解序列数据处理方法,补全面向对象编程中的缺失代码,并使用torch自带数据工具将数据封装为dataloader
- 分别采用手动方式以及调用接口方式实现RNN、LSTM和GRU,并在至少一种数据集上进行实验
- 从训练时间、预测精度、Loss变化等角度对比分析RNN、LSTM和GRU在相同数据集上的实验结果(最好使用图表展示)
- 不同超参数的对比分析(包括hidden_size、batch_size、lr等)选其中至少1-2个进行分析
1. 数据集处理
本实验采用高速公路车流量数据集traffic-flow,实现用历史流量数据预测未来流量的回归任务
1.2 任务思路及代码
import os
import numpy as np
import pandas as pd
import torch
from torch import nn
import torch.utils.data.dataset as dataset
import torch.utils.data.dataloader as dataloaderfrom sklearn.metrics import accuracy_score, recall_score, f1_scoredevice = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f'当前使用的device为{device}')import warnings
warnings.filterwarnings("ignore")
# 数据预处理
raw_data = np.load('dataset/traffic-flow/traffic.npz')['data']
print(raw_data.shape)
target = 0 # 选择第一维数据进行预测
window_size = 16
sensor_num = 3 # 选择5号感器train_x = []
train_y = []
test_x = []
test_y = []
len_train = int(raw_data.shape[0] * 0.6)
train_seqs = raw_data[:len_train]
test_seqs = raw_data[len_train:]for i in range(train_seqs.shape[0] - window_size):train_x.append(train_seqs[i:i+window_size, sensor_num, :].squeeze())train_y.append(train_seqs[i+window_size, sensor_num, target].squeeze())for i in range(test_seqs.shape[0] - window_size):test_x.append(test_seqs[i:i+window_size, sensor_num, :].squeeze())test_y.append(test_seqs[i+window_size, sensor_num, target].squeeze())train_x = torch.Tensor(train_x)
train_y = torch.Tensor(train_y)
test_x = torch.Tensor(test_x)
test_y = torch.Tensor(test_y)
# 数据归一化
mean = train_x.mean(dim=(0, 1))
std = train_x.std(dim=(0, 1))train_x = (train_x - mean) / std
train_y = (train_y - mean[target]) / std[target]test_x = (test_x - mean) / std
test_y = (test_y - mean[target]) / std[target]print(train_x.shape)
from torch.utils.data import Dataset, DataLoader
# 组装dataloader
class TimeSeriesDataset(Dataset):def __init__(self, data, target, window_size):self.data = dataself.target = targetself.window_size = window_sizedef __len__(self):return len(self.data) - self.window_sizedef __getitem__(self, idx):x = self.data[idx:idx+self.window_size, :]y = self.target[idx+self.window_size]return x, y# 创建训练和测试数据集
train_dataset = TimeSeriesDataset(train_x, train_y, window_size)
test_dataset = TimeSeriesDataset(test_x, test_y, window_size)# 创建 DataLoader
batch_size = 16 # 你可以根据需要调整批次大小
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=batch_size, shuffle=False)
from sklearn.metrics import mean_absolute_error, mean_squared_error
import time
import math
# 将训练数据转移到设备上
train_x, train_y = train_x.to(device), train_y.to(device)
test_x, test_y = test_x.to(device), test_y.to(device)# 训练模型
def train_and_eval(model, epochs=10, lr=0.001):criterion = nn.MSELoss()optimizer = torch.optim.Adam(model.parameters(), lr=lr)train_loss = []score_list = []start_time = time.time()for epoch in range(epochs):model.train()optimizer.zero_grad()# 前向传播output, _ = model(train_x)# 计算损失loss = criterion(output[:, -1, :], train_y.view(-1, 1))train_loss.append(loss.to('cpu'))# 反向传播和优化loss.backward()optimizer.step()# 打印训练信息if (epoch + 1) % 5 == 0:print(f'Epoch [{epoch + 1}/{epochs}], Loss: {loss.item()}')end_time = time.time()# 模型评估model.eval()print(f'耗时:{end_time-start_time:.2f}s')with torch.no_grad():# 前向传播predictions, _ = model(test_x)# 计算评价指标(这里使用均方根误差作为例子)mse = criterion(predictions[:, -1, :], test_y.view(-1, 1))# mae = mean_absolute_error(test_y.view(-1, 1).view(-1).cpu(), predictions[:, -1, :].view(-1).cpu())rmse = math.sqrt(mse.item())score_list.append([mse.to('cpu'), rmse])print(f'Mean Squared Error on Test Data: {mse.item()}')return train_loss, score_list
from sklearn.metrics import mean_absolute_error as mae_fn
import math
# 将训练数据转移到设备上
train_x, train_y = train_x.to(device), train_y.to(device)
test_x, test_y = test_x.to(device), test_y.to(device)# 训练模型
def train_and_eval2(model, epochs=100, lr=0.001, output_model=None):train_loss, test_loss, val_score_list= [], [], []criterion = nn.MSELoss()optimizer = torch.optim.Adam(model.parameters(), lr=lr)for epoch in range(epochs):epoch_loss = 0epoch_score = []batch_count = 0model.train()optimizer.zero_grad()for X, Y in train_loader:X = X.to(device)Y = Y.to(device)print(X.shape)X = X.view(-1, window_size, X.shape[-1])# 前向传播output, _ = model(X)# if output_model is not None:# y_hat = output_model(output[:, -1, :].squeeze(-1)).squeeze()# else:# y_hat = output[:, -1, :].squeeze(-1)# 计算损失# loss = criterion(output[:, -1, :], Y.view(-1, 1))loss = criterion(output.view(-1, 1), Y.view(-1, 1))# loss = criterion(y_hat, Y.view(-1, 1))# print(Y.shape, y_hat.shape)epoch_loss += loss# 反向传播和优化loss.backward()optimizer.step()batch_count += 1train_loss.append(epoch_loss / batch_count)# 打印训练信息if (epoch + 1) % 2 == 0:print(f'Epoch [{epoch + 1}/{epochs}], Loss: {epoch_loss / batch_count}')# 模型评估epoch_loss = 0model.eval()with torch.no_grad():mae = 0rmse = 0for XX, YY in test_loader:XX = XX.to(device)YY = YY.to(device)if len(XX) < batch_size:continue# 前向传播predictions, _ = model(XX)# if output_model is not None:# y_hatt = output_model(output[:, -1, :]).squeeze(-1)# else:# y_hatt = output[:, -1, :].squeeze(-1)mse = criterion(predictions[:, -1, :].view(-1, 1), YY.view(-1, 1)) epoch_loss += mse# print(f'YY:{YY.shape}, y_hat:{y_hat.shape}')# print(YY)# print(y_hat)# mae += mae_fn(YY.to('cpu'), torch.mean(y_hatt, dim=1).to('cpu'))# mae += mae_fn(YY.to('cpu'), y_hatt.reshape(-1).to('cpu'))rmse += math.sqrt(mse)test_loss.append(epoch_loss / batch_count)val_score_list.append([epoch_loss / batch_count, rmse / batch_count])return train_loss, test_loss, val_score_list
# 参考utils.py中的函数
import matplotlib.pyplot as pltdef visualize(num_epochs, train_data, x_label='epoch', y_label='loss'):temp_list1 = []for i in range(len(train_data)):temp_list1.append(train_data[i].detach().numpy())plt.plot(temp_list1, 'b-')# x = np.arange(0, num_epochs + 1)# plt.plot(x, train_data, label=f"train_{y_label}", linewidth=1.5)# plt.plot(x, test_data, label=f"val_{y_label}", linewidth=1.5)plt.xlabel(x_label)plt.ylabel(y_label)plt.legend()plt.show()def plot_metric(score_log):score_log = np.array(score_log)plt.figure(figsize=(10, 6), dpi=100)plt.subplot(2, 2, 1)plt.plot(score_log[:, 0], c='#d28ad4')plt.ylabel('MSE')plt.subplot(2, 2, 2)plt.plot(score_log[:, 1], c='#6b016d')plt.ylabel('RMSE')plt.show()
input_size = 3
hidden_size = 128
output_size = 1
lr = 0.001
epochs = 400
2. 实现RNN
2.1 任务思路及代码
# 手动实现RNN
class MyRNN(nn.Module):def __init__(self, input_size, hidden_size, output_size):super().__init__()self.hidden_size = hidden_size# 可学习参数的维度设置,可以类比一下全连接网络的实现。其维度取决于输入数据的维度,以及指定的隐藏状态维度。self.w_h = nn.Parameter(torch.rand(input_size, hidden_size))self.u_h = nn.Parameter(torch.rand(hidden_size, hidden_size))self.b_h = nn.Parameter(torch.zeros(hidden_size))self.w_y = nn.Parameter(torch.rand(hidden_size, output_size))self.b_y = nn.Parameter(torch.zeros(output_size))# 准备激活函数。Dropout函数可选。self.tanh = nn.Tanh()self.leaky_relu = nn.LeakyReLU()# 可选:使用性能更好的参数初始化函数for param in self.parameters():if param.dim() > 1:nn.init.xavier_uniform_(param)def forward(self, x):""":param x: 输入序列。一般来说,此输入包含三个维度:batch,序列长度,以及每条数据的特征。"""batch_size = x.size(0)seq_len = x.size(1)# 初始化隐藏状态,一般设为全0。由于是内部新建的变量,需要同步设备位置。h = torch.zeros(batch_size, self.hidden_size).to(x.device)# RNN实际上只能一步一步处理序列。因此需要用循环迭代。y_list = []for i in range(seq_len):h = self.tanh(torch.matmul(x[:, i, :], self.w_h) + torch.matmul(h, self.u_h) + self.b_h) # (batch_size, hidden_size)y = self.leaky_relu(torch.matmul(h, self.w_y) + self.b_y) # (batch_size, output_size)y_list.append(y)# 一般来说,RNN的返回值为最后一步的隐藏状态,以及每一步的输出状态。return torch.stack(y_list, dim=1), h
rnn1 = MyRNN(input_size=input_size, hidden_size=hidden_size, output_size=output_size)
rnn1 = rnn1.to(device)
train11, score11 = train_and_eval(rnn1, epochs=epochs, lr=lr)
visualize(epochs, train_data=train11)
# 调用接口实现RNN
rnn2 = nn.RNN(input_size=input_size, hidden_size=hidden_size, num_layers=2, batch_first=True)
rnn2 = rnn2.to(device)
train12, score12 = train_and_eval(rnn2, epochs=epochs, lr=lr)
visualize(epochs, train_data=train12)
3. 实现LSTM
3.1 任务思路及代码
# 手动实现LSTM(传统实现)
class My_legacyLSTM(nn.Module):def __init__(self, input_size, hidden_size):super().__init__()self.hidden_size = hidden_sizeself.w_f = nn.Parameter(torch.rand(input_size, hidden_size))self.u_f = nn.Parameter(torch.rand(hidden_size, hidden_size))self.b_f = nn.Parameter(torch.zeros(hidden_size))self.w_i = nn.Parameter(torch.rand(input_size, hidden_size))self.u_i = nn.Parameter(torch.rand(hidden_size, hidden_size))self.b_i = nn.Parameter(torch.zeros(hidden_size))self.w_o = nn.Parameter(torch.rand(input_size, hidden_size))self.u_o = nn.Parameter(torch.rand(hidden_size, hidden_size))self.b_o = nn.Parameter(torch.zeros(hidden_size))self.w_c = nn.Parameter(torch.rand(input_size, hidden_size))self.u_c = nn.Parameter(torch.rand(hidden_size, hidden_size))self.b_c = nn.Parameter(torch.zeros(hidden_size))self.sigmoid = nn.Sigmoid()self.tanh = nn.Tanh()for param in self.parameters():if param.dim() > 1:nn.init.xavier_uniform_(param)def forward(self, x):batch_size = x.size(0)seq_len = x.size(1)# 需要初始化隐藏状态和细胞状态h = torch.zeros(batch_size, self.hidden_size).to(x.device)c = torch.zeros(batch_size, self.hidden_size).to(x.device)y_list = []for i in range(seq_len):forget_gate = self.sigmoid(torch.matmul(x[:, i, :], self.w_f) +torch.matmul(h, self.u_f) + self.b_f)# (batch_siz,hidden_size)input_gate = self.sigmoid(torch.matmul(x[:, i, :], self.w_i) +torch.matmul(h, self.u_i) + self.b_i)output_gate = self.sigmoid(torch.matmul(x[:, i, :], self.w_o) +torch.matmul(h, self.u_o) + self.b_o)# 这里可以看到各个门的运作方式。# 三个门均通过hadamard积作用在每一个维度上。c = forget_gate * c + input_gate * self.tanh(torch.matmul(x[:, i, :], self.w_c) +torch.matmul(h, self.u_c) + self.b_c)h = output_gate * self.tanh(c)y_list.append(h)return torch.stack(y_list, dim=1), (h, c)
# 手动实现LSTM(常规实现)
class My_LSTM(nn. Module):def __init__(self, input_size, hidden_size, output_size):super().__init__()self.hidden_size = hidden_sizeself.gates = nn.Linear(input_size + hidden_size, hidden_size * 4)self.sigmoid = nn.Sigmoid()self.tanh = nn.Tanh()self.output = nn.Sequential(nn.Linear(hidden_size, hidden_size // 2),nn.ReLU(),nn.Linear(hidden_size // 2, output_size))for param in self.parameters():if param.dim() > 1:nn.init.xavier_uniform_(param)def forward(self, x):batch_size = x.size(0)seq_len = x.size(1)h, c = (torch.zeros(batch_size, self.hidden_size).to(x.device) for _ in range(2))y_list = []for i in range(seq_len):forget_gate, input_gate, output_gate, candidate_cell = \self.gates(torch.cat([x[:, i, :], h], dim=-1)).chunk(4, -1)forget_gate, input_gate, output_gate = (self.sigmoid(g)for g in (forget_gate, input_gate, output_gate))c = forget_gate * c + input_gate * self.tanh(candidate_cell)h = output_gate * self.tanh(c)y_list.append(self.output(h))return torch.stack(y_list, dim=1), (h, c)
lstm1 = My_legacyLSTM(input_size=input_size, hidden_size=hidden_size).to(device)
train21, score21 = train_and_eval(lstm1, epochs=epochs, lr=lr)
visualize(10, train21)
lstm2 = My_LSTM(input_size=input_size, hidden_size=hidden_size, output_size=output_size).to(device)
train22, score22 = train_and_eval(lstm2, epochs=epochs, lr=lr)
visualize(10, train22)
# 调用接口实现
lstm3 = nn.LSTM(input_size=input_size, hidden_size=hidden_size, num_layers=2, batch_first=True).to(device)
train23, score23 = train_and_eval(lstm3, epochs=epochs, lr=lr)
visualize(10, train23)
4. 实现GRU
4.1 任务思路及代码
# 手动实现GRU
class My_GRU(nn.Module):def __init__(self, input_size, hidden_size, output_size):super().__init__()self.hidden_size = hidden_sizeself.gates = nn.Linear(input_size+hidden_size, hidden_size*2)# 用于计算candidate hidden stateself.hidden_transform = nn.Linear(input_size+hidden_size, hidden_size)self.sigmoid = nn.Sigmoid()self.tanh = nn.Tanh()self.output = nn.Sequential(nn.Linear(hidden_size, hidden_size // 2),nn.ReLU(),nn.Linear(hidden_size // 2, output_size))for param in self.parameters():if param.dim() > 1:nn.init.xavier_uniform_(param)def forward(self, x):batch_size = x.size(0)seq_len = x.size(1)h = torch.zeros(batch_size, self.hidden_size).to(x.device)y_list = []for i in range(seq_len):update_gate, reset_gate = self.gates(torch.cat([x[:, i, :], h], dim=-1)).chunk(2, -1)update_gate, reset_gate = (self.sigmoid(gate) for gate in (update_gate, reset_gate))candidate_hidden = self.tanh(self.hidden_transform(torch.cat([x[:, i, :], reset_gate * h], dim=-1)))h = (1-update_gate) * h + update_gate * candidate_hiddeny_list.append(self.output(h))return torch.stack(y_list, dim=1), h
gru1 = My_GRU(input_size=input_size, hidden_size=hidden_size, output_size=output_size).to(device)
train31, score31 = train_and_eval(gru1,epochs=epochs, lr=lr)
visualize(10, train31)
# 调用接口实现
gru2 = nn.GRU(input_size=input_size, hidden_size=hidden_size, num_layers=2, batch_first=True).to(device)
train32, score32 = train_and_eval(gru2,epochs=epochs,lr=lr)
visualize(10, train32)
5. 对比分析
5.1 模型分析
在训练集、测试集、训练轮数、学习率都一致的前提下,不同模型的表现效果如表所示:
模型 | 模型信息 | 测试集上的MSE | 训练用时 |
---|---|---|---|
手动实现RNN | hidden_size=128 | 0.5115 | 6.23s |
接口实现RNN | num_layers=2 | 0.1227 | 7.77s |
手动实现LSTM(传统) | hidden_size=128 | 0.1828 | 16.80s |
手动实现LSTM(常规) | hidden_size=128 | 0.0409 | 15.96s |
接口实现LSTM | num_layers=2 | 0.1246 | 34.35s |
手动实现GRU | hidden_size=128 | 0.0407 | 14.67s |
接口实现GRU | num_layers=2 | 0.1221 | 24.04s |
- 对于RNN,接口实现的MSE值明显小于手动实现,而训练时间也相近,说明总体看来,接口实现的RNN性能更好。具体来说,torch内置的RNN优化了梯度流和反向传播过程,利用了底层CUDA,具有更好的数值稳定性。
- 对于LSTM,常规方法实现LSTM(lstm2)的性能达到了最优,传统方法实现LSTM(lstm1)的MSE值较大,而接口实现LSTM(lstm3)的训练用时很长。这可能是因为lstm2通过使用nn.Linear层实现了门的参数共享,使得LSTM模型的实现更加紧凑和简单;此外激活函数与初始化策略的不同也造成了影响。
- 对于GRU,手动实现GRU的性能明显好于接口实现,手动实现与LSTM类似,都采用Xavier初始化,这可能影响了模型的收敛速度和性能;手动实现的GRU采用了ReLU和Tanh,可能更比torch默认的激活函数更适合。
- 总体来说,手动定义的模型收敛更快,训练耗时也更短,针对本实践任务达到了更好的训练和测试效果;在模型类别间进行比较,则在本回归预测问题中,GRU表现最优,LSTM表现与GRU相近,RNN表现最差但训练很快。
5.2 训练参数分析
以LSTM模型为例,进行训练参数的比较分析
# 对于lr
lstm_11 = My_LSTM(input_size=input_size, hidden_size=hidden_size, output_size=output_size).to(device)
train51, score51 = train_and_eval(lstm_11, epochs=100, lr=0.00001)
visualize(100, train51)
# loss曲线为直线,说明学习率过小
# 对于lr
lstm_11 = My_LSTM(input_size=input_size, hidden_size=hidden_size, output_size=output_size).to(device)
train51, score51 = train_and_eval(lstm_11, epochs=100, lr=0.1)
visualize(100, train51)
# loss出现尖峰,且过于陡峭,说明学习率过高,应该下调
# 对于lr
lstm_11 = My_LSTM(input_size=input_size, hidden_size=hidden_size, output_size=output_size).to(device)
train51, score51 = train_and_eval(lstm_11, epochs=100, lr=0.001)
visualize(100, train51)
# loss较为合适
# 对于hidden_size
lstm_21 = My_LSTM(input_size=input_size, hidden_size=2, output_size=output_size).to(device)
train52, score52 = train_and_eval(lstm_21, epochs=100, lr=0.01)
lstm_22 = My_LSTM(input_size=input_size, hidden_size=1024, output_size=output_size).to(device)
train53, score53 = train_and_eval(lstm_22, epochs=100, lr=0.01)lstm_23 = My_LSTM(input_size=input_size, hidden_size=256, output_size=output_size).to(device)
train54, score54 = train_and_eval(lstm_23, epochs=100, lr=0.1)
print(score52[0][0])
print(score53[0][0])
print(score54[0][0])
隐藏层大小 | MSE |
---|---|
2 | 0.9881 |
256 | 0.0628 |
1024 | 0.1543 |
结论:
- 增大隐藏层大小会增加模型的容量,使其能够更好地适应复杂的训练数据。模型容量较大的情况下,模型更能够捕捉输入数据中的复杂模式和特征。
- 隐藏层太大可能导致过拟合,此外还有梯度爆炸或梯度消失的问题。
- 从表中可以看出,提高隐藏层大小至256,首先是MSE的下降,模型表现更好;继续提高至1024,则MSE反而提升,说明模型出现了过拟合。
模型训练中需要反复调试,以找到合适的隐藏层大小。
实验总结
本次实验中,我熟悉了整理、处理数据的流程和创建训练数据的方法,而后手动和调用了RNN、LSTM、GRU模型,增加了深度学习的实践经验,深入了解了RNN、LSTM、GRU的底层原理。
成功建立模型后,我进行了手动建立与torch内置方法的对比,而后进行三种模型间的对比,通过并列比较了解了三种模型各自的特性。
最后,我对超参数进行调试,直观地了解到learning rate、hidden size等参数对模型训练的影响,提升了模型设计与训练的能力。
这篇关于深度学习本科课程 实验5 循环神经网络的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!