第 4 章 动态规划算法

4.1 简介

动态规划(dynamic programming)是程序设计算法中非常重要的内容,能够高效解决一些经典问题,例如背包问题和最短路径规划。动态规划的基本思想是将待求解问题分解成若干个子问题,先求解子问题,然后从这些子问题的解得到目标问题的解。动态规划会保存已解决的子问题的答案,在求解目标问题的过程中,需要这些子问题答案时就可以直接利用,避免重复计算。本章介绍如何用动态规划的思想来求解在马尔可夫决策过程中的最优策略。

基于动态规划的强化学习算法主要有两种:一是策略迭代(policy iteration),二是价值迭代(value iteration)。其中,策略迭代由两部分组成:策略评估(policy evaluation)和策略提升(policy improvement)。具体来说,策略迭代中的策略评估使用贝尔曼期望方程来得到一个策略的状态价值函数,这是一个动态规划的过程;而价值迭代直接使用贝尔曼最优方程来进行动态规划,得到最终的最优状态价值。

不同于 3.5 节介绍的蒙特卡洛方法和第 5 章将要介绍的时序差分算法,基于动态规划的这两种强化学习算法要求事先知道环境的状态转移函数和奖励函数,也就是需要知道整个马尔可夫决策过程。在这样一个白盒环境中,不需要通过智能体和环境的大量交互来学习,可以直接用动态规划求解状态价值函数。但是,现实中的白盒环境很少,这也是动态规划算法的局限之处,我们无法将其运用到很多实际场景中。另外,策略迭代和价值迭代通常只适用于有限马尔可夫决策过程,即状态空间和动作空间是离散且有限的。

4.2 悬崖漫步环境

本节使用策略迭代和价值迭代来求解悬崖漫步(Cliff Walking)这个环境中的最优策略。接下来先简单介绍一下该环境。

悬崖漫步是一个非常经典的强化学习环境,它要求一个智能体从起点出发,避开悬崖行走,最终到达目标位置。如图 4-1 所示,有一个 4×12 的网格世界,每一个网格表示一个状态。智能体的起点是左下角的状态,目标是右下角的状态,智能体在每一个状态都可以采取 4 种动作:上、下、左、右。如果智能体采取动作后触碰到边界墙壁则状态不发生改变,否则就会相应到达下一个状态。环境中有一段悬崖,智能体掉入悬崖或到达目标状态都会结束动作并回到起点,也就是说掉入悬崖或者达到目标状态是终止状态。智能体每走一步的奖励是 −1,掉入悬崖的奖励是 −100。

图4-1 Cliff Walking环境示意图

接下来一起来看一看 Cliff Walking 环境的代码吧。

import copy
class CliffWalkingEnv:
""" 悬崖漫步环境"""
def __init__(self, ncol=12, nrow=4):
self.ncol = ncol # 定义网格世界的列
self.nrow = nrow # 定义网格世界的行
# 转移矩阵P[state][action] = [(p, next_state, reward, done)]包含下一个状态和奖励
self.P = self.createP()
def createP(self):
# 初始化
P = [[[] for j in range(4)] for i in range(self.nrow * self.ncol)]
# 4种动作, change[0]:上,change[1]:下, change[2]:左, change[3]:右。坐标系原点(0,0)
# 定义在左上角
change = [[0, -1], [0, 1], [-1, 0], [1, 0]]
for i in range(self.nrow):
for j in range(self.ncol):
for a in range(4):
# 位置在悬崖或者目标状态,因为无法继续交互,任何动作奖励都为0
if i == self.nrow - 1 and j > 0:
P[i * self.ncol + j][a] = [(1, i * self.ncol + j, 0,
True)]
continue
# 其他位置
next_x = min(self.ncol - 1, max(0, j + change[a][0]))
next_y = min(self.nrow - 1, max(0, i + change[a][1]))
next_state = next_y * self.ncol + next_x
reward = -1
done = False
# 下一个位置在悬崖或者终点
if next_y == self.nrow - 1 and next_x > 0:
done = True
if next_x != self.ncol - 1: # 下一个位置在悬崖
reward = -100
P[i * self.ncol + j][a] = [(1, next_state, reward, done)]
return P

4.3 策略迭代算法

策略迭代是策略评估和策略提升不断循环交替,直至最后得到最优策略的过程。本节分别对这两个过程进行详细介绍。

4.3.1 策略评估

策略评估这一过程用来计算一个策略的状态价值函数。回顾一下之前学习的贝尔曼期望方程:

其中,是策略在状态下采取动作的概率。可以看到,当知道奖励函数和状态转移函数时,我们可以根据下一个状态的价值来计算当前状态的价值。因此,根据动态规划的思想,可以把计算下一个可能状态的价值当成一个子问题,把计算当前状态的价值看作当前问题。在得知子问题的解后,就可以求解当前问题。更一般的,考虑所有的状态,就变成了用上一轮的状态价值函数来计算当前这一轮的状态价值函数,即

我们可以选定任意初始值。根据贝尔曼期望方程,可以得知是以上更新公式的一个不动点(fixed point)。事实上,可以证明当时,序列会收敛到,所以可以据此来计算得到一个策略的状态价值函数。可以看到,由于需要不断做贝尔曼期望方程迭代,策略评估其实会耗费很大的计算代价。在实际的实现过程中,如果某一轮的值非常小,可以提前结束策略评估。这样做可以提升效率,并且得到的价值也非常接近真实的价值。

4.3.2 策略提升

使用策略评估计算得到当前策略的状态价值函数之后,我们可以据此来改进该策略。假设此时对于策略,我们已经知道其价值,也就是知道了在策略下从每一个状态出发最终得到的期望回报。我们要如何改变策略来获得在状态下更高的期望回报呢?假设智能体在状态下采取动作,之后的动作依旧遵循策略,此时得到的期望回报其实就是动作价值。如果我们有,则说明在状态下采取动作会比原来的策略得到更高的期望回报。以上假设只是针对一个状态,现在假设存在一个确定性策略,在任意一个状态下,都满足

于是在任意状态下,我们有

这便是策略提升定理(policy improvement theorem)。于是我们可以直接贪心地在每一个状态选择动作价值最大的动作,也就是

我们发现构造的贪心策略满足策略提升定理的条件,所以策略能够比策略更好或者至少与其一样好。这个根据贪心法选取动作从而得到新的策略的过程称为策略提升。当策略提升之后得到的策略和之前的策略一样时,说明策略迭代达到了收敛,此时就是最优策略。

策略提升定理的证明通过以下推导过程可以证明,使用上述提升公式得到的新策略在每个状态的价值不低于原策略在该状态的价值。

可以看到,推导过程中的每一个时间步都用到局部动作价值优势,累积到无穷步或者终止状态时,我们就得到了整个策略价值提升的不等式。

4.3.3 策略迭代算法

总体来说,策略迭代算法的过程如下:对当前的策略进行策略评估,得到其状态价值函数,然后根据该状态价值函数进行策略提升以得到一个更好的新策略,接着继续评估新策略、提升策略……直至最后收敛到最优策略(收敛性证明参见 4.7 节):

结合策略评估和策略提升,我们得到以下策略迭代算法:

  • 随机初始化策略 和价值函数
  • while do : (策略评估循环)
  • 对于每一个状态 :
  • end while
  • 对于每一个状态 :
  • , 则停止算法并返回 ; 否则转到策略评估循环

我们现在来看一下策略迭代算法的代码实现过程。

class PolicyIteration:
""" 策略迭代算法 """
def __init__(self, env, theta, gamma):
self.env = env
self.v = [0] * self.env.ncol * self.env.nrow # 初始化价值为0
self.pi = [[0.25, 0.25, 0.25, 0.25]
for i in range(self.env.ncol * self.env.nrow)] # 初始化为均匀随机策略
self.theta = theta # 策略评估收敛阈值
self.gamma = gamma # 折扣因子
def policy_evaluation(self): # 策略评估
cnt = 1 # 计数器
while 1:
max_diff = 0
new_v = [0] * self.env.ncol * self.env.nrow
for s in range(self.env.ncol * self.env.nrow):
qsa_list = [] # 开始计算状态s下的所有Q(s,a)价值
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, r, done = res
qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
# 本章环境比较特殊,奖励和下一个状态有关,所以需要和状态转移概率相乘
qsa_list.append(self.pi[s][a] * qsa)
new_v[s] = sum(qsa_list) # 状态价值函数和动作价值函数之间的关系
max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
self.v = new_v
if max_diff < self.theta: break # 满足收敛条件,退出评估迭代
cnt += 1
print("策略评估进行%d轮后完成" % cnt)
def policy_improvement(self): # 策略提升
for s in range(self.env.nrow * self.env.ncol):
qsa_list = []
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, r, done = res
qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
qsa_list.append(qsa)
maxq = max(qsa_list)
cntq = qsa_list.count(maxq) # 计算有几个动作得到了最大的Q值
# 让这些动作均分概率
self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
print("策略提升完成")
return self.pi
def policy_iteration(self): # 策略迭代
while 1:
self.policy_evaluation()
old_pi = copy.deepcopy(self.pi) # 将列表进行深拷贝,方便接下来进行比较
new_pi = self.policy_improvement()
if old_pi == new_pi: break

