自定义gym环境并使用RL训练--寻找宝石

2024-03-02 08:10

本文主要是介绍自定义gym环境并使用RL训练--寻找宝石,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!


完整代码已上传到 github
result_polyDL.mp4.gif


最近有项目需要用到RL相关的一些东西,于是就开始尝试自己搭建一个自定义的gym环境,并使用入门的DQN网络对这个环境进行训练,这个是我入门的第一个项目,可能有一些地方理解的不够的或者有问题的,希望见谅并能指正。

其中环境的自定义部分参考了csdn extremebingo的文章,模型建立与训练过程参考了: pytorch official tutorials,训练结果的展示参考了:tensorflow org tutorials

寻找宝石游戏

绿色的小圆圈代表机器人,红色圈圈表示火坑,蓝色圆圈表示宝石,褐色圈圈表示石柱,其中环境每次重置机器人便会出生在任意一个空白的格子中,机器人需要找到含有宝石的格子获得奖励结束游戏。在寻找的过程中如果踩入火坑游戏结束获得负奖励,机器人无法移动到石柱所在的格子中。

自定义gym环境

自定义gym环境模块主要参考了csdn extremebingo的文章,可以直接点击查看自定义的具体流程介绍,也可以参考github Readme 的gym Env set up模块介绍中的操作流程。这里就不再赘述,下面主要介绍下使用这个流程中可能有的坑:

  1. 将自定义的文件拷贝到环境中可能不生效,可以尝试在这个路径同样进行一遍操作:
    C:\Users\xxx\AppData\Roaming\Python\Python37\site-packages\gym\envs

  2. extremebingo 构建的环境中有部分代码存在一些笔误还有一些bug,这里进行了一些修改,修改后的环境代码

模型构建与训练

数据收集

训练数据主要有:(state, action, next_state, reward)

  • state 当前环境的状态
  • action 当前状态时,机器人执行的动作
  • next_state 执行该动作后的状态
  • reward 执行该动作后获得的激励

