本文主要是介绍百度七日入门强化学习训练营,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
一直都有在参加百度7日训练营活动,这次是关于强化学习的~
强化学习一直都是我想学,但是总是遇到各种阻碍就放弃的,这次算是系统的学习了一遍,希望以后有时间可以继续深化~
科科老师牛皮~
强化学习(RL)初印象
Part1 什么是强化学习
- 强化学习(英语:
Reinforcement learning
,简称RL
)是机器学习中的一个领域,强调如何基于环境而行动,以取得最大化的预期利益。 - 核心思想:智能体
agent
在环境environment
中学习,根据环境的状态state
(或观测到的observation
),执行动作action
,并根据环境的反馈reward
(奖励)来指导更好的动作。
注意:从环境中获取的状态,有时候叫state
,有时候叫observation
,这两个其实一个代表全局状态,一个代表局部观测值,在多智能体环境里会有差别,但我们刚开始学习遇到的环境还没有那么复杂,可以先把这两个概念划上等号。
Part2 强化学习能做什么
- 游戏(马里奥、Atari、Alpha Go、星际争霸等)
- 机器人控制(机械臂、机器人、自动驾驶、四轴飞行器等)
- 用户交互(推荐、广告、NLP等)
- 交通(拥堵管理等)
- 资源调度(物流、带宽、功率等)
- 金融(投资组合、股票买卖等)
- 其他
Part3 强化学习与监督学习的区别
- 强化学习、监督学习、非监督学习是机器学习里的三个不同的领域,都跟深度学习有交集。
- 监督学习寻找输入到输出之间的映射,比如分类和回归问题。
- 非监督学习主要寻找数据之间的隐藏关系,比如聚类问题。
- 强化学习则需要在与环境的交互中学习和寻找最佳决策方案。
- 监督学习处理认知问题,强化学习处理决策问题。
Part4 强化学习的如何解决问题
- 强化学习通过不断的试错探索,吸取经验和教训,持续不断的优化策略,从环境中拿到更好的反馈。
- 强化学习有两种学习方案:基于价值(
value-based
)、基于策略(policy-based
)
Part5 强化学习的算法和环境
- 经典算法:
Q-learning
、Sarsa
、DQN
、Policy Gradient
、A3C
、DDPG
、PPO
- 环境分类:离散控制场景(输出动作可数)、连续控制场景(输出动作值不可数)
- 强化学习经典环境库
GYM
将环境交互接口规范化为:重置环境reset()
、交互step()
、渲染render()
- 强化学习框架库
PARL
将强化学习框架抽象为Model
、Algorithm
、Agent
三层,使得强化学习算法的实现和调试更方便和灵活。
Part6 实践 熟悉强化学习环境
-
GYM
是强化学习中经典的环境库,下节课我们会用到里面的CliffWalkingWapper
和FrozenLake
环境,为了使得环境可视化更有趣一些,直播课视频中演示的Demo对环境的渲染做了封装,感兴趣的同学可以在PARL
代码库中的examples/tutorials/lesson1
中下载gridworld.py
使用。 -
PARL
开源库地址:https://github.com/PaddlePaddle/PARL
Lesson2 表格型方法——Sarsa
1. Sarsa 简介
Sarsa
全称是state-action-reward-state'-action'
,目的是学习特定的state
下,特定action
的价值Q
,最终建立和优化一个Q
表格,以state
为行,action
为列,根据与环境交互得到的reward
来更新Q
表格,更新公式为:
Sarsa
在训练中为了更好的探索环境,采用ε-greedy
方式来训练,有一定概率随机选择动作输出。
2. Sarsa 实战
- 使用
Sarsa
解决悬崖问题,找到绕过悬崖通往终点的路径。
Step1 安装依赖
In[1]
!pip install gym
Step2 导入依赖
In[2]
import gym
import numpy as np
import time
Step3 Agent
Agent
是和环境environment
交互的主体。predict()
方法:输入观察值observation
(或者说状态state
),输出动作值sample()
方法:再predict()
方法基础上使用ε-greedy
增加探索learn()
方法:输入训练数据,完成一轮Q
表格的更新
In[3]
# agent.py
class SarsaAgent(object):def __init__(self, obs_n, act_n, learning_rate=0.01, gamma=0.9, e_greed=0.1):self.act_n = act_n # 动作维度,有几个动作可选self.lr = learning_rate # 学习率self.gamma = gamma # reward的衰减率self.epsilon = e_greed # 按一定概率随机选动作self.Q = np.zeros((obs_n, act_n))# 根据输入观察值,采样输出的动作值,带探索def sample(self, obs):if np.random.uniform(0, 1) < (1.0 - self.epsilon): #根据table的Q值选动作action = self.predict(obs)else:action = np.random.choice(self.act_n) #有一定概率随机探索选取一个动作return action# 根据输入观察值,预测输出的动作值def predict(self, obs):Q_list = self.Q[obs, :]maxQ = np.max(Q_list)action_list = np.where(Q_list == maxQ)[0] # maxQ可能对应多个actionaction = np.random.choice(action_list)return action# 学习方法,也就是更新Q-table的方法def learn(self, obs, action, reward, next_obs, next_action, done):""" on-policyobs: 交互前的obs, s_taction: 本次交互选择的action, a_treward: 本次动作获得的奖励rnext_obs: 本次交互后的obs, s_t+1next_action: 根据当前Q表格, 针对next_obs会选择的动作, a_t+1done: episode是否结束"""predict_Q = self.Q[obs, action]if done:target_Q = reward # 没有下一个状态了else:target_Q = reward + self.gamma * self.Q[next_obs, next_action] # Sarsaself.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q# 保存Q表格数据到文件def save(self):npy_file = './q_table.npy'np.save(npy_file, self.Q)print(npy_file + ' saved.')# 从文件中读取Q值到Q表格中def restore(self, npy_file='./q_table.npy'):self.Q = np.load(npy_file)print(npy_file + ' loaded.')
Step4 Training && Test(训练&&测试)
run_episode()
:agent
在一个episode
中训练的过程,使用agent.sample()
与环境交互,使用agent.learn()
训练Q
表格。test_episode()
:agent
在一个episode
中测试效果,评估目前的agent
能在一个episode
中拿到多少总reward
。
In[4]
def run_episode(env, agent, render=False):total_steps = 0 # 记录每个episode走了多少steptotal_reward = 0obs = env.reset() # 重置环境, 重新开一局(即开始新的一个episode)action = agent.sample(obs) # 根据算法选择一个动作while True:next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互next_action = agent.sample(next_obs) # 根据算法选择一个动作# 训练 Sarsa 算法agent.learn(obs, action, reward, next_obs, next_action, done)action = next_actionobs = next_obs # 存储上一个观察值total_reward += rewardtotal_steps += 1 # 计算step数if render:env.render() #渲染新的一帧图形if done:breakreturn total_reward, total_stepsdef test_episode(env, agent):total_reward = 0obs = env.reset()while True:action = agent.predict(obs) # greedynext_obs, reward, done, _ = env.step(action)total_reward += rewardobs = next_obs# time.sleep(0.5)# env.render()if done:breakreturn total_reward
Step5 创建环境和Agent,启动训练
In[5]
# 使用gym创建悬崖环境
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left# 创建一个agent实例,输入超参数
agent = SarsaAgent(obs_n=env.observation_space.n,act_n=env.action_space.n,learning_rate=0.1,gamma=0.9,e_greed=0.1)# 训练500个episode,打印每个episode的分数
for episode in range(500):ep_reward, ep_steps = run_episode(env, agent, False)print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps, ep_reward))# 全部训练结束,查看算法效果
test_reward = test_episode(env, agent)
print('test reward = %.1f' % (test_reward))
Episode 0: steps = 545 , reward = -1931.0
Episode 1: steps = 519 , reward = -1113.0
Episode 2: steps = 523 , reward = -919.0
Episode 3: steps = 164 , reward = -164.0
Episode 4: steps = 302 , reward = -500.0
Episode 5: steps = 146 , reward = -146.0
Episode 6: steps = 177 , reward = -375.0
Episode 7: steps = 148 , reward = -148.0
Episode 8: steps = 160 , reward = -160.0
1. Q-learning简介
Q-learning
也是采用Q
表格的方式存储Q
值(状态动作价值),决策部分与Sarsa
是一样的,采用ε-greedy
方式增加探索。Q-learning
跟Sarsa
不一样的地方是更新Q
表格的方式。Sarsa
是on-policy
的更新方式,先做出动作再更新。Q-learning
是off-policy
的更新方式,更新learn()
时无需获取下一步实际做出的动作next_action
,并假设下一步动作是取最大Q
值的动作。
Q-learning
的更新公式为:
2. Q-learning实战
- 使用
Q-learning
解决悬崖问题,找到绕过悬崖通往终端的最短路径。
Step1 安装依赖
In[3]
!pip install gym
Step2 导入依赖
In[4]
import gym
import time
import numpy as np
Step3 Agent
Agent
是和环境environment
交互的主体。predict()
方法:输入观察值observation
(或者说状态state
),输出动作值sample()
方法:在predict()
方法基础上使用ε-greedy
增加探索learn()
方法:输入训练数据,完成一轮Q
表格的更新
In[5]
class QLearningAgent(object):def __init__(self, obs_n, act_n, learning_rate=0.01, gamma=0.9, e_greed=0.1):self.act_n = act_n # 动作维度,有几个动作可选self.lr = learning_rate # 学习率self.gamma = gamma # reward的衰减率self.epsilon = e_greed # 按一定概率随机选动作self.Q = np.zeros((obs_n, act_n))# 根据输入观察值,采样输出的动作值,带探索def sample(self, obs):if np.random.uniform(0, 1) < (1.0 - self.epsilon): #根据table的Q值选动作action = self.predict(obs)else:action = np.random.choice(self.act_n) #有一定概率随机探索选取一个动作return action# 根据输入观察值,预测输出的动作值def predict(self, obs):Q_list = self.Q[obs, :]maxQ = np.max(Q_list)action_list = np.where(Q_list == maxQ)[0] # maxQ可能对应多个actionaction = np.random.choice(action_list)return action# 学习方法,也就是更新Q-table的方法def learn(self, obs, action, reward, next_obs, done):""" off-policyobs: 交互前的obs, s_taction: 本次交互选择的action, a_treward: 本次动作获得的奖励rnext_obs: 本次交互后的obs, s_t+1done: episode是否结束"""predict_Q = self.Q[obs, action]if done:target_Q = reward # 没有下一个状态了else:target_Q = reward + self.gamma * np.max(self.Q[next_obs, :]) # Q-learningself.Q[obs, action] += self.lr * (target_Q - predict_Q) # 修正q# 把 Q表格 的数据保存到文件中def save(self):npy_file = './q_table.npy'np.save(npy_file, self.Q)print(npy_file + ' saved.')# 从文件中读取数据到 Q表格def restore(self, npy_file='./q_table.npy'):self.Q = np.load(npy_file)print(npy_file + ' loaded.')
Step4 Training && Test(训练&&测试)
run_episode()
:agent
在一个episode
中训练的过程,使用agent.sample()
与环境交互,使用agent.learn()
训练Q
表格。test_episode()
:agent
在一个episode
中测试效果,评估目前的agent
能在一个episode
中拿到多少总reward
。
In[9]
# train.pydef run_episode(env, agent, render=False):total_steps = 0 # 记录每个episode走了多少steptotal_reward = 0obs = env.reset() # 重置环境, 重新开一局(即开始新的一个episode)while True:action = agent.sample(obs) # 根据算法选择一个动作next_obs, reward, done, _ = env.step(action) # 与环境进行一个交互# 训练 Q-learning算法agent.learn(obs, action, reward, next_obs, done)obs = next_obs # 存储上一个观察值total_reward += rewardtotal_steps += 1 # 计算step数if render:env.render() #渲染新的一帧图形if done:breakreturn total_reward, total_stepsdef test_episode(env, agent):total_reward = 0obs = env.reset()while True:action = agent.predict(obs) # greedynext_obs, reward, done, _ = env.step(action)total_reward += rewardobs = next_obs# time.sleep(0.5)# env.render()if done:breakreturn total_reward
Step5 创建环境和Agent,启动训练
In[10]
# 使用gym创建悬崖环境
env = gym.make("CliffWalking-v0") # 0 up, 1 right, 2 down, 3 left# 创建一个agent实例,输入超参数
agent = QLearningAgent(obs_n=env.observation_space.n,act_n=env.action_space.n,learning_rate=0.1,gamma=0.9,e_greed=0.1)# 训练500个episode,打印每个episode的分数
for episode in range(500):ep_reward, ep_steps = run_episode(env, agent, False)print('Episode %s: steps = %s , reward = %.1f' % (episode, ep_steps, ep_reward))# 全部训练结束,查看算法效果
test_reward = test_episode(env, agent)
print('test reward = %.1f' % (test_reward))
Episode 0: steps = 262 , reward = -955.0
Episode 1: steps = 686 , reward = -1577.0
Episode 2: steps = 381 , reward = -777.0
Episode 3: steps = 352 , reward = -748.0
Episode 4: steps = 201 , reward = -201.0
Episode 5: steps = 279 , reward = -576.0
Lesson 3 神经网络方法求解RL——DQN
1. DQN简介
- 上节课介绍的表格型方法存储的状态数量有限,当面对围棋或机器人控制这类有数不清的状态的环境时,表格型方法在存储和查找效率上都受局限,
DQN
的提出解决了这一局限,使用神经网络来近似替代Q
表格。 - 本质上
DQN
还是一个Q-learning
算法,更新方式一致。为了更好的探索环境,同样的也采用ε-greedy
方法训练。 - 在
Q-learning
的基础上,DQN
提出了两个技巧使得Q
网络的更新迭代更稳定。- 经验回放
Experience Replay
:主要解决样本关联性和利用效率的问题。使用一个经验池存储多条经验s,a,r,s'
,再从中随机抽取一批数据送去训练。 - 固定Q目标
Fixed-Q-Target
:主要解决算法训练不稳定的问题。复制一个和原来Q
网络结构一样的Target Q
网络,用于计算Q
目标值。
- 经验回放
2. DQN实践
- 使用
DQN
解决CartPole问题,移动小车使得车上的摆杆倒立起来。
Step1 安装依赖
In[2]
!pip uninstall -y parl # 说明:AIStudio预装的parl版本太老,容易跟其他库产生兼容性冲突,建议先卸载
!pip uninstall -y pandas scikit-learn # 提示:在AIStudio中卸载这两个库再import parl可避免warning提示,不卸载也不影响parl的使用!pip install gym
!pip install paddlepaddle==1.6.3
!pip install parl==1.3.1# 说明:安装日志中出现两条红色的关于 paddlehub 和 visualdl 的 ERROR 与parl无关,可以忽略,不影响使用
Step2 导入依赖
In[4]
import parl
from parl import layers
import paddle.fluid as fluid
import copy
import numpy as np
import os
import gym
from parl.utils import logger
Step3 设置超参数
In[5]
LEARN_FREQ = 5 # 训练频率,不需要每一个step都learn,攒一些新增经验后再learn,提高效率
MEMORY_SIZE = 20000 # replay memory的大小,越大越占用内存
MEMORY_WARMUP_SIZE = 200 # replay_memory 里需要预存一些经验数据,再开启训练
BATCH_SIZE = 32 # 每次给agent learn的数据数量,从replay memory随机里sample一批数据出来
LEARNING_RATE = 0.001 # 学习率
GAMMA = 0.99 # reward 的衰减因子,一般取 0.9 到 0.999 不等
Step4 搭建Model、Algorithm、Agent架构
Agent
把产生的数据传给algorithm
,algorithm
根据model
的模型结构计算出Loss
,使用SGD
或者其他优化器不断的优化,PARL
这种架构可以很方便的应用在各类深度强化学习问题中。
(1)Model
Model
用来定义前向(Forward
)网络,用户可以自由的定制自己的网络结构。
In[6]
class Model(parl.Model):def __init__(self, act_dim):hid1_size = 128hid2_size = 128# 3层全连接网络self.fc1 = layers.fc(size=hid1_size, act='relu')self.fc2 = layers.fc(size=hid2_size, act='relu')self.fc3 = layers.fc(size=act_dim, act=None)def value(self, obs):# 定义网络# 输入state,输出所有action对应的Q,[Q(s,a1), Q(s,a2), Q(s,a3)...]h1 = self.fc1(obs)h2 = self.fc2(h1)Q = self.fc3(h2)return Q
(2)Algorithm
Algorithm
定义了具体的算法来更新前向网络(Model
),也就是通过定义损失函数来更新Model
,和算法相关的计算都放在algorithm
中。
In[7]
# from parl.algorithms import DQN # 也可以直接从parl库中导入DQN算法class DQN(parl.Algorithm):def __init__(self, model, act_dim=None, gamma=None, lr=None):""" DQN algorithmArgs:model (parl.Model): 定义Q函数的前向网络结构act_dim (int): action空间的维度,即有几个actiongamma (float): reward的衰减因子lr (float): learning rate 学习率."""self.model = modelself.target_model = copy.deepcopy(model)assert isinstance(act_dim, int)assert isinstance(gamma, float)assert isinstance(lr, float)self.act_dim = act_dimself.gamma = gammaself.lr = lrdef predict(self, obs):""" 使用self.model的value网络来获取 [Q(s,a1),Q(s,a2),...]"""return self.model.value(obs)def learn(self, obs, action, reward, next_obs, terminal):""" 使用DQN算法更新self.model的value网络"""# 从target_model中获取 max Q' 的值,用于计算target_Qnext_pred_value = self.target_model.value(next_obs)best_v = layers.reduce_max(next_pred_value, dim=1)best_v.stop_gradient = True # 阻止梯度传递terminal = layers.cast(terminal, dtype='float32')target = reward + (1.0 - terminal) * self.gamma * best_vpred_value = self.model.value(obs) # 获取Q预测值# 将action转onehot向量,比如:3 => [0,0,0,1,0]action_onehot = layers.one_hot(action, self.act_dim)action_onehot = layers.cast(action_onehot, dtype='float32')# 下面一行是逐元素相乘,拿到action对应的 Q(s,a)# 比如:pred_value = [[2.3, 5.7, 1.2, 3.9, 1.4]], action_onehot = [[0,0,0,1,0]]# ==> pred_action_value = [[3.9]]pred_action_value = layers.reduce_sum(layers.elementwise_mul(action_onehot, pred_value), dim=1)# 计算 Q(s,a) 与 target_Q的均方差,得到losscost = layers.square_error_cost(pred_action_value, target)cost = layers.reduce_mean(cost)optimizer = fluid.optimizer.Adam(learning_rate=self.lr) # 使用Adam优化器optimizer.minimize(cost)return costdef sync_target(self):""" 把 self.model 的模型参数值同步到 self.target_model"""self.model.sync_weights_to(self.target_model)
(3)Agent
Agent
负责算法与环境的交互,在交互过程中把生成的数据提供给Algorithm
来更新模型(Model
),数据的预处理流程也一般定义在这里。
In[8]
class Agent(parl.Agent):def __init__(self,algorithm,obs_dim,act_dim,e_greed=0.1,e_greed_decrement=0):assert isinstance(obs_dim, int)assert isinstance(act_dim, int)self.obs_dim = obs_dimself.act_dim = act_dimsuper(Agent, self).__init__(algorithm)self.global_step = 0self.update_target_steps = 200 # 每隔200个training steps再把model的参数复制到target_model中self.e_greed = e_greed # 有一定概率随机选取动作,探索self.e_greed_decrement = e_greed_decrement # 随着训练逐步收敛,探索的程度慢慢降低def build_program(self):self.pred_program = fluid.Program()self.learn_program = fluid.Program()with fluid.program_guard(self.pred_program): # 搭建计算图用于 预测动作,定义输入输出变量obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')self.value = self.alg.predict(obs)with fluid.program_guard(self.learn_program): # 搭建计算图用于 更新Q网络,定义输入输出变量obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')action = layers.data(name='act', shape=[1], dtype='int32')reward = layers.data(name='reward', shape=[], dtype='float32')next_obs = layers.data(name='next_obs', shape=[self.obs_dim], dtype='float32')terminal = layers.data(name='terminal', shape=[], dtype='bool')self.cost = self.alg.learn(obs, action, reward, next_obs, terminal)def sample(self, obs):sample = np.random.rand() # 产生0~1之间的小数if sample < self.e_greed:act = np.random.randint(self.act_dim) # 探索:每个动作都有概率被选择else:act = self.predict(obs) # 选择最优动作self.e_greed = max(0.01, self.e_greed - self.e_greed_decrement) # 随着训练逐步收敛,探索的程度慢慢降低return actdef predict(self, obs): # 选择最优动作obs = np.expand_dims(obs, axis=0)pred_Q = self.fluid_executor.run(self.pred_program,feed={'obs': obs.astype('float32')},fetch_list=[self.value])[0]pred_Q = np.squeeze(pred_Q, axis=0)act = np.argmax(pred_Q) # 选择Q最大的下标,即对应的动作return actdef learn(self, obs, act, reward, next_obs, terminal):# 每隔200个training steps同步一次model和target_model的参数if self.global_step % self.update_target_steps == 0:self.alg.sync_target()self.global_step += 1act = np.expand_dims(act, -1)feed = {'obs': obs.astype('float32'),'act': act.astype('int32'),'reward': reward,'next_obs': next_obs.astype('float32'),'terminal': terminal}cost = self.fluid_executor.run(self.learn_program, feed=feed, fetch_list=[self.cost])[0] # 训练一次网络return cost
Step5 ReplayMemory
- 经验池:用于存储多条经验,实现 经验回放。
In[10]
import random
import collections
import numpy as npclass ReplayMemory(object):def __init__(self, max_size):self.buffer = collections.deque(maxlen=max_size)# 增加一条经验到经验池中def append(self, exp):self.buffer.append(exp)# 从经验池中选取N条经验出来def sample(self, batch_size):mini_batch = random.sample(self.buffer, batch_size)obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []for experience in mini_batch:s, a, r, s_p, done = experienceobs_batch.append(s)action_batch.append(a)reward_batch.append(r)next_obs_batch.append(s_p)done_batch.append(done)return np.array(obs_batch).astype('float32'), \np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'),\np.array(next_obs_batch).astype('float32'), np.array(done_batch).astype('float32')def __len__(self):return len(self.buffer)
Step6 Training && Test(训练&&测试)
In[11]
# 训练一个episode
def run_episode(env, agent, rpm):total_reward = 0obs = env.reset()step = 0while True:step += 1action = agent.sample(obs) # 采样动作,所有动作都有概率被尝试到next_obs, reward, done, _ = env.step(action)rpm.append((obs, action, reward, next_obs, done))# train modelif (len(rpm) > MEMORY_WARMUP_SIZE) and (step % LEARN_FREQ == 0):(batch_obs, batch_action, batch_reward, batch_next_obs,batch_done) = rpm.sample(BATCH_SIZE)train_loss = agent.learn(batch_obs, batch_action, batch_reward,batch_next_obs,batch_done) # s,a,r,s',donetotal_reward += rewardobs = next_obsif done:breakreturn total_reward# 评估 agent, 跑 5 个episode,总reward求平均
def evaluate(env, agent, render=False):eval_reward = []for i in range(5):obs = env.reset()episode_reward = 0while True:action = agent.predict(obs) # 预测动作,只选最优动作obs, reward, done, _ = env.step(action)episode_reward += rewardif render:env.render()if done:breakeval_reward.append(episode_reward)return np.mean(eval_reward)
Step7 创建环境和Agent,创建经验池,启动训练,保存模型
In[14]
env = gym.make('CartPole-v0') # CartPole-v0: 预期最后一次评估总分 > 180(最大值是200)
action_dim = env.action_space.n # CartPole-v0: 2
obs_shape = env.observation_space.shape # CartPole-v0: (4,)rpm = ReplayMemory(MEMORY_SIZE) # DQN的经验回放池# 根据parl框架构建agent
model = Model(act_dim=action_dim)
algorithm = DQN(model, act_dim=action_dim, gamma=GAMMA, lr=LEARNING_RATE)
agent = Agent(algorithm,obs_dim=obs_shape[0],act_dim=action_dim,e_greed=0.1, # 有一定概率随机选取动作,探索e_greed_decrement=1e-6) # 随着训练逐步收敛,探索的程度慢慢降低# 加载模型
# save_path = './dqn_model.ckpt'
# agent.restore(save_path)# 先往经验池里存一些数据,避免最开始训练的时候样本丰富度不够
while len(rpm) < MEMORY_WARMUP_SIZE:run_episode(env, agent, rpm)max_episode = 2000# 开始训练
episode = 0
while episode < max_episode: # 训练max_episode个回合,test部分不计算入episode数量# train partfor i in range(0, 50):total_reward = run_episode(env, agent, rpm)episode += 1# test parteval_reward = evaluate(env, agent, render=False) # render=True 查看显示效果logger.info('episode:{} e_greed:{} test_reward:{}'.format(episode, agent.e_greed, eval_reward))# 训练结束,保存模型
save_path = './dqn_model.ckpt'
agent.save(save_path)
[06-09 16:10:33 MainThread @machine_info.py:84] Cannot find available GPU devices, using CPU now.
[06-09 16:10:33 MainThread @machine_info.py:84] Cannot find available GPU devices, using CPU now.
[06-09 16:10:33 MainThread @machine_info.py:84] Cannot find available GPU devices, using CPU now.
[06-09 16:10:35 MainThread @<ipython-input-14-1f01c261665f>:39] episode:50 e_greed:0.09926899999999927 test_reward:9.6
[06-09 16:10:37 MainThread @<ipython-input-14-1f01c261665f>:39] episode:100 e_greed:0.09872899999999873 test_reward:12.4
[06-09 16:10:40 MainThread @<ipython-input-14-1f01c261665f>:39] episode:150 e_greed:0.0981919999999982 test_reward:11.4
[06-09 16:10:42 MainThread @<ipython-input-14-1f01c261665f>:39] episode:200 e_greed:0.09762199999999763 test_reward:9.6
[06-09 16:10:44 MainThread @<ipython-input-14-1f01c261665f>:39] episode:250 e_greed:0.09710399999999711 test_reward:9.2
Lesson 4 策略梯度方法求解RL——Policy Gradient
1. Policy Gradient简介
-
在强化学习中,有两大类方法,一种基于值(
Value-based
),一种基于策略(Policy-based
)Value-based
的算法的典型代表为Q-learning
和SARSA
,将Q
函数优化到最优,再根据Q
函数取最优策略。Policy-based
的算法的典型代表为Policy Gradient
,直接优化策略函数。
-
采用神经网络拟合策略函数,需计算策略梯度用于优化策略网络。
- 优化的目标是在策略
π(s,a)
的期望回报:所有的轨迹获得的回报R
与对应的轨迹发生概率p
的加权和,当N足够大时,可通过采样N个Episode求平均的方式近似表达。
- 优化目标对参数
θ
求导后得到策略梯度:
- 优化的目标是在策略
2. Policy Gradient实践——REINFORCE算法
- 使用
REINFORCE
解决 连续控制版本的CartPole
问题,向小车提供推力使得车上的摆杆倒立起来。
Step1 安装依赖
In[1]
!pip uninstall -y parl # 说明:AIStudio预装的parl版本太老,容易跟其他库产生兼容性冲突,建议先卸载
!pip uninstall -y pandas scikit-learn # 提示:在AIStudio中卸载这两个库再import parl可避免warning提示,不卸载也不影响parl的使用!pip install gym
!pip install paddlepaddle==1.6.3
!pip install parl==1.3.1# 说明:安装日志中出现两条红色的关于 paddlehub 和 visualdl 的 ERROR 与parl无关,可以忽略,不影响使用
In[ ]
# 检查依赖包版本是否正确
!pip list | grep paddlepaddle
!pip list | grep parl
Step2 导入依赖
In[ ]
import os
import gym
import numpy as npimport paddle.fluid as fluid
import parl
from parl import layers
from parl.utils import logger
Step3 设置超参数
In[ ]
LEARNING_RATE = 1e-3
Step4 搭建Model、Algorithm、Agent架构
Agent
把产生的数据传给algorithm
,algorithm
根据model
的模型结构计算出Loss
,使用SGD
或者其他优化器不断的优化,PARL
这种架构可以很方便的应用在各类深度强化学习问题中。
(1)Model
Model
用来定义前向(Forward
)网络,用户可以自由的定制自己的网络结构。
In[ ]
class Model(parl.Model):def __init__(self, act_dim):act_dim = act_dimhid1_size = act_dim * 10self.fc1 = layers.fc(size=hid1_size, act='tanh')self.fc2 = layers.fc(size=act_dim, act='softmax')def forward(self, obs): # 可直接用 model = Model(5); model(obs)调用out = self.fc1(obs)out = self.fc2(out)return out
(2)Algorithm
Algorithm
定义了具体的算法来更新前向网络(Model
),也就是通过定义损失函数来更新Model
,和算法相关的计算都放在algorithm
中。
In[ ]
# from parl.algorithms import PolicyGradient # 也可以直接从parl库中导入PolicyGradient算法,无需重复写算法class PolicyGradient(parl.Algorithm):def __init__(self, model, lr=None):""" Policy Gradient algorithmArgs:model (parl.Model): policy的前向网络.lr (float): 学习率."""self.model = modelassert isinstance(lr, float)self.lr = lrdef predict(self, obs):""" 使用policy model预测输出的动作概率"""return self.model(obs)def learn(self, obs, action, reward):""" 用policy gradient 算法更新policy model"""act_prob = self.model(obs) # 获取输出动作概率# log_prob = layers.cross_entropy(act_prob, action) # 交叉熵log_prob = layers.reduce_sum(-1.0 * layers.log(act_prob) * layers.one_hot(action, act_prob.shape[1]),dim=1)cost = log_prob * rewardcost = layers.reduce_mean(cost)optimizer = fluid.optimizer.Adam(self.lr)optimizer.minimize(cost)return cost
(3)Agent
Agent
负责算法与环境的交互,在交互过程中把生成的数据提供给Algorithm
来更新模型(Model
),数据的预处理流程也一般定义在这里。
In[ ]
class Agent(parl.Agent):def __init__(self, algorithm, obs_dim, act_dim):self.obs_dim = obs_dimself.act_dim = act_dimsuper(Agent, self).__init__(algorithm)def build_program(self):self.pred_program = fluid.Program()self.learn_program = fluid.Program()with fluid.program_guard(self.pred_program): # 搭建计算图用于 预测动作,定义输入输出变量obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')self.act_prob = self.alg.predict(obs)with fluid.program_guard(self.learn_program): # 搭建计算图用于 更新policy网络,定义输入输出变量obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')act = layers.data(name='act', shape=[1], dtype='int64')reward = layers.data(name='reward', shape=[], dtype='float32')self.cost = self.alg.learn(obs, act, reward)def sample(self, obs):obs = np.expand_dims(obs, axis=0) # 增加一维维度act_prob = self.fluid_executor.run(self.pred_program,feed={'obs': obs.astype('float32')},fetch_list=[self.act_prob])[0]act_prob = np.squeeze(act_prob, axis=0) # 减少一维维度act = np.random.choice(range(self.act_dim), p=act_prob) # 根据动作概率选取动作return actdef predict(self, obs):obs = np.expand_dims(obs, axis=0)act_prob = self.fluid_executor.run(self.pred_program,feed={'obs': obs.astype('float32')},fetch_list=[self.act_prob])[0]act_prob = np.squeeze(act_prob, axis=0)act = np.argmax(act_prob) # 根据动作概率选择概率最高的动作return actdef learn(self, obs, act, reward):act = np.expand_dims(act, axis=-1)feed = {'obs': obs.astype('float32'),'act': act.astype('int64'),'reward': reward.astype('float32')}cost = self.fluid_executor.run(self.learn_program, feed=feed, fetch_list=[self.cost])[0]return cost
Step 5 Training && Test(训练&&测试)
In[ ]
def run_episode(env, agent):obs_list, action_list, reward_list = [], [], []obs = env.reset()while True:obs_list.append(obs)action = agent.sample(obs) # 采样动作action_list.append(action)obs, reward, done, info = env.step(action)reward_list.append(reward)if done:breakreturn obs_list, action_list, reward_list# 评估 agent, 跑 5 个episode,总reward求平均
def evaluate(env, agent, render=False):eval_reward = []for i in range(5):obs = env.reset()episode_reward = 0while True:action = agent.predict(obs) # 选取最优动作obs, reward, isOver, _ = env.step(action)episode_reward += rewardif render:env.render()if isOver:breakeval_reward.append(episode_reward)return np.mean(eval_reward)
Step6 创建环境和Agent,启动训练,保存模型
In[ ]
# 根据一个episode的每个step的reward列表,计算每一个Step的Gt
def calc_reward_to_go(reward_list, gamma=1.0):for i in range(len(reward_list) - 2, -1, -1):# G_t = r_t + γ·r_t+1 + ... = r_t + γ·G_t+1reward_list[i] += gamma * reward_list[i + 1] # Gtreturn np.array(reward_list)# 创建环境
env = gym.make('CartPole-v0')
obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.n
logger.info('obs_dim {}, act_dim {}'.format(obs_dim, act_dim))# 根据parl框架构建agent
model = Model(act_dim=act_dim)
alg = PolicyGradient(model, lr=LEARNING_RATE)
agent = Agent(alg, obs_dim=obs_dim, act_dim=act_dim)# 加载模型
# if os.path.exists('./model.ckpt'):
# agent.restore('./model.ckpt')
# run_episode(env, agent, train_or_test='test', render=True)
# exit()for i in range(1000):obs_list, action_list, reward_list = run_episode(env, agent)if i % 10 == 0:logger.info("Episode {}, Reward Sum {}.".format(i, sum(reward_list)))batch_obs = np.array(obs_list)batch_action = np.array(action_list)batch_reward = calc_reward_to_go(reward_list)agent.learn(batch_obs, batch_action, batch_reward)if (i + 1) % 100 == 0:total_reward = evaluate(env, agent, render=False) # render=True 查看渲染效果,需要在本地运行,AIStudio无法显示logger.info('Test reward: {}'.format(total_reward))# 保存模型到文件 ./model.ckpt
agent.save('./model.ckpt')
[06-09 23:30:14 MainThread @<ipython-input-9-67de6cabb2c8>:13] obs_dim 4, act_dim 2
[06-09 23:30:14 MainThread @machine_info.py:84] Cannot find available GPU devices, using CPU now.
[06-09 23:30:14 MainThread @machine_info.py:84] Cannot find available GPU devices, using CPU now.
[06-09 23:30:14 MainThread @<ipython-input-9-67de6cabb2c8>:30] Episode 0, Reward Sum 28.0.
Lesson 5 连续动作空间上求解RL——DDPG
1. DDPG简介
DDPG
的提出动机其实是为了让DQN
可以扩展到连续的动作空间。DDPG
借鉴了DQN
的两个技巧:经验回放 和 固定Q
网络。DDPG
使用策略网络直接输出确定性动作。DDPG
使用了Actor-Critic
的架构。
2. DDPG实践
- 使用
DDPG
解决连续控制版本的CartPole
问题,给小车一个力(连续量)使得车上的摆杆倒立起来。
Step1 安装依赖
In[1]
!pip uninstall -y parl # 说明:AIStudio预装的parl版本太老,容易跟其他库产生兼容性冲突,建议先卸载
!pip uninstall -y pandas scikit-learn # 提示:在AIStudio中卸载这两个库再import parl可避免warning提示,不卸载也不影响parl的使用!pip install gym
!pip install paddlepaddle==1.6.3
!pip install parl==1.3.1# 说明:安装日志中出现两条红色的关于 paddlehub 和 visualdl 的 ERROR 与parl无关,可以忽略,不影响使用
In[ ]
# 检查依赖包版本是否正确
!pip list | grep paddlepaddle
!pip list | grep parl
Step2 导入依赖
In[2]
import gym
import numpy as np
from copy import deepcopyimport paddle.fluid as fluid
import parl
from parl import layers
from parl.utils import logger
Step3 设置超参数
In[3]
ACTOR_LR = 1e-3 # Actor网络的 learning rate
CRITIC_LR = 1e-3 # Critic网络的 learning rateGAMMA = 0.99 # reward 的衰减因子
TAU = 0.001 # 软更新的系数
MEMORY_SIZE = int(1e6) # 经验池大小
MEMORY_WARMUP_SIZE = MEMORY_SIZE // 20 # 预存一部分经验之后再开始训练
BATCH_SIZE = 128
REWARD_SCALE = 0.1 # reward 缩放系数
NOISE = 0.05 # 动作噪声方差TRAIN_EPISODE = 6000 # 训练的总episode数
Step4 搭建Model、Algorithm、Agent架构
Agent
把产生的数据传给algorithm
,algorithm
根据model
的模型结构计算出Loss
,使用SGD
或者其他优化器不断的优化,PARL
这种架构可以很方便的应用在各类深度强化学习问题中。
(1)Model
Model
用来定义前向(Forward
)网络,用户可以自由的定制自己的网络结构
In[4]
class Model(parl.Model):def __init__(self, act_dim):self.actor_model = ActorModel(act_dim)self.critic_model = CriticModel()def policy(self, obs):return self.actor_model.policy(obs)def value(self, obs, act):return self.critic_model.value(obs, act)def get_actor_params(self):return self.actor_model.parameters()class ActorModel(parl.Model):def __init__(self, act_dim):hid_size = 100self.fc1 = layers.fc(size=hid_size, act='relu')self.fc2 = layers.fc(size=act_dim, act='tanh')def policy(self, obs):hid = self.fc1(obs)means = self.fc2(hid)return meansclass CriticModel(parl.Model):def __init__(self):hid_size = 100self.fc1 = layers.fc(size=hid_size, act='relu')self.fc2 = layers.fc(size=1, act=None)def value(self, obs, act):concat = layers.concat([obs, act], axis=1)hid = self.fc1(concat)Q = self.fc2(hid)Q = layers.squeeze(Q, axes=[1])return Q
(2)Algorithm
Algorithm
定义了具体的算法来更新前向网络(Model
),也就是通过定义损失函数来更新Model
,和算法相关的计算都放在algorithm
中。
In[5]
# from parl.algorithms import DDPG # 也可以直接从parl库中快速引入DDPG算法,无需自己重新写算法class DDPG(parl.Algorithm):def __init__(self,model,gamma=None,tau=None,actor_lr=None,critic_lr=None):""" DDPG algorithmArgs:model (parl.Model): actor and critic 的前向网络.model 必须实现 get_actor_params() 方法.gamma (float): reward的衰减因子.tau (float): self.target_model 跟 self.model 同步参数 的 软更新参数actor_lr (float): actor 的学习率critic_lr (float): critic 的学习率"""assert isinstance(gamma, float)assert isinstance(tau, float)assert isinstance(actor_lr, float)assert isinstance(critic_lr, float)self.gamma = gammaself.tau = tauself.actor_lr = actor_lrself.critic_lr = critic_lrself.model = modelself.target_model = deepcopy(model)def predict(self, obs):""" 使用 self.model 的 actor model 来预测动作"""return self.model.policy(obs)def learn(self, obs, action, reward, next_obs, terminal):""" 用DDPG算法更新 actor 和 critic"""actor_cost = self._actor_learn(obs)critic_cost = self._critic_learn(obs, action, reward, next_obs,terminal)return actor_cost, critic_costdef _actor_learn(self, obs):action = self.model.policy(obs)Q = self.model.value(obs, action)cost = layers.reduce_mean(-1.0 * Q)optimizer = fluid.optimizer.AdamOptimizer(self.actor_lr)optimizer.minimize(cost, parameter_list=self.model.get_actor_params())return costdef _critic_learn(self, obs, action, reward, next_obs, terminal):next_action = self.target_model.policy(next_obs)next_Q = self.target_model.value(next_obs, next_action)terminal = layers.cast(terminal, dtype='float32')target_Q = reward + (1.0 - terminal) * self.gamma * next_Qtarget_Q.stop_gradient = TrueQ = self.model.value(obs, action)cost = layers.square_error_cost(Q, target_Q)cost = layers.reduce_mean(cost)optimizer = fluid.optimizer.AdamOptimizer(self.critic_lr)optimizer.minimize(cost)return costdef sync_target(self, decay=None, share_vars_parallel_executor=None):""" self.target_model从self.model复制参数过来,可设置软更新参数"""if decay is None:decay = 1.0 - self.tauself.model.sync_weights_to(self.target_model,decay=decay,share_vars_parallel_executor=share_vars_parallel_executor)
(3)Agent
Agent
负责算法与环境的交互,在交互过程中把生成的数据提供给Algorithm
来更新模型(Model
),数据的预处理流程也一般定义在这里。
In[6]
class Agent(parl.Agent):def __init__(self, algorithm, obs_dim, act_dim):assert isinstance(obs_dim, int)assert isinstance(act_dim, int)self.obs_dim = obs_dimself.act_dim = act_dimsuper(Agent, self).__init__(algorithm)# 注意:最开始先同步self.model和self.target_model的参数.self.alg.sync_target(decay=0)def build_program(self):self.pred_program = fluid.Program()self.learn_program = fluid.Program()with fluid.program_guard(self.pred_program):obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')self.pred_act = self.alg.predict(obs)with fluid.program_guard(self.learn_program):obs = layers.data(name='obs', shape=[self.obs_dim], dtype='float32')act = layers.data(name='act', shape=[self.act_dim], dtype='float32')reward = layers.data(name='reward', shape=[], dtype='float32')next_obs = layers.data(name='next_obs', shape=[self.obs_dim], dtype='float32')terminal = layers.data(name='terminal', shape=[], dtype='bool')_, self.critic_cost = self.alg.learn(obs, act, reward, next_obs,terminal)def predict(self, obs):obs = np.expand_dims(obs, axis=0)act = self.fluid_executor.run(self.pred_program, feed={'obs': obs},fetch_list=[self.pred_act])[0]act = np.squeeze(act)return actdef learn(self, obs, act, reward, next_obs, terminal):feed = {'obs': obs,'act': act,'reward': reward,'next_obs': next_obs,'terminal': terminal}critic_cost = self.fluid_executor.run(self.learn_program, feed=feed, fetch_list=[self.critic_cost])[0]self.alg.sync_target()return critic_cost
env.py
连续控制版本的CartPole环境
- 该环境代码与算法无关,可忽略不看
In[7]
# env.py
# Continuous version of Cartpoleimport math
import gym
from gym import spaces
from gym.utils import seeding
import numpy as npclass ContinuousCartPoleEnv(gym.Env):metadata = {'render.modes': ['human', 'rgb_array'],'video.frames_per_second': 50}def __init__(self):self.gravity = 9.8self.masscart = 1.0self.masspole = 0.1self.total_mass = (self.masspole + self.masscart)self.length = 0.5 # actually half the pole's lengthself.polemass_length = (self.masspole * self.length)self.force_mag = 30.0self.tau = 0.02 # seconds between state updatesself.min_action = -1.0self.max_action = 1.0# Angle at which to fail the episodeself.theta_threshold_radians = 12 * 2 * math.pi / 360self.x_threshold = 2.4# Angle limit set to 2 * theta_threshold_radians so failing observation# is still within boundshigh = np.array([self.x_threshold * 2,np.finfo(np.float32).max,self.theta_threshold_radians * 2,np.finfo(np.float32).max])self.action_space = spaces.Box(low=self.min_action,high=self.max_action,shape=(1,))self.observation_space = spaces.Box(-high, high)self.seed()self.viewer = Noneself.state = Noneself.steps_beyond_done = Nonedef seed(self, seed=None):self.np_random, seed = seeding.np_random(seed)return [seed]def stepPhysics(self, force):x, x_dot, theta, theta_dot = self.statecostheta = math.cos(theta)sintheta = math.sin(theta)temp = (force + self.polemass_length * theta_dot * theta_dot * sintheta) / self.total_massthetaacc = (self.gravity * sintheta - costheta * temp) / \(self.length * (4.0/3.0 - self.masspole * costheta * costheta / self.total_mass))xacc = temp - self.polemass_length * thetaacc * costheta / self.total_massx = x + self.tau * x_dotx_dot = x_dot + self.tau * xacctheta = theta + self.tau * theta_dottheta_dot = theta_dot + self.tau * thetaaccreturn (x, x_dot, theta, theta_dot)def step(self, action):action = np.expand_dims(action, 0)assert self.action_space.contains(action), \"%r (%s) invalid" % (action, type(action))# Cast action to float to strip np trappingsforce = self.force_mag * float(action)self.state = self.stepPhysics(force)x, x_dot, theta, theta_dot = self.statedone = x < -self.x_threshold \or x > self.x_threshold \or theta < -self.theta_threshold_radians \or theta > self.theta_threshold_radiansdone = bool(done)if not done:reward = 1.0elif self.steps_beyond_done is None:# Pole just fell!self.steps_beyond_done = 0reward = 1.0else:if self.steps_beyond_done == 0:gym.logger.warn("""
You are calling 'step()' even though this environment has already returned
done = True. You should always call 'reset()' once you receive 'done = True'
Any further steps are undefined behavior.""")self.steps_beyond_done += 1reward = 0.0return np.array(self.state), reward, done, {}def reset(self):self.state = self.np_random.uniform(low=-0.05, high=0.05, size=(4,))self.steps_beyond_done = Nonereturn np.array(self.state)def render(self, mode='human'):screen_width = 600screen_height = 400world_width = self.x_threshold * 2scale = screen_width /world_widthcarty = 100 # TOP OF CARTpolewidth = 10.0polelen = scale * 1.0cartwidth = 50.0cartheight = 30.0if self.viewer is None:from gym.envs.classic_control import renderingself.viewer = rendering.Viewer(screen_width, screen_height)l, r, t, b = -cartwidth / 2, cartwidth / 2, cartheight / 2, -cartheight / 2axleoffset = cartheight / 4.0cart = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])self.carttrans = rendering.Transform()cart.add_attr(self.carttrans)self.viewer.add_geom(cart)l, r, t, b = -polewidth / 2, polewidth / 2, polelen-polewidth / 2, -polewidth / 2pole = rendering.FilledPolygon([(l, b), (l, t), (r, t), (r, b)])pole.set_color(.8, .6, .4)self.poletrans = rendering.Transform(translation=(0, axleoffset))pole.add_attr(self.poletrans)pole.add_attr(self.carttrans)self.viewer.add_geom(pole)self.axle = rendering.make_circle(polewidth / 2)self.axle.add_attr(self.poletrans)self.axle.add_attr(self.carttrans)self.axle.set_color(.5, .5, .8)self.viewer.add_geom(self.axle)self.track = rendering.Line((0, carty), (screen_width, carty))self.track.set_color(0, 0, 0)self.viewer.add_geom(self.track)if self.state is None:return Nonex = self.statecartx = x[0] * scale + screen_width / 2.0 # MIDDLE OF CARTself.carttrans.set_translation(cartx, carty)self.poletrans.set_rotation(-x[2])return self.viewer.render(return_rgb_array=(mode == 'rgb_array'))def close(self):if self.viewer:self.viewer.close()
replay_memory.py
经验池 ReplayMemory
- 与
DQN
的replay_mamory.py
代码一致
In[8]
# replay_memory.py
import random
import collections
import numpy as npclass ReplayMemory(object):def __init__(self, max_size):self.buffer = collections.deque(maxlen=max_size)def append(self, exp):self.buffer.append(exp)def sample(self, batch_size):mini_batch = random.sample(self.buffer, batch_size)obs_batch, action_batch, reward_batch, next_obs_batch, done_batch = [], [], [], [], []for experience in mini_batch:s, a, r, s_p, done = experienceobs_batch.append(s)action_batch.append(a)reward_batch.append(r)next_obs_batch.append(s_p)done_batch.append(done)return np.array(obs_batch).astype('float32'), \np.array(action_batch).astype('float32'), np.array(reward_batch).astype('float32'),\np.array(next_obs_batch).astype('float32'), np.array(done_batch).astype('float32')def __len__(self):return len(self.buffer)
Step5 Training && Test(训练&&测试)
In[9]
def run_episode(agent, env, rpm):obs = env.reset()total_reward = 0steps = 0while True:steps += 1batch_obs = np.expand_dims(obs, axis=0)action = agent.predict(batch_obs.astype('float32'))# 增加探索扰动, 输出限制在 [-1.0, 1.0] 范围内action = np.clip(np.random.normal(action, NOISE), -1.0, 1.0)next_obs, reward, done, info = env.step(action)action = [action] # 方便存入replaymemoryrpm.append((obs, action, REWARD_SCALE * reward, next_obs, done))if len(rpm) > MEMORY_WARMUP_SIZE and (steps % 5) == 0:(batch_obs, batch_action, batch_reward, batch_next_obs,batch_done) = rpm.sample(BATCH_SIZE)agent.learn(batch_obs, batch_action, batch_reward, batch_next_obs,batch_done)obs = next_obstotal_reward += rewardif done or steps >= 200:breakreturn total_rewarddef evaluate(env, agent, render=False):eval_reward = []for i in range(5):obs = env.reset()total_reward = 0steps = 0while True:batch_obs = np.expand_dims(obs, axis=0)action = agent.predict(batch_obs.astype('float32'))action = np.clip(action, -1.0, 1.0)steps += 1next_obs, reward, done, info = env.step(action)obs = next_obstotal_reward += rewardif render:env.render()if done or steps >= 200:breakeval_reward.append(total_reward)return np.mean(eval_reward)
Step6 创建环境和Agent,创建经验池,启动训练,保存模型
In[10]
# 创建环境
env = ContinuousCartPoleEnv()obs_dim = env.observation_space.shape[0]
act_dim = env.action_space.shape[0]# 使用PARL框架创建agent
model = Model(act_dim)
algorithm = DDPG(model, gamma=GAMMA, tau=TAU, actor_lr=ACTOR_LR, critic_lr=CRITIC_LR)
agent = Agent(algorithm, obs_dim, act_dim)# 创建经验池
rpm = ReplayMemory(MEMORY_SIZE)
# 往经验池中预存数据
while len(rpm) < MEMORY_WARMUP_SIZE:run_episode(agent, env, rpm)episode = 0
while episode < TRAIN_EPISODE:for i in range(50):total_reward = run_episode(agent, env, rpm)episode += 1eval_reward = evaluate(env, agent, render=False)logger.info('episode:{} test_reward:{}'.format(episode, eval_reward))
[06-11 16:26:59 MainThread @machine_info.py:84] Cannot find available GPU devices, using CPU now.
[06-11 16:26:59 MainThread @machine_info.py:84] Cannot find available GPU devices, using CPU now.
[06-11 16:26:59 MainThread @machine_info.py:84] Cannot find available GPU devices, using CPU now.
[06-11 16:28:44 MainThread @machine_info.py:84] Cannot find available GPU devices, using CPU now.
[06-11 16:28:46 MainThread @<ipython-input-10-f4e0a11a1638>:27] episode:50 test_reward:6.6
[06-11 16:28:48 MainThread @<ipython-input-10-f4e0a11a1638>:27] episode:100 test_reward:5.8
[06-11 16:28:49 MainThread @<ipython-input-10-f4e0a11a1638>:27] episode:150 test_reward:6.0
这篇关于百度七日入门强化学习训练营的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!