深度强化学习之基于DRQN玩Doom游戏

2023-10-10 21:40

本文主要是介绍深度强化学习之基于DRQN玩Doom游戏,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

DRQN

  为何在希望DQN按人类水平玩Atari游戏时需要DRQN?要回答这个问题,首先要了解什么是部分可观测马尔科夫决策过程(POMDP)。当对环境只有有限信息时,该环境就称为POMDP。到目前为止,在前面的内容中,已了解一个完全可观测的MDP是已知所有可能的行为和状态,尽管智能体不知道转移概率和奖励概率,但对环境信息是完全已知的,例如,在建一个冰冻湖环境中,完全已知关于环境的所有状态和行为,那么就可以很容易地将环境建模为一个可观测的MDP。但大多数真实世界中的环境只能部分可观测,不能观测到所有状态。假设智能体要在真实世界环境中学习行走,显然,智能体不具备环境的完备信息。在POMDP中,状态只提供部分信息,但在过去的状态中保留的信息可有助于智能体更好地理解环境特征,从而改进策略。因此,在POMDP中,需要保留先前状态的相关信息,以采取最佳行为。

  因此,通过增加LSTM层来改进DQN架构,以更好地理解先前信息。在DQN架构中,用LSTM RNN替代第一个后卷积全连接层。通过这种方式,还可以解决部分可观测问题,因为现在的智能体具有记忆过去状态的能力,从而可以改进策略。

DRQN架构

与DQN非常类似,只是用LSTM RNN替代了第一个后卷积全连接层

在这里插入图片描述
  这时,将游戏画面作为卷积层的输入。卷积层对图像进行卷积运算,并产生特征图。然后,所得到的特征图传递到LSTM层。LSTM层具有保存信息的记忆功能。在LSTM层保留有关先前游戏状态的重要信息,并根据需要随时间更新其记忆。经过一个全连接层后输出Q值。因此,与DQN不同,无需直接估计 Q ( s t , a t ) Q(s_t,a_t) Q(stat),而是估计 Q ( h t , a t ) Q(h_t,a_t) Q(htat),其中, h t h_t ht是由网络在上一时间步返回的输入,即 h t = L S T M ( h t − 1 , a t ) h_t=LSTM(h_{t-1},a_t) ht=LSTM(ht1at)。由于是使用RNN,因此是通过基于时间的反向传播来训练网络的。

  那么经验回放缓存会是什么情况呢?在DQN中,为避免经验关联,采用了经验回放来保存游戏状态转移信息,并使用随机的批量经验来训练网络。在DRQN情况下,是将整个场景保存在经验缓存中,并从随机批量场景中随机采样n个时间步。这样,就既能适应随机性,又能获得另一种经验。

训练一个玩Doom游戏的智能体
在这里插入图片描述
基本的Doom游戏

加载必要的库:

from vizdoom import *
import random
import time

创建一个DoomGame实例:

game = DoomGame()

已知VIZDoom提供大量的Doom场景,在此加载一个基本场景:

game.load_config("basic.cfg")

通过init()方法初始化包含场景的游戏:

game.init()

定义一个热编码actions:

shoot = [0, 0, 1]
left = [1, 0, 0]
right = [0, 1, 0]
actions = [shoot, left, right]

开始游戏:

no_of_episodes = 10for i in range(no_of_episodes):     # for each episode start the gamegame.new_episode()# loop until the episode is overwhile not game.is_episode_finished():# get the game statestate = game.get_state()img = state.screen_buffer# get the game variablesmisc = state.game_variables# perform some action randomly and receuve reward 执行某一随机的行为并且获得奖励reward = game.make_action(random.choice(actions))        print(reward)# we will set some time before starting the next epiosdetime.sleep(2)

基于DRQN的Doom游戏

成功杀死怪物会得到正面奖励,而丧命、自杀和无弹药会得到负面奖励

First let us import all necessary libraries

import tensorflow as tf
import numpy as np
import math
from vizdoom import *
import timeit
import math
import os
import sys

接下来,定义function get_input_shape 函数来计算经卷积层卷积运算后输入图像的最终形式:

def get_input_shape(Image,Filter,Stride):layer1 = math.ceil(((Image - Filter + 1) / Stride)) o1 = math.ceil((layer1 / Stride))    layer2 = math.ceil(((o1 - Filter + 1) / Stride))    o2 = math.ceil((layer2 / Stride))    layer3 = math.ceil(((o2 - Filter + 1) / Stride))    o3 = math.ceil((layer3  / Stride))return int(o3)

