动手学强化学习reinforcement_learning-chapter-twenty-多智能体强化学习入门

本文主要是介绍动手学强化学习reinforcement_learning-chapter-twenty-多智能体强化学习入门,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!

动手学强化学习reinforcement learning-chapter-twenty-多智能体强化学习入门

动手学强化学习​hrl.boyuai.com/

《动手学强化学习》(张伟楠 沈键 俞勇)【简介_书评_在线阅读】 - 当当图书 (dangdang.com)​product.dangdang.com/29391150.html

ZouJiu1/Hands-on-RL: https://hrl.boyuai.com/ (github.com)​github.com/ZouJiu1/Hands-on-RL/tree/main

简介

在这里插入图片描述

multi-agent reinforcement learning,MARL

单智能体强化学习假设动态环境是 稳态stationary 的,状态转移函数和奖励函数不变,多智能体更加复杂,不仅和环境交互,还直接或者间接的和其他智能体交互。难点:1、环境是non-stationary,2、多个智能体可能多目标,使自己利益最大化,3、评估更难的,需要分布式train

问题modeling

多智能体环境用数组表示: ( N , S , A , R , P ) (N,S,A,R,P) (N,S,A,R,P) ,分别是智能体个数N,所有智能体状态集合 S = ( s 1 , s 2 , . . . ) S=(s_1,s_2,...) S=(s1,s2,...) ,所有智能体的动作集合 A = ( a 1 , a 2 , . . . ) A=(a_1,a_2,...) A=(a1,a2,...) ,R则是所有智能体的奖励函数,P是状态转移概率函数

多智能体强化学习的基本求解范式

完全中心化(full centralized):将多个智能体决策当作一个超级智能体在决策,将状态拼在一起,动作拼在一起,用一个策略网络就可以。

完全去中心化(full decentralized):每个智能体单独train,互不干扰的,每个智能体一个策略网络的。

IPPO algorithm

independent ppo,每个智能体都使用单智能体PPO

IPPO program

首先git clone ma-gym库到本地的

git clone https://github.com/boyu-ai/ma-gym.git

运行的时候报错

Traceback (most recent call last):File "c:\Users\10696\Desktop\access\Hands-on-RL\chapter20.py", line 19, in <module>from ma_gym.envs.combat.combat import CombatFile "C:\Users\10696\Desktop\access\ma-gym\ma_gym\__init__.py", line 10, in <module>env_specs = [env_spec for env_spec in envs.registry.all() if 'gym.envs' in env_spec.entry_point]
AttributeError: 'dict' object has no attribute 'all'

修改的方式是

env_specs = [env_spec for env_spec in envs.registry.all() if 'gym.envs' in env_spec.entry_point]
修改到
env_specs = [env_spec for key, env_spec in envs.registry.items() if 'gym.envs' in env_spec.entry_point]

然后又报错的

Traceback (most recent call last):File "c:\Users\10696\Desktop\access\Hands-on-RL\chapter20.py", line 19, in <module>from ma_gym.envs.combat.combat import CombatFile "C:\Users\10696\Desktop\access\ma-gym\ma_gym\__init__.py", line 15, in <module>kwargs={'name': spec.id, **spec._kwargs}
AttributeError: 'EnvSpec' object has no attribute '_kwargs'. Did you mean: 'kwargs'?

修改的方式是

kwargs={'name': spec.id, **spec._kwargs}
修改到
kwargs={'name': spec.id, **spec.kwargs}

fix two errors by ZouJiu1 · Pull Request #1 · boyu-ai/ma-gym (github.com)

动作的个数是(上、下、左、右、保持不动的),已经存在了共 five 个动作,其他的动作则是 是否攻击敌对目标,program内配置了2个智能体,所以也存在2个敌人,所以共 five + 2 = seven 个动作的呢。

也就是(上、下、左、右、保持不动的、攻击1号敌人,攻击2号敌人)共7个动作,移动和攻击不能同时发生。

状态则不相同的,每个智能体的状态会被初始化到 _agent_i_obs = np.zeros((6, 5, 5)),也就是遍历每个智能体,具体见下面的注释

也就是record 5x5的格子区间内的状态,每个智能体所在的 5x5 格子区间内容。也就是保存了以智能体$中心的5x5格子内的其他智能体或者敌人的内容,包括ID、类型、坐标、cool、健康状况的等。

ma-gym\ma_gym\envs\combat\

