Skip to the content.

Dynamic Programming


Contact me

Blog -> https://cugtyt.github.io/blog/index
Email -> cugtyt@qq.com
GitHub -> Cugtyt@GitHub


本系列博客主页及相关见此处
内容来源


Part 0: FrozenLake

环境是4*4的网格,agent有4种行为:LEFT = 0,DOWN = 1,RIGHT = 2,UP = 3。记为$\mathcal{S}^+ = {0, 1, \ldots, 15}$, $\mathcal{A} = {0, 1, 2, 3}$

env = FrozenLakeEnv()

# print the state space and action space
print(env.observation_space) # Discrete(16)
print(env.action_space) # Discrete(4)

# print the total number of states and actions
print(env.nS) # 16
print(env.nA) # 4

动态规划会假设agent知道马尔科夫决策过程,env.P[1][0]是会返回agent在状态1,向左走的所有可能的奖励和下一个状态:

env.P[1][0]

# output:
# [(0.3333333333333333, 1, 0.0, False),
#  (0.3333333333333333, 0, 0.0, False),
#  (0.3333333333333333, 5, 0.0, True)]

4列分别为:prob(概率), next_state, reward, done(是否结束),因此可以这样解释结果:

\[\mathbb{P}(S_{t+1}=s',R_{t+1}=r|S_t=1,A_t=0) = \begin{cases} \frac{1}{3} \text{ if } s'=1, r=0\\ \frac{1}{3} \text{ if } s'=0, r=0\\ \frac{1}{3} \text{ if } s'=5, r=0\\ 0 \text{ else} \end{cases}\]

Part 1: Iterative Policy Evaluation

输入:

输出:

def policy_evaluation(env, policy, gamma=1, theta=1e-8):
    V = np.zeros(env.nS)
    while True:
        delta = 0
        # 遍历状态
        for s in range(env.nS):
            Vs = 0
            # 遍历状态s下的行为
            for a, action_prob in enumerate(policy[s]):
                for prob, next_state, reward, done in env.P[s][a]:
                    Vs += action_prob * prob * (reward + gamma * V[next_state])
            delta = max(delta, np.abs(V[s]-Vs))
            V[s] = Vs
        # 终止条件
        if delta < theta:
            break
    return V

对于均等概率进行测试,即$\pi(a \vert s) = \frac{1}{\vert \mathcal{A}(s) \vert}$:

random_policy = np.ones([env.nS, env.nA]) / env.nA
# evaluate the policy 
V = policy_evaluation(env, random_policy)

plot_values(V)

Part 2: Obtain $q_\pi$ from $v_\pi$

输入:

输出:

def q_from_v(env, V, s, gamma=1):
    q = np.zeros(env.nA)
    # 遍历行为
    for a in range(env.nA):
        for prob, next_state, reward, done in env.P[s][a]:
            q[a] += prob * (reward + gamma * V[next_state])
    return q

计算所有状态的q:

Q = np.zeros([env.nS, env.nA])
# 对每个状态求所有行为的值
for s in range(env.nS):
    Q[s] = q_from_v(env, V, s)
print(Q)

# output
# [[0.0147094  0.01393978 0.01393978 0.01317015]
#  [0.00852356 0.01163091 0.0108613  0.01550788]
#  [0.02444514 0.02095298 0.02406033 0.01435346]
#  [0.01047649 0.01047649 0.00698432 0.01396865]
#  [0.02166487 0.01701828 0.01624865 0.01006281]
#  [0.         0.         0.         0.        ]
#  [0.05433538 0.04735105 0.05433538 0.00698432]
#  [0.         0.         0.         0.        ]
#  [0.01701828 0.04099204 0.03480619 0.04640826]
#  [0.07020885 0.11755991 0.10595784 0.05895312]
#  [0.18940421 0.17582037 0.16001424 0.04297382]
#  [0.         0.         0.         0.        ]
#  [0.         0.         0.         0.        ]
#  [0.08799677 0.20503718 0.23442716 0.17582037]
#  [0.25238823 0.53837051 0.52711478 0.43929118]
#  [0.         0.         0.         0.        ]]

Part 3: Policy Improvement

下面我们对策略进行改进,依据是q值。

输入:

输出:

def policy_improvement(env, V, gamma=1):
    policy = np.zeros([env.nS, env.nA]) / env.nA
    # 遍历状态
    for s in range(env.nS):
        q = q_from_v(env, V, s, gamma)
        
        # 选择 1: 确定性策略,取有最大值的行为 
        # policy[s][np.argmax(q)] = 1
        
        # 选择 2: 随机策略,对每个最大值的行为设置相等的概率
        best_a = np.argwhere(q==np.max(q)).flatten() # 挑选所有最大的
        policy[s] = np.sum([np.eye(env.nA)[i] for i in best_a], axis=0)/len(best_a) # 赋值概率
        
    return policy

Part 4: Policy Iteration

输入:

输出:

def policy_iteration(env, gamma=1, theta=1e-8):
    # 先生成随机策略
    policy = np.ones([env.nS, env.nA]) / env.nA
    while True:
        # 根据策略获取状态的值
        V = policy_evaluation(env, policy, gamma, theta)
        # 根据状态的值更新策略
        new_policy = policy_improvement(env, V)
        
        # 选择 1: 如果策略没有再更新,结束
        if (new_policy == policy).all():
            break;
        
        # 选择 2: 如果更改小于阈值,即收敛了,结束
        # if np.max(abs(policy_evaluation(env, policy) - policy_evaluation(env, new_policy))) < theta*1e2:
        #    break;
        
        policy = copy.copy(new_policy)
    return policy, V

查看效果:

policy_pi, V_pi = policy_iteration(env)
print(policy_pi,"\n")
plot_values(V_pi)

# output
# [[1.   0.   0.   0.  ]
#  [0.   0.   0.   1.  ]
#  [0.   0.   0.   1.  ]
#  [0.   0.   0.   1.  ]
#  [1.   0.   0.   0.  ]
#  [0.25 0.25 0.25 0.25]
#  [0.5  0.   0.5  0.  ]
#  [0.25 0.25 0.25 0.25]
#  [0.   0.   0.   1.  ]
#  [0.   1.   0.   0.  ]
#  [1.   0.   0.   0.  ]
#  [0.25 0.25 0.25 0.25]
#  [0.25 0.25 0.25 0.25]
#  [0.   0.   1.   0.  ]
#  [0.   1.   0.   0.  ]
#  [0.25 0.25 0.25 0.25]] 

Part 5: Truncated Policy Iteration

输入:

输出:

def truncated_policy_evaluation(env, policy, V, max_it=1, gamma=1):
    num_it=0
    while num_it < max_it:
        # 迭代状态
        for s in range(env.nS):
            v = 0
            # 根据策略计算q值
            q = q_from_v(env, V, s, gamma)
            # 迭代状态,根据概率更新状态值
            for a, action_prob in enumerate(policy[s]):
                v += action_prob * q[a]
            V[s] = v
        num_it += 1
    return V

输入:

输出:

def truncated_policy_iteration(env, max_it=1, gamma=1, theta=1e-8):
    # 初始状态值和策略
    V = np.zeros(env.nS)
    policy = np.zeros([env.nS, env.nA]) / env.nA
    while True:
        # 改进策略
        policy = policy_improvement(env, V)
        old_V = copy.copy(V)
        # 如果更新小于阈值,结束
        V = truncated_policy_evaluation(env, policy, V, max_it, gamma)
        if max(abs(V-old_V)) < theta:
            break;
    return policy, V

指定迭代次数更新策略:

policy_tpi, V_tpi = truncated_policy_iteration(env, max_it=2)
print(policy_tpi,"\n")
# plot the optimal state-value function
plot_values(V_tpi)

# output
# [[1.   0.   0.   0.  ]
#  [0.   0.   0.   1.  ]
#  [0.   0.   0.   1.  ]
#  [0.   0.   0.   1.  ]
#  [1.   0.   0.   0.  ]
#  [0.25 0.25 0.25 0.25]
#  [0.5  0.   0.5  0.  ]
#  [0.25 0.25 0.25 0.25]
#  [0.   0.   0.   1.  ]
#  [0.   1.   0.   0.  ]
#  [1.   0.   0.   0.  ]
#  [0.25 0.25 0.25 0.25]
#  [0.25 0.25 0.25 0.25]
#  [0.   0.   1.   0.  ]
#  [0.   1.   0.   0.  ]
#  [0.25 0.25 0.25 0.25]] 

Part 6: Value Iteration

输入:

输出:

def value_iteration(env, gamma=1, theta=1e-8):
    V = np.zeros(env.nS)
    while True:
        delta = 0
        # 遍历状态,直到V稳定
        for s in range(env.nS):
            v = V[s]
            V[s] = max(q_from_v(env, V, s, gamma))
            delta = max(delta,abs(V[s]-v))
        if delta < theta:
            break
    # 根据V选择策略
    policy = policy_improvement(env, V, gamma)
    return policy, V
policy_vi, V_vi = value_iteration(env)
print(policy_vi,"\n")
# plot the optimal state-value function
plot_values(V_vi)

# output
# [[1.   0.   0.   0.  ]
#  [0.   0.   0.   1.  ]
#  [0.   0.   0.   1.  ]
#  [0.   0.   0.   1.  ]
#  [1.   0.   0.   0.  ]
#  [0.25 0.25 0.25 0.25]
#  [0.5  0.   0.5  0.  ]
#  [0.25 0.25 0.25 0.25]
#  [0.   0.   0.   1.  ]
#  [0.   1.   0.   0.  ]
#  [1.   0.   0.   0.  ]
#  [0.25 0.25 0.25 0.25]
#  [0.25 0.25 0.25 0.25]
#  [0.   0.   1.   0.  ]
#  [0.   1.   0.   0.  ]
#  [0.25 0.25 0.25 0.25]]