这时,定义实现DRQN算法的DRQN

class DRQN():def __init__(self, input_shape, num_actions, inital_learning_rate):# first, we initialize all the hyperparameters(超参数)self.tfcast_type = tf.float32# shape of our input which would be (length, width, channels)(长度、宽度、通道)self.input_shape = input_shape# number of actions in the environmentself.num_actions = num_actions# learning rate for the neural networkself.learning_rate = inital_learning_rate# now we will define the hyperparameters of the convolutional neural network# filter size  滤波器的大小self.filter_size = 5# number of filters  滤波器的个数self.num_filters = [16, 32, 64]# stride size  步幅大小self.stride = 2# pool size    池化层大小self.poolsize = 2# shape of our convolutional layer  卷积层形状self.convolution_shape = get_input_shape(input_shape[0], self.filter_size, self.stride) * get_input_shape(input_shape[1], self.filter_size, self.stride) * self.num_filters[2]# now we define the hyperparameters of our recurrent neural network and the final feed forward layer 定义RNN和最后前馈层的超参数# number of neuronsself.cell_size = 100# number of hidden layersself.hidden_layer = 50# drop out probabilityself.dropout_probability = [0.3, 0.2]# hyperparameters for optimization  优化超参数self.loss_decay_rate = 0.96self.loss_decay_steps = 180# initialize all the variables for the CNN 初始化CNN所有变量# we initialize the placeholder for input whose shape would be (length, width, channel)  初始化形式为(长度、宽度、通道)的输入占位符self.input = tf.placeholder(shape=(self.input_shape[0], self.input_shape[1], self.input_shape[2]), dtype=self.tfcast_type)# we will also initialize the shape of the target vector whose shape is equal to the number of actions  初始化目标向量的形式,其形式与行为个数相同 self.target_vector = tf.placeholder(shape=(self.num_actions, 1), dtype=self.tfcast_type)# initialize feature maps for our corresponding 3 filters  初始化3个滤波器对应的特征图self.features1 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, input_shape[2], self.num_filters[0]),dtype = self.tfcast_type)self.features2 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, self.num_filters[0], self.num_filters[1]),dtype = self.tfcast_type)self.features3 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, self.num_filters[1], self.num_filters[2]),dtype = self.tfcast_type)# initialize variables for RNN# recall how RNN works from chapter 7self.h = tf.Variable(initial_value=np.zeros((1, self.cell_size)), dtype = self.tfcast_type)# hidden to hidden weight matrix  隐层到隐层的权重矩阵self.rW = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (self.convolution_shape + self.cell_size)),high = np.sqrt(6. / (self.convolution_shape + self.cell_size)),size = (self.convolution_shape, self.cell_size)),dtype = self.tfcast_type)# input to hidden weight matrix  输入层到隐层的权重矩阵self.rU = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (2 * self.cell_size)),high = np.sqrt(6. / (2 * self.cell_size)),size = (self.cell_size, self.cell_size)),dtype = self.tfcast_type)# hiddent to output weight matrix  隐层到输出层的权重矩阵self.rV = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (2 * self.cell_size)),high = np.sqrt(6. / (2 * self.cell_size)),size = (self.cell_size, self.cell_size)),dtype = self.tfcast_type)# bias self.rb = tf.Variable(initial_value = np.zeros(self.cell_size), dtype = self.tfcast_type)self.rc = tf.Variable(initial_value = np.zeros(self.cell_size), dtype = self.tfcast_type)# initialize weights and bias of feed forward network  初始化前馈网络的权重和偏置# weightsself.fW = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (self.cell_size + self.num_actions)),high = np.sqrt(6. / (self.cell_size + self.num_actions)),size = (self.cell_size, self.num_actions)),dtype = self.tfcast_type)# biasself.fb = tf.Variable(initial_value = np.zeros(self.num_actions), dtype = self.tfcast_type)# learning rateself.step_count = tf.Variable(initial_value = 0, dtype = self.tfcast_type)self.learning_rate = tf.train.exponential_decay(self.learning_rate,self.step_count,self.loss_decay_steps,self.loss_decay_steps,staircase = False)# now let us build the network# first convolutional layerself.conv1 = tf.nn.conv2d(input = tf.reshape(self.input, shape = (1, self.input_shape[0], self.input_shape[1], self.input_shape[2])), filter = self.features1, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu1 = tf.nn.relu(self.conv1)self.pool1 = tf.nn.max_pool(self.relu1, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# second convolutional layerself.conv2 = tf.nn.conv2d(input = self.pool1, filter = self.features2, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu2 = tf.nn.relu(self.conv2)self.pool2 = tf.nn.max_pool(self.relu2, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# third convolutional layerself.conv3 = tf.nn.conv2d(input = self.pool2, filter = self.features3, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu3 = tf.nn.relu(self.conv3)self.pool3 = tf.nn.max_pool(self.relu3, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# add dropout and reshape the input  添加dropout并重新整理输入self.drop1 = tf.nn.dropout(self.pool3, self.dropout_probability[0])self.reshaped_input = tf.reshape(self.drop1, shape = [1, -1])# now we build recurrent neural network which takes the input from the last layer of convolutional network  构建RNN,输入来自于卷积网络的最后一层self.h = tf.tanh(tf.matmul(self.reshaped_input, self.rW) + tf.matmul(self.h, self.rU) + self.rb)self.o = tf.nn.softmax(tf.matmul(self.h, self.rV) + self.rc)# add drop out to RNN 对RNN添加退出self.drop2 = tf.nn.dropout(self.o, self.dropout_probability[1])# we feed the result of RNN to the feed forward layer  将RNN的结果馈入前馈层self.output = tf.reshape(tf.matmul(self.drop2, self.fW) + self.fb, shape = [-1, 1])self.prediction = tf.argmax(self.output)# compute loss  计算损失self.loss = tf.reduce_mean(tf.square(self.target_vector - self.output))# we use Adam optimizer for minimizing the errorself.optimizer = tf.train.AdamOptimizer(self.learning_rate)# compute gradients of the loss and update the gradientsself.gradients = self.optimizer.compute_gradients(self.loss)self.update = self.optimizer.apply_gradients(self.gradients)self.parameters = (self.features1, self.features2, self.features3,self.rW, self.rU, self.rV, self.rb, self.rc,self.fW, self.fb)