combat.py

    def get_agent_obs(self):"""When input to a model, each agent is represented by a set of one-hot binary vectors {i, t, l, h, c}encoding its unique ID, team ID, location, health points and cooldown.A model controlling an agent also sees other agents in its visual range (5 × 5 surrounding area).:return:"""_obs = []for agent_i in range(self.n_agents):  ## 遍历每个智能体pos = self.agent_pos[agent_i]     ## 拿到该智能体的 (x, y) 坐标 # _agent_i_obs = self._one_hot_encoding(agent_i, self.n_agents)# _agent_i_obs += [pos[0] / self._grid_shape[0], pos[1] / (self._grid_shape[1] - 1)]  # coordinates# _agent_i_obs += [self.agent_health[agent_i]]# _agent_i_obs += [1 if self._agent_cool else 0]  # flag if agent is cooling down# team id, unique id, location, health, cooldown_agent_i_obs = np.zeros((6, 5, 5))  ## 初始化状态到全 0, 5x5的格子区间for row in range(0, 5):     ## 行5for col in range(0, 5):  ## 列5## 5x5的起始点是【(pos[0] - 2),(pos[1] - 2)】,满足 is_valid 且 这个格子包含智能体,不是空格子的if self.is_valid([row + (pos[0] - 2), col + (pos[1] - 2)]) and (PRE_IDS['empty'] not in self._full_obs[row + (pos[0] - 2)][col + (pos[1] - 2)]):x = self._full_obs[row + pos[0] - 2][col + pos[1] - 2] ## 智能体的名称 A2_type = 1 if PRE_IDS['agent'] in x else -1   ## 智能体或者敌人_id = int(x[1:]) - 1  # id         A2内的 2 - 1_agent_i_obs[0][row][col] = _type   ## 智能体或者敌人,标识符_agent_i_obs[1][row][col] = _id     ## id
#                         print('type', type, '_type', _type)  ## 智能体或者敌人,的健康状况 <= 3_agent_i_obs[2][row][col] = self.agent_health[_id] if _type == 1 else self.opp_health[_id]## 智能体或者敌人是否已经 game over_agent_i_obs[3][row][col] = self._agent_cool[_id] if _type == 1 else self._opp_cool[_id]## game over or not_agent_i_obs[3][row][col] = 1 if _agent_i_obs[3][row][col] else -1  # cool/uncool## 归一化以后的坐标_agent_i_obs[4][row][col] = pos[0] / self._grid_shape[0]  # x-coordinate_agent_i_obs[5][row][col] = pos[1] / self._grid_shape[1]  # y-coordinate_agent_i_obs = _agent_i_obs.flatten().tolist()_obs.append(_agent_i_obs)return _obs

Program

因 for _ in range(self.epochs): 内的epochs=1,所以重要性采样的数值一直都是1,不变

两个智能体同质所以共享了策略,若状态空间或动作空间不行,那么就不行的呢

