"""LSTM model definition and tools for LSTM model training."""
from __future__ import annotations
import math
from collections.abc import Callable
import numpy as np
import tensorflow as tf
import tensorflow.keras.backend as k
from tensorflow.keras.models import load_model
from .create_datasets import create_lstm_dataset, create_lstm_dataset_local
__all__ = [
"TestingGenerator",
"TestingGeneratorLocal",
"TrainingGenerator",
"TrainingGeneratorLocal",
"get_list_of_LSTM_models",
"run_trained_model",
"run_trained_model_local",
]
[docs]
def get_list_of_LSTM_models(model_structure) -> Callable: # noqa: N802
"""
Create a training generator to manage the GPU memory during training.
Parameters
----------
model_structure : str
The name of the LSTM model to use for training. Must correspond to one of the models present in lstm_static.py.
The "model_structure_dict" must be updated when new models are added.
Returns
-------
Callable
Handle to the LSTM model function.
"""
# Create a list of available models:
try:
model_structure_dict = {
"simple_local_lstm": _simple_local_lstm,
"simple_regional_lstm": _simple_regional_lstm,
"dummy_local_lstm": _dummy_local_lstm,
"dummy_regional_lstm": _dummy_regional_lstm,
}
except Exception as err: # noqa: BLE001
raise ValueError("The LSTM model structure desired is not present in the available model dictionary.") from err
model_handle = model_structure_dict[model_structure]
return model_handle
[docs]
class TrainingGenerator(tf.keras.utils.Sequence):
"""
Create a training generator to manage the GPU memory during training.
Parameters
----------
x_set : numpy.ndarray
Tensor of size [batch_size x window_size x n_dynamic_variables] that contains the batch of dynamic (i.e.
timeseries) variables that will be used during training.
x_set_static : numpy.ndarray
Tensor of size [batch_size x n_static_variables] that contains the batch of static (i.e. catchment descriptors)
variables that will be used during training.
x_set_q_stds : numpy.ndarray
Tensor of size [batch_size] that contains the standard deviation of scaled streamflow values for the catchment
associated to the data in x_set and x_set_static. Each data point could come from any catchment and this q_std
variable helps scale the objective function.
y_set : numpy.ndarray
Tensor of size [batch_size] containing the target variable for the same time point as in x_set, x_set_static and
x_set_q_stds. Usually the streamflow for the day associated to each of the training points.
batch_size : int
Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
performance and final model skill, and should be handled accordingly.
Returns
-------
self : An object containing the subset of the total data that is selected for this batch.
"""
def __init__(self, x_set, x_set_static, x_set_q_stds, y_set, batch_size):
self.x = x_set
self.x_static = x_set_static
self.x_q_stds = x_set_q_stds
self.y = y_set
self.batch_size = batch_size
self.indices = np.arange(self.x.shape[0])
def __len__(self):
"""Get total number of batches to generate"""
return (np.ceil(len(self.y) / float(self.batch_size))).astype(int)
def __getitem__(self, idx):
"""Get one of the batches by taking the 'batch_size' first elements from the list of remaining indices."""
inds = self.indices[idx * self.batch_size : (idx + 1) * self.batch_size]
batch_x = self.x[inds]
batch_x_static = self.x_static[inds]
batch_x_q_stds = self.x_q_stds[inds]
batch_y = self.y[inds]
return (np.array(batch_x), np.array(batch_x_static)), np.vstack((np.array(batch_y), np.array(batch_x_q_stds))).T
[docs]
def on_epoch_end(self):
"""Shuffle the dataset before the next batch sampling to ensure randomness, helping convergence."""
np.random.shuffle(self.indices)
[docs]
class TrainingGeneratorLocal(tf.keras.utils.Sequence):
"""
Create a training generator to manage the GPU memory during training.
Parameters
----------
x_set : numpy.ndarray
Tensor of size [batch_size x window_size x n_dynamic_variables] that contains the batch of dynamic (i.e.
timeseries) variables that will be used during training.
y_set : numpy.ndarray
Tensor of size [batch_size] containing the target variable for the same time point as in x_set, x_set_static and
x_set_q_stds. Usually the streamflow for the day associated to each of the training points.
batch_size : int
Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
performance and final model skill, and should be handled accordingly.
Returns
-------
self : An object containing the subset of the total data that is selected for this batch.
"""
def __init__(self, x_set, y_set, batch_size):
self.x = x_set
self.y = y_set
self.batch_size = batch_size
self.indices = np.arange(self.x.shape[0])
def __len__(self):
"""Get total number of batches to generate"""
return (np.ceil(len(self.y) / float(self.batch_size))).astype(int)
def __getitem__(self, idx):
"""Get one of the batches by taking the 'batch_size' first elements from the list of remaining indices."""
inds = self.indices[idx * self.batch_size : (idx + 1) * self.batch_size]
batch_x = self.x[inds]
batch_y = self.y[inds]
return (np.array(batch_x)), (np.array(batch_y))
[docs]
def on_epoch_end(self):
"""Shuffle the dataset before the next batch sampling to ensure randomness, helping convergence."""
np.random.shuffle(self.indices)
[docs]
class TestingGenerator(tf.keras.utils.Sequence):
"""
Create a testing generator to manage the GPU memory during training.
Parameters
----------
x_set : numpy.ndarray
Tensor of size [batch_size x window_size x n_dynamic_variables] that contains the batch of dynamic (i.e.
timeseries) variables that will be used during training.
x_set_static : numpy.ndarray
Tensor of size [batch_size x n_static_variables] that contains the batch of static (i.e. catchment descriptors)
variables that will be used during training.
batch_size : int
Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
performance and final model skill, and should be handled accordingly.
Returns
-------
self : An object containing the subset of the total data that is selected for this batch.
"""
def __init__(self, x_set, x_set_static, batch_size):
self.x = x_set
self.x_static = x_set_static
self.batch_size = batch_size
def __len__(self):
"""Get total number of batches to generate"""
return (np.ceil(self.x.shape[0] / float(self.batch_size))).astype(int)
def __getitem__(self, idx):
"""Get one of the batches by taking the 'batch_size' first elements from the list of remaining indices."""
batch_x = self.x[idx * self.batch_size : (idx + 1) * self.batch_size]
batch_x_static = self.x_static[idx * self.batch_size : (idx + 1) * self.batch_size]
return (np.array(batch_x), np.array(batch_x_static)), np.array(batch_x_static)
[docs]
class TestingGeneratorLocal(tf.keras.utils.Sequence):
"""
Create a testing generator to manage the GPU memory during training.
Parameters
----------
x_set : numpy.ndarray
Tensor of size [batch_size x window_size x n_dynamic_variables] that contains the batch of dynamic (i.e.
timeseries) variables that will be used during training.
batch_size : int
Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
performance and final model skill, and should be handled accordingly.
Returns
-------
self : An object containing the subset of the total data that is selected for this batch.
"""
def __init__(self, x_set, batch_size):
self.x = x_set
self.batch_size = batch_size
def __len__(self):
"""Get total number of batches to generate"""
return (np.ceil(self.x.shape[0] / float(self.batch_size))).astype(int)
def __getitem__(self, idx):
"""Get one of the batches by taking the 'batch_size' first elements from the list of remaining indices."""
batch_x = self.x[idx * self.batch_size : (idx + 1) * self.batch_size]
return [np.array(batch_x)]
def _kge_loss(data, y_pred):
"""
Compute the Kling-Gupta Efficiency (KGE) criterion under Keras for Tensorflow training.
Needs to be separate from the regular objective function calculations because it uses Keras/tensor arithmetic for
GPU.
Parameters
----------
data : numpy.ndarray
Observed streamflow used as target for the LSTM training.
y_pred : numpy.ndarray
Simulated streamflow generated by the LSTM during training.
Returns
-------
kge
KGE metric computed using Keras tools.
"""
y_true = data[:, 0]
y_pred = y_pred[:, 0]
# Compute the dimensionless correlation coefficient
r = k.sum((y_true - k.mean(y_true)) * (y_pred - k.mean(y_pred))) / (
k.sqrt(k.sum((y_true - k.mean(y_true)) ** 2)) * k.sqrt(k.sum((y_pred - k.mean(y_pred)) ** 2))
)
# Compute the dimensionless bias ratio b (beta)
b = k.mean(y_pred) / k.mean(y_true)
# Compute the dimensionless variability ratio g (gamma)
g = (k.std(y_pred) / k.mean(y_pred)) / (k.std(y_true) / k.mean(y_true))
# Compute the Kling-Gupta Efficiency (KGE) modified criterion
kge = 1 - (1 - k.sqrt((r - 1) ** 2 + (b - 1) ** 2 + (g - 1) ** 2))
return kge
def _nse_scaled_loss(data, y_pred):
"""
Compute the modified NSE loss for regional training.
Applies a random noise element for robustness, as well scales the flows according to their standard deviations to be
able to compute the nse with data from multiple catchments all scaled and normalized.
Parameters
----------
data : numpy.ndarray
Tensor of the target variable (column 1) and observed streamflow standard deviations (column 2).
y_pred : numpy.ndarray
Tensor of the predicted variable from the LSTM model.
Returns
-------
scaled_loss
Scaled modified NSE metric for training on regional models.
"""
y_true = data[:, 0]
q_stds = data[:, 1]
y_pred = y_pred[:, 0]
eps = 0.1
squared_error = (y_pred - y_true) ** 2
weights = 1 / (q_stds + eps) ** 2
scaled_loss = weights * squared_error
return scaled_loss
def _simple_regional_lstm(
window_size: int,
n_dynamic_features: int,
n_static_features: int,
training_func: str,
checkpoint_path: str = "tmp.keras",
):
"""
Define the LSTM model structure and hyperparameters to use. Must be updated by users to modify model structures.
Parameters
----------
window_size : int
Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
can improve results.
n_dynamic_features : int
Number of dynamic (i.e. time-series) variables that are used for training and simulating the model.
n_static_features : int
Number of static (i.e. catchment descriptor) variables that are used to define the regional LSTM model and allow
it to modulate simulations according to catchment properties.
training_func : str
Name of the objective function used for training. For a regional model, it is highly recommended to use the
scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
checkpoint_path : str
String containing the path of the file where the trained model will be saved. Default is `tmp.keras`.
Returns
-------
model_lstm : Tensorflow model
The tensorflow model that will be trained, along with all of its default hyperparameters and training options.
callback : list
A list of tensorflow objects that allow performing operations after each training epoch.
"""
x_in_365 = tf.keras.layers.Input(shape=(window_size, n_dynamic_features))
x_in_static = tf.keras.layers.Input(shape=(n_static_features,))
# LSTM 365 day
x_365 = tf.keras.layers.LSTM(128, return_sequences=True)(x_in_365)
x_365 = tf.keras.layers.LSTM(128, return_sequences=False)(x_365)
x_365 = tf.keras.layers.Dropout(0.1)(x_365)
# Dense statics
x_static = tf.keras.layers.Dense(24, activation="relu")(x_in_static)
x_static = tf.keras.layers.Dropout(0.2)(x_static)
# Concatenate the model
x = tf.keras.layers.Concatenate()([x_365, x_static])
x = tf.keras.layers.Dense(12, activation="relu")(x)
x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
x_out = tf.keras.layers.Dense(1, activation="relu")(x)
model_lstm = tf.keras.models.Model([x_in_365, x_in_static], [x_out])
if training_func == "nse_scaled":
model_lstm.compile(loss=_nse_scaled_loss, optimizer=tf.keras.optimizers.AdamW(clipnorm=0.1))
elif training_func == "kge":
model_lstm.compile(loss=_kge_loss, optimizer=tf.keras.optimizers.AdamW(clipnorm=0.1))
callback = [
tf.keras.callbacks.ModelCheckpoint(
checkpoint_path,
save_freq="epoch",
save_best_only=True,
monitor="val_loss",
mode="min",
verbose=1,
),
tf.keras.callbacks.EarlyStopping(monitor="val_loss", mode="min", verbose=1, patience=15),
tf.keras.callbacks.LearningRateScheduler(_step_decay),
]
return model_lstm, callback
def _simple_local_lstm(
window_size: int,
n_dynamic_features: int,
training_func: str,
checkpoint_path: str = "tmp.keras",
):
"""
Define the local LSTM model structure and hyperparameters to use.
Must be updated by users to modify model structures.
Parameters
----------
window_size : int
Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
can improve results.
n_dynamic_features : int
Number of dynamic (i.e. time-series) variables that are used for training and simulating the model.
training_func : str
Name of the objective function used for training. For a regional model, it is highly recommended to use the
scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
checkpoint_path : str
String containing the path of the file where the trained model will be saved.
Returns
-------
model_lstm : Tensorflow model
The tensorflow model that will be trained, along with all of its default hyperparameters and training options.
callback : list
A list of tensorflow objects that allow performing operations after each training epoch.
"""
x_in_365 = tf.keras.layers.Input(shape=(window_size, n_dynamic_features))
# Single LSTM layer
x_365 = tf.keras.layers.LSTM(32, return_sequences=True)(x_in_365)
x_365 = tf.keras.layers.LSTM(32, return_sequences=False)(x_365)
x_365 = tf.keras.layers.Dropout(0.1)(x_365) # Add dropout layer for robustness
# Pass to a simple Dense layer with relu activation and LeakyReLU activation
x = tf.keras.layers.Dense(12, activation="relu")(x_365)
x = tf.keras.layers.LeakyReLU(alpha=0.1)(x)
# pass to a 1-unit dense layer representing output flow.
x_out = tf.keras.layers.Dense(1, activation="relu")(x)
# Build model
model_lstm = tf.keras.models.Model([x_in_365], [x_out])
if training_func == "kge":
model_lstm.compile(loss=_kge_loss, optimizer=tf.keras.optimizers.AdamW(clipnorm=0.1))
else:
raise ValueError("training_func can only be kge for the local training model.")
callback = [
tf.keras.callbacks.ModelCheckpoint(
checkpoint_path,
save_freq="epoch",
save_best_only=True,
monitor="val_loss",
mode="min",
verbose=1,
),
tf.keras.callbacks.EarlyStopping(monitor="val_loss", mode="min", verbose=1, patience=15),
tf.keras.callbacks.LearningRateScheduler(_step_decay),
]
return model_lstm, callback
def _dummy_regional_lstm(
window_size: int,
n_dynamic_features: int,
n_static_features: int,
training_func: str,
checkpoint_path: str = "tmp.keras",
):
"""
Define the LSTM model structure and hyperparameters to use. Must be updated by users to modify model structures.
Parameters
----------
window_size : int
Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
can improve results.
n_dynamic_features : int
Number of dynamic (i.e. time-series) variables that are used for training and simulating the model.
n_static_features : int
Number of static (i.e. catchment descriptor) variables that are used to define the regional LSTM model and allow
it to modulate simulations according to catchment properties.
training_func : str
Name of the objective function used for training. For a regional model, it is highly recommended to use the
scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
checkpoint_path : str
String containing the path of the file where the trained model will be saved.
Returns
-------
model_lstm : Tensorflow model
The tensorflow model that will be trained, along with all of its default hyperparameters and training options.
callback : list
A list of tensorflow objects that allow performing operations after each training epoch.
"""
x_in_365 = tf.keras.layers.Input(shape=(window_size, n_dynamic_features))
x_in_static = tf.keras.layers.Input(shape=(n_static_features,))
# LSTM 365 day
x_365 = tf.keras.layers.LSTM(64, return_sequences=False)(x_in_365)
x_365 = tf.keras.layers.Dropout(0.2)(x_365)
# Dense statics
x_static = tf.keras.layers.Dense(24, activation="relu")(x_in_static)
x_static = tf.keras.layers.Dropout(0.2)(x_static)
# Concatenate the model
x = tf.keras.layers.Concatenate()([x_365, x_static])
x = tf.keras.layers.Dense(8, activation="relu")(x)
x_out = tf.keras.layers.Dense(1, activation="relu")(x)
model_lstm = tf.keras.models.Model([x_in_365, x_in_static], [x_out])
if training_func == "nse_scaled":
model_lstm.compile(loss=_nse_scaled_loss, optimizer=tf.keras.optimizers.AdamW())
elif training_func == "kge":
model_lstm.compile(loss=_kge_loss, optimizer=tf.keras.optimizers.AdamW())
callback = [
tf.keras.callbacks.ModelCheckpoint(
checkpoint_path,
save_freq="epoch",
save_best_only=True,
monitor="val_loss",
mode="min",
verbose=1,
),
tf.keras.callbacks.EarlyStopping(monitor="val_loss", mode="min", verbose=1, patience=15),
tf.keras.callbacks.LearningRateScheduler(_step_decay),
]
return model_lstm, callback
def _dummy_local_lstm(
window_size: int,
n_dynamic_features: int,
training_func: str,
checkpoint_path: str = "tmp.keras",
):
"""
Define the local LSTM model structure and hyperparameters to use.
Must be updated by users to modify model structures.
Parameters
----------
window_size : int
Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
can improve results.
n_dynamic_features : int
Number of dynamic (i.e. time-series) variables that are used for training and simulating the model.
training_func : str
Name of the objective function used for training. For a regional model, it is highly recommended to use the
scaled nse_loss variable that uses the standard deviation of streamflow as inputs.
checkpoint_path : str
String containing the path of the file where the trained model will be saved.
Returns
-------
model_lstm : Tensorflow model
The tensorflow model that will be trained, along with all of its default hyperparameters and training options.
callback : list
A list of tensorflow objects that allow performing operations after each training epoch.
"""
x_in_365 = tf.keras.layers.Input(shape=(window_size, n_dynamic_features))
# LSTM 365 day
x_365 = tf.keras.layers.LSTM(64, return_sequences=False)(x_in_365) # Single LSTM layer
x_365 = tf.keras.layers.Dropout(0.2)(x_365) # Add dropout layer for robustness
x = tf.keras.layers.Dense(8, activation="relu")(x_365) # Pass to a simple Dense layer with relu activation
x_out = tf.keras.layers.Dense(1, activation="relu")(x) # pass to a 1-unit dense layer representing output flow.
model_lstm = tf.keras.models.Model([x_in_365], [x_out])
if training_func == "kge":
model_lstm.compile(loss=_kge_loss, optimizer=tf.keras.optimizers.AdamW())
else:
raise ValueError("training_func can only be kge for the local training model.")
callback = [
tf.keras.callbacks.ModelCheckpoint(
checkpoint_path,
save_freq="epoch",
save_best_only=True,
monitor="val_loss",
mode="min",
verbose=1,
),
tf.keras.callbacks.EarlyStopping(monitor="val_loss", mode="min", verbose=1, patience=15),
tf.keras.callbacks.LearningRateScheduler(_step_decay),
]
return model_lstm, callback
def _step_decay(epoch: int) -> float:
"""
Callback for learning rate tuning during LSTM model training.
Parameters
----------
epoch : int
Current epoch number during training.
Used to adapt the learning rate after a certain number of epochs to aid in model training convergence.
Returns
-------
lrate : float
The updated learning rate.
"""
initial_lrate = 0.0005
drop = 0.5
epochs_drop = 10.0
lrate = initial_lrate * math.pow(drop, math.floor((1 + epoch) / epochs_drop))
return lrate
[docs]
def run_trained_model(
arr_dynamic: np.ndarray,
arr_static: np.ndarray,
q_stds: np.ndarray,
window_size: int,
w: int,
idx_scenario: np.ndarray,
batch_size: int,
watershed_areas: np.ndarray,
name_of_saved_model: str,
remove_nans: bool,
):
"""
Run the trained regional LSTM model on a single catchment from a larger set.
Parameters
----------
arr_dynamic : numpy.ndarray
Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
variables that will be used during training. The first element in axis=2 is the observed flow.
arr_static : numpy.ndarray
Tensor of size [time_steps x n_static_variables] that contains the static (i.e. catchment descriptors) variables
that will be used during training.
q_stds : numpy.ndarray
Tensor of size [time_steps] that contains the standard deviation of scaled streamflow values for the catchment
associated to the data in arr_dynamic and arr_static.
window_size : int
Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
can improve results.
w : int
Number of the watershed from the list of catchments that will be simulated.
idx_scenario : numpy.ndarray
2-element array of indices of the beginning and end of the desired period for which the LSTM model should be
simulated.
batch_size : int
Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
performance and final model skill, and should be handled accordingly.
watershed_areas : np.ndarray
Area of the watershed, in square kilometers, as taken from the training dataset initial input ncfile.
name_of_saved_model : str
Path to the model that has been pre-trained if required for simulations.
remove_nans : bool
Remove the periods for which observed streamflow is NaN for both observed and simulated flows.
Returns
-------
kge : float
KGE value between observed and simulated flows computed for the watershed of interest and for a specified
period.
flows : np.ndarray
Observed and simulated streamflows computed for the watershed of interest and for a specified period.
"""
# Delete and reload the model to free the memory
k.clear_session()
model_lstm = load_model(name_of_saved_model, compile=False, custom_objects={"loss": _nse_scaled_loss})
# Training Database
x, x_static, _, y = create_lstm_dataset(
arr_dynamic=arr_dynamic[:, :, :],
arr_static=arr_static,
q_stds=q_stds,
window_size=window_size,
watershed_list=w[np.newaxis], # FIXME: This is indexing an int
idx=idx_scenario,
remove_nans=remove_nans,
)
y_pred = model_lstm.predict(TestingGenerator(x, x_static, batch_size=batch_size))
y_pred = np.squeeze(y_pred)
# Rescale observed and simulated streamflow from mm/d to m^3/s
drainage_area = watershed_areas[w]
y_pred = y_pred * drainage_area / 86.4
y = y * drainage_area / 86.4
# Compute the Kling-Gupta Efficiency (KGE) for the current watershed
kge = obj_fun_kge(qobs=y, qsim=y_pred)
flows = np.array([y, y_pred])
return kge, flows
def obj_fun_kge(qobs: np.ndarray, qsim: np.ndarray) -> float:
"""
Compute the Kling-Gupta Efficiency (KGE) criterion.
Parameters
----------
qobs : numpy.ndarray
Observed streamflow.
qsim : numpy.ndarray
Simulated streamflow.
Returns
-------
float
KGE criterion value.
"""
# Remove all nans from both observed and simulated streamflow
ind_nan = np.isnan(qobs)
qobs = qobs[~ind_nan]
qsim = qsim[~ind_nan]
# Compute the dimensionless correlation coefficient
r = np.corrcoef(qsim, qobs)[0, 1]
# Compute the dimensionless bias ratio b (beta)
b = np.mean(qsim) / np.mean(qobs)
# Compute the dimensionless variability ratio g (gamma)
g = (np.std(qsim) / np.mean(qsim)) / (np.std(qobs) / np.mean(qobs))
# Compute the Kling-Gupta Efficiency (KGE) modified criterion
kge = 1 - np.sqrt((r - 1) ** 2 + (b - 1) ** 2 + (g - 1) ** 2)
# In some cases, the KGE can return nan values which will force some
# optimization algorithm to crash. Force the worst value possible instead.
if np.isnan(kge):
kge = -np.inf
return kge
[docs]
def run_trained_model_local(
arr_dynamic: np.ndarray,
window_size: int,
idx_scenario: np.ndarray,
batch_size: int,
name_of_saved_model: str,
remove_nans: bool,
):
"""
Run the trained regional LSTM model on a single catchment from a larger set.
Parameters
----------
arr_dynamic : numpy.ndarray
Tensor of size [time_steps x window_size x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series)
variables that will be used during training. The first element in axis=2 is the observed flow.
window_size : int
Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to
allow the model to learn from long-term weather patterns and history to predict the next day's streamflow.
Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but
can improve results.
idx_scenario : np.ndarray
2-element array of indices of the beginning and end of the desired period for which the LSTM model should be
simulated.
batch_size : int
Number of data points to use in training. Datasets are often way too big to train in a single batch on a single
GPU or CPU, meaning that the dataset must be divided into smaller batches. This has an impact on the training
performance and final model skill, and should be handled accordingly.
name_of_saved_model : str
Path to the model that has been pre-trained if required for simulations.
remove_nans : bool
Remove the periods for which observed streamflow is NaN for both observed and simulated flows.
Returns
-------
kge : float
KGE value between observed and simulated flows computed for the watershed of interest and for a specified
period.
flows : np.ndarray
Observed and simulated streamflows computed for the watershed of interest and for a specified period.
"""
# Delete and reload the model to free the memory
k.clear_session()
model_lstm = load_model(name_of_saved_model, compile=False, custom_objects={"loss": _kge_loss})
# Training Database
x, y = create_lstm_dataset_local(
arr_dynamic=arr_dynamic,
window_size=window_size,
idx=idx_scenario,
remove_nans=remove_nans,
)
y_pred = model_lstm.predict(TestingGeneratorLocal(x, batch_size=batch_size))
y_pred = np.squeeze(y_pred)
# Compute the Kling-Gupta Efficiency (KGE) for the watershed
kge = obj_fun_kge(qobs=y, qsim=y_pred)
flows = np.array([y, y_pred])
return kge, flows