深度强化学习之基于DRQN玩Doom游戏

2023-10-10 21:40

本文主要是介绍深度强化学习之基于DRQN玩Doom游戏,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

DRQN

  为何在希望DQN按人类水平玩Atari游戏时需要DRQN?要回答这个问题,首先要了解什么是部分可观测马尔科夫决策过程(POMDP)。当对环境只有有限信息时,该环境就称为POMDP。到目前为止,在前面的内容中,已了解一个完全可观测的MDP是已知所有可能的行为和状态,尽管智能体不知道转移概率和奖励概率,但对环境信息是完全已知的,例如,在建一个冰冻湖环境中,完全已知关于环境的所有状态和行为,那么就可以很容易地将环境建模为一个可观测的MDP。但大多数真实世界中的环境只能部分可观测,不能观测到所有状态。假设智能体要在真实世界环境中学习行走,显然,智能体不具备环境的完备信息。在POMDP中,状态只提供部分信息,但在过去的状态中保留的信息可有助于智能体更好地理解环境特征,从而改进策略。因此,在POMDP中,需要保留先前状态的相关信息,以采取最佳行为。

  因此,通过增加LSTM层来改进DQN架构,以更好地理解先前信息。在DQN架构中,用LSTM RNN替代第一个后卷积全连接层。通过这种方式,还可以解决部分可观测问题,因为现在的智能体具有记忆过去状态的能力,从而可以改进策略。

DRQN架构

与DQN非常类似,只是用LSTM RNN替代了第一个后卷积全连接层

在这里插入图片描述
  这时,将游戏画面作为卷积层的输入。卷积层对图像进行卷积运算,并产生特征图。然后,所得到的特征图传递到LSTM层。LSTM层具有保存信息的记忆功能。在LSTM层保留有关先前游戏状态的重要信息,并根据需要随时间更新其记忆。经过一个全连接层后输出Q值。因此,与DQN不同,无需直接估计 Q ( s t , a t ) Q(s_t,a_t) Q(stat),而是估计 Q ( h t , a t ) Q(h_t,a_t) Q(htat),其中, h t h_t ht是由网络在上一时间步返回的输入,即 h t = L S T M ( h t − 1 , a t ) h_t=LSTM(h_{t-1},a_t) ht=LSTM(ht1at)。由于是使用RNN,因此是通过基于时间的反向传播来训练网络的。

  那么经验回放缓存会是什么情况呢?在DQN中,为避免经验关联,采用了经验回放来保存游戏状态转移信息,并使用随机的批量经验来训练网络。在DRQN情况下,是将整个场景保存在经验缓存中,并从随机批量场景中随机采样n个时间步。这样,就既能适应随机性,又能获得另一种经验。

训练一个玩Doom游戏的智能体
在这里插入图片描述
基本的Doom游戏

加载必要的库:

from vizdoom import *
import random
import time

创建一个DoomGame实例:

game = DoomGame()

已知VIZDoom提供大量的Doom场景,在此加载一个基本场景:

game.load_config("basic.cfg")

通过init()方法初始化包含场景的游戏:

game.init()

定义一个热编码actions:

shoot = [0, 0, 1]
left = [1, 0, 0]
right = [0, 1, 0]
actions = [shoot, left, right]

开始游戏:

no_of_episodes = 10for i in range(no_of_episodes):     # for each episode start the gamegame.new_episode()# loop until the episode is overwhile not game.is_episode_finished():# get the game statestate = game.get_state()img = state.screen_buffer# get the game variablesmisc = state.game_variables# perform some action randomly and receuve reward 执行某一随机的行为并且获得奖励reward = game.make_action(random.choice(actions))        print(reward)# we will set some time before starting the next epiosdetime.sleep(2)

基于DRQN的Doom游戏

成功杀死怪物会得到正面奖励,而丧命、自杀和无弹药会得到负面奖励

First let us import all necessary libraries

import tensorflow as tf
import numpy as np
import math
from vizdoom import *
import timeit
import math
import os
import sys

接下来,定义function get_input_shape 函数来计算经卷积层卷积运算后输入图像的最终形式:

def get_input_shape(Image,Filter,Stride):layer1 = math.ceil(((Image - Filter + 1) / Stride)) o1 = math.ceil((layer1 / Stride))    layer2 = math.ceil(((o1 - Filter + 1) / Stride))    o2 = math.ceil((layer2 / Stride))    layer3 = math.ceil(((o2 - Filter + 1) / Stride))    o3 = math.ceil((layer3  / Stride))return int(o3)

这时,定义实现DRQN算法的DRQN

