 Copyright Â© Sorbonne University.

 This source code is licensed under the MIT license found in the
 LICENSE file in the root directory of this source tree.

# Outlook

In this notebook we study model-based reinforcement learning algorithms in the tabular case.
We investigate different things.

First, we compare two ways of learning a model of the transition function,
using either a stochastic or a deterministic model.

Next, we compare the sample efficiency of two approaches:
- learning a model of the transition function using random actions then learning the $Q$ function without new samples,
- using the Dyna-Q algorithm, which simultaneously learns a model and uses it "in imagination" to improve the policy of the agent.
We do it for various numbers of updates "in imagination" in the simultaneous learning case.

## Install libraries

In [None]:
# Installs the necessary Python and system libraries

try:
    from easypip import easyimport
except ModuleNotFoundError:
    from subprocess import run
    assert run(["pip", "install", "easypip"]).returncode == 0, "Could not install easypip"
    from easypip import easyimport

import os
from pathlib import Path
from typing import List, Tuple

import random
import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
from bbrl_gymnasium.envs.maze_mdp import MazeMDPEnv
from mazemdp.mdp import Mdp
from mazemdp.toolbox import egreedy, egreedy_loc, sample_categorical, softmax
from mazemdp import random_policy
import bbrl_gymnasium  # noqa: F401

# Model-Based Reinforcement Learning

Model-Based Reinforcement Learning (MBRL) is an approach to RL where the agent learns a model 
of the transition function, the reward function and the termination function 
and uses them to update its value function and/or its policy. 
To learn these models, the agent needs to interact with the environment. 
It can do so either with a fixed exploration policy or with the policy it is learning 
simultaneously with the models. 

To improve the policy of the agent, there are several MBRL approaches:

- the agent can apply dynamic programming (value iteration or policy iteration) 
algorithms using the learned models.

- the agent can draw random transitions from the learned models and perform Bellman backups 
using these samples to update a critic model. 
This is called learning in imagination, and this corresponds to Dyna-Q when the agent uses 
the Q-learning update rule to perform Bellman backups.

- the agent can do the same as above, but drawing transitions efficiently instead of randomly. 
This corresponds to Prioritized sweeping and its variants.

## Environment setup

As for model-free reinforcement learning, we first create a maze-like MDP. 

In [None]:
import gymnasium as gym
import bbrl_gymnasium
from bbrl_gymnasium.envs.maze_mdp import MazeMDPEnv

## Utilities: IQM and percentiles

### Calculate IQM and percentiles

Calculate the interquartile mean using the percentiles between min_bound and max_bound

In [None]:
def calculate_iqm(all_steps, min_bound, max_bound):
      """
      Calculate the Interquartile Mean (IQM) for the given data.
      :param all_steps: Array of steps from multiple runs
      :return: IQM array
      """
      # Calculate the min and max percentiles
      steps_min = np.percentile(all_steps, min_bound, axis=0)
      steps_max = np.percentile(all_steps, max_bound, axis=0)

      # IQM calculation: mean of values between min and max percentiles
      steps_iqm = np.mean(np.clip(all_steps, steps_min, steps_max), axis=0)

      return steps_iqm, steps_min, steps_max

###  Plot function: IQM and percentiles

Plot the Interquartile mean and corresponding percentiles with a label

In [None]:
def plot_quartiles(q_iqm, q_low, q_high, label):
    plt.plot(range(len(q_iqm)), q_iqm, label=label)
    plt.fill_between(range(len(q_iqm)),
                     q_low,
                     q_high,
                     alpha=0.2)

## Learning models

In this part we code classes to learn models of the transition, the reward function and the termination function.
All these model are grouped together into a FullModel class that is the unique interface to interact with all these models.

### Transition models

We create two TransitionModel classes that contain the learned model of the transition function.
Both models inherit from a more generic TransitionModel class.

In [None]:
class TransitionModel():
   def __init__(self, nb_states, nb_actions):
      self.nb_states = nb_states
      self.nb_actions = nb_actions

   def predict_next_state(self, state, action) -> int:
      pass

   def add_transition(self, state, action, next_state) -> None:
      pass

   # To monitor the accuracy of the model of the transition function, we build an evaluate function.
   # This function draws sample_size random transitions from the real maze and checks whether the model outputs the same next state.
   # This way to proceed is only adequate if the real maze is deterministic.
   # Otherwise, we should tell the distance between two probability distributions.

   def evaluate_random(self, sample_size: int=100) -> int:
      success = 0
      for _ in range(sample_size):
         state, action, next_state = env.sample_transition()
         next_state_model = self.predict_next_state(state, action)
         if next_state == next_state_model:
            success = success + 1
      return success / 100

   def is_accurate(self) -> bool:
      pass

   def display(self) -> None:
      pass

