Source code for xhydro_lstm.create_datasets

"""Tools to create the datasets to be used in LSTM model training and simulation."""

from __future__ import annotations
import os
from pathlib import Path

import numpy as np
import xarray as xr


__all__ = [
    "create_dataset_flexible",
    "create_dataset_flexible_local",
    "create_lstm_dataset",
    "create_lstm_dataset_local",
    "remove_nans_func",
    "remove_nans_func_local",
]


[docs] def create_dataset_flexible( filename: str | os.PathLike, dynamic_var_tags: list, qsim_pos: list, static_var_tags: list, ): """ Prepare the arrays of dynamic, static and observed flow variables. A few things are absolutely required: 1. a "watershed" coordinate that contains the ID of watersheds, such that we can preallocate the size of the matrices. 2. a "qobs" variable that contains observed flows for the catchments. 3. The size of the catchments in the "drainage_area" variable. This is used to compute scaled streamflow values for regionalization applications. Parameters ---------- filename : str or os.Pathlike Path to the netcdf file containing the required input and target data for the LSTM. The ncfile must contain a dataset named "qobs" and "drainage_area" for the code to work, as these are required as target and scaling for training, respectively. dynamic_var_tags : list of str List of dataset variables to use in the LSTM model training. Must be part of the input_data_filename ncfile. qsim_pos : list of bool List of same length as dynamic_var_tags. Should be set to all False EXCEPT where the dynamic_var_tags refer to flow simulations (ex: simulations from a hydrological model such as HYDROTEL). Those should be set to True. static_var_tags : list of str List of the catchment descriptor names in the input_data_filename ncfile. They need to be present in the ncfile and will be used as inputs to the regional model, to help the flow regionalization process. Returns ------- arr_dynamic : np.ndarray Tensor of size [watersheds x timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series) variables that will be used during training. The first element in axis=2 is the observed flow. arr_static : np.ndarray Tensor of size [watersheds x n_static_variables] that contains the static (i.e. catchment descriptors) variables that will be used during training. q_stds : np.ndarray Tensor of size [watersheds] that contains the standard deviation of scaled streamflow values for the catchment associated to the data in arr_dynamic and arr_static. """ # Open the dataset file ds = xr.open_dataset(Path(filename)) # Number of watersheds in the dataset n_watersheds = len(ds.watershed) # Number of days for each dynamic variable. n_days = len(ds.time) # Perform the analysis for qobs first. arr_qobs = ds.qobs.values.astype(np.float32) if ds.qobs.dims[0] == "time": arr_qobs = arr_qobs.T arr_qobs = arr_qobs / ds.drainage_area.values[:, np.newaxis] * 86.4 # Prepare the dynamic data array and set qobs as the first value arr_dynamic = np.empty(shape=[n_watersheds, n_days, len(dynamic_var_tags) + 1], dtype=np.float32) arr_dynamic[:] = np.nan arr_dynamic[:, :, 0] = arr_qobs for i in range(len(dynamic_var_tags)): # Read the data and put in a tmp var tmp = ds[dynamic_var_tags[i]].values.astype(np.float32) if ds.qobs.dims[0] == "time": tmp = tmp.T # If the variable must be scaled, do it if qsim_pos[i]: tmp = tmp / ds.drainage_area.values[:, np.newaxis] * 86.4 # Set the data in the main dataset arr_dynamic[:, :, i + 1] = tmp # Prepare the static dataset arr_static = np.empty([n_watersheds, len(static_var_tags)], dtype=np.float32) # Loop the for i in range(len(static_var_tags)): arr_static[:, i] = ds[static_var_tags[i]].values return arr_dynamic, arr_static, arr_qobs
[docs] def create_dataset_flexible_local( filename: str | os.PathLike, dynamic_var_tags: list, qsim_pos: list, ): """ Prepare the arrays of dynamic and observed flow variables. A few things are absolutely required: 1. a "watershed" variable that contains the ID of watersheds, such that we can preallocate the size of the matrices. 2. a "qobs" variable that contains observed flows for the catchments. Parameters ---------- filename : str or os.PathLike Path to the netcdf file containing the required input and target data for the LSTM. The ncfile must contain a dataset named "qobs" and "drainage_area" for the code to work, as these are required as target and scaling for training, respectively. dynamic_var_tags : list of str List of dataset variables to use in the LSTM model training. Must be part of the input_data_filename ncfile. qsim_pos : list of bool List of same length as dynamic_var_tags. Should be set to all False EXCEPT where the dynamic_var_tags refer to flow simulations (ex: simulations from a hydrological model such as HYDROTEL). Those should be set to True. Returns ------- arr_dynamic : np.ndarray Tensor of size [watersheds x timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series) variables that will be used during training. The first element in axis=1 is the observed flow. arr_qobs : np.ndarray Array containing the observed flow vector. """ # Open the dataset file ds = xr.open_dataset(Path(filename)) # Number of days for each dynamic variables # of watersheds. n_days = len(ds.time) # Perform the analysis for qobs first. arr_qobs = ds.qobs.values.astype(np.float32) # Prepare the dynamic data array and set qobs as the first value arr_dynamic = np.empty(shape=[n_days, len(dynamic_var_tags) + 1], dtype=np.float32) arr_dynamic[:] = np.nan arr_dynamic[:, 0] = np.squeeze(arr_qobs) for i in range(len(dynamic_var_tags)): # Read the data and put in a tmp var tmp = ds[dynamic_var_tags[i]].values.astype(np.float32) # If the variable must be scaled, do it if qsim_pos[i]: tmp = tmp / ds.drainage_area.values * 86.4 # Set the data in the main dataset arr_dynamic[:, i + 1] = np.squeeze(tmp) return arr_dynamic, np.squeeze(arr_qobs)
[docs] def create_lstm_dataset( arr_dynamic: np.ndarray, arr_static: np.ndarray, q_stds: np.ndarray, window_size: int, watershed_list: list, idx: np.ndarray, remove_nans: bool = True, ): """ Create the LSTM dataset and shape the data using look-back windows and preparing all data for training. Parameters ---------- arr_dynamic : np.ndarray Tensor of size [watersheds x timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series) variables that will be used during training. The first element in axis=2 is the observed flow. arr_static : np.ndarray Tensor of size [watersheds x n_static_variables] that contains the static (i.e. catchment descriptors) variables that will be used during training. q_stds : np.ndarray Tensor of size [watersheds] that contains the standard deviation of scaled streamflow values for the catchment associated to the data in arr_dynamic and arr_static. window_size : int Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to allow the model to learn from long-term weather patterns and history to predict the next day's streamflow. Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but can improve results. watershed_list : list The total number of watersheds that will be used for training and simulation. Corresponds to the watershed in the input file, i.e. in the arr_dynamic array axis 0. idx : np.ndarray 2-element array of indices of the beginning and end of the desired period for which the LSTM model should be simulated. remove_nans : bool Flag indicating that the NaN values associated to the observed streamflow should be removed. Required for training but can be kept to False for simulation to ensure simulation on the entire period. Returns ------- x : np.ndarray Tensor of size [(timesteps * watersheds) x window_size x n_dynamic_variables] that contains the dynamic (i.e. timeseries) variables that will be used during training. x_static : np.ndarray Tensor of size [(timesteps * watersheds) x n_static_variables] that contains the static (i.e. catchment descriptors) variables that will be used during training. x_q_stds : np.ndarray Tensor of size [(timesteps * watersheds)] that contains the standard deviation of scaled streamflow values for the catchment associated to the data in x and x_static. Each data point could come from any catchment and this q_std variable helps scale the objective function. y : np.ndarray Tensor of size [(timesteps * watersheds)] containing the target variable for the same time point as in x, x_static and x_q_stds. Usually the observed streamflow for the day associated to each of the training points. """ ndata = arr_dynamic.shape[2] - 1 x = np.empty((0, window_size, ndata)) x_static = np.empty((0, arr_static.shape[1])) x_q_stds = np.empty(0) y = np.empty(0) for w in watershed_list: idx_w = idx[w] print("Currently working on watershed no: " + str(w)) x_w, x_w_static, x_w_q_std, y_w = _extract_watershed_block( arr_w_dynamic=arr_dynamic[w, idx_w[0] : idx_w[1], :], arr_w_static=arr_static[w, :], q_std_w=q_stds[w], window_size=window_size, ) # remove nans if remove_nans: y_w, x_w, x_w_q_std, x_w_static = remove_nans_func(y_w, x_w, x_w_q_std, x_w_static) x = np.vstack([x, x_w]) x_static = np.vstack([x_static, x_w_static]) x_q_stds = np.hstack([x_q_stds, x_w_q_std]) y = np.hstack([y, y_w]) return x, x_static, x_q_stds, y
[docs] def create_lstm_dataset_local( arr_dynamic: np.ndarray, window_size: int, idx: np.ndarray, remove_nans: bool = True, ): """ Create the local LSTM dataset and shape the data using look-back windows and preparing all data for training. Parameters ---------- arr_dynamic : np.ndarray Tensor of size [watersheds x timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series) variables that will be used during training. The first element in axis=2 is the observed flow. window_size : int Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to allow the model to learn from long-term weather patterns and history to predict the next day's streamflow. Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but can improve results. idx : np.ndarray 2-element array of indices of the beginning and end of the desired period for which the LSTM model should be simulated. remove_nans : bool Flag indicating that the NaN values associated to the observed streamflow should be removed. Required for training but can be kept to False for simulation to ensure simulation on the entire period. Returns ------- x : np.ndarray Tensor of size [timesteps x window_size x n_dynamic_variables] that contains the dynamic (i.e. timeseries) variables that will be used during training. y : np.ndarray Tensor of size [timesteps] containing the target variable for the same time point as in x, x_static and x_q_stds. Usually the observed streamflow for the day associated to each of the training points. """ x, y = _extract_watershed_block_local( arr_dynamic=arr_dynamic[idx[0] : idx[1], :], window_size=window_size, ) # Remove nans if remove_nans: y, x = remove_nans_func_local(y, x) return x, y
def _extract_windows_vectorized( array: np.ndarray, sub_window_size: int, ): """ Create the array where each day contains data from a previous period (look-back period). Parameters ---------- array : np.ndarray The array of dynamic variables for a single catchment. sub_window_size : int Size of the look-back window. Returns ------- data_array Array of dynamic data processed into a 3D tensor for LSTM model training. """ max_time = array.shape[0] # expand_dims are used to convert a 1D array to 2D array. sub_windows = np.expand_dims(np.arange(sub_window_size), 0) + np.expand_dims(np.arange(max_time - sub_window_size), 0).T data_array = array[sub_windows] return data_array def _extract_watershed_block( arr_w_dynamic: np.ndarray, arr_w_static: np.ndarray, q_std_w: np.ndarray, window_size: int, ): """ Extract all series of the desired window length over all features for a given watershed. Create the LSTM tensor format of data from the regular input arrays. Both dynamic and static variables are extracted. Parameters ---------- arr_w_dynamic : np.ndarray Tensor of size [timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series) variables that will be used during training for the current catchment. The first element in axis=1 is the observed flow. arr_w_static : np.ndarray Tensor of size [n_static_variables] that contains the static (i.e. catchment descriptors) variables that will be used during training for the current catchment. q_std_w : np.ndarray Tensor of size [1] that contains the standard deviation of scaled streamflow values for the catchment associated to the current catchment. window_size : int Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to allow the model to learn from long-term weather patterns and history to predict the next day's streamflow. Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but can improve results. Returns ------- x_w : np.ndarray Tensor of size [timesteps x window_size x n_dynamic_variables] that contains the dynamic (i.e. time-series) variables that will be used during training for a single processed catchment. x_w_static : np.ndarray Tensor of size [timesteps x n_static_variables] that contains the static (i.e. catchment descriptors) variables that will be used during training for a single processed catchment. x_w_q_stds : np.ndarray Tensor of size [timesteps] that contains the standard deviation of scaled streamflow values for the catchment associated to the data in x and x_static for a single processed catchment. y_w : np.ndarray Tensor of size [timesteps] containing the target variable for the same time point as in x_w, x_w_static and x_w_q_stds. Usually the observed streamflow for the day associated to each of the training points for the currently processed catchment. """ # Extract all series of the desired window length for all features x_w = _extract_windows_vectorized(array=arr_w_dynamic, sub_window_size=window_size) x_w_static = np.repeat(arr_w_static.reshape(-1, 1), x_w.shape[0], axis=1).T x_w_q_std = np.squeeze(np.repeat(q_std_w.reshape(-1, 1), x_w.shape[0], axis=1).T) # Get the last value of qobs from each series for the prediction y_w = x_w[:, -1, 0] # Remove qobs from the features x_w = np.delete(x_w, 0, axis=2) return x_w, x_w_static, x_w_q_std, y_w def _extract_watershed_block_local(arr_dynamic: np.ndarray, window_size: int): """ Extract all series of the desired window length over all features for a given watershed. Create the LSTM tensor format of data from the regular input arrays. Both dynamic and static variables are extracted. Parameters ---------- arr_dynamic : np.ndarray Tensor of size [timestep x (n_dynamic_variables+1)] that contains the dynamic (i.e. time-series) variables that will be used during training for the current catchment. The first element in axis=1 is the observed flow. window_size : int Number of days of look-back for training and model simulation. LSTM requires a large backwards-looking window to allow the model to learn from long-term weather patterns and history to predict the next day's streamflow. Usually set to 365 days to get one year of previous data. This makes the model heavier and longer to train but can improve results. Returns ------- x : np.ndarray Tensor of size [timesteps x window_size x n_dynamic_variables] that contains the dynamic (i.e. time-series) variables that will be used during training for a the catchment. y : np.ndarray Tensor of size [timesteps] containing the target variable for the same time point as in 'x'. Usually the observed streamflow for the day associated to each of the training points. """ # Extract all series of the desired window length for all features x = _extract_windows_vectorized(array=arr_dynamic, sub_window_size=window_size) # Get the last value of qobs from each series for the prediction y = x[:, -1, 0] # Remove qobs from the features x = np.delete(x, 0, axis=2) return x, y
[docs] def remove_nans_func(y: np.ndarray, x: np.ndarray, x_q_std: np.ndarray, x_static: np.ndarray): """ Check for nans in the variable "y" and remove all lines containing those nans in all datasets. Parameters ---------- y : np.ndarray Array of target variables for training, that might contain NaNs. x : np.ndarray Array of dynamic variables for LSTM model training and simulation. x_q_std : np.ndarray Array of observed streamflow standard deviations for catchments in regional LSTM models. x_static : np.ndarray Array of static variables for LSTM model training and simulation, specifically for regional LSTM models. Returns ------- y : np.ndarray Array of target variables for training, with all NaNs removed. x : np.ndarray Array of dynamic variables for LSTM model training and simulation, with values associated to NaN "y" values removed. x_q_std : np.ndarray Array of observed streamflow standard deviations for catchments in regional LSTM models, with values associated to NaN "y" values removed. x_static : np.ndarray Array of static variables for LSTM model training and simulation, specifically for regional LSTM models, with values associated to NaN "y" values removed. """ ind_nan = np.isnan(y) y = y[~ind_nan] x = x[~ind_nan, :, :] x_q_stds = x_q_std[~ind_nan] x_static = x_static[~ind_nan, :] return y, x, x_q_stds, x_static
[docs] def remove_nans_func_local(y: np.ndarray, x: np.ndarray): """ Check for nans in the variable "y" and remove all lines containing those nans in all datasets. Parameters ---------- y : np.ndarray Array of target variables for training, that might contain NaNs. x : np.ndarray Array of dynamic variables for LSTM model training and simulation. Returns ------- y : np.ndarray Array of target variables for training, with all NaNs removed. x : np.ndarray Array of dynamic variables for LSTM model training and simulation, with values associated to NaN "y" values removed. """ ind_nan = np.isnan(y) y = y[~ind_nan] x = x[~ind_nan, :, :] return y, x