定义ExperienceReplay类来实现经验回放缓存。在经验回放缓存中保存智能体的所有经验,即状态、行为和奖励,然后采样小批量的经验来训练网络:

class ExperienceReplay():def __init__(self, buffer_size):# buffer for holding the transistion  保存转移信息的缓存self.buffer = []       # size of the buffer  缓存大小self.buffer_size = buffer_size# we remove the old transistion if buffer size has reached it's limit. Think off the buffer as a queue when new# one comes, old one goes off  若缓存达到最大容量,将删除旧的缓存信息。将缓存看成一个队列,先进先出def appendToBuffer(self, memory_tuplet):if len(self.buffer) > self.buffer_size: for i in range(len(self.buffer) - self.buffer_size):self.buffer.remove(self.buffer[0])     self.buffer.append(memory_tuplet)  # define a function called sample for sampling some random n number of transistions   定义一个sample函数来随机采样n个转移信息def sample(self, n):memories = []for i in range(n):memory_index = np.random.randint(0, len(self.buffer))       memories.append(self.buffer[memory_index])return memories

定义train函数来训练网络 :

def train(num_episodes, episode_length, learning_rate, scenario="deathmatch.cfg", map_path='map02', render=False):# discount parameter for Q-value computation   Q值计算的折扣参数discount_factor = .99# frequency for updating the experience in the buffer  缓存中经验信息的更新频率update_frequency = 5store_frequency = 50# for printing the output  打印输出print_frequency = 1000# initialize variables for storing total rewards and total loss  初始化保存总奖励和总损失的变量total_reward = 0total_loss = 0old_q_value = 0# initialize lists for storing the episodic rewards and losses 初始化保存情景奖励和损失的列表rewards = []losses = []# okay, now let us get to the action!# first, we initialize our doomgame environmentgame = DoomGame()# specify the path where our scenario file is located  指定场景文件的存放路径game.set_doom_scenario_path(scenario)# specify the path of map file  指定地图文件的路径game.set_doom_map(map_path)# then we set screen resolution and screen format  设置屏幕分辨率和屏幕的格式game.set_screen_resolution(ScreenResolution.RES_256X160)game.set_screen_format(ScreenFormat.RGB24)# we can add particles and effetcs we needed by simply setting them to true or false  通过简单的设置True或者False来添加所需的粒子和效果game.set_render_hud(False)game.set_render_minimal_hud(False)game.set_render_crosshair(False)game.set_render_weapon(True)game.set_render_decals(False)game.set_render_particles(False)game.set_render_effects_sprites(False)game.set_render_messages(False)game.set_render_corpses(False)game.set_render_screen_flashes(True)# now we will specify buttons that should be available to the agent  设置智能体可用的按钮game.add_available_button(Button.MOVE_LEFT)game.add_available_button(Button.MOVE_RIGHT)game.add_available_button(Button.TURN_LEFT)game.add_available_button(Button.TURN_RIGHT)game.add_available_button(Button.MOVE_FORWARD)game.add_available_button(Button.MOVE_BACKWARD)game.add_available_button(Button.ATTACK)# 在此再添加一个称为delta按钮。上述按钮只能类似于键盘按键,仅具有布尔值。delta按钮将模拟鼠标,具有正负值,这将在探索环境时非常有用game.add_available_button(Button.TURN_LEFT_RIGHT_DELTA, 90)game.add_available_button(Button.LOOK_UP_DOWN_DELTA, 90)# initialize an array for actions  初始化行为数组actions = np.zeros((game.get_available_buttons_size(), game.get_available_buttons_size()))count = 0for i in actions:i[count] = 1count += 1actions = actions.astype(int).tolist()# then we add the game variables, ammo, health, and killcount  添加游戏变量:弹药、生命力和杀死怪兽个数game.add_available_game_variable(GameVariable.AMMO0)game.add_available_game_variable(GameVariable.HEALTH)game.add_available_game_variable(GameVariable.KILLCOUNT)# 设置episode_timeout,在经过一些时间步之后终止情景。# 另外,还设置episode_start_time,这对于省略初始事件非常有用game.set_episode_timeout(6 * episode_length)game.set_episode_start_time(10)game.set_window_visible(render)# 通过设置 set_sound_enable to true为真,可以播放声音 game.set_sound_enabled(False)game.set_sound_enabled(False)# we set living reward to 0 which the agent for each move it does even though the move is not useful# 设存活奖励为0,用于奖励智能体的每次移动,即使该移动无用game.set_living_reward(0)# doom有效具有不同模式,如玩家、观众、非同步玩家、非同步观众# 在观众模式下,是人来操作,而智能体进行学习# 在玩家模式下,智能体将真正玩游戏,因此,在此采用玩家模式game.set_mode(Mode.PLAYER)# okay, So now we, initialize the game environmentgame.init()# 创建一个DRQN类的实例,并创建行为者和目标DRQN网络actionDRQN = DRQN((160, 256, 3), game.get_available_buttons_size() - 2, learning_rate)targetDRQN = DRQN((160, 256, 3), game.get_available_buttons_size() - 2, learning_rate)# 另外,还需创建一个ExperienceReplay类的实例,其缓存大小为1000experiences = ExperienceReplay(1000)# 保存模型saver = tf.train.Saver({v.name: v for v in actionDRQN.parameters}, max_to_keep = 1)# now let us start the training process# 初始化从经验缓存中采样和在经验缓存中保存转移信息的变量sample = 5store = 50# start the tensorflow session(会话)with tf.Session() as sess:# initialize all tensorflow variablessess.run(tf.global_variables_initializer())for episode in range(num_episodes):# start the new episodegame.new_episode()# play the episode till it reaches the episode lengthfor frame in range(episode_length):# get the game statestate = game.get_state()s = state.screen_buffer# select the actiona = actionDRQN.prediction.eval(feed_dict = {actionDRQN.input: s})[0]action = actions[a]# perform the action and store the rewardreward = game.make_action(action)# update total rewadtotal_reward += reward# if the episode is over then breakif game.is_episode_finished():break# store transistion to our experience buffer   将转移信息保存到经验缓存中if (frame % store) == 0:experiences.appendToBuffer((s, action, reward))# sample experience form the experience buffer  从经验缓存中采样经验if (frame % sample) == 0:memory = experiences.sample(1)mem_frame = memory[0][0]mem_reward = memory[0][2]# now, train the networkQ1 = actionDRQN.output.eval(feed_dict = {actionDRQN.input: mem_frame})Q2 = targetDRQN.output.eval(feed_dict = {targetDRQN.input: mem_frame})# set learning ratelearning_rate = actionDRQN.learning_rate.eval()# calculate Q valueQtarget = old_q_value + learning_rate * (mem_reward + discount_factor * Q2 - old_q_value)# update old Q valueold_q_value = Qtarget# compute Lossloss = actionDRQN.loss.eval(feed_dict = {actionDRQN.target_vector: Qtarget, actionDRQN.input: mem_frame})# update total losstotal_loss += loss# update both networksactionDRQN.update.run(feed_dict = {actionDRQN.target_vector: Qtarget, actionDRQN.input: mem_frame})targetDRQN.update.run(feed_dict = {targetDRQN.target_vector: Qtarget, targetDRQN.input: mem_frame})rewards.append((episode, total_reward))losses.append((episode, total_loss))print("Episode %d - Reward = %.3f, Loss = %.3f." % (episode, total_reward, total_loss))total_reward = 0total_loss = 0