现在我们已经写好了环境代码和策略迭代代码。为了更好地展现最终的策略,接下来增加一个打印策略的函数,用于打印当前策略在每个状态下的价值以及智能体会采取的动作。对于打印出来的动作,我们用^o<o表示等概率采取向左和向上两种动作,ooo>表示在当前状态只采取向右动作。

def print_agent(agent, action_meaning, disaster=[], end=[]):
print("状态价值:")
for i in range(agent.env.nrow):
for j in range(agent.env.ncol):
# 为了输出美观,保持输出6个字符
print('%6.6s' % ('%.3f' % agent.v[i * agent.env.ncol + j]), end=' ')
print()
print("策略:")
for i in range(agent.env.nrow):
for j in range(agent.env.ncol):
# 一些特殊的状态,例如悬崖漫步中的悬崖
if (i * agent.env.ncol + j) in disaster:
print('****', end=' ')
elif (i * agent.env.ncol + j) in end: # 目标状态
print('EEEE', end=' ')
else:
a = agent.pi[i * agent.env.ncol + j]
pi_str = ''
for k in range(len(action_meaning)):
pi_str += action_meaning[k] if a[k] > 0 else 'o'
print(pi_str, end=' ')
print()
env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
theta = 0.001
gamma = 0.9
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])
策略评估进行60轮后完成
策略提升完成
策略评估进行72轮后完成
策略提升完成
策略评估进行44轮后完成
策略提升完成
策略评估进行12轮后完成
策略提升完成
策略评估进行1轮后完成
策略提升完成
状态价值:
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000
-7.458 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000
策略:
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
^ooo **** **** **** **** **** **** **** **** **** **** EEEE

经过 5 次策略评估和策略提升的循环迭代,策略收敛了,此时将获得的策略打印出来。用贝尔曼最优方程去检验其中每一个状态的价值,可以发现最终输出的策略的确是最优策略。

4.4 价值迭代算法

从上面的代码运行结果中我们能发现,策略迭代中的策略评估需要进行很多轮才能收敛得到某一策略的状态函数,这需要很大的计算量,尤其是在状态和动作空间比较大的情况下。我们是否必须要完全等到策略评估完成后再进行策略提升呢?试想一下,可能出现这样的情况:虽然状态价值函数还没有收敛,但是不论接下来怎么更新状态价值,策略提升得到的都是同一个策略。如果只在策略评估中进行一轮价值更新,然后直接根据更新后的价值进行策略提升,这样是否可以呢?答案是肯定的,这其实就是本节将要讲解的价值迭代算法,它可以被认为是一种策略评估只进行了一轮更新的策略迭代算法。需要注意的是,价值迭代中不存在显式的策略,我们只维护一个状态价值函数。

确切来说,价值迭代可以看成一种动态规划过程,它利用的是贝尔曼最优方程:

将其写成迭代更新的方式为

价值迭代便是按照以上更新方式进行的。等到相同时,它就是贝尔曼最优方程的不动点,此时对应着最优状态价值函数。然后我们利用},从中恢复出最优策略即可。

价值迭代算法流程如下:

  • 随机初始化
  • while do :
  • 对于每一个状态 :
  • end while
  • 返回一个确定性策略

我们现在来编写价值迭代的代码。

