DDPG强化学习pytorch代码

2023-11-23 19:30

本文主要是介绍DDPG强化学习pytorch代码,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

DDPG强化学习pytorch代码

参照莫烦大神的强化学习教程tensorflow代码改写成了pytorch代码。
具体代码如下,也可以去我的GitHub上下载

'''
torch = 0.41
'''
import torch
import torch.nn as nn
import torch.nn.functional as F
import numpy as np
import gym
import time#####################  hyper parameters  ####################MAX_EPISODES = 200
MAX_EP_STEPS = 200
LR_A = 0.001    # learning rate for actor
LR_C = 0.002    # learning rate for critic
GAMMA = 0.9     # reward discount
TAU = 0.01      # soft replacement
MEMORY_CAPACITY = 10000
BATCH_SIZE = 32
TAU = 0.01
RENDER = False
ENV_NAME = 'Pendulum-v0'###############################  DDPG  ####################################class ANet(nn.Module):   # ae(s)=adef __init__(self,s_dim,a_dim):super(ANet,self).__init__()self.fc1 = nn.Linear(s_dim,30)self.fc1.weight.data.normal_(0,0.1) # initializationself.out = nn.Linear(30,a_dim)self.out.weight.data.normal_(0,0.1) # initializationdef forward(self,x):x = self.fc1(x)x = F.relu(x)x = self.out(x)x = F.tanh(x)actions_value = x*2return actions_valueclass CNet(nn.Module):   # ae(s)=adef __init__(self,s_dim,a_dim):super(CNet,self).__init__()self.fcs = nn.Linear(s_dim,30)self.fcs.weight.data.normal_(0,0.1) # initializationself.fca = nn.Linear(a_dim,30)self.fca.weight.data.normal_(0,0.1) # initializationself.out = nn.Linear(30,1)self.out.weight.data.normal_(0, 0.1)  # initializationdef forward(self,s,a):x = self.fcs(s)y = self.fca(a)net = F.relu(x+y)actions_value = self.out(net)return actions_valueclass DDPG(object):def __init__(self, a_dim, s_dim, a_bound,):self.a_dim, self.s_dim, self.a_bound = a_dim, s_dim, a_bound,self.memory = np.zeros((MEMORY_CAPACITY, s_dim * 2 + a_dim + 1), dtype=np.float32)self.pointer = 0#self.sess = tf.Session()self.Actor_eval = ANet(s_dim,a_dim)self.Actor_target = ANet(s_dim,a_dim)self.Critic_eval = CNet(s_dim,a_dim)self.Critic_target = CNet(s_dim,a_dim)self.ctrain = torch.optim.Adam(self.Critic_eval.parameters(),lr=LR_C)self.atrain = torch.optim.Adam(self.Actor_eval.parameters(),lr=LR_A)self.loss_td = nn.MSELoss()def choose_action(self, s):s = torch.unsqueeze(torch.FloatTensor(s), 0)return self.Actor_eval(s)[0].detach() # ae(s)def learn(self):for x in self.Actor_target.state_dict().keys():eval('self.Actor_target.' + x + '.data.mul_((1-TAU))')eval('self.Actor_target.' + x + '.data.add_(TAU*self.Actor_eval.' + x + '.data)')for x in self.Critic_target.state_dict().keys():eval('self.Critic_target.' + x + '.data.mul_((1-TAU))')eval('self.Critic_target.' + x + '.data.add_(TAU*self.Critic_eval.' + x + '.data)')# soft target replacement#self.sess.run(self.soft_replace)  # 用ae、ce更新at,ctindices = np.random.choice(MEMORY_CAPACITY, size=BATCH_SIZE)bt = self.memory[indices, :]bs = torch.FloatTensor(bt[:, :self.s_dim])ba = torch.FloatTensor(bt[:, self.s_dim: self.s_dim + self.a_dim])br = torch.FloatTensor(bt[:, -self.s_dim - 1: -self.s_dim])bs_ = torch.FloatTensor(bt[:, -self.s_dim:])a = self.Actor_eval(bs)q = self.Critic_eval(bs,a)  # loss=-q=-ce(s,ae(s))更新ae   ae(s)=a   ae(s_)=a_# 如果 a是一个正确的行为的话,那么它的Q应该更贴近0loss_a = -torch.mean(q) #print(q)#print(loss_a)self.atrain.zero_grad()loss_a.backward()self.atrain.step()a_ = self.Actor_target(bs_)  # 这个网络不及时更新参数, 用于预测 Critic 的 Q_target 中的 actionq_ = self.Critic_target(bs_,a_)  # 这个网络不及时更新参数, 用于给出 Actor 更新参数时的 Gradient ascent 强度q_target = br+GAMMA*q_  # q_target = 负的#print(q_target)q_v = self.Critic_eval(bs,ba)#print(q_v)td_error = self.loss_td(q_target,q_v)# td_error=R + GAMMA * ct(bs_,at(bs_))-ce(s,ba) 更新ce ,但这个ae(s)是记忆中的ba,让ce得出的Q靠近Q_target,让评价更准确#print(td_error)self.ctrain.zero_grad()td_error.backward()self.ctrain.step()def store_transition(self, s, a, r, s_):transition = np.hstack((s, a, [r], s_))index = self.pointer % MEMORY_CAPACITY  # replace the old memory with new memoryself.memory[index, :] = transitionself.pointer += 1###############################  training  ####################################
env = gym.make(ENV_NAME)
env = env.unwrapped
env.seed(1)
s_dim = env.observation_space.shape[0]
a_dim = env.action_space.shape[0]
a_bound = env.action_space.highddpg = DDPG(a_dim, s_dim, a_bound)var = 3  # control exploration
t1 = time.time()
for i in range(MAX_EPISODES):s = env.reset()ep_reward = 0for j in range(MAX_EP_STEPS):if RENDER:env.render()# Add exploration noisea = ddpg.choose_action(s)a = np.clip(np.random.normal(a, var), -2, 2)    # add randomness to action selection for explorations_, r, done, info = env.step(a)ddpg.store_transition(s, a, r / 10, s_)if ddpg.pointer > MEMORY_CAPACITY:var *= .9995    # decay the action randomnessddpg.learn()s = s_ep_reward += rif j == MAX_EP_STEPS-1:print('Episode:', i, ' Reward: %i' % int(ep_reward), 'Explore: %.2f' % var, )if ep_reward > -300:RENDER = Truebreak
print('Running time: ', time.time() - t1)

效果如图所示:

在倒立摆

基于进化算法的强化学习:

"""
According to https://morvanzhou.github.io/tutorials/
required pytorch=0.41"""
import numpy as np
import gym
import multiprocessing as mp
import time
import torch
import torch.nn as nn
import torch.nn.functional as FN_KID = 10                  # half of the training population
N_GENERATION = 5000         # training step
LR = .05                    # learning rate
SIGMA = .05                 # mutation strength or step size
N_CORE = mp.cpu_count()-1
CONFIG = [dict(game="CartPole-v0",n_feature=4, n_action=2, continuous_a=[False], ep_max_step=700, eval_threshold=500),dict(game="MountainCar-v0",n_feature=2, n_action=3, continuous_a=[False], ep_max_step=200, eval_threshold=-120),dict(game="Pendulum-v0",n_feature=3, n_action=1, continuous_a=[True, 2.], ep_max_step=200, eval_threshold=-180)
][0]    # choose your gameclass net(nn.Module):def __init__(self,input_dim,output_dim):super(net,self).__init__()self.fc1 = nn.Linear(input_dim,30)self.fc1.weight.data.normal_(0,1)self.fc2 = nn.Linear(30,20)self.fc2.weight.data.normal_(0,1)self.fc3 = nn.Linear(20,output_dim)self.fc3.weight.data.normal_(0,1)def forward(self,x):x = F.tanh(self.fc1(x))x = F.tanh(self.fc2(x))out = self.fc3(x)return outdef sign(k_id): return -1. if k_id % 2 == 0 else 1.  # mirrored samplingclass SGD(object):                      # optimizer with momentumdef __init__(self, params, learning_rate, momentum=0.9):self.v = np.zeros(params).astype(np.float32)self.lr, self.momentum = learning_rate, momentumdef get_gradients(self, gradients):self.v = self.momentum * self.v + (1. - self.momentum) * gradientsreturn self.lr * self.vdef get_reward(network_param, num_p,env, ep_max_step, continuous_a, seed_and_id=None,):# perturb parameters using seedif seed_and_id is not None:seed, k_id = seed_and_id# for layer in network.children():#     np.random.seed(seed)#     layer.weight.data += torch.FloatTensor(sign(k_id) * SIGMA * np.random.randn(layer.weight.shape[0],layer.weight.shape[1]))#     np.random.seed(seed)#     layer.bias.data += torch.FloatTensor(sign(k_id) * SIGMA * np.random.randn(layer.bias.shape[0]))np.random.seed(seed)params = torch.FloatTensor(sign(k_id) * SIGMA * np.random.randn(num_p))Net = net(CONFIG['n_feature'],CONFIG['n_action'])Net.load_state_dict(network_param)for layer in Net.children():layer.weight.data += params[:layer.weight.shape[0]*layer.weight.shape[1]].view(layer.weight.shape[0],layer.weight.shape[1])layer.bias.data += params[layer.weight.shape[0]*layer.weight.shape[1]:layer.bias.shape[0]+layer.weight.shape[0]*layer.weight.shape[1]]params = params[layer.bias.shape[0]+layer.weight.shape[0]*layer.weight.shape[1]:]else:Net = net(CONFIG['n_feature'], CONFIG['n_action'])Net.load_state_dict(network_param)# run episodes = env.reset()ep_r = 0.for step in range(ep_max_step):a = get_action(Net, s, continuous_a)  # continuous_a 动作是否连续s, r, done, _ = env.step(a)# mountain car's reward can be trickyif env.spec._env_name == 'MountainCar' and s[0] > -0.1: r = 0.ep_r += rif done: breakreturn ep_rdef get_action(network, x, continuous_a):x = torch.unsqueeze(torch.FloatTensor(x), 0)x = network.forward(x)if not continuous_a[0]: return np.argmax(x.detach().numpy(), axis=1)[0]      # for discrete actionelse: return continuous_a[1] * np.tanh(x.detach().numpy())[0]                # for continuous actiondef train(network_param, num_p,optimizer, utility, pool):# pass seed instead whole noise matrix to parallel will save your timenoise_seed = np.random.randint(0, 2 ** 32 - 1, size=N_KID, dtype=np.uint32).repeat(2)    # mirrored sampling# 生成一些镜像的噪点,每一个种群一个噪点seed# distribute training in parallel'''apply_async 是异步非阻塞的。即不用等待当前进程执行完毕,随时根据系统调度来进行进程切换。'''jobs = [pool.apply_async(get_reward, (network_param, num_p,env, CONFIG['ep_max_step'], CONFIG['continuous_a'],[noise_seed[k_id], k_id], )) for k_id in range(N_KID*2)]# 塞了2*种群个进去rewards = np.array([j.get() for j in jobs])# 排列rewardkids_rank = np.argsort(rewards)[::-1]               # rank kid id by reward#All_data = []# for layer in network.children():#     weight_data = 0#     bias_data = 0#     for ui, k_id in enumerate(kids_rank):#         np.random.seed(noise_seed[k_id])#         weight_data += utility[ui] * sign(k_id) * np.random.randn(layer.weight.shape[0],layer.weight.shape[1])#         np.random.seed(noise_seed[k_id])#         bias_data += utility[ui] * sign(k_id) * np.random.randn(layer.bias.shape[0])#     weight_data = weight_data.flatten()#     All_data.append(weight_data)#     All_data.append(bias_data)All_data = 0for ui, k_id in enumerate(kids_rank):np.random.seed(noise_seed[k_id])  # reconstruct noise using seedAll_data += utility[ui] * sign(k_id) * np.random.randn(num_p)  # reward大的乘的utility也大# 用的噪声配列降序相乘系数 相加'''utility 就是将 reward 排序, reward 最大的那个, 对应上 utility 的第一个, 反之, reward 最小的对应上 utility 最后一位'''#All_data = [data/(2*N_KID*SIGMA) for data in All_data]#All_data = np.concatenate(All_data)gradients = optimizer.get_gradients(All_data/(2*N_KID*SIGMA))gradients = torch.FloatTensor(gradients)for layer in network_param.keys():if 'weight' in layer:network_param[layer] += gradients[:network_param[layer].shape[0]*network_param[layer].shape[1]].view(network_param[layer].shape[0],network_param[layer].shape[1])gradients = gradients[network_param[layer].shape[0] * network_param[layer].shape[1]:]if 'bias' in layer:network_param[layer] += gradients[:network_param[layer].shape[0]]gradients = gradients[network_param[layer].shape[0]:]return network_param, rewardsif __name__ == "__main__":# utility instead reward for update parameters (rank transformation)base = N_KID * 2    # *2 for mirrored sampling  种群数rank = np.arange(1, base + 1)util_ = np.maximum(0, np.log(base / 2 + 1) - np.log(rank))utility = util_ / util_.sum() - 1 / base# trainingNet_org = net(CONFIG['n_feature'],CONFIG['n_action']).state_dict()#print(Net.fc1.weight.data[0][0])num_params = 0for r in list(Net_org):num_params+=Net_org[r].numel()env = gym.make(CONFIG['game']).unwrappedoptimizer = SGD(num_params, LR)pool = mp.Pool(processes=N_CORE)  # 多线程mar = None      # moving average rewardfor g in range(N_GENERATION):t0 = time.time()Net_org, kid_rewards = train(Net_org, num_params,optimizer, utility, pool)# 更新了参数# test trained net without noisenet_r = get_reward(Net_org, num_params,env, CONFIG['ep_max_step'], CONFIG['continuous_a'], None,)mar = net_r if mar is None else 0.9 * mar + 0.1 * net_r       # moving average rewardprint('Gen: ', g,'| Net_R: %.1f' % mar,'| Kid_avg_R: %.1f' % kid_rewards.mean(),'| Gen_T: %.2f' % (time.time() - t0),)if mar >= CONFIG['eval_threshold']: break# testprint("\nTESTING....")#p = params_reshape(net_shapes, net_params)while True:s = env.reset()for _ in range(CONFIG['ep_max_step']):env.render()net_test = net(CONFIG['n_feature'],CONFIG['n_action'])net_test.load_state_dict(Net_org)a = get_action(net_test, s, CONFIG['continuous_a'])s, _, done, _ = env.step(a)if done: break

这篇关于DDPG强化学习pytorch代码的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/420217

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

活用c4d官方开发文档查询代码

当你问AI助手比如豆包,如何用python禁止掉xpresso标签时候,它会提示到 这时候要用到两个东西。https://developers.maxon.net/论坛搜索和开发文档 比如这里我就在官方找到正确的id描述 然后我就把参数标签换过来

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

poj 1258 Agri-Net(最小生成树模板代码)

感觉用这题来当模板更适合。 题意就是给你邻接矩阵求最小生成树啦。~ prim代码:效率很高。172k...0ms。 #include<stdio.h>#include<algorithm>using namespace std;const int MaxN = 101;const int INF = 0x3f3f3f3f;int g[MaxN][MaxN];int n

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

【学习笔记】 陈强-机器学习-Python-Ch15 人工神经网络(1)sklearn

系列文章目录 监督学习:参数方法 【学习笔记】 陈强-机器学习-Python-Ch4 线性回归 【学习笔记】 陈强-机器学习-Python-Ch5 逻辑回归 【课后题练习】 陈强-机器学习-Python-Ch5 逻辑回归(SAheart.csv) 【学习笔记】 陈强-机器学习-Python-Ch6 多项逻辑回归 【学习笔记 及 课后题练习】 陈强-机器学习-Python-Ch7 判别分析 【学

系统架构师考试学习笔记第三篇——架构设计高级知识(20)通信系统架构设计理论与实践

本章知识考点:         第20课时主要学习通信系统架构设计的理论和工作中的实践。根据新版考试大纲,本课时知识点会涉及案例分析题(25分),而在历年考试中,案例题对该部分内容的考查并不多,虽在综合知识选择题目中经常考查,但分值也不高。本课时内容侧重于对知识点的记忆和理解,按照以往的出题规律,通信系统架构设计基础知识点多来源于教材内的基础网络设备、网络架构和教材外最新时事热点技术。本课时知识