训练10000个场景,每个场景的长度为300:

train(num_episodes = 10000, episode_length = 300, learning_rate = 0.01, render = False)

https://github.com/PacktPublishing/Hands-On-Reinforcement-Learning-with-Python/blob/master/Chapter09/9.5%20Doom%20Game%20Using%20DRQN.ipynb

这篇关于深度强化学习之基于DRQN玩Doom游戏的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/183273

相关文章

SpringCloud动态配置注解@RefreshScope与@Component的深度解析

《SpringCloud动态配置注解@RefreshScope与@Component的深度解析》在现代微服务架构中,动态配置管理是一个关键需求,本文将为大家介绍SpringCloud中相关的注解@Re... 目录引言1. @RefreshScope 的作用与原理1.1 什么是 @RefreshScope1.

Python 中的异步与同步深度解析(实践记录)

《Python中的异步与同步深度解析(实践记录)》在Python编程世界里,异步和同步的概念是理解程序执行流程和性能优化的关键,这篇文章将带你深入了解它们的差异,以及阻塞和非阻塞的特性,同时通过实际... 目录python中的异步与同步:深度解析与实践异步与同步的定义异步同步阻塞与非阻塞的概念阻塞非阻塞同步

Redis中高并发读写性能的深度解析与优化

《Redis中高并发读写性能的深度解析与优化》Redis作为一款高性能的内存数据库,广泛应用于缓存、消息队列、实时统计等场景,本文将深入探讨Redis的读写并发能力,感兴趣的小伙伴可以了解下... 目录引言一、Redis 并发能力概述1.1 Redis 的读写性能1.2 影响 Redis 并发能力的因素二、

