 Copyright Â© Sorbonne University.

 This source code is licensed under the MIT license found in the
 LICENSE file in the root directory of this source tree.

# Outlook

In this notebook, you will code a naive actor-critic algorithm in the tabular case. 
Then you will tune it using grid search and Bayesian optimization, 
potentially using the [optuna](https://optuna.readthedocs.io/en/stable/) library.
Finally, you will get the best hyper-parameters obtained with both methods and perform a statistical test to see 
if there is a statistically significant difference between these methods and with respect to naive hyper-parameter values.

## Install libraries

Installs the necessary Python and system libraries

In [None]:
try:
    from easypip import easyimport
except ModuleNotFoundError:
    from subprocess import run
    assert run(["pip", "install", "easypip"]).returncode == 0, "Could not install easypip"
    from easypip import easyimport

import os
from pathlib import Path
from typing import List, Tuple

import gymnasium as gym
import matplotlib.pyplot as plt
import numpy as np
from bbrl_gymnasium.envs.maze_mdp import MazeMDPEnv
from mazemdp.mdp import Mdp
from mazemdp.toolbox import egreedy, egreedy_loc, sample_categorical, softmax
from mazemdp import random_policy
import bbrl_gymnasium  # noqa: F401

# matplotlib.use("TkAgg")

from functools import partial
import optuna
import yaml
import hydra
from omegaconf import OmegaConf, DictConfig

import seaborn as sns
sns.set_theme()
import logging
optuna.logging.set_verbosity(optuna.logging.WARNING)

# Step 1: Coding the naive Actor-critic algorithm

We consider the naive actor-critic algorithm with a categorical policy.
The algorithm learns a critic with the standard temporal difference mechanism
using a learning rate $\alpha_{critic}$.

We consider a value-based critic $V(s)$.

To update the critic, the algorithm computes the temporal difference error:

$$\delta_t = r(s_t, a_t) + \gamma V^{(n)}(s_{t+1})-V^{(n)}(s_t).$$

Then it applies it to the critic:

$$V^{(n+1)}(s_t) = V^{(n)}(s_t) + \alpha_{critic} \delta_t.$$

To update the actor, the general idea is the same, using the temporal difference error with another learning rate $\alpha_{actor}$.

However, naively applying the same learning rule would not ensure that the probabilities of all actions in a state sum to 1.
Besides, when the temporal difference error $\delta_t$ is negative, it may happen that the probability of an action gets negative or null, which raises an issue when applying renormalization.

So, instead of applying the naive rule, we apply the following one:
$$ 
\pi_{temp}(a_t|s_t) =  \begin{cases}
\pi^{(i)}(a_t|s_t) + \alpha_{actor} \delta_t & \mathrm{if } \pi^{(i)}(a_t|s_t) + \alpha_{actor} \delta_t > 10^{-8}\\
10^{-8} & \mathrm{otherwise.} \\
\end{cases}
$$

Then we can apply renormalization so that the probabilities of actions still sum to 1, with
$$
\forall a, \pi^{(i+1)}(a|s_t) = \frac{\pi_{temp}^{(i+1)}(a|s_t)} {\sum_{a'} \pi_{temp}^{(i+1)}(a'|s_t)}
$$ with
$$ 
\pi_{temp}^{(i+1)}(a|s_t) =  \begin{cases}
\pi_{temp}(a|s_t) & \mathrm{if } a = a_t\\
\pi^{(i)}(a|s_t) & \mathrm{otherwise.} \\
\end{cases}
$$

## Code the naive actor-critic algorithm as specified above.

A good idea to build this code it to take inspiration from the code of Q-learning, to add an actor (a categorical policy), both learning rates,
and to take care about the renormalization function.

We provide some code structure below. Following this structure is not mandatory, but you should at least ensure that the signature of the ```actor_critic_v(...)``` function is respected so that the code of the next exercises can be run appropriately.

In [None]:
def renormalize(
    policy: np.array,
    state: np.array,
) -> None:
    """
    Renormalize the probability of actions so that the sum of probabilities over actions is always 1.
    We made sure in the calling function that the probabilities of action never get negative when they are decreased.
    :param policy: the current policy, before normalization
    :param state: the state where the actions need to be renormalized
    :return: nothing
    """

    # To be completed...

    assert False, 'Not implemented yet'


In [None]:
def perform_episode(
    policy: np.array,
    v: np.array,
    alpha_critic: float,
    alpha_actor: float,
    render: bool = True,
) -> int:
    """
    Perform an episode on the given mdp with the given policy
    :param policy: the current policy
    :param v: the current critic
    :param alpha_critic: the learning rate of the critic
    :param alpha_actor: the learning rate of the actor
    :return: the number of steps before it stops
    """
    # Draw the first state of the episode using a uniform distribution over all the states
    state, _ = mdp.reset(uniform=True)
    terminated = False
    truncated = False
    steps = 0

    # To be completed...

    assert False, 'Not implemented yet'

    return steps

Here comes the main actor-critic function. It should take the shown parameters as input and output a list of value norms and a list of number of steps,
corresponding to the evolution of these quantities through learning

In [None]:
def actor_critic_v(
    nb_episodes: int,
    alpha_critic: float,
    alpha_actor: float,
    render: bool = True,
) -> np.array:
    """
    Perform actor-critic training over a number of episodes
    :param nb_episodes: the number of episodes
    :param alpha_critic: the learning rate of the critic
    :param alpha_actor: the learning rate of the actor
    :param render: whether training should be rendered
    :return: a list of norm of V values and a list of number of steps of episodes
    """

    # To be completed...

    assert False, 'Not implemented yet'


## Provide a plot function

Your plot function should show the evolution through time of number of steps the agent takes to find the reward in the maze.
If your algorithm works, this number of steps should decrease through time.

Your plot function should also show a mean and a standard deviation (or some more advanced statistics) over a collection of learning runs.
Make sure that your figure complies with [The figure checklist](https://master-dac.isir.upmc.fr/The_figure_checklist.pdf).

In [None]:
# To be completed...

assert False, 'Not implemented yet'


## Actor-critic hyper-parameters

To represent the hyper-parameters of the experiments performed in this notebook, we suggest using the dictionary below.
This dictionary can be read using omegaconf.
Using it is not mandatory.
You can also change the value of hyper-parameters or environment parameters at will.

In [None]:
ac_params = {
    "save_curves": False,
    "save_heatmap": True,
    "maze": {
        "name": "MazeMDP-v0",
        "width": 5,
        "height": 5,
        "ratio": 0.2,
        "render_mode": "human",
        },
        
    "log_dir": "./tmp",
    "video_dir": "./tmp/videos",

    "nb_episodes": 100,
    "render": False, # True, # 
    "nb_repeats": 5,

    "alpha_critic": 0.5,
    "alpha_actor": 0.5,
    }

## Test your code

The code for testing what you did so far is provided below.

In [None]:
def make_mdp(cfg):
    """
    Return the MazeMDP and an unwrapped version, used to access the attributes and function without raising a warning
    We also need the not unwrapped env for calling steps otherwise truncation beyond time limit will not apply
    """
    # Environment with 20% of walls and no negative reward when hitting a wall
    mdp = gym.make(
        "MazeMDP-v0",
        kwargs={"width": cfg.maze.width, "height": cfg.maze.height, "ratio": cfg.maze.ratio, "hit": 0.0, "start_states": [0]},
        render_mode=cfg.maze.render_mode,
    )
    env = mdp.unwrapped  # the .unwrapped removes a warning from gymnasium when accessing an attribute or function
    return mdp, env

cfg=OmegaConf.create(ac_params)
mdp, env = make_mdp(cfg) # the mdp is made once and for all, this is an ugly global variable
mdp.reset()
env.init_draw("The maze")
plot_steps(nb_repeats=100)
# print("average v value:", v_vals.mean())

# Step 2: Tuning hyper-parameters

In this part, you have to optimize two hyper-parameters of the actor-critic algorithm, namely the actor and critic learning rates.
You have to do so using a simple grid search method and some Bayesian optimization method.
For that, we suggest using [optuna](https://optuna.readthedocs.io/en/stable/).
Follow the above link to understand how optuna works.
The code to run optuna is provided.

You should make sure that the hyper-parameters tuning algorithms that you compare benefit from the same training budget
We suggest 400 training runs overall for each method,
which means 20 values each for the actor and the critic learning rates in the case of grid search.

By running the code below, you will do the following:

1. Perform hyper-parameters tuning with two algorithms as suggested above.

2. Provide a "heatmap" of the norm of the value function given the hyper-parameters, after training for each pair of hyper-parameters.

3. Collect the value of the best hyper-parameters found with each algorithm. You will need them for Step 3.

In [None]:
def objective(trial):
    # Sample values of alpha_critic and alpha_actor
    alpha_critic = trial.suggest_float('alpha_critic', 0.01, 1.0)
    alpha_actor = trial.suggest_float('alpha_actor', 0.01, 1.0)
    
    # Run the actor-critic algorithm with sampled hyperparameters
    val_list, steps_list = actor_critic_v(
        alpha_critic=alpha_critic,
        alpha_actor=alpha_actor,
        nb_episodes=cfg.nb_episodes,  
        render=False,  # Turn off rendering for faster runs
    )

    # We want to maximize the norm of the final value function
    final_norm = val_list[-1]
    return final_norm

In [None]:
# Bayesian optimization 
study_Bayes = optuna.create_study(direction='maximize')
study_Bayes.optimize(objective, n_trials=400)

# Data frame contains the params and the corresponding norm of the final value function
study_Bayes_analyse = study_Bayes.trials_dataframe(attrs=('params', 'value')) 

best_Bayes = study_Bayes.best_params
print ('The best parameters founded using Bayesian optimization are: ', best_Bayes, '\n\n')

# Heat map
plt.figure(figsize=(8, 4))
plt.scatter(study_Bayes_analyse['params_alpha_critic'], 
             study_Bayes_analyse['params_alpha_actor'], 
             c=study_Bayes_analyse['value'], cmap="RdYlGn_r")
plt.title('Value function norms (Bayesian optimization)')
plt.xlabel(r'$\alpha_{critic}$')
plt.ylabel(r'$\alpha_{actor}$')
plt.colorbar()
plt.savefig("bayes_opt.pdf")
# plt.show()

In [None]:
# Grid search
search_space = {'alpha_critic': np.linspace(0.01, 1.0, 20), 'alpha_actor': np.linspace(0.01, 1.0, 20)}
study_grid = optuna.create_study(direction='maximize', sampler=optuna.samplers.GridSampler(search_space))
study_grid.optimize(objective)

# Data frame contains the params and the corresponding norm of the final value function
study_grid_analyse = study_grid.trials_dataframe(attrs=('params', 'value')) 

best_grid = study_grid.best_params
print ('The best parameters founded using Grid search are: ', best_grid, '\n\n')

# Heat map
plt.figure(figsize=(8, 4))
plt.scatter(study_grid_analyse['params_alpha_critic'], 
             study_grid_analyse['params_alpha_actor'], 
             c=study_grid_analyse['value'], cmap="RdYlGn_r")
plt.title('Value function norms (Grid search)')
plt.xlabel(r'$\alpha_{critic}$')
plt.ylabel(r'$\alpha_{actor}$')
plt.grid(False)
plt.colorbar()
plt.savefig("grid_search.pdf")
# plt.show()

# Step 3: Statistical tests

Now you have to compare the performance of the actor-critic algorithm tuned
with all the best hyper-parameters you found before, using statistical tests.

The functions below are provided to run Welch's T-test over learning curves.
They have been adapted from a github repository: https://github.com/flowersteam/rl_stats
You don't need to understand them in detail (though it is always a good idea to try to understand more code).

In [None]:
from scipy.stats import ttest_ind
import bootstrapped.bootstrap as bs
import bootstrapped.compare_functions as bs_compare
import bootstrapped.stats_functions as bs_stats

In [None]:
def compute_central_tendency_and_error(id_central, id_error, sample):

    try:
        id_error = int(id_error)
    except:
        pass

    if id_central == "mean":
        central = np.nanmean(sample, axis=1)
    elif id_central == "median":
        central = np.nanmedian(sample, axis=1)
    else:
        raise NotImplementedError

    if isinstance(id_error, int):
        low = np.nanpercentile(sample, q=int((100 - id_error) / 2), axis=1)
        high = np.nanpercentile(sample, q=int(100 - (100 - id_error) / 2), axis=1)
    elif id_error == "std":
        low = central - np.nanstd(sample, axis=1)
        high = central + np.nanstd(sample, axis=1)
    elif id_error == "sem":
        low = central - np.nanstd(sample, axis=1) / np.sqrt(sample.shape[0])
        high = central + np.nanstd(sample, axis=1) / np.sqrt(sample.shape[0])
    else:
        raise NotImplementedError

    return central, low, high

def run_test(data1, data2, alpha=0.05):
    """
    Compute tests comparing data1 and data2 with confidence level alpha
    :param data1: (np.ndarray) sample 1
    :param data2: (np.ndarray) sample 2
    :param alpha: (float) confidence level of the test
    :return: (bool) if True, the null hypothesis is rejected
    """
    data1 = data1.squeeze()
    data2 = data2.squeeze()

    # perform Welch t-test":
    _, p = ttest_ind(data1, data2, equal_var=False)
    return p < alpha

This last function was adapted for the lab.

In [None]:
def perform_test(perf1, perf2, name1, name2, sample_size=20, downsampling_fact=5, confidence_level=0.01):

    perf1 = perf1.transpose()
    perf2 = perf2.transpose()
    nb_datapoints = perf1.shape[1]
    nb_steps = perf1.shape[0]

    legend = [name1, name2]

    # what do you want to plot ?
    id_central = 'mean' # "median"  # 
    id_error = 80  # (percentiles), also: 'std', 'sem'

    sample1 = perf1[:, np.random.randint(0, nb_datapoints, sample_size)]
    sample2 = perf2[:, np.random.randint(0, nb_datapoints, sample_size)]

    steps = np.arange(0, nb_steps, downsampling_fact)
    sample1 = sample1[steps, :]
    sample2 = sample2[steps, :]

    # test
    sign_diff = np.zeros([len(steps)])
    for i in range(len(steps)):
        sign_diff[i] = run_test(
            sample1[i, :], sample2[i, :], alpha=confidence_level
        )

    central1, low1, high1 = compute_central_tendency_and_error(
        id_central, id_error, sample1
    )
    central2, low2, high2 = compute_central_tendency_and_error(
        id_central, id_error, sample2
    )

    # plot
    _, ax = plt.subplots(1, 1, figsize=(20, 10))
    lab1 = plt.xlabel("training steps")
    lab2 = plt.ylabel("performance")

    plt.plot(steps*downsampling_fact, central1, linewidth=10)
    plt.plot(steps*downsampling_fact, central2, linewidth=10)
    plt.fill_between(steps*downsampling_fact, low1, high1, alpha=0.3)
    plt.fill_between(steps*downsampling_fact, low2, high2, alpha=0.3)
    leg = ax.legend(legend, frameon=False)

    # plot significative difference as dots
    idx = np.argwhere(sign_diff == 1)
    y = max(np.nanmax(high1), np.nanmax(high2))
    # Plot the stars where there is a statistically significant difference
    plt.scatter(steps[idx]*downsampling_fact, y * 1.05 * np.ones([idx.size]), s=100, c="k", marker="*")

    # style
    for line in leg.get_lines():
        line.set_linewidth(10.0)
    ax.spines["top"].set_linewidth(5)
    ax.spines["right"].set_linewidth(5)
    ax.spines["bottom"].set_linewidth(5)
    ax.spines["left"].set_linewidth(5)

    plt.savefig(
        f"./{name1}_{name2}.png", bbox_extra_artists=(leg, lab1, lab2), bbox_inches="tight", dpi=100
    )
    # plt.show()

It is now time to compare the performance of you actor-critic algorithm, using different hyper-parameters.
As hyper-parameters, you will use:

- naive tuning, that is a pair (0.5, 0.5) for the actor and critic learning rates,
- the best hyper-parameters you found with the different tuning algorithms you used before.

Below, we provide the code for comparing the Bayesian optimization approach and grid search.
You have to extend it to also compare these two approaches to using naive tuning.

The code below does the following:

1. For each set of hyper-parameters, collect a large dataset of learning curves (we suggest using 150 training episodes)

2. Perform statistical comparisons

- Take two datasets of learning curves obtained with the hyper-parameters sets that you found with different tuning algorithms.
- Use the ``` perform_test(...)``` function to compare each possible pair of sets.

You should obtain an image for each pair you have tried.
In this image, black dots signal the time step where there is a statistically significant difference between two learning curves.

In [None]:
nb_runs = 150
# Collect dataset
perf_naive = [] # Dataset for naive tuning 
perf_Bayes = [] # Dataset for Bayesian optimization
perf_grid = [] # Dataset for Grid search

for _ in range(nb_runs):
    v_Bayes, _ = actor_critic_v(nb_episodes=cfg.nb_episodes,
                                   alpha_critic=best_Bayes['alpha_critic'], alpha_actor=best_Bayes['alpha_actor'],
                                   render=False)
    v_grid, _ = actor_critic_v(nb_episodes=cfg.nb_episodes,
                                   alpha_critic=best_grid['alpha_critic'], alpha_actor=best_grid['alpha_actor'],
                                   render=False)

    perf_Bayes.append(v_Bayes)
    perf_grid.append(v_grid)

perf_Bayes = np.array(perf_Bayes)
perf_grid = np.array(perf_grid)
    
# Perform statistical comparisons 
perform_test(perf_Bayes, perf_grid, 'Bayes optimization', 'Grid search')
    
    
# To be completed...

assert False, 'Not implemented yet'


# Once you have completed the code, conclude