本书之前介绍的基于策略的方法包括策略梯度算法和 Actor-Critic 算法。这些方法虽然简单、直观,但在实际应用过程中会遇到训练不稳定的情况。回顾一下基于策略的方法:参数化智能体的策略,并设计衡量策略好坏的目标函数,通过梯度上升的方法来最大化这个目标函数,使得策略最优。具体来说,假设
针对以上问题,我们考虑在更新时找到一块信任区域(trust region),在这个区域上更新策略时能够得到某种策略性能的安全性保证,这就是信任区域策略优化(trust region policy optimization,TRPO)算法的主要思想。TRPO 算法在 2015 年被提出,它在理论上能够保证策略学习的性能单调性,并在实际应用中取得了比策略梯度算法更好的效果。
假设当前策略为
基于以上等式,我们可以推导新旧策略的目标函数之间的差距:
将时序差分残差定义为优势函数
最后一个等号的成立运用到了状态访问分布的定义:
但是直接求解该式是非常困难的,因为
当新旧策略非常接近时,状态访问分布变化很小,这么近似是合理的。其中,动作仍然用新策略
这样,我们就可以基于旧策略
这里的不等式约束定义了策略空间中的一个 KL 球,被称为信任区域。在这个区域中,可以认为当前学习策略和环境交互的状态分布与上一轮策略最后采样的状态分布一致,进而可以基于一步行动的重要性采样方法使当前学习策略稳定提升。TRPO 背后的原理如图 11-1 所示。
左图表示当完全不设置信任区域时,策略的梯度更新可能导致策略的性能骤降;右图表示当设置了信任区域时,可以保证每次策略的梯度更新都能来带性能的提升
直接求解上式带约束的优化问题比较麻烦,TRPO 在其具体实现中做了一步近似操作来快速求解。为方便起见,我们在接下来的式子中用
其中
于是我们的优化目标变成了:
此时,我们可以用卡罗需-库恩-塔克(Karush-Kuhn-Tucker,KKT)条件直接导出上述问题的解:
一般来说,用神经网络表示的策略函数的参数数量都是成千上万的,计算和存储黑塞矩阵
因此,只要可以直接计算
在共轭梯度运算过程中,直接计算
即先用梯度和向量
由于 TRPO 算法用到了泰勒展开的 1 阶和 2 阶近似,这并非精准求解,因此,
求出的
至此,我们已经基本上清楚了 TRPO 算法的大致过程,它具体的算法流程如下:
从 11.5 节中,我们尚未得知如何估计优势函数
然后,GAE 将这些不同步数的优势估计进行指数加权平均:
其中,
下面一段是 GAE 的代码,给定
def compute_advantage(gamma, lmbda, td_delta):td_delta = td_delta.detach().numpy()advantage_list = []advantage = 0.0for delta in td_delta[::-1]:advantage = gamma * lmbda * advantage + deltaadvantage_list.append(advantage)advantage_list.reverse()return torch.tensor(advantage_list, dtype=torch.float)
本节将使用支持与离散和连续两种动作交互的环境来进行 TRPO 的实验。我们使用的第一个环境是车杆(CartPole),第二个环境是倒立摆(Inverted Pendulum)。
首先导入一些必要的库。
import torchimport numpy as npimport gymimport matplotlib.pyplot as pltimport torch.nn.functional as Fimport rl_utilsimport copy
然后定义策略网络和价值网络(与 Actor-Critic 算法一样)。
class PolicyNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))return F.softmax(self.fc2(x), dim=1)class ValueNet(torch.nn.Module):def __init__(self, state_dim, hidden_dim):super(ValueNet, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc2 = torch.nn.Linear(hidden_dim, 1)def forward(self, x):x = F.relu(self.fc1(x))return self.fc2(x)class TRPO:""" TRPO算法 """def __init__(self, hidden_dim, state_space, action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device):state_dim = state_space.shape[0]action_dim = action_space.n# 策略网络参数不需要优化器更新self.actor = PolicyNet(state_dim, hidden_dim, action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbda # GAE参数self.kl_constraint = kl_constraint # KL距离最大限制self.alpha = alpha # 线性搜索参数self.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)probs = self.actor(state)action_dist = torch.distributions.Categorical(probs)action = action_dist.sample()return action.item()def hessian_matrix_vector_product(self, states, old_action_dists, vector):# 计算黑塞矩阵和一个向量的乘积new_action_dists = torch.distributions.Categorical(self.actor(states))kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists)) # 计算平均KL距离kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True)kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])# KL距离的梯度先和向量进行点积运算kl_grad_vector_product = torch.dot(kl_grad_vector, vector)grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters())grad2_vector = torch.cat([grad.view(-1) for grad in grad2])return grad2_vectordef conjugate_gradient(self, grad, states, old_action_dists): # 共轭梯度法求解方程x = torch.zeros_like(grad)r = grad.clone()p = grad.clone()rdotr = torch.dot(r, r)for i in range(10): # 共轭梯度主循环Hp = self.hessian_matrix_vector_product(states, old_action_dists,p)alpha = rdotr / torch.dot(p, Hp)x += alpha * pr -= alpha * Hpnew_rdotr = torch.dot(r, r)if new_rdotr < 1e-10:breakbeta = new_rdotr / rdotrp = r + beta * prdotr = new_rdotrreturn xdef compute_surrogate_obj(self, states, actions, advantage, old_log_probs,actor): # 计算策略目标log_probs = torch.log(actor(states).gather(1, actions))ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)def line_search(self, states, actions, advantage, old_log_probs,old_action_dists, max_vec): # 线性搜索old_para = torch.nn.utils.convert_parameters.parameters_to_vector(self.actor.parameters())old_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)for i in range(15): # 线性搜索主循环coef = self.alpha**inew_para = old_para + coef * max_vecnew_actor = copy.deepcopy(self.actor)torch.nn.utils.convert_parameters.vector_to_parameters(new_para, new_actor.parameters())new_action_dists = torch.distributions.Categorical(new_actor(states))kl_div = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))new_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, new_actor)if new_obj > old_obj and kl_div < self.kl_constraint:return new_parareturn old_paradef policy_learn(self, states, actions, old_action_dists, old_log_probs,advantage): # 更新策略函数surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)grads = torch.autograd.grad(surrogate_obj, self.actor.parameters())obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach()# 用共轭梯度法计算x = H^(-1)gdescent_direction = self.conjugate_gradient(obj_grad, states,old_action_dists)Hd = self.hessian_matrix_vector_product(states, old_action_dists,descent_direction)max_coef = torch.sqrt(2 * self.kl_constraint /(torch.dot(descent_direction, Hd) + 1e-8))new_para = self.line_search(states, actions, advantage, old_log_probs,old_action_dists,descent_direction * max_coef) # 线性搜索torch.nn.utils.convert_parameters.vector_to_parameters(new_para, self.actor.parameters()) # 用线性搜索后的参数更新策略def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions']).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)old_log_probs = torch.log(self.actor(states).gather(1,actions)).detach()old_action_dists = torch.distributions.Categorical(self.actor(states).detach())critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step() # 更新价值函数# 更新策略函数self.policy_learn(states, actions, old_action_dists, old_log_probs,advantage)
接下来在车杆环境中训练 TRPO,并将结果可视化。
num_episodes = 500hidden_dim = 128gamma = 0.98lmbda = 0.95critic_lr = 1e-2kl_constraint = 0.0005alpha = 0.5device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'CartPole-v0'env = gym.make(env_name)env.seed(0)torch.manual_seed(0)agent = TRPO(hidden_dim, env.observation_space, env.action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device)return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)episodes_list = list(range(len(return_list)))plt.plot(episodes_list, return_list)plt.xlabel('Episodes')plt.ylabel('Returns')plt.title('TRPO on {}'.format(env_name))plt.show()mv_return = rl_utils.moving_average(return_list, 9)plt.plot(episodes_list, mv_return)plt.xlabel('Episodes')plt.ylabel('Returns')plt.title('TRPO on {}'.format(env_name))plt.show()
Iteration 0: 0%| | 0/50 [00:00<?, ?it/s]/usr/local/lib/python3.7/dist-packages/ipykernel_launcher.py:38: UserWarning: Creating a tensor from a list of numpy.ndarrays is extremely slow. Please consider converting the list to a single numpy.ndarray with numpy.array() before converting to a tensor. (Triggered internally at ../torch/csrc/utils/tensor_new.cpp:201.)Iteration 0: 100%|██████████| 50/50 [00:03<00:00, 15.71it/s, episode=50, return=139.200]Iteration 1: 100%|██████████| 50/50 [00:03<00:00, 13.08it/s, episode=100, return=150.500]Iteration 2: 100%|██████████| 50/50 [00:04<00:00, 11.57it/s, episode=150, return=184.000]Iteration 3: 100%|██████████| 50/50 [00:06<00:00, 7.60it/s, episode=200, return=183.600]Iteration 4: 100%|██████████| 50/50 [00:06<00:00, 7.17it/s, episode=250, return=183.500]Iteration 5: 100%|██████████| 50/50 [00:04<00:00, 10.91it/s, episode=300, return=193.700]Iteration 6: 100%|██████████| 50/50 [00:04<00:00, 10.70it/s, episode=350, return=199.500]Iteration 7: 100%|██████████| 50/50 [00:04<00:00, 10.89it/s, episode=400, return=200.000]Iteration 8: 100%|██████████| 50/50 [00:04<00:00, 10.80it/s, episode=450, return=200.000]Iteration 9: 100%|██████████| 50/50 [00:04<00:00, 11.09it/s, episode=500, return=200.000]
TRPO 在车杆环境中很快收敛,展现了十分优秀的性能效果。
接下来我们尝试倒立摆环境,由于它是与连续动作交互的环境,我们需要对上面的代码做一定的修改。对于策略网络,因为环境是连续动作的,所以策略网络分别输出表示动作分布的高斯分布的均值和标准差。
class PolicyNetContinuous(torch.nn.Module):def __init__(self, state_dim, hidden_dim, action_dim):super(PolicyNetContinuous, self).__init__()self.fc1 = torch.nn.Linear(state_dim, hidden_dim)self.fc_mu = torch.nn.Linear(hidden_dim, action_dim)self.fc_std = torch.nn.Linear(hidden_dim, action_dim)def forward(self, x):x = F.relu(self.fc1(x))mu = 2.0 * torch.tanh(self.fc_mu(x))std = F.softplus(self.fc_std(x))return mu, std # 高斯分布的均值和标准差class TRPOContinuous:""" 处理连续动作的TRPO算法 """def __init__(self, hidden_dim, state_space, action_space, lmbda,kl_constraint, alpha, critic_lr, gamma, device):state_dim = state_space.shape[0]action_dim = action_space.shape[0]self.actor = PolicyNetContinuous(state_dim, hidden_dim,action_dim).to(device)self.critic = ValueNet(state_dim, hidden_dim).to(device)self.critic_optimizer = torch.optim.Adam(self.critic.parameters(),lr=critic_lr)self.gamma = gammaself.lmbda = lmbdaself.kl_constraint = kl_constraintself.alpha = alphaself.device = devicedef take_action(self, state):state = torch.tensor([state], dtype=torch.float).to(self.device)mu, std = self.actor(state)action_dist = torch.distributions.Normal(mu, std)action = action_dist.sample()return [action.item()]def hessian_matrix_vector_product(self,states,old_action_dists,vector,damping=0.1):mu, std = self.actor(states)new_action_dists = torch.distributions.Normal(mu, std)kl = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))kl_grad = torch.autograd.grad(kl,self.actor.parameters(),create_graph=True)kl_grad_vector = torch.cat([grad.view(-1) for grad in kl_grad])kl_grad_vector_product = torch.dot(kl_grad_vector, vector)grad2 = torch.autograd.grad(kl_grad_vector_product,self.actor.parameters())grad2_vector = torch.cat([grad.contiguous().view(-1) for grad in grad2])return grad2_vector + damping * vectordef conjugate_gradient(self, grad, states, old_action_dists):x = torch.zeros_like(grad)r = grad.clone()p = grad.clone()rdotr = torch.dot(r, r)for i in range(10):Hp = self.hessian_matrix_vector_product(states, old_action_dists,p)alpha = rdotr / torch.dot(p, Hp)x += alpha * pr -= alpha * Hpnew_rdotr = torch.dot(r, r)if new_rdotr < 1e-10:breakbeta = new_rdotr / rdotrp = r + beta * prdotr = new_rdotrreturn xdef compute_surrogate_obj(self, states, actions, advantage, old_log_probs,actor):mu, std = actor(states)action_dists = torch.distributions.Normal(mu, std)log_probs = action_dists.log_prob(actions)ratio = torch.exp(log_probs - old_log_probs)return torch.mean(ratio * advantage)def line_search(self, states, actions, advantage, old_log_probs,old_action_dists, max_vec):old_para = torch.nn.utils.convert_parameters.parameters_to_vector(self.actor.parameters())old_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)for i in range(15):coef = self.alpha**inew_para = old_para + coef * max_vecnew_actor = copy.deepcopy(self.actor)torch.nn.utils.convert_parameters.vector_to_parameters(new_para, new_actor.parameters())mu, std = new_actor(states)new_action_dists = torch.distributions.Normal(mu, std)kl_div = torch.mean(torch.distributions.kl.kl_divergence(old_action_dists,new_action_dists))new_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, new_actor)if new_obj > old_obj and kl_div < self.kl_constraint:return new_parareturn old_paradef policy_learn(self, states, actions, old_action_dists, old_log_probs,advantage):surrogate_obj = self.compute_surrogate_obj(states, actions, advantage,old_log_probs, self.actor)grads = torch.autograd.grad(surrogate_obj, self.actor.parameters())obj_grad = torch.cat([grad.view(-1) for grad in grads]).detach()descent_direction = self.conjugate_gradient(obj_grad, states,old_action_dists)Hd = self.hessian_matrix_vector_product(states, old_action_dists,descent_direction)max_coef = torch.sqrt(2 * self.kl_constraint /(torch.dot(descent_direction, Hd) + 1e-8))new_para = self.line_search(states, actions, advantage, old_log_probs,old_action_dists,descent_direction * max_coef)torch.nn.utils.convert_parameters.vector_to_parameters(new_para, self.actor.parameters())def update(self, transition_dict):states = torch.tensor(transition_dict['states'],dtype=torch.float).to(self.device)actions = torch.tensor(transition_dict['actions'],dtype=torch.float).view(-1, 1).to(self.device)rewards = torch.tensor(transition_dict['rewards'],dtype=torch.float).view(-1, 1).to(self.device)next_states = torch.tensor(transition_dict['next_states'],dtype=torch.float).to(self.device)dones = torch.tensor(transition_dict['dones'],dtype=torch.float).view(-1, 1).to(self.device)rewards = (rewards + 8.0) / 8.0 # 对奖励进行修改,方便训练td_target = rewards + self.gamma * self.critic(next_states) * (1 -dones)td_delta = td_target - self.critic(states)advantage = compute_advantage(self.gamma, self.lmbda,td_delta.cpu()).to(self.device)mu, std = self.actor(states)old_action_dists = torch.distributions.Normal(mu.detach(),std.detach())old_log_probs = old_action_dists.log_prob(actions)critic_loss = torch.mean(F.mse_loss(self.critic(states), td_target.detach()))self.critic_optimizer.zero_grad()critic_loss.backward()self.critic_optimizer.step()self.policy_learn(states, actions, old_action_dists, old_log_probs,advantage)
接下来我们在倒立摆环境下训练连续动作版本的 TRPO 算法,并观测它的训练性能曲线。本段代码的完整运行需要一定的时间。
num_episodes = 2000hidden_dim = 128gamma = 0.9lmbda = 0.9critic_lr = 1e-2kl_constraint = 0.00005alpha = 0.5device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")env_name = 'Pendulum-v0'env = gym.make(env_name)env.seed(0)torch.manual_seed(0)agent = TRPOContinuous(hidden_dim, env.observation_space, env.action_space,lmbda, kl_constraint, alpha, critic_lr, gamma, device)return_list = rl_utils.train_on_policy_agent(env, agent, num_episodes)episodes_list = list(range(len(return_list)))plt.plot(episodes_list, return_list)plt.xlabel('Episodes')plt.ylabel('Returns')plt.title('TRPO on {}'.format(env_name))plt.show()mv_return = rl_utils.moving_average(return_list, 9)plt.plot(episodes_list, mv_return)plt.xlabel('Episodes')plt.ylabel('Returns')plt.title('TRPO on {}'.format(env_name))plt.show()
Iteration 0: 100%|██████████| 200/200 [00:23<00:00, 8.63it/s, episode=200, return=-1181.390]Iteration 1: 100%|██████████| 200/200 [00:23<00:00, 8.68it/s, episode=400, return=-994.876]Iteration 2: 100%|██████████| 200/200 [00:23<00:00, 8.39it/s, episode=600, return=-888.498]Iteration 3: 100%|██████████| 200/200 [00:23<00:00, 8.69it/s, episode=800, return=-848.329]Iteration 4: 100%|██████████| 200/200 [00:23<00:00, 8.68it/s, episode=1000, return=-772.392]Iteration 5: 100%|██████████| 200/200 [00:22<00:00, 8.72it/s, episode=1200, return=-611.870]Iteration 6: 100%|██████████| 200/200 [00:23<00:00, 8.62it/s, episode=1400, return=-397.705]Iteration 7: 100%|██████████| 200/200 [00:23<00:00, 8.68it/s, episode=1600, return=-268.498]Iteration 8: 100%|██████████| 200/200 [00:23<00:00, 8.61it/s, episode=1800, return=-408.976]Iteration 9: 100%|██████████| 200/200 [00:23<00:00, 8.49it/s, episode=2000, return=-296.363]
用 TRPO 在与连续动作交互的倒立摆环境中能够取得非常不错的效果,这说明 TRPO 中的信任区域优化方法在离散和连续动作空间都能有效工作。
本章讲解了 TRPO 算法,并分别在离散动作和连续动作交互的环境中进行了实验。TRPO 算法属于在线策略学习方法,每次策略训练仅使用上一轮策略采样的数据,是基于策略的深度强化学习算法中十分有代表性的工作之一。直觉性地理解,TRPO 给出的观点是:由于策略的改变导致数据分布的改变,这大大影响深度模型实现的策略网络的学习效果,所以通过划定一个可信任的策略学习区域,保证策略学习的稳定性和有效性。
TRPO 算法是比较难掌握的一种强化学习算法,需要较好的数学基础。读者若在学习过程中遇到困难,可自行查阅相关资料。TRPO 有一些后续工作,其中最著名的当属 PPO,我们将在第 12 章进行介绍。
[1] SCHULMAN J, LEVINE S, ABBEEL P, et al. Trust region policy optimization [C]// International conference on machine learning, PMLR, 2015:1889-1897.
[2] SHAM K M. A natural policy gradient [C]// Advances in neural information processing systems 2001: 14.
[3] SCHULMAN J, MORITZ P, LEVINE S, et al. High-dimensional continuous control using generalized advantage estimation [C]// International conference on learning representation, 2016.