最新Spring Security实战教程之表单登录定制到处理逻辑的深度改造(最新推荐)

《最新SpringSecurity实战教程之表单登录定制到处理逻辑的深度改造(最新推荐)》本章节介绍了如何通过SpringSecurity实现从配置自定义登录页面、表单登录处理逻辑的配置,并简单模拟... 目录前言改造准备开始登录页改造自定义用户名密码登陆成功失败跳转问题自定义登出前后端分离适配方案结语前言

Java进阶学习之如何开启远程调式

《Java进阶学习之如何开启远程调式》Java开发中的远程调试是一项至关重要的技能,特别是在处理生产环境的问题或者协作开发时,:本文主要介绍Java进阶学习之如何开启远程调式的相关资料,需要的朋友... 目录概述Java远程调试的开启与底层原理开启Java远程调试底层原理JVM参数总结&nbsMbKKXJx

Redis 内存淘汰策略深度解析(最新推荐)

《Redis内存淘汰策略深度解析(最新推荐)》本文详细探讨了Redis的内存淘汰策略、实现原理、适用场景及最佳实践,介绍了八种内存淘汰策略,包括noeviction、LRU、LFU、TTL、Rand... 目录一、 内存淘汰策略概述二、内存淘汰策略详解2.1 ​noeviction(不淘汰)​2.2 ​LR

Python与DeepSeek的深度融合实战

《Python与DeepSeek的深度融合实战》Python作为最受欢迎的编程语言之一,以其简洁易读的语法、丰富的库和广泛的应用场景,成为了无数开发者的首选,而DeepSeek,作为人工智能领域的新星... 目录一、python与DeepSeek的结合优势二、模型训练1. 数据准备2. 模型架构与参数设置3

Java深度学习库DJL实现Python的NumPy方式

《Java深度学习库DJL实现Python的NumPy方式》本文介绍了DJL库的背景和基本功能,包括NDArray的创建、数学运算、数据获取和设置等,同时,还展示了如何使用NDArray进行数据预处理... 目录1 NDArray 的背景介绍1.1 架构2 JavaDJL使用2.1 安装DJL2.2 基本操

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操