TP3: Neural Ordinary Differential Equations (NODE)¶
Apprentissage d'une ODE.¶
Dans ce TP nous verrons comment apprendre une équation différentielle, de la forme
$$ u'(t) = f(t,u(t)) \qquad u(t_0) = u_0, $$
avec un réseau de neurones à partir d'observations de la solution à différents temps.
Dans cette illustration, on considérera une équation de la forme
\begin{align*} x'(t) & = -\alpha \, x(t) - \beta \, y(t), \\ y'(t) & = \beta \, x(t) - \alpha \, y(t), \end{align*}
mais vous pourrez tester différents champs de vecteurs.
Commençons par illustrer les solutions d'une telle équation sur une grille de temps donnée. Ces solutions, et la grille de temps, nous servirons de données d'entrainement.
1) Entrainement¶
import numpy as np
import matplotlib.pyplot as plt
from scipy.integrate import solve_ivp
# champs de vecteur
def spiral_dynamics(t, u, a, b):
x, y = u
dxdt = a * x - b * y
dydt = b * x + a * y
return [dxdt, dydt]
# Parametres pour l'edo
a = 0.1
b = 1.0
t_start, t_stop = 0., 10. # bornes pour l'intervalle de temps
n_obs = 50 # nombre de points sur l'intervalle de temps
# grille de temps d'entrainement
t = np.linspace(t_start, t_stop, n_obs)
# conditions initiales pour l'entrainement
n_trajectories = 10
init_conditions = np.random.uniform(-2, 2, size=(n_trajectories, 2))
#liste pour stocker les différentes trajectoires d'entrainement
trajectories = np.zeros((n_obs, n_trajectories, 2)) # format qui va nous être retourné par torchdiffeq.odeint
# génère les trajectoires d'entrainement
for j, ic in enumerate(init_conditions):
sol = solve_ivp(
spiral_dynamics,
(t_start, t_stop),
ic,
t_eval=t,
args=(a, b),
method="RK45"
)
trajectories[:,j,:] = sol.y.T # on prend la transposé des trajectoires pour avoir deux colones
# et préparer la mise en forme des données.
# trajectoires/points d'entrainement
plt.figure(figsize=(8, 6))
for j in range(n_trajectories):
plt.plot(trajectories[:, j, 0], trajectories[:, j, 1], 'o-')
plt.title("Trajectoires en spirales", fontsize=18)
plt.xlabel("x", fontsize=18)
plt.ylabel("y", fontsize=18)
plt.grid()
plt.show()
Pour bien apprendre, il faut que les trajectoires du jeu d'entrainement recouvre bien le domaine dans lequel les trajectoires évoluent.
On va maintenant convertir nos données en tenseurs torch.
import torch
import torch.nn as nn
import torch.optim as optim
from torchdiffeq import odeint
# !pip install torchdiffeq # a executer dans une cellule pour google colab
Ici odeint
est un solver d'EDO implémenté via pytorch afin d'être compatible avec la backpropagation pour le calcul des gradients
# mise en forme des données d'entrainement
times_data = torch.tensor(t, dtype=torch.float32)
init_conds_data = torch.tensor(init_conditions, dtype=torch.float32)
trajectories_data = torch.tensor(trajectories, dtype=torch.float32)
On reprend notre classe de réseau de neurones vu au TP précédent.
# on reprend notre classe de réseau de neurones vu au tp précédent
class MLP(nn.Module):
def __init__(self, layers, activations):
"""
Parameters:
- layers: List of integer, the number of neurons in each layer (including input and output layers).
- activations: List of the activation functions for each layer.
The length of activations should be len(layers) - 2 (one for each hidden layer).
"""
super().__init__()
if len(activations) != len(layers) - 2:
raise ValueError("The number of activation functions must match the number of hidden layers.")
# Build the network
self.layers = nn.Sequential()
for i in range(len(layers) - 1):
self.layers.add_module(f"linear_{i}", nn.Linear(layers[i], layers[i + 1]))
if i < len(activations): # Apply activation to all but the last layer
self.layers.add_module(f"activation_{i}", activations[i])
def forward(self, t, x):
return self.layers(x)
On définit une fonction d'entrainement qui prend en entré un réseau de neurone et va l'entrainer.
def train(model, optimizer, loss_fct, times_data, init_conds_data, trajectories_data, n_epoch):
"""
Trains a given model on specific data for a defined number of epochs.
Parameters:
- model : neural network model to be trained.
- optimizer : optimization method to update the model's weights.
- loss_fct : the loss function to measure the error between the model's predictions and the target data.
- times_data : time grid
- init_conds_data : initial conditions for each training trajectories.
- trajectories_data : training trajectories
- epochs : inumber of epochs for training.
"""
for t in range(n_epoch):
pred_traj = odeint(model, init_conds_data, times_data)
loss = loss_fct(pred_traj, trajectories_data)
if t%100 == 0:
print("iteration=%s, loss=%s" % (t,loss.item()))
# On met à zéro les gradients avant de les calculer.
optimizer.zero_grad()
# On calcul les gradients
loss.backward()
# On met à jour les paramètres
optimizer.step()
print("iteration=%s, loss=%s" % (t,loss.item()))
Définissons une architecture pour notre réseau. Le problème de spirale est très simple, il n'est donc pas nécessaire d'avoir une architecture trop compliquée. De plus les NeuralODE ont un coût computationnel assez lourd. On va commencer par faire simple pour avoir un entrainement pas trop long.
# on définit l'architecture du réseau
layers = [2, 32, 2]
activations = [nn.Tanh()]
# définit le réseau de neurones
model = MLP(layers, activations)
# fonction de coût
loss_fct = nn.MSELoss()
n_epoch = 2000
learning_rate = 0.001
# définit la méthode d'optimisation
optimizer = optim.Adam(model.parameters(), lr=learning_rate)
# entrainement du réseau
train(model, optimizer, loss_fct, times_data, init_conds_data, trajectories_data, n_epoch)
iteration=0, loss=13.464597702026367 iteration=100, loss=3.4281327724456787 iteration=200, loss=3.349064350128174 iteration=300, loss=3.2874343395233154 iteration=400, loss=3.244019031524658 iteration=500, loss=3.2049717903137207 iteration=600, loss=3.1509957313537598 iteration=700, loss=2.9907209873199463 iteration=800, loss=1.440382957458496 iteration=900, loss=0.9623568058013916 iteration=1000, loss=0.6588707566261292 iteration=1100, loss=0.43824121356010437 iteration=1200, loss=0.28637823462486267 iteration=1300, loss=0.19047017395496368 iteration=1400, loss=0.13178901374340057 iteration=1500, loss=0.09554387629032135 iteration=1600, loss=0.07348066568374634 iteration=1700, loss=0.05957542359828949 iteration=1800, loss=0.04991249740123749 iteration=1900, loss=0.04260001704096794 iteration=1999, loss=0.03684285655617714
Question: Comparer graphiquement les prédictions du réseau aux trajectoires d'entrainement.¶
pred_trajectories = odeint(model, init_conds_data, times_data)
plt.figure(figsize=(10, 6))
idx = np.random.randint(low=0, high=n_trajectories, size=2)
for j in idx:
plt.plot(trajectories_data[:,j,0], trajectories_data[:,j,1], 'o-', label="true traj")
plt.plot(pred_trajectories[:, j, 0].detach(), pred_trajectories[:, j, 1].detach(), 'o-', label="predictions")
plt.legend(fontsize=12)
plt.title("True vs predicted spiral trajectories on a longer time grid than the training one")
plt.xlabel("x")
plt.ylabel("y")
plt.show()
Dans nos modestes conditions, le réseau n'est pas bien entrainé pour des trajectoires très excentriques.
2) Question: tester la capacité de prédiction du réseau pour de nouvelles trajectoires partant de nouvelles conditions initiales qui n'ont pas servi à l'entrainement. On choisira ces nouvelles conditions initiales aléatoirement.¶
Regardons comment notre réseau prédit de nouvelles trajectoires à partir de nouvelles conditions initiales.
# on génère de nouvelles conditions initiales
n_new_trajectories = 2
new_init_conditions = np.random.uniform(-2, 2, size=(n_new_trajectories, 2))
new_trajectories = np.zeros((n_obs, n_new_trajectories, 2)) # format correspondant à la sortie de torchdiffeq.odeint
# comparaisons des prédictions du réseau avec de nouvelles
for j, ic in enumerate(new_init_conditions):
sol = solve_ivp(
spiral_dynamics,
(t_start, t_stop),
ic,
t_eval=t,
args=(a, b),
method="RK45"
)
new_trajectories[:,j,:] = sol.y.T
new_init_conditions_torch = torch.tensor(new_init_conditions, dtype=torch.float32)
pred_trajectories = odeint(model, new_init_conditions_torch, times_data)
plt.figure(figsize=(10, 6))
for j in range(n_new_trajectories):
plt.plot(new_trajectories[:,j,0], new_trajectories[:,j,1], 'o-', label="true traj")
plt.plot(pred_trajectories[:, j, 0].detach(), pred_trajectories[:, j, 1].detach(), 'o-', label="predictions")
plt.legend(loc='upper right', fontsize=12)
plt.title("True vs predicted spiral trajectories on the training time grid", fontsize=18)
plt.xlabel("x", fontsize=18)
plt.ylabel("y", fontsize=18)
plt.show()
On peut observer des trajectoires bien prédites par le réseau, et d'autres moins bien prédites. Outre le fait qu'on a peu de trajectoires pour l'entrainement, et que le réseau n'est pas bien entrainé pour des trajectoires très excentriques, une courbe qui va sortir du domaine couvert par les trajectoires d'entrainement aura du mal à être prédite correctement.
3) Question: Sur une grille de temps plus fine, tester les prédictions du réseau par rapport aux trajectoires partant des nouvelles conditions initiales de la section précédente.¶
Prenons un nombre de points plus élevé que précédemment pour la grille de temps.
# nombre de points sur l'intervalle de temps
n_obs_test = 120
# nouvelle grille de temps de test
t_grid = torch.linspace(t_start, t_stop, n_obs_test)
new_trajectories = np.zeros((n_obs_test, n_new_trajectories, 2)) # format correspondant à la sortie de torchdiffeq.odeint
# on génère les nouvelles trajectoires
for j, ic in enumerate(new_init_conditions):
sol = solve_ivp(
spiral_dynamics,
(t_start, t_stop),
ic,
t_eval=t_grid,
args=(a, b),
method="RK45"
)
new_trajectories[:,j,:] = sol.y.T
# on génère les prédictions du réseau sur la grille de temps plus fine
pred_trajectories = odeint(model, new_init_conditions_torch, t_grid)
plt.figure(figsize=(10, 6))
for j in range(n_new_trajectories):
plt.plot(new_trajectories[:,j,0], new_trajectories[:,j,1], 'o-', label="true traj")
plt.plot(pred_trajectories[:, j, 0].detach(), pred_trajectories[:, j, 1].detach(), 'o-', label="predictions")
plt.legend()
plt.title("True vs predicted spiral trajectories on a finer time grid than the training one")
plt.xlabel("x")
plt.ylabel("y")
plt.show()
Le réseau semble bien généraliser sur une grille de temps plus fine.
4) Question: Sur une grille de temps plus longue, tester les prédictions du réseau par rapport aux trajectoires partant des nouvelles conditions initiales de la section 2.¶
# on génère la nouvelle grille de temps
t_stop_extended = t_stop + 5
n_obs_test = 150
# nouvelle grille de temps de test
t_grid_extended = torch.linspace(t_start, t_stop_extended , n_obs_test)
new_extended_trajectories = np.zeros((n_obs_test, n_new_trajectories, 2)) # format correspondant à la sortie de torchdiffeq.odeint
# on génère les nouvelles trajectoires
for j, ic in enumerate(new_init_conditions):
sol = solve_ivp(
spiral_dynamics,
(t_start, t_stop_extended),
ic,
t_eval=t_grid_extended.numpy(),
args=(a, b),
method="RK45"
)
new_extended_trajectories[:,j,:] = sol.y.T
# on génère les nouvelles prédictions
pred_extended_trajectories = odeint(model, new_init_conditions_torch, t_grid_extended)
plt.figure(figsize=(10, 6))
for j in range(n_new_trajectories):
plt.plot(new_extended_trajectories[:,j,0], new_extended_trajectories[:,j,1], 'o-', label="true traj")
plt.plot(pred_extended_trajectories[:, j, 0].detach(), pred_extended_trajectories[:, j, 1].detach(), 'o-', label="predictions")
plt.legend()
plt.title("True vs predicted spiral trajectories on a longer time grid than the training one")
plt.xlabel("x")
plt.ylabel("y")
plt.show()
Comme précédemment, on peut observer des trajectoires bien prédites par le réseau sur un temps plus que celui d'entrainement, et d'autres moins bien prédites. Sur un temps plus long, si la courbe ne sort pas du domaine couvert par les trajectoires d'entrainement, et qu'elle n'est pas trop excentrique,celle-ci pourra être prédite correctement. Mais sur un temps plus long, si la courbe sort du domaine couvert par les trajectoires d'entrainement, ou est trop excentrique, on aura du mal à être prédite correctement.