class DRQN():def __init__(self, input_shape, num_actions, inital_learning_rate):# first, we initialize all the hyperparameters(超参数)self.tfcast_type = tf.float32# shape of our input which would be (length, width, channels)(长度、宽度、通道)self.input_shape = input_shape# number of actions in the environmentself.num_actions = num_actions# learning rate for the neural networkself.learning_rate = inital_learning_rate# now we will define the hyperparameters of the convolutional neural network# filter size  滤波器的大小self.filter_size = 5# number of filters  滤波器的个数self.num_filters = [16, 32, 64]# stride size  步幅大小self.stride = 2# pool size    池化层大小self.poolsize = 2# shape of our convolutional layer  卷积层形状self.convolution_shape = get_input_shape(input_shape[0], self.filter_size, self.stride) * get_input_shape(input_shape[1], self.filter_size, self.stride) * self.num_filters[2]# now we define the hyperparameters of our recurrent neural network and the final feed forward layer 定义RNN和最后前馈层的超参数# number of neuronsself.cell_size = 100# number of hidden layersself.hidden_layer = 50# drop out probabilityself.dropout_probability = [0.3, 0.2]# hyperparameters for optimization  优化超参数self.loss_decay_rate = 0.96self.loss_decay_steps = 180# initialize all the variables for the CNN 初始化CNN所有变量# we initialize the placeholder for input whose shape would be (length, width, channel)  初始化形式为(长度、宽度、通道)的输入占位符self.input = tf.placeholder(shape=(self.input_shape[0], self.input_shape[1], self.input_shape[2]), dtype=self.tfcast_type)# we will also initialize the shape of the target vector whose shape is equal to the number of actions  初始化目标向量的形式,其形式与行为个数相同 self.target_vector = tf.placeholder(shape=(self.num_actions, 1), dtype=self.tfcast_type)# initialize feature maps for our corresponding 3 filters  初始化3个滤波器对应的特征图self.features1 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, input_shape[2], self.num_filters[0]),dtype = self.tfcast_type)self.features2 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, self.num_filters[0], self.num_filters[1]),dtype = self.tfcast_type)self.features3 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, self.num_filters[1], self.num_filters[2]),dtype = self.tfcast_type)# initialize variables for RNN# recall how RNN works from chapter 7self.h = tf.Variable(initial_value=np.zeros((1, self.cell_size)), dtype = self.tfcast_type)# hidden to hidden weight matrix  隐层到隐层的权重矩阵self.rW = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (self.convolution_shape + self.cell_size)),high = np.sqrt(6. / (self.convolution_shape + self.cell_size)),size = (self.convolution_shape, self.cell_size)),dtype = self.tfcast_type)# input to hidden weight matrix  输入层到隐层的权重矩阵self.rU = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (2 * self.cell_size)),high = np.sqrt(6. / (2 * self.cell_size)),size = (self.cell_size, self.cell_size)),dtype = self.tfcast_type)# hiddent to output weight matrix  隐层到输出层的权重矩阵self.rV = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (2 * self.cell_size)),high = np.sqrt(6. / (2 * self.cell_size)),size = (self.cell_size, self.cell_size)),dtype = self.tfcast_type)# bias self.rb = tf.Variable(initial_value = np.zeros(self.cell_size), dtype = self.tfcast_type)self.rc = tf.Variable(initial_value = np.zeros(self.cell_size), dtype = self.tfcast_type)# initialize weights and bias of feed forward network  初始化前馈网络的权重和偏置# weightsself.fW = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (self.cell_size + self.num_actions)),high = np.sqrt(6. / (self.cell_size + self.num_actions)),size = (self.cell_size, self.num_actions)),dtype = self.tfcast_type)# biasself.fb = tf.Variable(initial_value = np.zeros(self.num_actions), dtype = self.tfcast_type)# learning rateself.step_count = tf.Variable(initial_value = 0, dtype = self.tfcast_type)self.learning_rate = tf.train.exponential_decay(self.learning_rate,self.step_count,self.loss_decay_steps,self.loss_decay_steps,staircase = False)# now let us build the network# first convolutional layerself.conv1 = tf.nn.conv2d(input = tf.reshape(self.input, shape = (1, self.input_shape[0], self.input_shape[1], self.input_shape[2])), filter = self.features1, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu1 = tf.nn.relu(self.conv1)self.pool1 = tf.nn.max_pool(self.relu1, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# second convolutional layerself.conv2 = tf.nn.conv2d(input = self.pool1, filter = self.features2, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu2 = tf.nn.relu(self.conv2)self.pool2 = tf.nn.max_pool(self.relu2, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# third convolutional layerself.conv3 = tf.nn.conv2d(input = self.pool2, filter = self.features3, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu3 = tf.nn.relu(self.conv3)self.pool3 = tf.nn.max_pool(self.relu3, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# add dropout and reshape the input  添加dropout并重新整理输入self.drop1 = tf.nn.dropout(self.pool3, self.dropout_probability[0])self.reshaped_input = tf.reshape(self.drop1, shape = [1, -1])# now we build recurrent neural network which takes the input from the last layer of convolutional network  构建RNN,输入来自于卷积网络的最后一层self.h = tf.tanh(tf.matmul(self.reshaped_input, self.rW) + tf.matmul(self.h, self.rU) + self.rb)self.o = tf.nn.softmax(tf.matmul(self.h, self.rV) + self.rc)# add drop out to RNN 对RNN添加退出self.drop2 = tf.nn.dropout(self.o, self.dropout_probability[1])# we feed the result of RNN to the feed forward layer  将RNN的结果馈入前馈层self.output = tf.reshape(tf.matmul(self.drop2, self.fW) + self.fb, shape = [-1, 1])self.prediction = tf.argmax(self.output)# compute loss  计算损失self.loss = tf.reduce_mean(tf.square(self.target_vector - self.output))# we use Adam optimizer for minimizing the errorself.optimizer = tf.train.AdamOptimizer(self.learning_rate)# compute gradients of the loss and update the gradientsself.gradients = self.optimizer.compute_gradients(self.loss)self.update = self.optimizer.apply_gradients(self.gradients)self.parameters = (self.features1, self.features2, self.features3,self.rW, self.rU, self.rV, self.rb, self.rc,self.fW, self.fb)