## 构造智能体 agent 的大脑,也就是输入状态,返回该状态下,选择每个动作的概率
## 输入是状态的,也就是 (车子center-point的坐标,车子的速度,杆的竖直角度,杆的角速度)
## 返回值应该是2 dim
class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)self.fc3 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x))))return F.softmax(self.fc3(x), dim=1)  ## 返回该状态下,选择的动作的概率## 构造智能体 agent 的大脑,也就是输入状态,返回该状态下,每个动作的动作价值
## 输入是状态的,也就是 (车子center-point的坐标,车子的速度,杆的竖直角度,杆的角速度)
## 返回值应该是2 dim
class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)self.fc3 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x))))return self.fc3(x)class PPO:''' PPO算法,采用截断方式 '''def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device):self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)   ##  策略网络的self.critic = ValueNet(state_dim, hidden_dim).to(device)       ##  价值网络self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr)           ##  函数配置优化器self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)         ##  价值函数配置优化器self.gamma = gamma          ## 衰减因子的呢self.lmbda = lmbdaself.eps = eps  # PPO中截断范围的参数self.device = deviceself.epochs = epochsdef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)       ## 拿到该状态下,每个动作的选择概率action_dist = torch.distributions.Categorical(probs)    ##   配置 好采样的概率action = action_dist.sample()        ## 对该状态下,所有的动作采样,采样的概率是probsreturn action.item()                 ## 返回依概率采样得到的动作def update(self, transition_dict):## 拿到这条序列内的 状态、动作和奖励,下一个状态、是否完成的states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)## 用下个状态求下一个状态的状态动作价值,然后间接求出当前状态的状态动作价值td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)## 间接求出的价值 - 直接求出的当前状态的状态动作价值,也就是 TD-error,或者是优势函数 Atd_delta = td_target - self.critic(states)##  算出优势函数,广义优势估计,也就是每一步优势的均值advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)## 选择的旧动作概率的log值,不反向传播求梯度,detachold_log_probs = torch.log(self.actor(states).gather(1,actions)).detach()for _ in range(self.epochs):  # 实现是包括了epoch数量的#ret = self.actor(states).gather(1, actions)log_probs = torch.log(ret)ratio = torch.exp(log_probs - old_log_probs)    ## 算重要性采样surr1 = ratio * advantage  ## 重要性采样和优势估计相乘的surr2 = torch.clamp(ratio, 1 - self.eps,1 + self.eps) * advantage  # 截断## 算出来的重要性采样,求出两者间的最小值,然后加负号,也就是最大化目标函数,不加负号的话是最小化目标函数# kk = torch.min(surr1, surr2)# kkk = -torch.sum(kk)actor_loss = torch.mean(-torch.min(surr1, surr2))  # PPO损失函数# kl_grad = torch.autograd.grad(actor_loss, surr1, create_graph=True, retain_graph=True)[0]# kl_grad2 = torch.autograd.grad(actor_loss, surr2, create_graph=True, retain_graph=True)[0]# kll = (kl_grad + kl_grad2 ) * advantage# kl_gra = torch.autograd.grad(actor_loss, ratio, create_graph=True, retain_graph=True)[0]# su = torch.sum(kl_gra!=kll)# kl_g = torch.autograd.grad(torch.sum(log_probs), ret, create_graph=True, retain_graph=True)[0]## 直接求出当前状态的状态动作价值,和 间接求出的价值,使用 MSE 来算损失函数的,td_target不反向传播求梯度,detachcritic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad()      ## 价值网络的参数梯度置零的actor_loss.backward()critic_loss.backward()                 ## 价值网络的损失loss反向传播梯度self.actor_optimizer.step() self.critic_optimizer.step()           ## update 价值网络的参数team_size = 2
grid_size = (15, 15)
#创建Combat环境,格子世界的大小$15x15,己方智能体和敌方智能体数量都$2
env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size)state_dim = env.observation_space[0].shape[0]
action_dim = env.action_space[0].n
#两个智能体共享同一个策略
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)win_list = []
epoch = 10
allimage = []
for i in range(epoch):with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:for i_episode in range(int(num_episodes / 10)):s = env.reset()terminal = Falsewhile not terminal:if int(num_episodes / 10)-1 == i_episode and i == epoch - 1:img = env.render(mode = r'rgb_array')allimage.append(img)a_1 = agent.take_action(s[0])                      ## 智能体1采取了动作的a_2 = agent.take_action(s[1])                      ## 智能体2采取了动作的next_s, r, done, info = env.step([a_1, a_2])       ## 环境执行动作的transition_dict_1['states'].append(s[0])           ## 加入智能体1的当前状态transition_dict_1['actions'].append(a_1)           ## 加入智能体1采取的动作transition_dict_1['next_states'].append(next_s[0]) ## 加入智能体1的下一个状态transition_dict_1['rewards'].append(               ## 加入智能体1的奖励,胜+100,负-0.1,胜的奖励值多了1000倍r[0] + 100 if info['win'] else r[0] - 0.1)  transition_dict_1['dones'].append(False)           ## 没有完成的呢transition_dict_2['states'].append(s[1])           ## 加入智能体2的当前状态transition_dict_2['actions'].append(a_2)           ## 加入智能体2采取的动作transition_dict_2['next_states'].append(next_s[1]) ## 加入智能体2的下一个状态transition_dict_2['rewards'].append(               ## 加入智能体2的奖励,胜+100,负-0.1r[1] + 100 if info['win'] else r[1] - 0.1)transition_dict_2['dones'].append(False)           ## 没有完成的呢s = next_sterminal = all(done)                               ## 是否都已经完成的win_list.append(1 if info["win"] else 0)## 两个智能体默认 共享 同一个策略网络和价值网络agent.update(transition_dict_1)  ## 使用智能体1的数据来train策略网络和价值网络agent.update(transition_dict_2)  ## 使用智能体2的数据来train策略网络和价值网络if (i_episode + 1) % 100 == 0:pbar.set_postfix({'episode':'%d' % (num_episodes / 10 * i + i_episode + 1),'return':'%.3f' % np.mean(win_list[-100:])})pbar.update(1)

https://zhuanlan.zhihu.com/p/659219840

这篇关于动手学强化学习reinforcement_learning-chapter-twenty-多智能体强化学习入门的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!



http://www.chinasem.cn/article/332686

相关文章

HarmonyOS学习(七)——UI(五)常用布局总结