class ValueIteration:
""" 价值迭代算法 """
def __init__(self, env, theta, gamma):
self.env = env
self.v = [0] * self.env.ncol * self.env.nrow # 初始化价值为0
self.theta = theta # 价值收敛阈值
self.gamma = gamma
# 价值迭代结束后得到的策略
self.pi = [None for i in range(self.env.ncol * self.env.nrow)]
def value_iteration(self):
cnt = 0
while 1:
max_diff = 0
new_v = [0] * self.env.ncol * self.env.nrow
for s in range(self.env.ncol * self.env.nrow):
qsa_list = [] # 开始计算状态s下的所有Q(s,a)价值
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, r, done = res
qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
qsa_list.append(qsa) # 这一行和下一行代码是价值迭代和策略迭代的主要区别
new_v[s] = max(qsa_list)
max_diff = max(max_diff, abs(new_v[s] - self.v[s]))
self.v = new_v
if max_diff < self.theta: break # 满足收敛条件,退出评估迭代
cnt += 1
print("价值迭代一共进行%d轮" % cnt)
self.get_policy()
def get_policy(self): # 根据价值函数导出一个贪婪策略
for s in range(self.env.nrow * self.env.ncol):
qsa_list = []
for a in range(4):
qsa = 0
for res in self.env.P[s][a]:
p, next_state, r, done = res
qsa += p * (r + self.gamma * self.v[next_state] * (1 - done))
qsa_list.append(qsa)
maxq = max(qsa_list)
cntq = qsa_list.count(maxq) # 计算有几个动作得到了最大的Q值
# 让这些动作均分概率
self.pi[s] = [1 / cntq if q == maxq else 0 for q in qsa_list]
env = CliffWalkingEnv()
action_meaning = ['^', 'v', '<', '>']
theta = 0.001
gamma = 0.9
agent = ValueIteration(env, theta, gamma)
agent.value_iteration()
print_agent(agent, action_meaning, list(range(37, 47)), [47])
价值迭代一共进行14轮
状态价值:
-7.712 -7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710
-7.458 -7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900
-7.176 -6.862 -6.513 -6.126 -5.695 -5.217 -4.686 -4.095 -3.439 -2.710 -1.900 -1.000
-7.458 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000 0.000
策略:
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovo> ovoo
ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ooo> ovoo
^ooo **** **** **** **** **** **** **** **** **** **** EEEE

可以看到,解决同样的训练任务,价值迭代总共进行了数十轮,而策略迭代中的策略评估总共进行了数百轮,价值迭代中的循环次数远少于策略迭代。

4.5 冰湖环境

除了悬崖漫步环境,本章还准备了另一个环境——冰湖(Frozen Lake)。冰湖环境的状态空间和动作空间是有限的,我们在该环境中也尝试一下策略迭代算法和价值迭代算法,以便更好地理解这两个算法。

冰湖是 OpenAI Gym 库中的一个环境。OpenAI Gym 库中包含了很多有名的环境,例如 Atari 和 MuJoCo,并且支持我们定制自己的环境。在之后的章节中,我们还会使用到更多来自 OpenAI Gym 库的环境。如图 4-2 所示,冰湖环境和悬崖漫步环境相似,也是一个网格世界,大小为。每一个方格是一个状态,智能体起点状态在左上角,目标状态在右下角,中间还有若干冰洞。在每一个状态都可以采取上、下、左、右 4 个动作。由于智能体在冰面行走,因此每次行走都有一定的概率滑行到附近的其它状态,并且到达冰洞或目标状态时行走会提前结束。每一步行走的奖励是 0,到达目标的奖励是 1。

图4-2 Frozen Lake环境示意图

我们先创建 OpenAI Gym 中的 FrozenLake-v0 环境,并简单查看环境信息,然后找出冰洞和目标状态。

import gym
env = gym.make("FrozenLake-v0") # 创建环境
env = env.unwrapped # 解封装才能访问状态转移矩阵P
env.render() # 环境渲染,通常是弹窗显示或打印出可视化的环境
holes = set()
ends = set()
for s in env.P:
for a in env.P[s]:
for s_ in env.P[s][a]:
if s_[2] == 1.0: # 获得奖励为1,代表是目标
ends.add(s_[1])
if s_[3] == True:
holes.add(s_[1])
holes = holes - ends
print("冰洞的索引:", holes)
print("目标的索引:", ends)
for a in env.P[14]: # 查看目标左边一格的状态转移信息
print(env.P[14][a])
SFFF
FHFH
FFFH
HFFG
冰洞的索引: {11, 12, 5, 7}
目标的索引: {15}
[(0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False)]
[(0.3333333333333333, 13, 0.0, False), (0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True)]
[(0.3333333333333333, 14, 0.0, False), (0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False)]
[(0.3333333333333333, 15, 1.0, True), (0.3333333333333333, 10, 0.0, False), (0.3333333333333333, 13, 0.0, False)]

首先,我们发现冰洞的索引是(集合 set 的索引是无序的),起点状态(索引为 0)在左上角,和悬崖漫步环境一样。其次,根据第 15 个状态(即目标左边一格,数组下标索引为 14)的信息,我们可以看到每个动作都会等概率“滑行”到 3 种可能的结果,这一点和悬崖漫步环境是不一样的。我们接下来先在冰湖环境中尝试一下策略迭代算法。

