深度强化学习之基于DRQN玩Doom游戏

2023-10-10 21:40

本文主要是介绍深度强化学习之基于DRQN玩Doom游戏,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

DRQN

  为何在希望DQN按人类水平玩Atari游戏时需要DRQN?要回答这个问题,首先要了解什么是部分可观测马尔科夫决策过程(POMDP)。当对环境只有有限信息时,该环境就称为POMDP。到目前为止,在前面的内容中,已了解一个完全可观测的MDP是已知所有可能的行为和状态,尽管智能体不知道转移概率和奖励概率,但对环境信息是完全已知的,例如,在建一个冰冻湖环境中,完全已知关于环境的所有状态和行为,那么就可以很容易地将环境建模为一个可观测的MDP。但大多数真实世界中的环境只能部分可观测,不能观测到所有状态。假设智能体要在真实世界环境中学习行走,显然,智能体不具备环境的完备信息。在POMDP中,状态只提供部分信息,但在过去的状态中保留的信息可有助于智能体更好地理解环境特征,从而改进策略。因此,在POMDP中,需要保留先前状态的相关信息,以采取最佳行为。

  因此,通过增加LSTM层来改进DQN架构,以更好地理解先前信息。在DQN架构中,用LSTM RNN替代第一个后卷积全连接层。通过这种方式,还可以解决部分可观测问题,因为现在的智能体具有记忆过去状态的能力,从而可以改进策略。

DRQN架构

与DQN非常类似,只是用LSTM RNN替代了第一个后卷积全连接层

在这里插入图片描述
  这时,将游戏画面作为卷积层的输入。卷积层对图像进行卷积运算,并产生特征图。然后,所得到的特征图传递到LSTM层。LSTM层具有保存信息的记忆功能。在LSTM层保留有关先前游戏状态的重要信息,并根据需要随时间更新其记忆。经过一个全连接层后输出Q值。因此,与DQN不同,无需直接估计 Q ( s t , a t ) Q(s_t,a_t) Q(stat),而是估计 Q ( h t , a t ) Q(h_t,a_t) Q(htat),其中, h t h_t ht是由网络在上一时间步返回的输入,即 h t = L S T M ( h t − 1 , a t ) h_t=LSTM(h_{t-1},a_t) ht=LSTM(ht1at)。由于是使用RNN,因此是通过基于时间的反向传播来训练网络的。

  那么经验回放缓存会是什么情况呢?在DQN中,为避免经验关联,采用了经验回放来保存游戏状态转移信息,并使用随机的批量经验来训练网络。在DRQN情况下,是将整个场景保存在经验缓存中,并从随机批量场景中随机采样n个时间步。这样,就既能适应随机性,又能获得另一种经验。

训练一个玩Doom游戏的智能体
在这里插入图片描述
基本的Doom游戏

加载必要的库:

from vizdoom import *
import random
import time

创建一个DoomGame实例:

game = DoomGame()

已知VIZDoom提供大量的Doom场景,在此加载一个基本场景:

game.load_config("basic.cfg")

通过init()方法初始化包含场景的游戏:

game.init()

定义一个热编码actions:

shoot = [0, 0, 1]
left = [1, 0, 0]
right = [0, 1, 0]
actions = [shoot, left, right]

开始游戏:

no_of_episodes = 10for i in range(no_of_episodes):     # for each episode start the gamegame.new_episode()# loop until the episode is overwhile not game.is_episode_finished():# get the game statestate = game.get_state()img = state.screen_buffer# get the game variablesmisc = state.game_variables# perform some action randomly and receuve reward 执行某一随机的行为并且获得奖励reward = game.make_action(random.choice(actions))        print(reward)# we will set some time before starting the next epiosdetime.sleep(2)

基于DRQN的Doom游戏

成功杀死怪物会得到正面奖励,而丧命、自杀和无弹药会得到负面奖励

First let us import all necessary libraries

import tensorflow as tf
import numpy as np
import math
from vizdoom import *
import timeit
import math
import os
import sys

接下来,定义function get_input_shape 函数来计算经卷积层卷积运算后输入图像的最终形式:

def get_input_shape(Image,Filter,Stride):layer1 = math.ceil(((Image - Filter + 1) / Stride)) o1 = math.ceil((layer1 / Stride))    layer2 = math.ceil(((o1 - Filter + 1) / Stride))    o2 = math.ceil((layer2 / Stride))    layer3 = math.ceil(((o2 - Filter + 1) / Stride))    o3 = math.ceil((layer3  / Stride))return int(o3)

这时,定义实现DRQN算法的DRQN

class DRQN():def __init__(self, input_shape, num_actions, inital_learning_rate):# first, we initialize all the hyperparameters(超参数)self.tfcast_type = tf.float32# shape of our input which would be (length, width, channels)(长度、宽度、通道)self.input_shape = input_shape# number of actions in the environmentself.num_actions = num_actions# learning rate for the neural networkself.learning_rate = inital_learning_rate# now we will define the hyperparameters of the convolutional neural network# filter size  滤波器的大小self.filter_size = 5# number of filters  滤波器的个数self.num_filters = [16, 32, 64]# stride size  步幅大小self.stride = 2# pool size    池化层大小self.poolsize = 2# shape of our convolutional layer  卷积层形状self.convolution_shape = get_input_shape(input_shape[0], self.filter_size, self.stride) * get_input_shape(input_shape[1], self.filter_size, self.stride) * self.num_filters[2]# now we define the hyperparameters of our recurrent neural network and the final feed forward layer 定义RNN和最后前馈层的超参数# number of neuronsself.cell_size = 100# number of hidden layersself.hidden_layer = 50# drop out probabilityself.dropout_probability = [0.3, 0.2]# hyperparameters for optimization  优化超参数self.loss_decay_rate = 0.96self.loss_decay_steps = 180# initialize all the variables for the CNN 初始化CNN所有变量# we initialize the placeholder for input whose shape would be (length, width, channel)  初始化形式为(长度、宽度、通道)的输入占位符self.input = tf.placeholder(shape=(self.input_shape[0], self.input_shape[1], self.input_shape[2]), dtype=self.tfcast_type)# we will also initialize the shape of the target vector whose shape is equal to the number of actions  初始化目标向量的形式,其形式与行为个数相同 self.target_vector = tf.placeholder(shape=(self.num_actions, 1), dtype=self.tfcast_type)# initialize feature maps for our corresponding 3 filters  初始化3个滤波器对应的特征图self.features1 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, input_shape[2], self.num_filters[0]),dtype = self.tfcast_type)self.features2 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, self.num_filters[0], self.num_filters[1]),dtype = self.tfcast_type)self.features3 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, self.num_filters[1], self.num_filters[2]),dtype = self.tfcast_type)# initialize variables for RNN# recall how RNN works from chapter 7self.h = tf.Variable(initial_value=np.zeros((1, self.cell_size)), dtype = self.tfcast_type)# hidden to hidden weight matrix  隐层到隐层的权重矩阵self.rW = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (self.convolution_shape + self.cell_size)),high = np.sqrt(6. / (self.convolution_shape + self.cell_size)),size = (self.convolution_shape, self.cell_size)),dtype = self.tfcast_type)# input to hidden weight matrix  输入层到隐层的权重矩阵self.rU = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (2 * self.cell_size)),high = np.sqrt(6. / (2 * self.cell_size)),size = (self.cell_size, self.cell_size)),dtype = self.tfcast_type)# hiddent to output weight matrix  隐层到输出层的权重矩阵self.rV = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (2 * self.cell_size)),high = np.sqrt(6. / (2 * self.cell_size)),size = (self.cell_size, self.cell_size)),dtype = self.tfcast_type)# bias self.rb = tf.Variable(initial_value = np.zeros(self.cell_size), dtype = self.tfcast_type)self.rc = tf.Variable(initial_value = np.zeros(self.cell_size), dtype = self.tfcast_type)# initialize weights and bias of feed forward network  初始化前馈网络的权重和偏置# weightsself.fW = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (self.cell_size + self.num_actions)),high = np.sqrt(6. / (self.cell_size + self.num_actions)),size = (self.cell_size, self.num_actions)),dtype = self.tfcast_type)# biasself.fb = tf.Variable(initial_value = np.zeros(self.num_actions), dtype = self.tfcast_type)# learning rateself.step_count = tf.Variable(initial_value = 0, dtype = self.tfcast_type)self.learning_rate = tf.train.exponential_decay(self.learning_rate,self.step_count,self.loss_decay_steps,self.loss_decay_steps,staircase = False)# now let us build the network# first convolutional layerself.conv1 = tf.nn.conv2d(input = tf.reshape(self.input, shape = (1, self.input_shape[0], self.input_shape[1], self.input_shape[2])), filter = self.features1, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu1 = tf.nn.relu(self.conv1)self.pool1 = tf.nn.max_pool(self.relu1, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# second convolutional layerself.conv2 = tf.nn.conv2d(input = self.pool1, filter = self.features2, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu2 = tf.nn.relu(self.conv2)self.pool2 = tf.nn.max_pool(self.relu2, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# third convolutional layerself.conv3 = tf.nn.conv2d(input = self.pool2, filter = self.features3, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu3 = tf.nn.relu(self.conv3)self.pool3 = tf.nn.max_pool(self.relu3, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# add dropout and reshape the input  添加dropout并重新整理输入self.drop1 = tf.nn.dropout(self.pool3, self.dropout_probability[0])self.reshaped_input = tf.reshape(self.drop1, shape = [1, -1])# now we build recurrent neural network which takes the input from the last layer of convolutional network  构建RNN,输入来自于卷积网络的最后一层self.h = tf.tanh(tf.matmul(self.reshaped_input, self.rW) + tf.matmul(self.h, self.rU) + self.rb)self.o = tf.nn.softmax(tf.matmul(self.h, self.rV) + self.rc)# add drop out to RNN 对RNN添加退出self.drop2 = tf.nn.dropout(self.o, self.dropout_probability[1])# we feed the result of RNN to the feed forward layer  将RNN的结果馈入前馈层self.output = tf.reshape(tf.matmul(self.drop2, self.fW) + self.fb, shape = [-1, 1])self.prediction = tf.argmax(self.output)# compute loss  计算损失self.loss = tf.reduce_mean(tf.square(self.target_vector - self.output))# we use Adam optimizer for minimizing the errorself.optimizer = tf.train.AdamOptimizer(self.learning_rate)# compute gradients of the loss and update the gradientsself.gradients = self.optimizer.compute_gradients(self.loss)self.update = self.optimizer.apply_gradients(self.gradients)self.parameters = (self.features1, self.features2, self.features3,self.rW, self.rU, self.rV, self.rb, self.rc,self.fW, self.fb)