自适应布局 1.1、线性布局(LinearLayout) 通过线性容器Row和Column实现线性布局。Column容器内的子组件按照垂直方向排列,Row组件中的子组件按照水平方向排列。 属性说明space通过space参数设置主轴上子组件的间距,达到各子组件在排列上的等间距效果alignItems设置子组件在交叉轴上的对齐方式,且在各类尺寸屏幕上表现一致,其中交叉轴为垂直时,取值为Vert

Ilya-AI分享的他在OpenAI学习到的15个提示工程技巧

Ilya(不是本人,claude AI)在社交媒体上分享了他在OpenAI学习到的15个Prompt撰写技巧。 以下是详细的内容: 提示精确化:在编写提示时,力求表达清晰准确。清楚地阐述任务需求和概念定义至关重要。例:不用"分析文本",而用"判断这段话的情感倾向:积极、消极还是中性"。 快速迭代:善于快速连续调整提示。熟练的提示工程师能够灵活地进行多轮优化。例:从"总结文章"到"用

Spring Security 从入门到进阶系列教程

Spring Security 入门系列 《保护 Web 应用的安全》 《Spring-Security-入门(一):登录与退出》 《Spring-Security-入门(二):基于数据库验证》 《Spring-Security-入门(三):密码加密》 《Spring-Security-入门(四):自定义-Filter》 《Spring-Security-入门(五):在 Sprin

【前端学习】AntV G6-08 深入图形与图形分组、自定义节点、节点动画(下)

【课程链接】 AntV G6:深入图形与图形分组、自定义节点、节点动画(下)_哔哩哔哩_bilibili 本章十吾老师讲解了一个复杂的自定义节点中,应该怎样去计算和绘制图形,如何给一个图形制作不间断的动画,以及在鼠标事件之后产生动画。(有点难,需要好好理解) <!DOCTYPE html><html><head><meta charset="UTF-8"><title>06

学习hash总结

2014/1/29/   最近刚开始学hash,名字很陌生,但是hash的思想却很熟悉,以前早就做过此类的题,但是不知道这就是hash思想而已,说白了hash就是一个映射,往往灵活利用数组的下标来实现算法,hash的作用:1、判重;2、统计次数;

嵌入式QT开发:构建高效智能的嵌入式系统

摘要: 本文深入探讨了嵌入式 QT 相关的各个方面。从 QT 框架的基础架构和核心概念出发,详细阐述了其在嵌入式环境中的优势与特点。文中分析了嵌入式 QT 的开发环境搭建过程,包括交叉编译工具链的配置等关键步骤。进一步探讨了嵌入式 QT 的界面设计与开发,涵盖了从基本控件的使用到复杂界面布局的构建。同时也深入研究了信号与槽机制在嵌入式系统中的应用,以及嵌入式 QT 与硬件设备的交互,包括输入输出设

让树莓派智能语音助手实现定时提醒功能

最初的时候是想直接在rasa 的chatbot上实现,因为rasa本身是带有remindschedule模块的。不过经过一番折腾后,忽然发现,chatbot上实现的定时,语音助手不一定会有响应。因为,我目前语音助手的代码设置了长时间无应答会结束对话,这样一来,chatbot定时提醒的触发就不会被语音助手获悉。那怎么让语音助手也具有定时提醒功能呢? 我最后选择的方法是用threading.Time

零基础学习Redis(10) -- zset类型命令使用

zset是有序集合,内部除了存储元素外,还会存储一个score,存储在zset中的元素会按照score的大小升序排列,不同元素的score可以重复,score相同的元素会按照元素的字典序排列。 1. zset常用命令 1.1 zadd  zadd key [NX | XX] [GT | LT]   [CH] [INCR] score member [score member ...]

【机器学习】高斯过程的基本概念和应用领域以及在python中的实例

引言 高斯过程(Gaussian Process,简称GP)是一种概率模型,用于描述一组随机变量的联合概率分布,其中任何一个有限维度的子集都具有高斯分布 文章目录 引言一、高斯过程1.1 基本定义1.1.1 随机过程1.1.2 高斯分布 1.2 高斯过程的特性1.2.1 联合高斯性1.2.2 均值函数1.2.3 协方差函数(或核函数) 1.3 核函数1.4 高斯过程回归(Gauss

数论入门整理(updating)

一、gcd lcm 基础中的基础,一般用来处理计算第一步什么的,分数化简之类。 LL gcd(LL a, LL b) { return b ? gcd(b, a % b) : a; } <pre name="code" class="cpp">LL lcm(LL a, LL b){LL c = gcd(a, b);return a / c * b;} 例题: