In this last article, we implemented the iterative policy evaluation algorithm to evaluate a policy. In this article, our goal will be to implement an algorithm called Policy Iteration algorithm which will help us improve a policy and find optimal policy which will get the most reward. Policy Iteration algorithm use a dynamic programming (DP) approach where we have complete knowledge of the environment or all the MDP variables.
import numpy as np
def policy_evaluation(env, policy, gamma, theta, state_values):
"""
policy_evaluation: evaluate policy by estimating state values
@param env: OpenAI Gym environment
@param policy: policy matrix containing actions and their probability in each state
@param gamma: discount factor
@param theta: the evaluation will stop once values for all states are less than the threshold
@param state_values: inital state values
@return: state values of the given policy
"""
delta = theta*2
state_len = env.nS
action_len = env.nA
while (delta > theta):
delta = 0
# for all state
for s in range(state_len):
# we will get new state value
new_s = 0
# for all actions
for a in range(action_len):
# get the current transitions list (U,D,L,R)
transitions_list = env.P[s][a]
# for all transitions from currect state
for i in transitions_list:
transition_prob, next_state, reward, done = i
# if we reached the goal
if (done):
new_s += policy[s,a] * transition_prob * reward
else:
# otherwise
new_s += policy[s,a] * transition_prob * (reward + gamma * state_values[next_state])
delta = max(delta, np.abs(new_s - state_values[s]))
state_values[s] = new_s
return state_values
def policy_improvement(env, policy, state_values, gamma):
"""
policy_improvement: improve policy
@param env: OpenAI Gym environment
@param policy: policy matrix containing actions and their probability in each state
@param gamma: discount factor
@param state_values: the evaluation will stop once values for all states are less than the threshold
@return policy_stable: flag for if policy is stable
@return policy: improved policy
"""
policy_stable = True
state_len = env.nS
action_len = env.nA
# for all states
for s in range(state_len):
# actions from current state
old_action = np.argmax(policy[s])
temp_array = np.zeros((action_len))
# for all actions from current state
for a in range(action_len):
# get the current transitions list (U,D,L,R)
transitions_list = env.P[s][a]
# for all transitions from current state
for i in transitions_list:
# calculate new state values
transition_prob, next_state, reward, done = i
if (done):
temp_array[a] += transition_prob * reward
else:
temp_array[a] += transition_prob * (reward + gamma * state_values[next_state])
policy[s] = np.zeros((action_len))
policy[s, np.argmax(temp_array)] = 1.
if old_action != np.argmax(policy[s]):
policy_stable = False
return policy_stable, policy
def policy_iteration(env, gamma, theta):
# initial state values
state_values = np.zeros(n_state)
# create a random policy
policy = np.ones((env.nS, env.nA))/env.nA
policy_stable = False
while not policy_stable:
# policy evaluation
state_values = policy_evaluation(env, policy, gamma, theta, state_values)
# policy improvement
policy_stable, policy = policy_improvement(env, policy, state_values, gamma)
return state_values, policy
import gym
# use the frozen lake env
env = gym.make('FrozenLake-v0', is_slippery=False)
# discount factor gamma
gamma = 0.99
# intial policy: agents can take actions at equal probability
policy = np.ones((n_state, n_action))/n_action
# threshold
theta = 0.0001
# call the policy iteration method
states, policy = policy_iteration(env, gamma, theta)
[[0.951 0.961 0.97 0.961]
[0.961 0. 0.98 0. ]
[0.97 0.98 0.99 0. ]
[0. 0.99 1. 0. ]]
[[0. 1. 0. 0.]
[0. 0. 1. 0.]
[0. 1. 0. 0.]
[1. 0. 0. 0.]
[0. 1. 0. 0.]
[1. 0. 0. 0.]
[0. 1. 0. 0.]
[1. 0. 0. 0.]
[0. 0. 1. 0.]
[0. 1. 0. 0.]
[0. 1. 0. 0.]
[1. 0. 0. 0.]
[1. 0. 0. 0.]
[0. 0. 1. 0.]
[0. 0. 1. 0.]
[1. 0. 0. 0.]]