The first model is stochastic, it is initialized with uniform probabilities and updates the probabilities when receiving new evidence.

In [None]:
class StochasticTransitionModel(TransitionModel):
   def __init__(self, nb_states, nb_actions, threshold=0.01):
      super().__init__(nb_states, nb_actions)
      self.probas = np.ones(
            (self.nb_states, self.nb_actions, self.nb_states)
         ) / self.nb_states
      self.I = np.array(range(self.nb_states))
      self.count = np.zeros((self.nb_states, self.nb_actions))
      self.threshold = threshold

   def predict_next_state(self, state, action) -> int:
      # To be completed...

      assert False, 'Not implemented yet'


   def add_transition(self, state, action, next_state) -> None:
      self.count[state, action] = self.count[state, action] + 1
      self.probas[state, action, :] = (1-1/self.count[state, action]) * (self.probas[state, action, :]).reshape(self.nb_states) + 1/self.count[state, action] * np.transpose((self.I==next_state).astype(int))

   # In the probabilistic case, sampling the right next state is not enough, as it may happen by chance
   # we compare the probabilities

   def is_accurate(self) -> bool:
      """
      This function assumes access to the true probabilities of the mdp,
      it should not be used by an agent learning from interactions with
      the environment. To be used only for evaluation purposes
      """
      # To be completed...

      assert False, 'Not implemented yet'

   
   # Useful for debug
   
   def display(self) -> None:
      print("probas :", self.probas)

The second model is determistic, it is initialized as empty and learns new deterministic transitions as they come.

In [None]:
class DeterministicTransitionModel(TransitionModel):
   def __init__(self, nb_states, nb_actions):
      super().__init__(nb_states, nb_actions)
      self.count = np.zeros(
            (self.nb_states, self.nb_actions, self.nb_states)
         )

   def predict_next_state(self, state, action) -> int:
      # To be completed...

      assert False, 'Not implemented yet'


   def add_transition(self, state, action, next_state) -> None:
      self.count[state, action, next_state] = self.count[state, action, next_state] + 1

   # This function draws all transitions from the real maze and checks whether the model outputs the same next state
   # This way to proceed is only adequate if the real maze is deterministic.

   def is_accurate(self) -> bool:
      """
      This function assumes access to the true probabilities of the mdp,
      it should not be used by an agent learning from interactions with
      the environment. To be used only for evaluation purposes
      """
      # To be completed...

      assert False, 'Not implemented yet'


   # Useful for debug
   
   def display(self) -> None:
      print("count :", self.count)

### Reward model

This class is used to learn a model of the reward function.
We consider that the reward function is deterministic.

In [None]:
class RewardModel():
   def __init__(self, nb_states, nb_actions):
      self.nb_states = nb_states
      self.nb_actions = nb_actions
      self.reward_model = np.zeros((self.nb_states, self.nb_actions))

   def predict_reward(self, state, action) -> float:
      return self.reward_model[state, action]

   def add_reward(self, state, action, reward) -> None:
      self.reward_model[state, action] = reward

   # This function draws all transitions from the real maze and checks whether the model outputs the same reward.
   # This way to proceed is only adequate if the real maze is deterministic.

   def is_accurate(self) -> bool:
      """
      This function assumes access to the true probabilities of the mdp,
      it should not be used by an agent learning from interactions with
      the environment. To be used only for evaluation purposes
      """
      # To be completed...

      assert False, 'Not implemented yet'


   # Useful for debug
   
   def display_reward(self) -> None:
      print("reward model", self.reward_model)

### Termination model

This class is used to learn a model of the termination function.
We consider that termination is deterministic.

In [None]:
class TerminationModel():
   def __init__(self, nb_states, nb_actions):
      self.nb_states = nb_states
      self.nb_actions = nb_actions
      self.termination_model = np.zeros((self.nb_states, self.nb_actions))
   
   def predict_termination(self, state, action) -> bool:
      return self.termination_model[state, action]

   def add_termination(self, state, action, termination) -> None:
      self.termination_model[state, action] = termination


   # This function draws all transitions from the real maze and checks whether the model outputs the same termination
   # This way to proceed is only adequate if the real maze is deterministic.

   def is_accurate(self) -> bool:
      """
      This function assumes access to the true probabilities of the mdp,
      it should not be used by an agent learning from interactions with
      the environment. To be used only for evaluation purposes
      """
      # To be completed...

      assert False, 'Not implemented yet'


   # Useful for debug

   def display_termination(self) -> None:
      print("terminated model", self.termination_model)

### Full model

This class is the unique interface that packs all the learned models together 

In [None]:
class FullModel():
   def __init__(self, deterministic=True, threshold=0.01):
      self.nb_states = env.nb_states 
      self.nb_actions = env.action_space.n
      if deterministic:
         self.transition_model = DeterministicTransitionModel(self.nb_states, self.nb_actions)
      else:
         self.transition_model = StochasticTransitionModel(self.nb_states, self.nb_actions, threshold=threshold)
      self.reward_model = RewardModel(self.nb_states, self.nb_actions)
      self.termination_model = TerminationModel(self.nb_states, self.nb_actions)

   def is_accurate(self) -> bool:
      """
      This function assumes access to the true probabilities of the mdp,
      it should not be used by an agent learning from interactions with
      the environment. To be used only for evaluation purposes
      """
      trans_ok = self.transition_model.is_accurate()
      rew_ok = self.reward_model.is_accurate()
      term_ok = self.termination_model.is_accurate()
      return trans_ok and rew_ok and term_ok

   def add_sample(self, state, action, reward, next_state, terminated) -> None:
      self.transition_model.add_transition(state, action, next_state)
      self.reward_model.add_reward(state, action, reward)
      self.termination_model.add_termination(state, action, terminated)

   def sample_state_action(self) -> Tuple[int, int]:
      """
      This function draws a random state, action pair from the model.
      """
      state = random.randint(0, self.nb_states - 1)
      action = random.randint(0, self.nb_actions - 1)
      return state, action

   def predict_full_sample(self, state, action)-> Tuple[int, int, float, int, bool]: 
      """
      This function draws a full sample from the models.
      in BBRL, this is renamed into forward
      """
      # To be completed...

      assert False, 'Not implemented yet'

      return state, action, reward, next_state, terminated
   
   def display_all(self) -> None:
      self.transition_model.display()
      self.reward_model.display_reward()
      self.termination_model.display_termination()

## Q-learning agents ##

We reuse the Q-learning algorithm coded in the previous lab.
But this time, we write it into a more object oriented way.

We start with a general QAgent class and then derive it into SoftmaxQAgent and EgreedyQAgent to account for two exploration methods

In the code below:
- fill the ```updateQ``` function,
- write the ```update_Q_from_models``` function that updates the $Q$ function from models

In [None]:
class QAgent():
   def __init__(self, alpha):
      self.nb_states = env.nb_states
      self.nb_actions = env.action_space.n
      self.gamma = env.gamma
      self.alpha = alpha
      self.init_Q()

   def choose_action(self, state):
      pass

   def init_Q(self):
       self.Q = np.zeros((self.nb_states, self.nb_actions))
       
   # Beware that the efficiency is highly dependent on the error threshold
   def is_accurate(self, q_func, threshold=0.01) -> bool:
      """
      This function assumes access to the ground truth optimal $Q$ function
      it should not be used by an agent learning from interactions with
      the environment. To be used only for evaluation purposes
      """
      error = np.linalg.norm(self.Q - q_func)
      if error > threshold:
         return False
      return True

   # Do not forget to deal with the case where the episode is terminated
   def updateQ(self, state, action, reward, next_state, terminated) -> None:
      """
      Performs a Bellman back-up over the $Q$ function of the agent
      :return: nothing
      """
      # To be completed...

      assert False, 'Not implemented yet'


   # The function below should use the above function
   def update_Q_from_model(self, full_model:FullModel, nb_updates: int) -> None:
      """
      Updates the $Q$ function of the agent using randomly sampled transitions (i.e. the agent is learning in imagination)
      It does so nb_updates times
      :param full_model: the model used to sample updates
      :param nb_updates: the number of performed updates
      :return: nothing
      """
      # To be completed...

      assert False, 'Not implemented yet'


   # Learn the Q function in imagination from a model of the MDP
   def learn_Q_from_model(self, full_model: FullModel, nb_steps: int, nb_repeats: int) -> Tuple[int, np.array]:

       # To be completed...

       assert False, 'Not implemented yet'


Fill the ```choose_action``` function of both classes below

In [None]:
class SoftmaxQAgent(QAgent):
   def __init__(self, alpha: float = 0.5, beta: float = 6.0):
      super().__init__(alpha)
      self.beta = beta

   def choose_action(self, state) -> int:
      # To be completed...

      assert False, 'Not implemented yet'

      return action

In [None]:
class EgreedyQAgent(QAgent):
   def __init__(self, alpha: float = 0.5, epsilon: float = 0.02):
      super().__init__(alpha)
      self.epsilon = epsilon 

   def choose_action(self, state) -> int:
      # To be completed...

      assert False, 'Not implemented yet'

      return action

## The Dyna-Q approach: learning models and the policy simultaneously

This time, we learn the models and the Q function simultaneously
We perform `nb_updates` steps "in imagination" for each step in the real environment

