本文主要是介绍深度强化学习之基于DRQN玩Doom游戏,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
DRQN
为何在希望DQN按人类水平玩Atari游戏时需要DRQN?要回答这个问题,首先要了解什么是部分可观测马尔科夫决策过程(POMDP)。当对环境只有有限信息时,该环境就称为POMDP。到目前为止,在前面的内容中,已了解一个完全可观测的MDP是已知所有可能的行为和状态,尽管智能体不知道转移概率和奖励概率,但对环境信息是完全已知的,例如,在建一个冰冻湖环境中,完全已知关于环境的所有状态和行为,那么就可以很容易地将环境建模为一个可观测的MDP。但大多数真实世界中的环境只能部分可观测,不能观测到所有状态。假设智能体要在真实世界环境中学习行走,显然,智能体不具备环境的完备信息。在POMDP中,状态只提供部分信息,但在过去的状态中保留的信息可有助于智能体更好地理解环境特征,从而改进策略。因此,在POMDP中,需要保留先前状态的相关信息,以采取最佳行为。
因此,通过增加LSTM层来改进DQN架构,以更好地理解先前信息。在DQN架构中,用LSTM RNN替代第一个后卷积全连接层。通过这种方式,还可以解决部分可观测问题,因为现在的智能体具有记忆过去状态的能力,从而可以改进策略。
DRQN架构
与DQN非常类似,只是用LSTM RNN替代了第一个后卷积全连接层
这时,将游戏画面作为卷积层的输入。卷积层对图像进行卷积运算,并产生特征图。然后,所得到的特征图传递到LSTM层。LSTM层具有保存信息的记忆功能。在LSTM层保留有关先前游戏状态的重要信息,并根据需要随时间更新其记忆。经过一个全连接层后输出Q值。因此,与DQN不同,无需直接估计 Q ( s t , a t ) Q(s_t,a_t) Q(st,at),而是估计 Q ( h t , a t ) Q(h_t,a_t) Q(ht,at),其中, h t h_t ht是由网络在上一时间步返回的输入,即 h t = L S T M ( h t − 1 , a t ) h_t=LSTM(h_{t-1},a_t) ht=LSTM(ht−1,at)。由于是使用RNN,因此是通过基于时间的反向传播来训练网络的。
那么经验回放缓存会是什么情况呢?在DQN中,为避免经验关联,采用了经验回放来保存游戏状态转移信息,并使用随机的批量经验来训练网络。在DRQN情况下,是将整个场景保存在经验缓存中,并从随机批量场景中随机采样n个时间步。这样,就既能适应随机性,又能获得另一种经验。
训练一个玩Doom游戏的智能体
基本的Doom游戏
加载必要的库:
from vizdoom import *
import random
import time
创建一个DoomGame实例:
game = DoomGame()
已知VIZDoom提供大量的Doom场景,在此加载一个基本场景:
game.load_config("basic.cfg")
通过init()
方法初始化包含场景的游戏:
game.init()
定义一个热编码actions
:
shoot = [0, 0, 1]
left = [1, 0, 0]
right = [0, 1, 0]
actions = [shoot, left, right]
开始游戏:
no_of_episodes = 10for i in range(no_of_episodes): # for each episode start the gamegame.new_episode()# loop until the episode is overwhile not game.is_episode_finished():# get the game statestate = game.get_state()img = state.screen_buffer# get the game variablesmisc = state.game_variables# perform some action randomly and receuve reward 执行某一随机的行为并且获得奖励reward = game.make_action(random.choice(actions)) print(reward)# we will set some time before starting the next epiosdetime.sleep(2)
基于DRQN的Doom游戏
成功杀死怪物会得到正面奖励,而丧命、自杀和无弹药会得到负面奖励
First let us import all necessary libraries
import tensorflow as tf
import numpy as np
import math
from vizdoom import *
import timeit
import math
import os
import sys
接下来,定义function get_input_shape 函数来计算经卷积层卷积运算后输入图像的最终形式:
def get_input_shape(Image,Filter,Stride):layer1 = math.ceil(((Image - Filter + 1) / Stride)) o1 = math.ceil((layer1 / Stride)) layer2 = math.ceil(((o1 - Filter + 1) / Stride)) o2 = math.ceil((layer2 / Stride)) layer3 = math.ceil(((o2 - Filter + 1) / Stride)) o3 = math.ceil((layer3 / Stride))return int(o3)
这时,定义实现DRQN
算法的DRQN
类
class DRQN():def __init__(self, input_shape, num_actions, inital_learning_rate):# first, we initialize all the hyperparameters(超参数)self.tfcast_type = tf.float32# shape of our input which would be (length, width, channels)(长度、宽度、通道)self.input_shape = input_shape# number of actions in the environmentself.num_actions = num_actions# learning rate for the neural networkself.learning_rate = inital_learning_rate# now we will define the hyperparameters of the convolutional neural network# filter size 滤波器的大小self.filter_size = 5# number of filters 滤波器的个数self.num_filters = [16, 32, 64]# stride size 步幅大小self.stride = 2# pool size 池化层大小self.poolsize = 2# shape of our convolutional layer 卷积层形状self.convolution_shape = get_input_shape(input_shape[0], self.filter_size, self.stride) * get_input_shape(input_shape[1], self.filter_size, self.stride) * self.num_filters[2]# now we define the hyperparameters of our recurrent neural network and the final feed forward layer 定义RNN和最后前馈层的超参数# number of neuronsself.cell_size = 100# number of hidden layersself.hidden_layer = 50# drop out probabilityself.dropout_probability = [0.3, 0.2]# hyperparameters for optimization 优化超参数self.loss_decay_rate = 0.96self.loss_decay_steps = 180# initialize all the variables for the CNN 初始化CNN所有变量# we initialize the placeholder for input whose shape would be (length, width, channel) 初始化形式为(长度、宽度、通道)的输入占位符self.input = tf.placeholder(shape=(self.input_shape[0], self.input_shape[1], self.input_shape[2]), dtype=self.tfcast_type)# we will also initialize the shape of the target vector whose shape is equal to the number of actions 初始化目标向量的形式,其形式与行为个数相同 self.target_vector = tf.placeholder(shape=(self.num_actions, 1), dtype=self.tfcast_type)# initialize feature maps for our corresponding 3 filters 初始化3个滤波器对应的特征图self.features1 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, input_shape[2], self.num_filters[0]),dtype = self.tfcast_type)self.features2 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, self.num_filters[0], self.num_filters[1]),dtype = self.tfcast_type)self.features3 = tf.Variable(initial_value = np.random.rand(self.filter_size, self.filter_size, self.num_filters[1], self.num_filters[2]),dtype = self.tfcast_type)# initialize variables for RNN# recall how RNN works from chapter 7self.h = tf.Variable(initial_value=np.zeros((1, self.cell_size)), dtype = self.tfcast_type)# hidden to hidden weight matrix 隐层到隐层的权重矩阵self.rW = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (self.convolution_shape + self.cell_size)),high = np.sqrt(6. / (self.convolution_shape + self.cell_size)),size = (self.convolution_shape, self.cell_size)),dtype = self.tfcast_type)# input to hidden weight matrix 输入层到隐层的权重矩阵self.rU = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (2 * self.cell_size)),high = np.sqrt(6. / (2 * self.cell_size)),size = (self.cell_size, self.cell_size)),dtype = self.tfcast_type)# hiddent to output weight matrix 隐层到输出层的权重矩阵self.rV = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (2 * self.cell_size)),high = np.sqrt(6. / (2 * self.cell_size)),size = (self.cell_size, self.cell_size)),dtype = self.tfcast_type)# bias self.rb = tf.Variable(initial_value = np.zeros(self.cell_size), dtype = self.tfcast_type)self.rc = tf.Variable(initial_value = np.zeros(self.cell_size), dtype = self.tfcast_type)# initialize weights and bias of feed forward network 初始化前馈网络的权重和偏置# weightsself.fW = tf.Variable(initial_value = np.random.uniform(low = -np.sqrt(6. / (self.cell_size + self.num_actions)),high = np.sqrt(6. / (self.cell_size + self.num_actions)),size = (self.cell_size, self.num_actions)),dtype = self.tfcast_type)# biasself.fb = tf.Variable(initial_value = np.zeros(self.num_actions), dtype = self.tfcast_type)# learning rateself.step_count = tf.Variable(initial_value = 0, dtype = self.tfcast_type)self.learning_rate = tf.train.exponential_decay(self.learning_rate,self.step_count,self.loss_decay_steps,self.loss_decay_steps,staircase = False)# now let us build the network# first convolutional layerself.conv1 = tf.nn.conv2d(input = tf.reshape(self.input, shape = (1, self.input_shape[0], self.input_shape[1], self.input_shape[2])), filter = self.features1, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu1 = tf.nn.relu(self.conv1)self.pool1 = tf.nn.max_pool(self.relu1, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# second convolutional layerself.conv2 = tf.nn.conv2d(input = self.pool1, filter = self.features2, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu2 = tf.nn.relu(self.conv2)self.pool2 = tf.nn.max_pool(self.relu2, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# third convolutional layerself.conv3 = tf.nn.conv2d(input = self.pool2, filter = self.features3, strides = [1, self.stride, self.stride, 1], padding = "VALID")self.relu3 = tf.nn.relu(self.conv3)self.pool3 = tf.nn.max_pool(self.relu3, ksize = [1, self.poolsize, self.poolsize, 1], strides = [1, self.stride, self.stride, 1], padding = "SAME")# add dropout and reshape the input 添加dropout并重新整理输入self.drop1 = tf.nn.dropout(self.pool3, self.dropout_probability[0])self.reshaped_input = tf.reshape(self.drop1, shape = [1, -1])# now we build recurrent neural network which takes the input from the last layer of convolutional network 构建RNN,输入来自于卷积网络的最后一层self.h = tf.tanh(tf.matmul(self.reshaped_input, self.rW) + tf.matmul(self.h, self.rU) + self.rb)self.o = tf.nn.softmax(tf.matmul(self.h, self.rV) + self.rc)# add drop out to RNN 对RNN添加退出self.drop2 = tf.nn.dropout(self.o, self.dropout_probability[1])# we feed the result of RNN to the feed forward layer 将RNN的结果馈入前馈层self.output = tf.reshape(tf.matmul(self.drop2, self.fW) + self.fb, shape = [-1, 1])self.prediction = tf.argmax(self.output)# compute loss 计算损失self.loss = tf.reduce_mean(tf.square(self.target_vector - self.output))# we use Adam optimizer for minimizing the errorself.optimizer = tf.train.AdamOptimizer(self.learning_rate)# compute gradients of the loss and update the gradientsself.gradients = self.optimizer.compute_gradients(self.loss)self.update = self.optimizer.apply_gradients(self.gradients)self.parameters = (self.features1, self.features2, self.features3,self.rW, self.rU, self.rV, self.rb, self.rc,self.fW, self.fb)
定义ExperienceReplay类来实现经验回放缓存。在经验回放缓存中保存智能体的所有经验,即状态、行为和奖励,然后采样小批量的经验来训练网络:
class ExperienceReplay():def __init__(self, buffer_size):# buffer for holding the transistion 保存转移信息的缓存self.buffer = [] # size of the buffer 缓存大小self.buffer_size = buffer_size# we remove the old transistion if buffer size has reached it's limit. Think off the buffer as a queue when new# one comes, old one goes off 若缓存达到最大容量,将删除旧的缓存信息。将缓存看成一个队列,先进先出def appendToBuffer(self, memory_tuplet):if len(self.buffer) > self.buffer_size: for i in range(len(self.buffer) - self.buffer_size):self.buffer.remove(self.buffer[0]) self.buffer.append(memory_tuplet) # define a function called sample for sampling some random n number of transistions 定义一个sample函数来随机采样n个转移信息def sample(self, n):memories = []for i in range(n):memory_index = np.random.randint(0, len(self.buffer)) memories.append(self.buffer[memory_index])return memories
定义train函数来训练网络 :
def train(num_episodes, episode_length, learning_rate, scenario="deathmatch.cfg", map_path='map02', render=False):# discount parameter for Q-value computation Q值计算的折扣参数discount_factor = .99# frequency for updating the experience in the buffer 缓存中经验信息的更新频率update_frequency = 5store_frequency = 50# for printing the output 打印输出print_frequency = 1000# initialize variables for storing total rewards and total loss 初始化保存总奖励和总损失的变量total_reward = 0total_loss = 0old_q_value = 0# initialize lists for storing the episodic rewards and losses 初始化保存情景奖励和损失的列表rewards = []losses = []# okay, now let us get to the action!# first, we initialize our doomgame environmentgame = DoomGame()# specify the path where our scenario file is located 指定场景文件的存放路径game.set_doom_scenario_path(scenario)# specify the path of map file 指定地图文件的路径game.set_doom_map(map_path)# then we set screen resolution and screen format 设置屏幕分辨率和屏幕的格式game.set_screen_resolution(ScreenResolution.RES_256X160)game.set_screen_format(ScreenFormat.RGB24)# we can add particles and effetcs we needed by simply setting them to true or false 通过简单的设置True或者False来添加所需的粒子和效果game.set_render_hud(False)game.set_render_minimal_hud(False)game.set_render_crosshair(False)game.set_render_weapon(True)game.set_render_decals(False)game.set_render_particles(False)game.set_render_effects_sprites(False)game.set_render_messages(False)game.set_render_corpses(False)game.set_render_screen_flashes(True)# now we will specify buttons that should be available to the agent 设置智能体可用的按钮game.add_available_button(Button.MOVE_LEFT)game.add_available_button(Button.MOVE_RIGHT)game.add_available_button(Button.TURN_LEFT)game.add_available_button(Button.TURN_RIGHT)game.add_available_button(Button.MOVE_FORWARD)game.add_available_button(Button.MOVE_BACKWARD)game.add_available_button(Button.ATTACK)# 在此再添加一个称为delta按钮。上述按钮只能类似于键盘按键,仅具有布尔值。delta按钮将模拟鼠标,具有正负值,这将在探索环境时非常有用game.add_available_button(Button.TURN_LEFT_RIGHT_DELTA, 90)game.add_available_button(Button.LOOK_UP_DOWN_DELTA, 90)# initialize an array for actions 初始化行为数组actions = np.zeros((game.get_available_buttons_size(), game.get_available_buttons_size()))count = 0for i in actions:i[count] = 1count += 1actions = actions.astype(int).tolist()# then we add the game variables, ammo, health, and killcount 添加游戏变量:弹药、生命力和杀死怪兽个数game.add_available_game_variable(GameVariable.AMMO0)game.add_available_game_variable(GameVariable.HEALTH)game.add_available_game_variable(GameVariable.KILLCOUNT)# 设置episode_timeout,在经过一些时间步之后终止情景。# 另外,还设置episode_start_time,这对于省略初始事件非常有用game.set_episode_timeout(6 * episode_length)game.set_episode_start_time(10)game.set_window_visible(render)# 通过设置 set_sound_enable to true为真,可以播放声音 game.set_sound_enabled(False)game.set_sound_enabled(False)# we set living reward to 0 which the agent for each move it does even though the move is not useful# 设存活奖励为0,用于奖励智能体的每次移动,即使该移动无用game.set_living_reward(0)# doom有效具有不同模式,如玩家、观众、非同步玩家、非同步观众# 在观众模式下,是人来操作,而智能体进行学习# 在玩家模式下,智能体将真正玩游戏,因此,在此采用玩家模式game.set_mode(Mode.PLAYER)# okay, So now we, initialize the game environmentgame.init()# 创建一个DRQN类的实例,并创建行为者和目标DRQN网络actionDRQN = DRQN((160, 256, 3), game.get_available_buttons_size() - 2, learning_rate)targetDRQN = DRQN((160, 256, 3), game.get_available_buttons_size() - 2, learning_rate)# 另外,还需创建一个ExperienceReplay类的实例,其缓存大小为1000experiences = ExperienceReplay(1000)# 保存模型saver = tf.train.Saver({v.name: v for v in actionDRQN.parameters}, max_to_keep = 1)# now let us start the training process# 初始化从经验缓存中采样和在经验缓存中保存转移信息的变量sample = 5store = 50# start the tensorflow session(会话)with tf.Session() as sess:# initialize all tensorflow variablessess.run(tf.global_variables_initializer())for episode in range(num_episodes):# start the new episodegame.new_episode()# play the episode till it reaches the episode lengthfor frame in range(episode_length):# get the game statestate = game.get_state()s = state.screen_buffer# select the actiona = actionDRQN.prediction.eval(feed_dict = {actionDRQN.input: s})[0]action = actions[a]# perform the action and store the rewardreward = game.make_action(action)# update total rewadtotal_reward += reward# if the episode is over then breakif game.is_episode_finished():break# store transistion to our experience buffer 将转移信息保存到经验缓存中if (frame % store) == 0:experiences.appendToBuffer((s, action, reward))# sample experience form the experience buffer 从经验缓存中采样经验if (frame % sample) == 0:memory = experiences.sample(1)mem_frame = memory[0][0]mem_reward = memory[0][2]# now, train the networkQ1 = actionDRQN.output.eval(feed_dict = {actionDRQN.input: mem_frame})Q2 = targetDRQN.output.eval(feed_dict = {targetDRQN.input: mem_frame})# set learning ratelearning_rate = actionDRQN.learning_rate.eval()# calculate Q valueQtarget = old_q_value + learning_rate * (mem_reward + discount_factor * Q2 - old_q_value)# update old Q valueold_q_value = Qtarget# compute Lossloss = actionDRQN.loss.eval(feed_dict = {actionDRQN.target_vector: Qtarget, actionDRQN.input: mem_frame})# update total losstotal_loss += loss# update both networksactionDRQN.update.run(feed_dict = {actionDRQN.target_vector: Qtarget, actionDRQN.input: mem_frame})targetDRQN.update.run(feed_dict = {targetDRQN.target_vector: Qtarget, targetDRQN.input: mem_frame})rewards.append((episode, total_reward))losses.append((episode, total_loss))print("Episode %d - Reward = %.3f, Loss = %.3f." % (episode, total_reward, total_loss))total_reward = 0total_loss = 0
训练10000个场景,每个场景的长度为300:
train(num_episodes = 10000, episode_length = 300, learning_rate = 0.01, render = False)
https://github.com/PacktPublishing/Hands-On-Reinforcement-Learning-with-Python/blob/master/Chapter09/9.5%20Doom%20Game%20Using%20DRQN.ipynb
这篇关于深度强化学习之基于DRQN玩Doom游戏的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!