定义ExperienceReplay类来实现经验回放缓存。在经验回放缓存中保存智能体的所有经验,即状态、行为和奖励,然后采样小批量的经验来训练网络:

class ExperienceReplay():def __init__(self, buffer_size):# buffer for holding the transistion  保存转移信息的缓存self.buffer = []       # size of the buffer  缓存大小self.buffer_size = buffer_size# we remove the old transistion if buffer size has reached it's limit. Think off the buffer as a queue when new# one comes, old one goes off  若缓存达到最大容量,将删除旧的缓存信息。将缓存看成一个队列,先进先出def appendToBuffer(self, memory_tuplet):if len(self.buffer) > self.buffer_size: for i in range(len(self.buffer) - self.buffer_size):self.buffer.remove(self.buffer[0])     self.buffer.append(memory_tuplet)  # define a function called sample for sampling some random n number of transistions   定义一个sample函数来随机采样n个转移信息def sample(self, n):memories = []for i in range(n):memory_index = np.random.randint(0, len(self.buffer))       memories.append(self.buffer[memory_index])return memories

定义train函数来训练网络 :

def train(num_episodes, episode_length, learning_rate, scenario="deathmatch.cfg", map_path='map02', render=False):# discount parameter for Q-value computation   Q值计算的折扣参数discount_factor = .99# frequency for updating the experience in the buffer  缓存中经验信息的更新频率update_frequency = 5store_frequency = 50# for printing the output  打印输出print_frequency = 1000# initialize variables for storing total rewards and total loss  初始化保存总奖励和总损失的变量total_reward = 0total_loss = 0old_q_value = 0# initialize lists for storing the episodic rewards and losses 初始化保存情景奖励和损失的列表rewards = []losses = []# okay, now let us get to the action!# first, we initialize our doomgame environmentgame = DoomGame()# specify the path where our scenario file is located  指定场景文件的存放路径game.set_doom_scenario_path(scenario)# specify the path of map file  指定地图文件的路径game.set_doom_map(map_path)# then we set screen resolution and screen format  设置屏幕分辨率和屏幕的格式game.set_screen_resolution(ScreenResolution.RES_256X160)game.set_screen_format(ScreenFormat.RGB24)# we can add particles and effetcs we needed by simply setting them to true or false  通过简单的设置True或者False来添加所需的粒子和效果game.set_render_hud(False)game.set_render_minimal_hud(False)game.set_render_crosshair(False)game.set_render_weapon(True)game.set_render_decals(False)game.set_render_particles(False)game.set_render_effects_sprites(False)game.set_render_messages(False)game.set_render_corpses(False)game.set_render_screen_flashes(True)# now we will specify buttons that should be available to the agent  设置智能体可用的按钮game.add_available_button(Button.MOVE_LEFT)game.add_available_button(Button.MOVE_RIGHT)game.add_available_button(Button.TURN_LEFT)game.add_available_button(Button.TURN_RIGHT)game.add_available_button(Button.MOVE_FORWARD)game.add_available_button(Button.MOVE_BACKWARD)game.add_available_button(Button.ATTACK)# 在此再添加一个称为delta按钮。上述按钮只能类似于键盘按键,仅具有布尔值。delta按钮将模拟鼠标,具有正负值,这将在探索环境时非常有用game.add_available_button(Button.TURN_LEFT_RIGHT_DELTA, 90)game.add_available_button(Button.LOOK_UP_DOWN_DELTA, 90)# initialize an array for actions  初始化行为数组actions = np.zeros((game.get_available_buttons_size(), game.get_available_buttons_size()))count = 0for i in actions:i[count] = 1count += 1actions = actions.astype(int).tolist()# then we add the game variables, ammo, health, and killcount  添加游戏变量:弹药、生命力和杀死怪兽个数game.add_available_game_variable(GameVariable.AMMO0)game.add_available_game_variable(GameVariable.HEALTH)game.add_available_game_variable(GameVariable.KILLCOUNT)# 设置episode_timeout,在经过一些时间步之后终止情景。# 另外,还设置episode_start_time,这对于省略初始事件非常有用game.set_episode_timeout(6 * episode_length)game.set_episode_start_time(10)game.set_window_visible(render)# 通过设置 set_sound_enable to true为真,可以播放声音 game.set_sound_enabled(False)game.set_sound_enabled(False)# we set living reward to 0 which the agent for each move it does even though the move is not useful# 设存活奖励为0,用于奖励智能体的每次移动,即使该移动无用game.set_living_reward(0)# doom有效具有不同模式,如玩家、观众、非同步玩家、非同步观众# 在观众模式下,是人来操作,而智能体进行学习# 在玩家模式下,智能体将真正玩游戏,因此,在此采用玩家模式game.set_mode(Mode.PLAYER)# okay, So now we, initialize the game environmentgame.init()# 创建一个DRQN类的实例,并创建行为者和目标DRQN网络actionDRQN = DRQN((160, 256, 3), game.get_available_buttons_size() - 2, learning_rate)targetDRQN = DRQN((160, 256, 3), game.get_available_buttons_size() - 2, learning_rate)# 另外,还需创建一个ExperienceReplay类的实例,其缓存大小为1000experiences = ExperienceReplay(1000)# 保存模型saver = tf.train.Saver({v.name: v for v in actionDRQN.parameters}, max_to_keep = 1)# now let us start the training process# 初始化从经验缓存中采样和在经验缓存中保存转移信息的变量sample = 5store = 50# start the tensorflow session(会话)with tf.Session() as sess:# initialize all tensorflow variablessess.run(tf.global_variables_initializer())for episode in range(num_episodes):# start the new episodegame.new_episode()# play the episode till it reaches the episode lengthfor frame in range(episode_length):# get the game statestate = game.get_state()s = state.screen_buffer# select the actiona = actionDRQN.prediction.eval(feed_dict = {actionDRQN.input: s})[0]action = actions[a]# perform the action and store the rewardreward = game.make_action(action)# update total rewadtotal_reward += reward# if the episode is over then breakif game.is_episode_finished():break# store transistion to our experience buffer   将转移信息保存到经验缓存中if (frame % store) == 0:experiences.appendToBuffer((s, action, reward))# sample experience form the experience buffer  从经验缓存中采样经验if (frame % sample) == 0:memory = experiences.sample(1)mem_frame = memory[0][0]mem_reward = memory[0][2]# now, train the networkQ1 = actionDRQN.output.eval(feed_dict = {actionDRQN.input: mem_frame})Q2 = targetDRQN.output.eval(feed_dict = {targetDRQN.input: mem_frame})# set learning ratelearning_rate = actionDRQN.learning_rate.eval()# calculate Q valueQtarget = old_q_value + learning_rate * (mem_reward + discount_factor * Q2 - old_q_value)# update old Q valueold_q_value = Qtarget# compute Lossloss = actionDRQN.loss.eval(feed_dict = {actionDRQN.target_vector: Qtarget, actionDRQN.input: mem_frame})# update total losstotal_loss += loss# update both networksactionDRQN.update.run(feed_dict = {actionDRQN.target_vector: Qtarget, actionDRQN.input: mem_frame})targetDRQN.update.run(feed_dict = {targetDRQN.target_vector: Qtarget, targetDRQN.input: mem_frame})rewards.append((episode, total_reward))losses.append((episode, total_loss))print("Episode %d - Reward = %.3f, Loss = %.3f." % (episode, total_reward, total_loss))total_reward = 0total_loss = 0

训练10000个场景,每个场景的长度为300:

train(num_episodes = 10000, episode_length = 300, learning_rate = 0.01, render = False)

https://github.com/PacktPublishing/Hands-On-Reinforcement-Learning-with-Python/blob/master/Chapter09/9.5%20Doom%20Game%20Using%20DRQN.ipynb

这篇关于深度强化学习之基于DRQN玩Doom游戏的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/183273

相关文章

Java深度学习库DJL实现Python的NumPy方式

《Java深度学习库DJL实现Python的NumPy方式》本文介绍了DJL库的背景和基本功能,包括NDArray的创建、数学运算、数据获取和设置等,同时,还展示了如何使用NDArray进行数据预处理... 目录1 NDArray 的背景介绍1.1 架构2 JavaDJL使用2.1 安装DJL2.2 基本操

最长公共子序列问题的深度分析与Java实现方式

《最长公共子序列问题的深度分析与Java实现方式》本文详细介绍了最长公共子序列(LCS)问题,包括其概念、暴力解法、动态规划解法,并提供了Java代码实现,暴力解法虽然简单,但在大数据处理中效率较低,... 目录最长公共子序列问题概述问题理解与示例分析暴力解法思路与示例代码动态规划解法DP 表的构建与意义动

Go中sync.Once源码的深度讲解

《Go中sync.Once源码的深度讲解》sync.Once是Go语言标准库中的一个同步原语,用于确保某个操作只执行一次,本文将从源码出发为大家详细介绍一下sync.Once的具体使用,x希望对大家有... 目录概念简单示例源码解读总结概念sync.Once是Go语言标准库中的一个同步原语,用于确保某个操

五大特性引领创新! 深度操作系统 deepin 25 Preview预览版发布

《五大特性引领创新!深度操作系统deepin25Preview预览版发布》今日,深度操作系统正式推出deepin25Preview版本,该版本集成了五大核心特性:磐石系统、全新DDE、Tr... 深度操作系统今日发布了 deepin 25 Preview,新版本囊括五大特性:磐石系统、全新 DDE、Tree

Node.js 中 http 模块的深度剖析与实战应用小结

《Node.js中http模块的深度剖析与实战应用小结》本文详细介绍了Node.js中的http模块,从创建HTTP服务器、处理请求与响应,到获取请求参数,每个环节都通过代码示例进行解析,旨在帮... 目录Node.js 中 http 模块的深度剖析与实战应用一、引言二、创建 HTTP 服务器:基石搭建(一

Python开发围棋游戏的实例代码(实现全部功能)

《Python开发围棋游戏的实例代码(实现全部功能)》围棋是一种古老而复杂的策略棋类游戏,起源于中国,已有超过2500年的历史,本文介绍了如何用Python开发一个简单的围棋游戏,实例代码涵盖了游戏的... 目录1. 围棋游戏概述1.1 游戏规则1.2 游戏设计思路2. 环境准备3. 创建棋盘3.1 棋盘类

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;