In [None]:
def dyna_q_soft(
    agent: QAgent,
    nb_steps: int,
    nb_updates: int = 5,
) -> int:
   # Run learning cycle
   steps = 0

   while steps < nb_steps:
   # To be completed...

   assert False, 'Not implemented yet'

   return q_norm

## Experiments

### Setting up the environment

In [None]:
def make_mdp():
    """
    Return the MazeMDP and an unwrapped version, used to access the attributes and function without raising a warning
    We also need the not unwrapped env for calling steps otherwise truncation beyond time limit will not apply
    """
    # Environment with 20% of walls and no negative reward when hitting a wall
    mdp = gym.make(
        "MazeMDP-v0",
        kwargs={"width": 5, "height": 5, "ratio": 0.2, "hit": 0.0, "start_states": [0]},
        render_mode="human",
    )
    env = mdp.unwrapped  # the .unwrapped removes a warning from gymnasium when accessing an attribute or function
    return mdp, env

mdp, env = make_mdp() # the mdp is made once and for all, this is an ugly global variable
mdp.reset()
env.init_draw("The maze")

### Learning models first with a random policy, then learning an agent

We first learn models from a random policy

In the function below, we train the models from an agent performing random actions
We stop once the models are accurate enough
to play a random action, use: action = random.randint(0, env.action_space.n - 1)
The rest corresponds to a standard interaction loop between an agent and its environment

In [None]:
def learn_models_from_random_actions(
    nb_steps: int,
) -> Tuple[np.ndarray, List[float]]:
   """
   Train the models from an agent performing random actions
   """

   full_model = FullModel()

   # Run learning cycle
   steps = 0 # number of steps before convergence

   while steps < nb_steps:
      # To be completed...

      assert False, 'Not implemented yet'


   return full_model

###  Putting everything together

Run the model learning algorithm then get an optimal $Q$ function from that model
We do it several times to obtain an average number of steps

In [None]:
nb_repeats = 30
nb_steps_model = [100, 120, 150, 200, 300, 400]
nb_steps_rl = 1500
perf_seq = np.zeros(len(nb_steps_model))
index = 0

for i in nb_steps_model:
    all_q = []
    for j in range(nb_repeats):
        agent = SoftmaxQAgent() # Here you may try an Egreedy agent
        full_model = learn_models_from_random_actions(i)
        q_list = agent.learn_Q_from_model(full_model, nb_steps_rl, nb_repeats)
        all_q.append(q_list)
    all_q = np.array(all_q)
    q_iqm, q_low, q_high = calculate_iqm(all_q, 25, 75)
    plot_quartiles(q_iqm, q_low, q_high, f" {i} model learning steps")
    perf_seq[index] = q_iqm[-1]
    index += 1

print(perf_seq)
plt.xlabel("Number of steps")
plt.ylabel("Norm of Q values")
plt.legend(loc="lower right")
# plt.savefig("q_norms.png")
plt.title("Learning in imagination as a function of the number of model learning steps")
plt.show()

Run the Dyna-Q algorithm for different values of nb_updates

In [None]:
nb_repeats = 10

plt.plot(nb_steps_model, perf_seq, label=f"sequential learning")
for nb_updates in range(1, 5):
    index = 0

    perf = np.zeros(len(nb_steps_model))
    for steps in nb_steps_model:
        nb_steps_array = np.zeros(nb_repeats)
        q_norm_array = np.zeros(nb_repeats)
        for i in range(nb_repeats):
            agent = SoftmaxQAgent() # Here you may try an Egreedy agent
            full_model = FullModel()
            q_norm_array[i] = dyna_q_soft(agent, steps, nb_updates)
        q_iqm, _, _ = calculate_iqm(q_norm_array, 25, 75)
        perf[index] = q_iqm
        index += 1
    plt.plot(nb_steps_model, perf, label=f"Dyna-Q with {nb_updates} updates")

plt.xlabel("Number of samples used")
plt.ylabel("Norm of Q values")
plt.legend(loc="lower right")
plt.title("final comparison")
plt.show()

## Further studies

Here are suggestions for going further

### Basic studies

- You may extend the above study working with mazes of different sizes
- You may try it using the Deterministic model and the Stochastic model, and using Softmax Exploration versus Egreedy exploration.

### Advanced studies

- Instead of Q-learning, use the policy iteration algorithm to improve the policy while learning the model. 
This corresponds to the [Dyna-PI](https://citeseerx.ist.psu.edu/document?repid=rep1&type=pdf&doi=642db624b5b33a02a435ee1415d7c9f9cef36e1d) algorithm

### More advanced study (difficult)

Code the [prioritized sweeping](https://link.springer.com/content/pdf/10.1007/BF00993104.pdf) algorithm and compare to the above methods