本文主要是介绍使用paddle2的DQN跑Mountain,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
1.Agent
Agent就是一个接口,sample就是实现了一个随机探索,本质还是用的self.alg.predict()函数
然后Agent.learn(self, obs, act, reward, next_obs, terminal)就是将从环境拿到的obs, act, reward, next_obs, terminal转化为tensor形式,然后送给算法中的learn,即self.alg.learn(obs, act, reward, next_obs, terminal)
import parl
import paddle
import numpy as npclass Agent(parl.Agent):def __init__(self, algorithm, act_dim, e_greed=0.1, e_greed_decrement=0):super(Agent, self).__init__(algorithm)assert isinstance(act_dim, int)self.act_dim = act_dimself.global_step = 0self.update_target_steps = 200self.e_greed = e_greedself.e_greed_decrement = e_greed_decrementdef sample(self, obs):"""Sample an action `for exploration` when given an observationArgs:obs(np.float32): shape of (obs_dim,)Returns:act(int): action"""sample = np.random.random()if sample < self.e_greed:act = np.random.randint(self.act_dim)else:if np.random.random() < 0.01:act = np.random.randint(self.act_dim)else:act = self.predict(obs)self.e_greed = max(0.01, self.e_greed - self.e_greed_decrement)return act ##返回动作def predict(self, obs):"""Predict an action when given an observationArgs:obs(np.float32): shape of (obs_dim,)Returns:act(int): action"""obs = paddle.to_tensor(obs, dtype='float32') ##将环境obs转换为tensor形式pred_q = self.alg.predict(obs) ##调用了算法中的predict函数act = pred_q.argmax().numpy()[0] ##找最大值,返回第一个数据即actreturn actdef learn(self, obs, act, reward, next_obs, terminal):"""Update model with an episode dataArgs:obs(np.float32): shape of (batch_size, obs_dim)act(np.int32): shape of (batch_size)reward(np.float32): shape of (batch_size)next_obs(np.float32): shape of (batch_size, obs_dim)terminal(np.float32): shape of (batch_size)Returns:loss(float)"""if self.global_step % self.update_target_steps == 0:self.alg.sync_target()self.global_step += 1##扩展维度1变为【1】act = np.expand_dims(act, axis=-1)reward = np.expand_dims(reward, axis=-1)terminal = np.expand_dims(terminal, axis=-1)##将arrary转换为tensor形式obs = paddle.to_tensor(obs, dtype='float32')act = paddle.to_tensor(act, dtype='int32')reward = paddle.to_tensor(reward, dtype='float32')next_obs = paddle.to_tensor(next_obs, dtype='float32')terminal = paddle.to_tensor(terminal, dtype='float32')##调用算法中的learn,因为self.alg引用算法中的learn了loss = self.alg.learn(obs, act, reward, next_obs, terminal)return loss.numpy()[0]
2.Model
model就是定义网络的结构,nn.Linear(输入维度,输出维度)。前向网络就是输入进入全连接层,然后relu激活函数;再经过第二层全连接层,然后relu激活函数,最后再全连接层输出。输出维度为act_dim。
import paddle
import paddle.nn as nn
import paddle.nn.functional as F
import parlclass Model(parl.Model):""" Linear network to solve Cartpole problem.Args:obs_dim (int): Dimension of observation space.act_dim (int): Dimension of action space."""def __init__(self, obs_dim, act_dim):super(Model, self).__init__()hid1_size = 128hid2_size = 128self.fc1 = nn.Linear(obs_dim, hid1_size)self.fc2 = nn.Linear(hid1_size, hid2_size)self.fc3 = nn.Linear(hid2_size, act_dim)def forward(self, obs):h1 = F.relu(self.fc1(obs))h2 = F.relu(self.fc2(h1))Q = self.fc3(h2)return Q
3.Train
import gym
import numpy as np
from parl.utils import logger, ReplayMemoryfrom Model import Model
from Agent import Agent
from parl.algorithms import DQNLEARN_FREQ = 5 # 训练频率,不需要每一个step都learn,攒一些新增经验后再learn,提高效率
MEMORY_SIZE = 20000 # replay memory的大小,越大越占用内存
MEMORY_WARMUP_SIZE = 200 # replay_memory 里需要预存一些经验数据,再从里面sample一个batch的经验让agent去learn
BATCH_SIZE = 32 # 每次给agent learn的数据数量,从replay memory随机里sample一批数据出来
LEARNING_RATE = 0.001 # 学习率
GAMMA = 0.99 # reward 的衰减因子,一般取 0.9 到 0.999 不等# train an episode
def run_train_episode(agent, env, rpm):total_reward = 0obs = env.reset()step = 0while True:step += 1action = agent.sample(obs)——训练的时候用sampl函数next_obs, reward, done, _ = env.step(action)#这里体现了Q-learningrpm.append(obs, action, reward, next_obs, done)——存储到经验池# train model——进行学习if (len(rpm) > MEMORY_WARMUP_SIZE) and (step % LEARN_FREQ == 0):# s,a,r,s',done(batch_obs, batch_action, batch_reward, batch_next_obs,batch_done) = rpm.sample_batch(BATCH_SIZE)train_loss = agent.learn(batch_obs, batch_action, batch_reward,batch_next_obs, batch_done)total_reward += rewardobs = next_obsif done:breakreturn total_reward# evaluate 5 episodes
def run_evaluate_episodes(agent, env, eval_episodes=5, render=False):eval_reward = []for i in range(eval_episodes):obs = env.reset()episode_reward = 0while True:action = agent.predict(obs)——用训练的模型与环境交互obs, reward, done, _ = env.step(action)episode_reward += reward##记录一轮的游戏得分if render:env.render()if done:breakeval_reward.append(episode_reward)##组装为数组,再进行求平均return np.mean(eval_reward)def main():env = gym.make('MountainCar-v0')obs_dim = env.observation_space.shape[0]act_dim = env.action_space.nlogger.info('obs_dim {}, act_dim {}'.format(obs_dim, act_dim))# set action_shape = 0 while in discrete control environmentrpm = ReplayMemory(MEMORY_SIZE, obs_dim, 0)# build an agentmodel = Model(obs_dim=obs_dim, act_dim=act_dim)alg = DQN(model, gamma=GAMMA, lr=LEARNING_RATE)agent = Agent(alg, act_dim=act_dim, e_greed=0.1, e_greed_decrement=1e-6)
##加载模型save_path = './model.ckpt'agent.restore(save_path)# warmup memorywhile len(rpm) < MEMORY_WARMUP_SIZE:run_train_episode(agent, env, rpm)##总训练次数max_episode = 2000# start trainingepisode = 0while episode < max_episode:# train part一轮训练50次for i in range(50):total_reward = run_train_episode(agent, env, rpm)episode += 1# test parteval_reward = run_evaluate_episodes(agent, env, render=True)logger.info('episode:{} e_greed:{} Test reward:{}'.format(episode, agent.e_greed, eval_reward))# save the parameters to ./model.ckptsave_path = './model.ckpt'agent.save(save_path)if __name__ == '__main__':main()
4.结果
这篇关于使用paddle2的DQN跑Mountain的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!