# 这个动作意义是Gym库针对冰湖环境事先规定好的
action_meaning = ['<', 'v', '>', '^']
theta = 1e-5
gamma = 0.9
agent = PolicyIteration(env, theta, gamma)
agent.policy_iteration()
print_agent(agent, action_meaning, [5, 7, 11, 12], [15])
策略评估进行25轮后完成
策略提升完成
策略评估进行58轮后完成
策略提升完成
状态价值:
0.069 0.061 0.074 0.056
0.092 0.000 0.112 0.000
0.145 0.247 0.300 0.000
0.000 0.380 0.639 0.000
策略:
<ooo ooo^ <ooo ooo^
<ooo **** <o>o ****
ooo^ ovoo <ooo ****
**** oo>o ovoo EEEE

这个最优策略很看上去比较反直觉,其原因是这是一个智能体会随机滑向其他状态的冰冻湖面。例如,在目标左边一格的状态,采取向右的动作时,它有可能会滑到目标左上角的位置,从该位置再次到达目标会更加困难,所以此时采取向下的动作是更为保险的,并且有一定概率能够滑到目标。我们再来尝试一下价值迭代算法。

action_meaning = ['<', 'v', '>', '^']
theta = 1e-5
gamma = 0.9
agent = ValueIteration(env, theta, gamma)
agent.value_iteration()
print_agent(agent, action_meaning, [5, 7, 11, 12], [15])
价值迭代一共进行60轮
状态价值:
0.069 0.061 0.074 0.056
0.092 0.000 0.112 0.000
0.145 0.247 0.300 0.000
0.000 0.380 0.639 0.000
策略:
<ooo ooo^ <ooo ooo^
<ooo **** <o>o ****
ooo^ ovoo <ooo ****
**** oo>o ovoo EEEE

可以发现价值迭代算法的结果和策略迭代算法的结果完全一致,这也互相验证了各自的结果。

4.6 小结

本章讲解了强化学习中两个经典的动态规划算法:策略迭代算法和价值迭代算法,它们都能用于求解最优价值和最优策略。动态规划的主要思想是利用贝尔曼方程对所有状态进行更新。需要注意的是,在利用贝尔曼方程进行状态更新时,我们会用到马尔可夫决策过程中的奖励函数和状态转移函数。如果智能体无法事先得知奖励函数和状态转移函数,就只能通过和环境进行交互来采样(状态-动作-奖励-下一状态)这样的数据,我们将在之后的章节中讲解如何求解这种情况下的最优策略。

4.7 扩展阅读:收敛性证明

4.7.1 策略迭代

策略迭代的过程如下:

根据策略提升定理,我们知道更新后的策略的价值函数满足单调性,即。所以只要所有可能的策略个数是有限的,策略迭代就能收敛到最优策略。假设 MDP 的状态空间大小为,动作空间大小为,此时所有可能的策略个数为,是有限个,所以策略迭代能够在有限步找到其中的最优策略。

还有另一种类似的证明思路。在有限马尔可夫决策过程中,如果,那么很显然存在一个上界(这里的为最大单步奖励值),使得对于任意策略和状态,其价值。因此,对于每个状态,我们可以将策略迭代得到的价值写成数列。根据实数列的单调有界收敛定理,该数列一定收敛,也即是策略迭代算法一定收敛。

4.7.2 价值迭代

价值迭代的更新公式为:

我们将其定义为一个贝尔曼最优算子:

然后我们引入压缩算子(contraction operator):若是一个算子,如果满足条件,则我们称是一个压缩算子。其中表示范数,包括我们将会用到的无穷范数

我们接下来证明当时,贝尔曼最优算子是一个-压缩算子。

设为最优价值函数,于是有:

这意味着,在的情况下,随着迭代次数越来越大,会越来越接近,即。至此,价值迭代的收敛性得到证明。

4.8 参考文献

[1] GERAMIFARD A, WALSN T J, TELLEX S, et al. A tutorial on linear function approximators for dynamic programming and reinforcement learning [J]. Foundations and Trends®in Machine Learning, 2013, 6 (4): 375-451.

[2] SZEPESVÁRI C, LITTMAN M L. Generalized markov decision processes: dynamic-programming and reinforcement-learning algorithms [C]//International Conference of Machine Learning, 1996: 96.