本文主要是介绍动手学强化学习reinforcement_learning-chapter-twenty-多智能体强化学习入门,希望对大家解决编程问题提供一定的参考价值,需要的开发者们随着小编来一起学习吧!
动手学强化学习reinforcement learning-chapter-twenty-多智能体强化学习入门
动手学强化学习hrl.boyuai.com/
《动手学强化学习》(张伟楠 沈键 俞勇)【简介_书评_在线阅读】 - 当当图书 (dangdang.com)product.dangdang.com/29391150.html
ZouJiu1/Hands-on-RL: https://hrl.boyuai.com/ (github.com)github.com/ZouJiu1/Hands-on-RL/tree/main
简介
multi-agent reinforcement learning,MARL
单智能体强化学习假设动态环境是 稳态stationary 的,状态转移函数和奖励函数不变,多智能体更加复杂,不仅和环境交互,还直接或者间接的和其他智能体交互。难点:1、环境是non-stationary,2、多个智能体可能多目标,使自己利益最大化,3、评估更难的,需要分布式train
问题modeling
多智能体环境用数组表示: ( N , S , A , R , P ) (N,S,A,R,P) (N,S,A,R,P) ,分别是智能体个数N,所有智能体状态集合 S = ( s 1 , s 2 , . . . ) S=(s_1,s_2,...) S=(s1,s2,...) ,所有智能体的动作集合 A = ( a 1 , a 2 , . . . ) A=(a_1,a_2,...) A=(a1,a2,...) ,R则是所有智能体的奖励函数,P是状态转移概率函数
多智能体强化学习的基本求解范式
完全中心化(full centralized):将多个智能体决策当作一个超级智能体在决策,将状态拼在一起,动作拼在一起,用一个策略网络就可以。
完全去中心化(full decentralized):每个智能体单独train,互不干扰的,每个智能体一个策略网络的。
IPPO algorithm
independent ppo,每个智能体都使用单智能体PPO
IPPO program
首先git clone ma-gym库到本地的
git clone https://github.com/boyu-ai/ma-gym.git
运行的时候报错
Traceback (most recent call last):File "c:\Users\10696\Desktop\access\Hands-on-RL\chapter20.py", line 19, in <module>from ma_gym.envs.combat.combat import CombatFile "C:\Users\10696\Desktop\access\ma-gym\ma_gym\__init__.py", line 10, in <module>env_specs = [env_spec for env_spec in envs.registry.all() if 'gym.envs' in env_spec.entry_point]
AttributeError: 'dict' object has no attribute 'all'
修改的方式是
env_specs = [env_spec for env_spec in envs.registry.all() if 'gym.envs' in env_spec.entry_point]
修改到
env_specs = [env_spec for key, env_spec in envs.registry.items() if 'gym.envs' in env_spec.entry_point]
然后又报错的
Traceback (most recent call last):File "c:\Users\10696\Desktop\access\Hands-on-RL\chapter20.py", line 19, in <module>from ma_gym.envs.combat.combat import CombatFile "C:\Users\10696\Desktop\access\ma-gym\ma_gym\__init__.py", line 15, in <module>kwargs={'name': spec.id, **spec._kwargs}
AttributeError: 'EnvSpec' object has no attribute '_kwargs'. Did you mean: 'kwargs'?
修改的方式是
kwargs={'name': spec.id, **spec._kwargs}
修改到
kwargs={'name': spec.id, **spec.kwargs}
fix two errors by ZouJiu1 · Pull Request #1 · boyu-ai/ma-gym (github.com)
动作的个数是(上、下、左、右、保持不动的),已经存在了共 five 个动作,其他的动作则是 是否攻击敌对目标,program内配置了2个智能体,所以也存在2个敌人,所以共 five + 2 = seven 个动作的呢。
也就是(上、下、左、右、保持不动的、攻击1号敌人,攻击2号敌人)共7个动作,移动和攻击不能同时发生。
状态则不相同的,每个智能体的状态会被初始化到 _agent_i_obs = np.zeros((6, 5, 5)),也就是遍历每个智能体,具体见下面的注释
也就是record 5x5的格子区间内的状态,每个智能体所在的 5x5 格子区间内容。也就是保存了以智能体$中心的5x5格子内的其他智能体或者敌人的内容,包括ID、类型、坐标、cool、健康状况的等。
ma-gym\ma_gym\envs\combat\
combat.py
def get_agent_obs(self):"""When input to a model, each agent is represented by a set of one-hot binary vectors {i, t, l, h, c}encoding its unique ID, team ID, location, health points and cooldown.A model controlling an agent also sees other agents in its visual range (5 × 5 surrounding area).:return:"""_obs = []for agent_i in range(self.n_agents): ## 遍历每个智能体pos = self.agent_pos[agent_i] ## 拿到该智能体的 (x, y) 坐标 # _agent_i_obs = self._one_hot_encoding(agent_i, self.n_agents)# _agent_i_obs += [pos[0] / self._grid_shape[0], pos[1] / (self._grid_shape[1] - 1)] # coordinates# _agent_i_obs += [self.agent_health[agent_i]]# _agent_i_obs += [1 if self._agent_cool else 0] # flag if agent is cooling down# team id, unique id, location, health, cooldown_agent_i_obs = np.zeros((6, 5, 5)) ## 初始化状态到全 0, 5x5的格子区间for row in range(0, 5): ## 行5for col in range(0, 5): ## 列5## 5x5的起始点是【(pos[0] - 2),(pos[1] - 2)】,满足 is_valid 且 这个格子包含智能体,不是空格子的if self.is_valid([row + (pos[0] - 2), col + (pos[1] - 2)]) and (PRE_IDS['empty'] not in self._full_obs[row + (pos[0] - 2)][col + (pos[1] - 2)]):x = self._full_obs[row + pos[0] - 2][col + pos[1] - 2] ## 智能体的名称 A2_type = 1 if PRE_IDS['agent'] in x else -1 ## 智能体或者敌人_id = int(x[1:]) - 1 # id A2内的 2 - 1_agent_i_obs[0][row][col] = _type ## 智能体或者敌人,标识符_agent_i_obs[1][row][col] = _id ## id
# print('type', type, '_type', _type) ## 智能体或者敌人,的健康状况 <= 3_agent_i_obs[2][row][col] = self.agent_health[_id] if _type == 1 else self.opp_health[_id]## 智能体或者敌人是否已经 game over_agent_i_obs[3][row][col] = self._agent_cool[_id] if _type == 1 else self._opp_cool[_id]## game over or not_agent_i_obs[3][row][col] = 1 if _agent_i_obs[3][row][col] else -1 # cool/uncool## 归一化以后的坐标_agent_i_obs[4][row][col] = pos[0] / self._grid_shape[0] # x-coordinate_agent_i_obs[5][row][col] = pos[1] / self._grid_shape[1] # y-coordinate_agent_i_obs = _agent_i_obs.flatten().tolist()_obs.append(_agent_i_obs)return _obs
Program
因 for _ in range(self.epochs): 内的epochs=1,所以重要性采样的数值一直都是1,不变
两个智能体同质所以共享了策略,若状态空间或动作空间不行,那么就不行的呢
## 构造智能体 agent 的大脑,也就是输入状态,返回该状态下,选择每个动作的概率
## 输入是状态的,也就是 (车子center-point的坐标,车子的速度,杆的竖直角度,杆的角速度)
## 返回值应该是2 dim
class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)self.fc3 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x))))return F.softmax(self.fc3(x), dim=1) ## 返回该状态下,选择的动作的概率## 构造智能体 agent 的大脑,也就是输入状态,返回该状态下,每个动作的动作价值
## 输入是状态的,也就是 (车子center-point的坐标,车子的速度,杆的竖直角度,杆的角速度)
## 返回值应该是2 dim
class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, hidden_dim)self.fc3 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc2(F.relu(self.fc1(x))))return self.fc3(x)class PPO:''' PPO算法,采用截断方式 '''def __init__(self, state_dim, hidden_dim, action_dim, actor_lr, critic_lr,lmbda, epochs, eps, gamma, device):self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device) ## 策略网络的self.critic = ValueNet(state_dim, hidden_dim).to(device) ## 价值网络self.actor_optimizer = torch.optim.Adam(self.actor.parameters(),lr=actor_lr) ## 函数配置优化器self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr) ## 价值函数配置优化器self.gamma = gamma ## 衰减因子的呢self.lmbda = lmbdaself.eps = eps # PPO中截断范围的参数self.device = deviceself.epochs = epochsdef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state) ## 拿到该状态下,每个动作的选择概率action_dist = torch.distributions.Categorical(probs) ## 配置 好采样的概率action = action_dist.sample() ## 对该状态下,所有的动作采样,采样的概率是probsreturn action.item() ## 返回依概率采样得到的动作def update(self, transition_dict):## 拿到这条序列内的 状态、动作和奖励,下一个状态、是否完成的states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)## 用下个状态求下一个状态的状态动作价值,然后间接求出当前状态的状态动作价值td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)## 间接求出的价值 - 直接求出的当前状态的状态动作价值,也就是 TD-error,或者是优势函数 Atd_delta = td_target - self.critic(states)## 算出优势函数,广义优势估计,也就是每一步优势的均值advantage = rl_utils.compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)## 选择的旧动作概率的log值,不反向传播求梯度,detachold_log_probs = torch.log(self.actor(states).gather(1,actions)).detach()for _ in range(self.epochs): # 实现是包括了epoch数量的#ret = self.actor(states).gather(1, actions)log_probs = torch.log(ret)ratio = torch.exp(log_probs - old_log_probs) ## 算重要性采样surr1 = ratio * advantage ## 重要性采样和优势估计相乘的surr2 = torch.clamp(ratio, 1 - self.eps,1 + self.eps) * advantage # 截断## 算出来的重要性采样,求出两者间的最小值,然后加负号,也就是最大化目标函数,不加负号的话是最小化目标函数# kk = torch.min(surr1, surr2)# kkk = -torch.sum(kk)actor_loss = torch.mean(-torch.min(surr1, surr2)) # PPO损失函数# kl_grad = torch.autograd.grad(actor_loss, surr1, create_graph=True, retain_graph=True)[0]# kl_grad2 = torch.autograd.grad(actor_loss, surr2, create_graph=True, retain_graph=True)[0]# kll = (kl_grad + kl_grad2 ) * advantage# kl_gra = torch.autograd.grad(actor_loss, ratio, create_graph=True, retain_graph=True)[0]# su = torch.sum(kl_gra!=kll)# kl_g = torch.autograd.grad(torch.sum(log_probs), ret, create_graph=True, retain_graph=True)[0]## 直接求出当前状态的状态动作价值,和 间接求出的价值,使用 MSE 来算损失函数的,td_target不反向传播求梯度,detachcritic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.actor_optimizer.zero_grad()self.critic_optimizer.zero_grad() ## 价值网络的参数梯度置零的actor_loss.backward()critic_loss.backward() ## 价值网络的损失loss反向传播梯度self.actor_optimizer.step() self.critic_optimizer.step() ## update 价值网络的参数team_size = 2
grid_size = (15, 15)
#创建Combat环境,格子世界的大小$15x15,己方智能体和敌方智能体数量都$2
env = Combat(grid_shape=grid_size, n_agents=team_size, n_opponents=team_size)state_dim = env.observation_space[0].shape[0]
action_dim = env.action_space[0].n
#两个智能体共享同一个策略
agent = PPO(state_dim, hidden_dim, action_dim, actor_lr, critic_lr, lmbda, epochs, eps, gamma, device)win_list = []
epoch = 10
allimage = []
for i in range(epoch):with tqdm(total=int(num_episodes / 10), desc='Iteration %d' % i) as pbar:for i_episode in range(int(num_episodes / 10)):s = env.reset()terminal = Falsewhile not terminal:if int(num_episodes / 10)-1 == i_episode and i == epoch - 1:img = env.render(mode = r'rgb_array')allimage.append(img)a_1 = agent.take_action(s[0]) ## 智能体1采取了动作的a_2 = agent.take_action(s[1]) ## 智能体2采取了动作的next_s, r, done, info = env.step([a_1, a_2]) ## 环境执行动作的transition_dict_1['states'].append(s[0]) ## 加入智能体1的当前状态transition_dict_1['actions'].append(a_1) ## 加入智能体1采取的动作transition_dict_1['next_states'].append(next_s[0]) ## 加入智能体1的下一个状态transition_dict_1['rewards'].append( ## 加入智能体1的奖励,胜+100,负-0.1,胜的奖励值多了1000倍r[0] + 100 if info['win'] else r[0] - 0.1) transition_dict_1['dones'].append(False) ## 没有完成的呢transition_dict_2['states'].append(s[1]) ## 加入智能体2的当前状态transition_dict_2['actions'].append(a_2) ## 加入智能体2采取的动作transition_dict_2['next_states'].append(next_s[1]) ## 加入智能体2的下一个状态transition_dict_2['rewards'].append( ## 加入智能体2的奖励,胜+100,负-0.1r[1] + 100 if info['win'] else r[1] - 0.1)transition_dict_2['dones'].append(False) ## 没有完成的呢s = next_sterminal = all(done) ## 是否都已经完成的win_list.append(1 if info["win"] else 0)## 两个智能体默认 共享 同一个策略网络和价值网络agent.update(transition_dict_1) ## 使用智能体1的数据来train策略网络和价值网络agent.update(transition_dict_2) ## 使用智能体2的数据来train策略网络和价值网络if (i_episode + 1) % 100 == 0:pbar.set_postfix({'episode':'%d' % (num_episodes / 10 * i + i_episode + 1),'return':'%.3f' % np.mean(win_list[-100:])})pbar.update(1)
https://zhuanlan.zhihu.com/p/659219840
这篇关于动手学强化学习reinforcement_learning-chapter-twenty-多智能体强化学习入门的文章就介绍到这儿,希望我们推荐的文章对编程师们有所帮助!