定义ExperienceReplay类来实现经验回放缓存。在经验回放缓存中保存智能体的所有经验,即状态、行为和奖励,然后采样小批量的经验来训练网络:

class ExperienceReplay():def __init__(self, buffer_size):# buffer for holding the transistion  保存转移信息的缓存self.buffer = []       # size of the buffer  缓存大小self.buffer_size = buffer_size# we remove the old transistion if buffer size has reached it's limit. Think off the buffer as a queue when new# one comes, old one goes off  若缓存达到最大容量,将删除旧的缓存信息。将缓存看成一个队列,先进先出def appendToBuffer(self, memory_tuplet):if len(self.buffer) > self.buffer_size: for i in range(len(self.buffer) - self.buffer_size):self.buffer.remove(self.buffer[0])     self.buffer.append(memory_tuplet)  # define a function called sample for sampling some random n number of transistions   定义一个sample函数来随机采样n个转移信息def sample(self, n):memories = []for i in range(n):memory_index = np.random.randint(0, len(self.buffer))       memories.append(self.buffer[memory_index])return memories

定义train函数来训练网络 :

def train(num_episodes, episode_length, learning_rate, scenario="deathmatch.cfg", map_path='map02', render=False):# discount parameter for Q-value computation   Q值计算的折扣参数discount_factor = .99# frequency for updating the experience in the buffer  缓存中经验信息的更新频率update_frequency = 5store_frequency = 50# for printing the output  打印输出print_frequency = 1000# initialize variables for storing total rewards and total loss  初始化保存总奖励和总损失的变量total_reward = 0total_loss = 0old_q_value = 0# initialize lists for storing the episodic rewards and losses 初始化保存情景奖励和损失的列表rewards = []losses = []# okay, now let us get to the action!# first, we initialize our doomgame environmentgame = DoomGame()# specify the path where our scenario file is located  指定场景文件的存放路径game.set_doom_scenario_path(scenario)# specify the path of map file  指定地图文件的路径game.set_doom_map(map_path)# then we set screen resolution and screen format  设置屏幕分辨率和屏幕的格式game.set_screen_resolution(ScreenResolution.RES_256X160)game.set_screen_format(ScreenFormat.RGB24)# we can add particles and effetcs we needed by simply setting them to true or false  通过简单的设置True或者False来添加所需的粒子和效果game.set_render_hud(False)game.set_render_minimal_hud(False)game.set_render_crosshair(False)game.set_render_weapon(True)game.set_render_decals(False)game.set_render_particles(False)game.set_render_effects_sprites(False)game.set_render_messages(False)game.set_render_corpses(False)game.set_render_screen_flashes(True)# now we will specify buttons that should be available to the agent  设置智能体可用的按钮game.add_available_button(Button.MOVE_LEFT)game.add_available_button(Button.MOVE_RIGHT)game.add_available_button(Button.TURN_LEFT)game.add_available_button(Button.TURN_RIGHT)game.add_available_button(Button.MOVE_FORWARD)game.add_available_button(Button.MOVE_BACKWARD)game.add_available_button(Button.ATTACK)# 在此再添加一个称为delta按钮。上述按钮只能类似于键盘按键,仅具有布尔值。delta按钮将模拟鼠标,具有正负值,这将在探索环境时非常有用game.add_available_button(Button.TURN_LEFT_RIGHT_DELTA, 90)game.add_available_button(Button.LOOK_UP_DOWN_DELTA, 90)# initialize an array for actions  初始化行为数组actions = np.zeros((game.get_available_buttons_size(), game.get_available_buttons_size()))count = 0for i in actions:i[count] = 1count += 1actions = actions.astype(int).tolist()# then we add the game variables, ammo, health, and killcount  添加游戏变量:弹药、生命力和杀死怪兽个数game.add_available_game_variable(GameVariable.AMMO0)game.add_available_game_variable(GameVariable.HEALTH)game.add_available_game_variable(GameVariable.KILLCOUNT)# 设置episode_timeout,在经过一些时间步之后终止情景。# 另外,还设置episode_start_time,这对于省略初始事件非常有用game.set_episode_timeout(6 * episode_length)game.set_episode_start_time(10)game.set_window_visible(render)# 通过设置 set_sound_enable to true为真,可以播放声音 game.set_sound_enabled(False)game.set_sound_enabled(False)# we set living reward to 0 which the agent for each move it does even though the move is not useful# 设存活奖励为0,用于奖励智能体的每次移动,即使该移动无用game.set_living_reward(0)# doom有效具有不同模式,如玩家、观众、非同步玩家、非同步观众# 在观众模式下,是人来操作,而智能体进行学习# 在玩家模式下,智能体将真正玩游戏,因此,在此采用玩家模式game.set_mode(Mode.PLAYER)# okay, So now we, initialize the game environmentgame.init()# 创建一个DRQN类的实例,并创建行为者和目标DRQN网络actionDRQN = DRQN((160, 256, 3), game.get_available_buttons_size() - 2, learning_rate)targetDRQN = DRQN((160, 256, 3), game.get_available_buttons_size() - 2, learning_rate)# 另外,还需创建一个ExperienceReplay类的实例,其缓存大小为1000experiences = ExperienceReplay(1000)# 保存模型saver = tf.train.Saver({v.name: v for v in actionDRQN.parameters}, max_to_keep = 1)# now let us start the training process# 初始化从经验缓存中采样和在经验缓存中保存转移信息的变量sample = 5store = 50# start the tensorflow session(会话)with tf.Session() as sess:# initialize all tensorflow variablessess.run(tf.global_variables_initializer())for episode in range(num_episodes):# start the new episodegame.new_episode()# play the episode till it reaches the episode lengthfor frame in range(episode_length):# get the game statestate = game.get_state()s = state.screen_buffer# select the actiona = actionDRQN.prediction.eval(feed_dict = {actionDRQN.input: s})[0]action = actions[a]# perform the action and store the rewardreward = game.make_action(action)# update total rewadtotal_reward += reward# if the episode is over then breakif game.is_episode_finished():break# store transistion to our experience buffer   将转移信息保存到经验缓存中if (frame % store) == 0:experiences.appendToBuffer((s, action, reward))# sample experience form the experience buffer  从经验缓存中采样经验if (frame % sample) == 0:memory = experiences.sample(1)mem_frame = memory[0][0]mem_reward = memory[0][2]# now, train the networkQ1 = actionDRQN.output.eval(feed_dict = {actionDRQN.input: mem_frame})Q2 = targetDRQN.output.eval(feed_dict = {targetDRQN.input: mem_frame})# set learning ratelearning_rate = actionDRQN.learning_rate.eval()# calculate Q valueQtarget = old_q_value + learning_rate * (mem_reward + discount_factor * Q2 - old_q_value)# update old Q valueold_q_value = Qtarget# compute Lossloss = actionDRQN.loss.eval(feed_dict = {actionDRQN.target_vector: Qtarget, actionDRQN.input: mem_frame})# update total losstotal_loss += loss# update both networksactionDRQN.update.run(feed_dict = {actionDRQN.target_vector: Qtarget, actionDRQN.input: mem_frame})targetDRQN.update.run(feed_dict = {targetDRQN.target_vector: Qtarget, targetDRQN.input: mem_frame})rewards.append((episode, total_reward))losses.append((episode, total_loss))print("Episode %d - Reward = %.3f, Loss = %.3f." % (episode, total_reward, total_loss))total_reward = 0total_loss = 0

训练10000个场景,每个场景的长度为300:

train(num_episodes = 10000, episode_length = 300, learning_rate = 0.01, render = False)

https://github.com/PacktPublishing/Hands-On-Reinforcement-Learning-with-Python/blob/master/Chapter09/9.5%20Doom%20Game%20Using%20DRQN.ipynb

这篇关于深度强化学习之基于DRQN玩Doom游戏的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/183273

相关文章

深度解析Java DTO(最新推荐)

《深度解析JavaDTO(最新推荐)》DTO(DataTransferObject)是一种用于在不同层(如Controller层、Service层)之间传输数据的对象设计模式,其核心目的是封装数据,... 目录一、什么是DTO?DTO的核心特点:二、为什么需要DTO?(对比Entity)三、实际应用场景解析

深度解析Java项目中包和包之间的联系

《深度解析Java项目中包和包之间的联系》文章浏览阅读850次,点赞13次,收藏8次。本文详细介绍了Java分层架构中的几个关键包:DTO、Controller、Service和Mapper。_jav... 目录前言一、各大包1.DTO1.1、DTO的核心用途1.2. DTO与实体类(Entity)的区别1

深度解析Python装饰器常见用法与进阶技巧

《深度解析Python装饰器常见用法与进阶技巧》Python装饰器(Decorator)是提升代码可读性与复用性的强大工具,本文将深入解析Python装饰器的原理,常见用法,进阶技巧与最佳实践,希望可... 目录装饰器的基本原理函数装饰器的常见用法带参数的装饰器类装饰器与方法装饰器装饰器的嵌套与组合进阶技巧

深度解析Spring Boot拦截器Interceptor与过滤器Filter的区别与实战指南

《深度解析SpringBoot拦截器Interceptor与过滤器Filter的区别与实战指南》本文深度解析SpringBoot中拦截器与过滤器的区别,涵盖执行顺序、依赖关系、异常处理等核心差异,并... 目录Spring Boot拦截器(Interceptor)与过滤器(Filter)深度解析:区别、实现

深度解析Spring AOP @Aspect 原理、实战与最佳实践教程

《深度解析SpringAOP@Aspect原理、实战与最佳实践教程》文章系统讲解了SpringAOP核心概念、实现方式及原理,涵盖横切关注点分离、代理机制(JDK/CGLIB)、切入点类型、性能... 目录1. @ASPect 核心概念1.1 AOP 编程范式1.2 @Aspect 关键特性2. 完整代码实

SpringBoot开发中十大常见陷阱深度解析与避坑指南

《SpringBoot开发中十大常见陷阱深度解析与避坑指南》在SpringBoot的开发过程中,即使是经验丰富的开发者也难免会遇到各种棘手的问题,本文将针对SpringBoot开发中十大常见的“坑... 目录引言一、配置总出错?是不是同时用了.properties和.yml?二、换个位置配置就失效?搞清楚加

Python实例题之pygame开发打飞机游戏实例代码

《Python实例题之pygame开发打飞机游戏实例代码》对于python的学习者,能够写出一个飞机大战的程序代码,是不是感觉到非常的开心,:本文主要介绍Python实例题之pygame开发打飞机... 目录题目pygame-aircraft-game使用 Pygame 开发的打飞机游戏脚本代码解释初始化部

Go学习记录之runtime包深入解析

《Go学习记录之runtime包深入解析》Go语言runtime包管理运行时环境,涵盖goroutine调度、内存分配、垃圾回收、类型信息等核心功能,:本文主要介绍Go学习记录之runtime包的... 目录前言:一、runtime包内容学习1、作用:① Goroutine和并发控制:② 垃圾回收:③ 栈和

Python中文件读取操作漏洞深度解析与防护指南

《Python中文件读取操作漏洞深度解析与防护指南》在Web应用开发中,文件操作是最基础也最危险的功能之一,这篇文章将全面剖析Python环境中常见的文件读取漏洞类型,成因及防护方案,感兴趣的小伙伴可... 目录引言一、静态资源处理中的路径穿越漏洞1.1 典型漏洞场景1.2 os.path.join()的陷

Android学习总结之Java和kotlin区别超详细分析

《Android学习总结之Java和kotlin区别超详细分析》Java和Kotlin都是用于Android开发的编程语言,它们各自具有独特的特点和优势,:本文主要介绍Android学习总结之Ja... 目录一、空安全机制真题 1:Kotlin 如何解决 Java 的 NullPointerExceptio