(这里用环境render的图表示state 见get_screen,actions = ['n', 'e', 's', 'w'] 含意为:n 上 s下 w左 e 右 reward 找到宝石+1,踩到火坑-1,增加步数在训练的过程中进行适度的惩罚

数据收集过程中的action 根据当前训练的状态按照概率选择使用模型结果或者随机选择动作执行下一步操作
这个概率值由EPS_END EPS_STAR EPS_DECAY 还有steps_done 共同控制 结果按照指数进行衰减
这里我使用的值为:

    EPS_START = 0.9EPS_END = 0.05EPS_DECAY = 20000

选择随机策略的概率随着训练次数steps_done 的变化如下图所示:
eps.png
这里eps_decay 改为了20000而不是torch offical tutorials里的200,主要是因为这个环境比小车的稍微复杂,因此前期需要更多的随机策略的样本训练,offical turorials 里概率的变化曲线如下:

eps.jpg

,当我们在test模型时,主要应选取模型的输出作为下一个action 因此 我在代码中增加了eval时eps_threshold=0.001

def select_action(state, eval=False):global steps_donesample = random.random()eps_threshold = EPS_END + (EPS_START - EPS_END) * \math.exp(-1. * steps_done / EPS_DECAY)if eval:eps_threshold = 0.001print("eps_threshold:{} ,steps_done:{}".format(eps_threshold, steps_done))steps_done += 1if sample > eps_threshold:print("select Model")with torch.no_grad():# t.max(1) will return largest column value of each row.# second column on max result is index of where max element was# found, so we pick action with the larger expected reward.if eval:return target_net(state).max(1)[1].view(1, 1)return policy_net(state).max(1)[1].view(1, 1)else:print("select random")return torch.tensor([[random.randrange(n_actions)]], device=device, dtype=torch.long)

运行过程中生产的数据放到一个存储类中,每次随机采样batchSize条数据训练:

class ReplayMemory(object):def __init__(self, capacity):self.capacity = capacityself.memory = []self.position = 0def push(self, *args):"""Saves a transition."""if len(self.memory) < self.capacity:self.memory.append(None)self.memory[self.position] = Transition(*args)self.position = (self.position + 1) % self.capacitydef sample(self, batch_size):return random.sample(self.memory, batch_size)def __len__(self):return len(self.memory)

get_screen 函数主要获取环境状态改变时的图像

def get_screen():# Returned screen requested by gym is 400x600x3, but is sometimes larger# such as 800x1200x3. Transpose it into torch order (CHW).screen = env.render(mode='rgb_array').transpose((2, 0, 1))# Cart is in the lower half, so strip off the top and bottom of the screen_, screen_height, screen_width = screen.shape# print("screen_height {}, screen_width {}".format(screen_height,screen_width))screen = screen[:, int(screen_height * 0):int(screen_height * 0.9)]view_width = int(screen_width * 0.6)# Strip off the edges, so that we have a square image centered on a cart# screen = screen[:, :, slice_range]# Convert to float, rescale, convert to torch tensor# (this doesn't require a copy)screen = np.ascontiguousarray(screen, dtype=np.float32) / 255screen = torch.from_numpy(screen)# Resize, and add a batch dimension (BCHW)return resize(screen).unsqueeze(0).to(device)

模型构建

DQN 网络使用三层卷积,根据状态 预测下一步采取各个行动的收益


class DQN(nn.Module):def __init__(self, h, w, outputs):super(DQN, self).__init__()self.conv1 = nn.Conv2d(3, 16, kernel_size=5, stride=2)self.bn1 = nn.BatchNorm2d(16)self.conv2 = nn.Conv2d(16, 32, kernel_size=5, stride=2)self.bn2 = nn.BatchNorm2d(32)self.conv3 = nn.Conv2d(32, 32, kernel_size=5, stride=2)self.bn3 = nn.BatchNorm2d(32)# Number of Linear input connections depends on output of conv2d layers# and therefore the input image size, so compute it.def conv2d_size_out(size, kernel_size=5, stride=2):return (size - (kernel_size - 1) - 1) // stride + 1convw = conv2d_size_out(conv2d_size_out(conv2d_size_out(w)))convh = conv2d_size_out(conv2d_size_out(conv2d_size_out(h)))linear_input_size = convw * convh * 32self.head = nn.Linear(linear_input_size, outputs)# Called with either one element to determine next action, or a batch# during optimization. Returns tensor([[left0exp,right0exp]...]).def forward(self, x):x = F.relu(self.bn1(self.conv1(x)))x = F.relu(self.bn2(self.conv2(x)))x = F.relu(self.bn3(self.conv3(x)))return self.head(x.view(x.size(0), -1))

训练过程模型参数更新

通过policy_net (参数实时更新的net)根据batch数据中的state信息预测下一步采取的每个行动的收益,生成bx4(action 可选择的个数4)的矩阵,根据batch 中 的action 的index 选择 这一action 模型预测的值(Q(s_t, a) - model computes Q(s_t)):

 state_action_values = policy_net(state_batch).gather(1, action_batch)

使用target_net (参数更新copy from policy net延迟的net) 使用next state信息(过滤掉 状态为none)预测最大收益的行动:next_state_values

当前状态的收益期望值 = 下一状态预测的行动最大收益(next_state_values)*GAMMA + 当前状态行为的实际收益 reward_batch 如下所示:

expected_state_action_values = (next_state_values * GAMMA) + reward_batch

根据当前网络预测的动作收益 state_action_values 与实际期望的收益的误差作为模型的loss 更新整个策略网络

    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))print("loss:{}".format(loss.item()))# Optimize the modeloptimizer.zero_grad()  loss.backward()for param in policy_net.parameters():param.grad.data.clamp_(-1, 1)optimizer.step()

该函数optimize_model完整代码如下:

def optimize_model():if len(memory) < BATCH_SIZE:returntransitions = memory.sample(BATCH_SIZE)# Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for# detailed explanation). This converts batch-array of Transitions# to Transition of batch-arrays.batch = Transition(*zip(*transitions))# Compute a mask of non-final states and concatenate the batch elements# (a final state would've been the one after which simulation ended)non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,batch.next_state)), device=device, dtype=torch.bool)non_final_next_states = torch.cat([s for s in batch.next_stateif s is not None])state_batch = torch.cat(batch.state)action_batch = torch.cat(batch.action)reward_batch = torch.cat(batch.reward)# Compute Q(s_t, a) - the model computes Q(s_t), then we select the# columns of actions taken. These are the actions which would've been taken# for each batch state according to policy_netstate_action_values = policy_net(state_batch).gather(1, action_batch)# Compute V(s_{t+1}) for all next states.# Expected values of actions for non_final_next_states are computed based# on the "older" target_net; selecting their best reward with max(1)[0].# This is merged based on the mask, such that we'll have either the expected# state value or 0 in case the state was final.next_state_values = torch.zeros(BATCH_SIZE, device=device)next_state_values[non_final_mask] = target_net(non_final_next_states).max(1)[0].detach()# Compute the expected Q valuesexpected_state_action_values = (next_state_values * GAMMA) + reward_batch# Compute Huber lossloss = F.smooth_l1_loss(state_action_values, expected_state_action_values.unsqueeze(1))print("loss:{}".format(loss.item()))# Optimize the modeloptimizer.zero_grad()loss.backward()for param in policy_net.parameters():param.grad.data.clamp_(-1, 1)optimizer.step()

这篇关于自定义gym环境并使用RL训练--寻找宝石的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/765466

相关文章

如何使用celery进行异步处理和定时任务(django)

《如何使用celery进行异步处理和定时任务(django)》文章介绍了Celery的基本概念、安装方法、如何使用Celery进行异步任务处理以及如何设置定时任务,通过Celery,可以在Web应用中... 目录一、celery的作用二、安装celery三、使用celery 异步执行任务四、使用celery

使用Python绘制蛇年春节祝福艺术图

《使用Python绘制蛇年春节祝福艺术图》:本文主要介绍如何使用Python的Matplotlib库绘制一幅富有创意的“蛇年有福”艺术图,这幅图结合了数字,蛇形,花朵等装饰,需要的可以参考下... 目录1. 绘图的基本概念2. 准备工作3. 实现代码解析3.1 设置绘图画布3.2 绘制数字“2025”3.3

Jsoncpp的安装与使用方式

《Jsoncpp的安装与使用方式》JsonCpp是一个用于解析和生成JSON数据的C++库,它支持解析JSON文件或字符串到C++对象,以及将C++对象序列化回JSON格式,安装JsonCpp可以通过... 目录安装jsoncppJsoncpp的使用Value类构造函数检测保存的数据类型提取数据对json数

python使用watchdog实现文件资源监控

《python使用watchdog实现文件资源监控》watchdog支持跨平台文件资源监控,可以检测指定文件夹下文件及文件夹变动,下面我们来看看Python如何使用watchdog实现文件资源监控吧... python文件监控库watchdogs简介随着Python在各种应用领域中的广泛使用,其生态环境也

Python中构建终端应用界面利器Blessed模块的使用

《Python中构建终端应用界面利器Blessed模块的使用》Blessed库作为一个轻量级且功能强大的解决方案,开始在开发者中赢得口碑,今天,我们就一起来探索一下它是如何让终端UI开发变得轻松而高... 目录一、安装与配置:简单、快速、无障碍二、基本功能:从彩色文本到动态交互1. 显示基本内容2. 创建链

springboot整合 xxl-job及使用步骤

《springboot整合xxl-job及使用步骤》XXL-JOB是一个分布式任务调度平台,用于解决分布式系统中的任务调度和管理问题,文章详细介绍了XXL-JOB的架构,包括调度中心、执行器和Web... 目录一、xxl-job是什么二、使用步骤1. 下载并运行管理端代码2. 访问管理页面,确认是否启动成功

使用Nginx来共享文件的详细教程

《使用Nginx来共享文件的详细教程》有时我们想共享电脑上的某些文件,一个比较方便的做法是,开一个HTTP服务,指向文件所在的目录,这次我们用nginx来实现这个需求,本文将通过代码示例一步步教你使用... 在本教程中,我们将向您展示如何使用开源 Web 服务器 Nginx 设置文件共享服务器步骤 0 —

Java中switch-case结构的使用方法举例详解

《Java中switch-case结构的使用方法举例详解》:本文主要介绍Java中switch-case结构使用的相关资料,switch-case结构是Java中处理多个分支条件的一种有效方式,它... 目录前言一、switch-case结构的基本语法二、使用示例三、注意事项四、总结前言对于Java初学者

Golang使用minio替代文件系统的实战教程

《Golang使用minio替代文件系统的实战教程》本文讨论项目开发中直接文件系统的限制或不足,接着介绍Minio对象存储的优势,同时给出Golang的实际示例代码,包括初始化客户端、读取minio对... 目录文件系统 vs Minio文件系统不足:对象存储:miniogolang连接Minio配置Min

使用Python绘制可爱的招财猫

《使用Python绘制可爱的招财猫》招财猫,也被称为“幸运猫”,是一种象征财富和好运的吉祥物,经常出现在亚洲文化的商店、餐厅和家庭中,今天,我将带你用Python和matplotlib库从零开始绘制一... 目录1. 为什么选择用 python 绘制?2. 绘图的基本概念3. 实现代码解析3.1 设置绘图画