"""
This module contains the implementation of the ParallelReservoirsBase class, three coresponding subclasses ParallelReservoirs, ParallelReservoirsFFT and ParallelReservoirsPCA and the ParallelReservoirsArguments dataclass.
The Subclasses are used to predict high dimensional (spatially extended) systems using multiple Reservoir Computers in parallel:
1. The ParallelReservoirs class is used for high dimensional systems, without a dimension reduction.
2. The ParallelReservoirsFFT class is used for high dimensional systems, using only a selection of fft modes as a dimension reduction.
3. The ParallelReservoirsPCA class is used for high dimensional systems, using only a selection of pca modes as a dimension reduction.
The base class :code:`ParallelReservoirsBase` contains everything for parallel reservoir applications that predicts high dimensional (spatially extended) systems using multiple Reservoir Computers in parallel.
The classes use a class instance of :code:`ParallelReservoirsArguments`, which is a dataclass that represents the parameters for configuring the (parallel-) reservoirs.
Author: Luk Fleddermann, Gerrit Wellecke
Date: 13.06.2024
"""
import logging as log
import os.path as paths
import pickle
from abc import ABC, abstractmethod
from dataclasses import dataclass, fields
import numpy as np
from sklearn.decomposition import PCA
from drrc.config import Config
from drrc.reservoircomputer import ReservoirComputer
from drrc.tools.logger_config import drrc_logger as logger
# NOTE: what ParallelReservoirsX are supposed to do:
# - Implement the use of multi-reservoirs
# - Contain a list of those reservoirs
# - Train a reservoir for each of the subdomains or alternatively train one reservoir on
# all subdomains
# - Handle the training & output data w.r.t. linear transformations
# (currently PCA & FFT) and boundary conditions (through ghost cells)
# Coding
# TODO: JIT the _boundary_condition and _evaluation_one_step, this is used in every prediction step
# TODO: Maybe add tqdm to Training loop of reservoirs, decide what we want there some times the number is 1-4 sometimes up to at least 2000
# Testing and documentation
# TODO: write wrong parameter usage function. For instance, ParallelReservoirs should not have dimensionreduction
# TODO: Insert wrong arguments checking for all non-internal functions (init, train, itterative_predict, reservoir_transient)
# TODO: remove class variables from documentation
# get logging levels
loglevel_debug_parallelreservoirs = log.getLevelName("DEBUG_PRC")
loglevel_debug_DRRC = log.getLevelName("DEBUG_DRRC")
loglevel_info_single_run = log.getLevelName("INFO_1RUN")
loglevel_info_multiple_run = log.getLevelName("INFO_nRUN")
[docs]
@dataclass
class ParallelReservoirsArguments:
"""Parallel-reservoir parameters.
Args:
adjacency_degree (int):
average degree of the adjacency matrix
adjacency_dense (bool):
whether the adjacency matrix is sparse (False) or dense (True)
adjacency_spectralradius (float):
spectral radius, largest eigenvalue of the adjacency matrix
reservoir_leakage (float):
leakage the strength of memory a reservoir state remembers old
excitations with (0.0 only driven from new data no memory, 1.0 no update
of rs)
reservoir_nodes (int):
number of nodes in each of the parallel reservoirs
input_scaling (float):
input scaling the maximal absolute value of entries in the input matrix
input_bias (float):
scaling of the bias strength double the maximal absolute value of the
bias input to a reservoir node, None defaults to inscale
spatial_shape (tuple[int, ...]):
shape of the input data, without boundary condition (for ex. :code:`(128,)` in 1D case, :code:`(128,128)` in 2D case)
system_variables (int):
number of variables in the system (data for one time step is of shape :code:`(system_variables, *spatial_shape)`).
boundary_condition (str):
the type of boundary condition to apply to the input data
parallelreservoirs_grid_shape (tuple[int, ...]):
the amount of reservoirs per dimension that should be used together as a
multi-reservoir (for ex. [2,1] in 2D case for 2 reservoirs in
x direction)
parallelreservoirs_ghosts (int):
number of variables that a reservoir sees from outside the region where
its predicting for sync.
dimensionreduction_fraction (float):
fraction of variables that actually enters the reservoir after
dimension reduction
training_includeinput (bool):
whether to also fit the input signal for predicting the next timestep
training_regularization (float):
regularization strength for the ridge regression
training_output_bias (float):
scaling of the output bias
identical_inputmatrix (bool):
whether we use different input matrices for each reservoir
identical_adjacency (bool):
whether we use different adjacency matrices for each reservoir
identical_outputmatrix (bool | str | tuple[int, ...]):
whether we train each domain with a separate reservoir (False) or one
reservoir on all domains ('combine_data') or one reservoir on one domain (tuple of indices)
"""
adjacency_degree: int
adjacency_dense: bool
adjacency_spectralradius: float
reservoir_leakage: float
reservoir_nodes: int
input_scaling: float
input_bias: float
spatial_shape: tuple[int, ...]
system_variables: int
boundary_condition: str
parallelreservoirs_grid_shape: tuple[int, ...]
parallelreservoirs_ghosts: int
dimensionreduction_fraction: float
training_includeinput: bool
training_regularization: float
training_output_bias: float
identical_inputmatrix: bool
identical_adjacencymatrix: bool
identical_outputmatrix: bool | str | tuple[int, ...]
# maybe deprecated, not used yet
# training_overwrite: bool
# dtype: str
# deprecated_trainonly: bool
[docs]
@classmethod
def from_config(cls, conf: Config, job_idx: int, sub_idx: int):
"""Make a ParallelReservoirsArguments object from a Config object.
This method generates a ParallelReservoirsArguments object based on the provided Config object.
The Config object should have keys that match the names defined in this class.
Args:
conf (Config): The Config object corresponding to the YAML for the parameter scan.
idx (int): The index of the current parameter set.
Returns:
ParallelReservoirsArguments: The generated ParallelReservoirsArguments object.
"""
parameters = conf.param_scan_list()[job_idx][sub_idx]
if isinstance(parameters["identical_outputmatrix"], list):
parameters["identical_outputmatrix"] = tuple(
parameters["identical_outputmatrix"]
)
pass
parameters["spatial_shape"] = tuple(parameters["spatial_shape"])
parameters["parallelreservoirs_grid_shape"] = tuple(
parameters["parallelreservoirs_grid_shape"]
)
del parameters["training_data_index"]
return cls(**parameters)
[docs]
@classmethod
def from_dict(cls, input_dict: dict):
"""Generate parameters from dictionary
This function supports passing of a dictionary that contains more than the
needed keys. In this case any additional information is simply ignored.
Args:
input_dict:
dictionary containing at least all the needed keys for the
initialiser.
"""
class_fields = {f.name for f in fields(cls)}
if isinstance(input_dict["identical_outputmatrix"], list):
input_dict["identical_outputmatrix"] = tuple(
input_dict["identical_outputmatrix"]
)
input_dict["spatial_shape"] = tuple(input_dict["spatial_shape"])
if isinstance(input_dict["parallelreservoirs_grid_shape"], int):
input_dict["parallelreservoirs_grid_shape"] = tuple(
[input_dict["parallelreservoirs_grid_shape"]]
)
else:
input_dict["parallelreservoirs_grid_shape"] = tuple(
input_dict["parallelreservoirs_grid_shape"]
)
return cls(**{k: v for k, v in input_dict.items() if k in class_fields})
[docs]
class ParallelReservoirsBase(ABC):
"""Base class for parallel reservoir applications. Implements the use of multiple Reservoir Computers in parallel to predict high dimensional (spatially extended) systems.
Parallel is to be understood in terms of domain splitting of the input data."""
def __init__(
self,
args: ParallelReservoirsArguments,
**kwargs,
):
"""
- Base class: Set up reservoir and parallel-reservoir parameters, initialize reservoirs and slices.
Args:
args (ParallelReservoirsArguments):
All needed arguments in a dataclass object.
"""
logger.log(loglevel_info_multiple_run, "Initializing ParallelReservoirsBase.")
# set data input parameters
self.spatial_shape = args.spatial_shape
self.system_variables = args.system_variables
self.boundary_condition = args.boundary_condition
# set reservoir parameters
self.adj_degree = args.adjacency_degree
self.adj_dense = args.adjacency_dense
self.adj_spectralrad = args.adjacency_spectralradius
self.res_leakage = args.reservoir_leakage
self.res_nodes = args.reservoir_nodes
self.input_scaling = args.input_scaling
self.input_bias = args.input_bias
self.output_bias = args.training_output_bias
# set multi-reservoir parameters
self.rc_grid_shape = args.parallelreservoirs_grid_shape
self.rc_ghosts = args.parallelreservoirs_ghosts
# set dimensionality reduction parameters
self.dr_fraction = args.dimensionreduction_fraction
# set training parameters
self.train_includeinput = args.training_includeinput
self.train_regularization = args.training_regularization
# how many matrices are used
self.identical_input = args.identical_inputmatrix
self.identical_adjacencymatrix = args.identical_adjacencymatrix
self.identical_output = args.identical_outputmatrix
# check if identical matrices are set correctly
if isinstance(self.identical_output, (tuple, str)):
if not self.identical_adjacencymatrix:
self.identical_adjacencymatrix = True
logger.warning(
"Identical output matrix requires identical adjacency matrix. Identical adjacency matrix is set to True."
)
if not self.identical_input:
self.identical_input = True
logger.warning(
"Identical output matrix requires identical input matrix. Identical input matrix is set to True."
)
# deprecated parameters (just in case)
# self.dtype = args.dtype
# self.deprecated_trainonly = args.deprecated_trainonly
# self.train_overwrite = args.training_overwrite
# Calculate the shape of the domain on which each reservoir predicts. To this shape the reservoirs output will be reshaped. Data of this shape is transformed and dimension reduced to create the input.
self.res_domain_size = np.append(
[self.system_variables],
np.array(self.spatial_shape) // np.array(self.rc_grid_shape)
+ 2 * self.rc_ghosts,
)
# res_inpu_length depends on dimensionreduction and is set in the derived classes
self.res_output_length = np.prod(self.res_domain_size)
self.res_input_length = self._get_input_length()
logger.log(
loglevel_debug_parallelreservoirs,
f"ParallelReservoir.__init__\t\t\tReservoir output shape: {self.res_domain_size}, Reservoir input length: {self.res_input_length}",
)
# TODO write seed somewhere!
self.seed = np.random.randint(0, 2**32 - 1)
self.reservoirs = self._initialize_reservoirs(seed=self.seed)
# set up slices
## one prediction slice for all reservoirs
if self.rc_ghosts == 0:
self.remove_ghosts = tuple(
slice(None) for _ in range(len(self.rc_grid_shape) + 2)
)
else:
self.remove_ghosts = (slice(None), slice(None)) + tuple(
slice(self.rc_ghosts, -self.rc_ghosts)
for _ in range(len(self.rc_grid_shape))
)
## one input slice for each reservoir: list of slicing objects
self.reservoir_slices = self._initialize_reservoir_slices()
logger.log(loglevel_info_multiple_run, "Done.")
[docs]
def train(
self,
input_training_data: np.ndarray,
output_training_data: np.ndarray,
transient_steps: int = 0,
) -> None:
"""Train the output matrix of all parallel reservoirs using ridge regression.
Hereby it is differentiated between different methods:
1. Each reservoir is trained individually. Works for arbirary systems.
2. One reservoir (used for the predictions of all domains) is trained on one domain only. Works only for homogenious systems.
3. One reservoir (used for the predictions of all domains) is trained on all domains by combigning the data. Works only for homogenious systems.
Args:
input_training_data: np.ndarray[float]
input data for training, without bouncary condition ghostcells. Data needs to be of shape :code:`(time_steps, variables, *spatial_shape)`.
output_training_data: np.ndarray[float]
output data for training, without bouncary condition ghostcells. Data needs to be of shape :code:`(time_steps - transient_steps, variables, *spatial_shape)`.
transient_steps: int, optional
number of transient steps to be used for training (default: 0)
Notes:
The training method includes a transient phase, where the reservoirs are driven by the input data without using the results for the training.
"""
logger.log(loglevel_info_multiple_run, f"Training parallel reservoirs.")
logger.log(
loglevel_debug_parallelreservoirs,
f"ParallelReservoir.train\t\t\t\tTraining input, without ghostcells shape: {input_training_data.shape}",
)
logger.log(
loglevel_debug_parallelreservoirs,
f"ParallelReservoir.train\t\t\t\tTraining output, without ghostcells shape: {output_training_data.shape}",
)
input_training_data = self._boundary_condition(
input_training_data,
add_ghostcells=True,
)
output_training_data = self._boundary_condition(
output_training_data,
add_ghostcells=True,
)
logger.log(
loglevel_debug_parallelreservoirs,
f"ParallelReservoir.train\t\t\t\tTraining input, with ghost cells shape: {input_training_data.shape}",
)
logger.log(
loglevel_debug_parallelreservoirs,
f"ParallelReservoir.train\t\t\t\tTraining output, with ghosts cells shape: {output_training_data.shape}",
)
# Test data shapes
if len(input_training_data.shape) != 2 + len(self.spatial_shape) or len(
output_training_data.shape
) != 2 + len(self.spatial_shape):
raise ValueError(
f"Data must have shape (time_steps, variables, *spatial_shape), but have shape input_shape = {input_training_data.shape} and output_shape = {output_training_data.shape}."
)
if (
input_training_data.shape[0]
!= output_training_data.shape[0] + transient_steps
):
raise ValueError(
f"Input and output training data must have the same length (only input is supposed to include transient). But input is of shape {input_training_data.shape}, output is of shape {output_training_data.shape} and transient_steps is {transient_steps}."
)
# Test training specifying variables:
if isinstance(self.identical_output, tuple):
if np.any(self.identical_output >= self.rc_grid_shape):
raise ValueError(
f"Identical output reservoir index {self.identical_output} is out of bounds. Number of reservoirs is {np.prod(self.rc_grid_shape)}."
)
# distinguish between different trainings
if self.identical_output is False:
# train all reservoirs individually
for i, res in enumerate(self.reservoirs):
res.train(
input=self._transform_data(
input_training_data[self.reservoir_slices[i]],
fraction=self.dr_fraction,
),
output=self._transform_data(
output_training_data[self.reservoir_slices[i]], fraction=1
),
transient_steps=transient_steps,
)
elif isinstance(self.identical_output, tuple):
# train one reservoir on one domain
idx = np.ravel_multi_index(self.identical_output, self.rc_grid_shape)
self.reservoirs[0].train(
input=self._transform_data(
input_training_data[self.reservoir_slices[idx]],
fraction=self.dr_fraction,
),
output=self._transform_data(
output_training_data[self.reservoir_slices[idx]], fraction=1
),
transient_steps=transient_steps,
)
elif self.identical_output == "combine_data":
# train one reservoir on all domains
## stack spatial data to a list of trainingdatasets
input_data = [
self._transform_data(
input_training_data[self.reservoir_slices[i]],
fraction=self.dr_fraction,
)
for i, _ in enumerate(self.reservoirs)
]
output_data = [
self._transform_data(
output_training_data[self.reservoir_slices[i]], fraction=1
)
for i, _ in enumerate(self.reservoirs)
]
# train the first reservoir on all datasets
self.reservoirs[0].train_on_multiple_datasets(
inputs=input_data, outputs=output_data, transient_steps=transient_steps
)
else:
raise ValueError(
f"Identical output matrix needs to be False, tuple or 'combine_data', but is {self.identical_output}."
)
logger.log(loglevel_info_multiple_run, f"Done.")
[docs]
def iterative_predict(
self, initial, max_steps, supervision_data=None, **kwargs
) -> tuple[np.ndarray, np.ndarray | None, int]:
"""Iteratively predict a time series.
Args:
initial (np.ndarray[float]):
Initial condition for the time series prediction.
max_steps (int):
Maximum number of steps to predict.
supervision_data (np.ndarray[float], optional):
Supervision data for evaluating the prediction.
Defaults to None.
**kwargs:
Additional keyword arguments for evaluating the prediction:
Including an error function :code:`error_function` of {'NRMSE'},
the (temporal) mean of the norm of the data :code:`mean_norm`,
a threshhold value for the error function :code:`error_stop`,
and the number of extra steps to predict after the error threshold is exceeded :code:`extra_steps`.
Returns:
tuple[np.ndarray, np.ndarray | None, int]:
A tuple containing the predicted time series,
the prediction errors (if supervision data is provided, else None),
and the number of steps predicted.
Notes: Before the prediction the parallel reservoir states need to be adjusted to the state of the system to be predicted by using :code:`reservoir_transient`.
Warning: This function with supervision data is not tested nor fully implemented yet.
"""
logger.log(loglevel_info_single_run, f"Iteratively predicting time series.")
# add ghostcells to initial data to fullfill boundary condition, and create prediction place holder
initial = self._boundary_condition(initial, add_ghostcells=True)
prediction = np.empty((max_steps, *initial.shape[1:]))
logger.log(
loglevel_debug_parallelreservoirs,
f"ParallelReservoir.iterative_predict\t\tInitial shape: {initial.shape}",
)
logger.log(
loglevel_debug_parallelreservoirs,
f"ParallelReservoir.iterative_predict\t\tPrediction shape: {prediction.shape}",
)
# Decide whether to predict with supervision data or not, if not max_steps time steps are predicted, else the _evaluate_one_step function is used to stop the prediction
if supervision_data is None:
for t in range(max_steps):
for i, res in enumerate(self.reservoirs):
prediction[t : t + 1][self.reservoir_slices[i]][
self.remove_ghosts
] = self._inv_transform_data(
res.predict_single_step(
self._transform_data(
initial[self.reservoir_slices[i]],
fraction=self.dr_fraction,
)
)
)[
self.remove_ghosts
]
initial = self._boundary_condition(prediction[t : t + 1])
# return inner prediction/ neglect ghostcells
logger.log(loglevel_info_single_run, f"Done.")
return (prediction[self.remove_ghosts], None, max_steps)
else:
# TODO: This could be modified. I test here all arguments needed for evaluation of a prediction.
# TODO: The parameters are set as kwargs, because they dont belong to the class, but to this prediciton
# Checking arguments for iterative predction
try:
error_function = kwargs["error_function"]
error_stop = kwargs["error_stop"]
except KeyError:
raise ValueError(
"Errorfunction 'error_function' and threshhold value 'error_stop' need to be set to evaluate predictions."
)
# setting number of steps after threshold is exceeded
if "extra_steps" in kwargs:
extra_steps = kwargs["extra_steps"]
else:
extra_steps = 0 # or any default value
# setting error function
if error_function == "NRMSE":
try:
mean_norm = kwargs["mean_norm"]
except KeyError:
raise ValueError(
"Mean norm of data need to be set as keyword arguments to evaluate predictions."
)
else:
raise NotImplementedError(
"Only Normalized Root Mean Square Error 'NRMSE' is implemented so far."
)
# create variables
errors = np.zeros(max_steps)
good_prediction = True
# iteratively predict and measure error
t = 0
while t < max_steps and good_prediction:
for i, res in enumerate(self.reservoirs):
prediction[t : t + 1][self.reservoir_slices[i]][
self.remove_ghosts
] = self._inv_transform_data(
res.predict_single_step(
self._transform_data(
initial[self.reservoir_slices[i]],
fraction=self.dr_fraction,
)
)
)[
self.remove_ghosts
]
initial = self._boundary_condition(prediction[t : t + 1])
# measure error, prediction slice is used to compare only the inner part of the data/ neglect ghost cells
good_prediction, errors[t] = self._evaluate_one_step(
prediction[t : t + 1][self.remove_ghosts],
supervision_data[t],
errrofunction=error_function,
mean_norm=mean_norm,
error_stop=error_stop,
)
# TODO: Hier muessen noch parameter von _evaluate_one_step gesetzt werden, die nicht allgemein in der baseclass sein muessen: Kwargs?
# TODO: Es gibt hier auch noch das problem, dass eigentlich nur auf dem inneren, i.e. ohne ghostcells der fehler evaluiert werden solte
t += 1
t2 = t
while t2 < max_steps and t2 < t + extra_steps:
for i, res in enumerate(self.reservoirs):
prediction[t2 : t2 + 1][self.reservoir_slices[i]][
self.remove_ghosts
] = self._inv_transform_data(
res.predict_single_step(
self._transform_data(
initial[self.reservoir_slices[i]],
fraction=self.dr_fraction,
)
)
)[
self.remove_ghosts
]
initial = self._boundary_condition(prediction[t2 : t2 + 1])
# measure error, prediction slice is used to compare only the inner part of the data/ neglect ghost cells
_, errors[t2] = self._evaluate_one_step(
prediction[t2 : t2 + 1][self.remove_ghosts],
supervision_data[t2 : t2 + 1],
errrofunction=error_function,
mean_norm=mean_norm,
error_stop=error_stop,
)
# TODO: Hier muessen noch parameter von _evaluate_one_step gesetzt werden, die nicht allgemein in der baseclass sein muessen: Kwargs?
# TODO: Es gibt hier auch noch das problem, dass eigentlich nur auf dem inneren, i.e. ohne ghostcells der fehler evaluiert werden solte
t2 += 1
logger.log(loglevel_info_single_run, f"Done.")
return (prediction[self.remove_ghosts][:t2], errors[:t2], t)
[docs]
def reservoir_transient(
self, input, predict_on_transient=False
) -> None | np.ndarray:
"""Transient dynamics of the reservoirs. Updates the reservoir states and optionally predicts one step ahead on the transient dynamics, without reusing the predictions.
Args:
input: np.ndarray[float]
input data for the transient dynamics
predict_on_transient: bool
if True, one step-ahead predictions are performed on transient data and returned, else only the reservoir states are updated
"""
logger.log(
loglevel_info_single_run, f"Updating reservoir nodes on transient dynamics."
)
logger.log(
loglevel_debug_parallelreservoirs,
f"ParallelReservoir.reservoir_transient\t\tInput shape: {input.shape}",
)
# add ghostcells to input data to fullfill boundary condition
input = self._boundary_condition(
input,
add_ghostcells=True,
)
# destinguish, whether to predict on transient or not
if predict_on_transient:
prediction = np.empty(input.shape)
logger.log(
loglevel_debug_parallelreservoirs,
f"ParallelReservoir.reservoir_transient\t\tPrediction shape: {prediction.shape}",
)
# predict one step ahead on transient data, without reusing the predictions
## the order of the loops is irelevant, because the reservoirs are not interacting, but all time steps and all reservoirs need to be used
for t in range(input.shape[0]):
for i, res in enumerate(self.reservoirs):
prediction[t : t + 1][self.reservoir_slices[i]][
self.remove_ghosts
] = self._inv_transform_data(
res.predict_single_step(
self._transform_data(
input[t : t + 1][self.reservoir_slices[i]],
fraction=self.dr_fraction,
)
)
)[
self.remove_ghosts
]
logger.log(loglevel_info_single_run, f"Done.")
return prediction[
self.remove_ghosts
] # return prediction without ghostcells
else:
# Update the reservoir states on the transient data
# All time steps can be used at ones, because no interaction of the reservoirs is used
for i, res in enumerate(self.reservoirs):
for input_t in self._transform_data(
input[self.reservoir_slices[i]], fraction=self.dr_fraction
):
res.propagate_reservoir_state(input_t)
logger.log(loglevel_info_single_run, f"Done.")
return None
[docs]
def _initialize_reservoirs(self, seed: int) -> list[ReservoirComputer]:
"""Initialize the reservoirs for the parallel reservoirs using :code:`rc_grid_shape`, which specifies how many reservoirs in each dimension are used.
Args:
seed (int): Seed for the random number generator, used for initializing the reservoirs.
Returns:
list[ReservoirComputer]: The list of initialized reservoirs.
"""
np.random.seed(seed=seed)
return [
ReservoirComputer(
nodes=self.res_nodes,
degree=self.adj_degree,
leakage=self.res_leakage,
spectral_radius=self.adj_spectralrad,
input_scaling=self.input_scaling,
input_length=self.res_input_length,
input_bias=self.input_bias,
output_bias=self.output_bias,
regularization=self.train_regularization,
training_includeinput=self.train_includeinput,
identical_inputmatrix=self.identical_input,
identical_adjacencymatrix=self.identical_adjacencymatrix,
identical_outputmatrix=self.identical_output,
reset_matrices=(i == 0),
)
for i in range(np.prod(self.rc_grid_shape))
]
[docs]
def _initialize_reservoir_slices(self) -> list[tuple[slice]]:
r"""Prepare the reservoir slices for different reserovirs. First two dimensions are ignored, because they are :code:`(time_steps, variables)`/ not spatial.
In each spatial dimension of length :math:`dim`, the slices for reservoir number :math:`i` (w.r.t. this dimension) of total number of reservoirs :math:`n` is chosen to be :code:`slice(start : stop)`, where
.. math::
\text{start} = i \cdot \left\lfloor\frac{dim}{n}\right\rfloor\quad\text{and}\quad\text{stop} = (i+1) \cdot \left\lfloor\frac{dim}{n}\right\rfloor + 2 \cdot \text{ghosts}.
Returns:
list[tuple[slice]]: List of slices for each reservoir
"""
# TODO change name 'data_slices?'
# NOTE This only cares about spatial dimensions so far. Temporal and variable dimensions should be added here or need to be added in the application?
slices = []
# loop over all reservoirs
for i, res in enumerate(self.reservoirs):
mult_index = np.unravel_index(i, self.rc_grid_shape)
# create slice
slices.append(
(slice(None), slice(self.system_variables))
+ tuple(
slice(
mult_index[dim]
* (self.spatial_shape[dim] // self.rc_grid_shape[dim]),
(mult_index[dim] + 1)
* (self.spatial_shape[dim] // self.rc_grid_shape[dim])
+ 2 * self.rc_ghosts,
)
for dim in range(len(self.spatial_shape))
)
)
return slices
# TODO: JIT
[docs]
def _boundary_condition(
self,
data: np.ndarray,
add_ghostcells: bool = False,
) -> np.ndarray:
"""The function enforces boundary conditions on all spatial dimensions and returns the result.
Either boundary cells are used as ghost cells or ghost cells are added.
Ghost cell values depend on the boundary condition.
Args:
data (np.ndarray): Input data in region to be predicted. Shape is :code:`(time_steps, variables, *spatial_shape)`.
add_ghostcells (bool): If True, the boundary condition is fullfilled by adding ghostcells, else the outercells of the array are updated to fullfill the boundary condition.
Return:
np.ndarray: The extended array of size :code:`data.shape` or :code:`data.shape+2*window_size`, depending on :code:`add_ghostcells`.
Notes:
The boundary condition is applied to spatial dimensions only. Therefore, the first and second dimension of the input :code:`data` is ignored.
Attention:
This has not been tested for arbitrary dimensions. Only up to 2D.
Might be useful to be precompiled(Numba-JIT), it runs in every time step
"""
shape = np.array(data.shape)
if add_ghostcells:
shape[2:] += 2 * self.rc_ghosts
increased_data = np.empty(shape)
# interior
# ind = [slice(self.rc_ghosts, -self.rc_ghosts) for _ in range(len(shape))]
# ind[0], ind[1] = slice(shape[0]), slice(shape[1])
increased_data[self.remove_ghosts] = data
else:
increased_data = data
# boundary
if self.boundary_condition == "Periodic":
dim = 0
while dim < len(shape) - 2:
increased_data[:, :, : self.rc_ghosts] = increased_data[
:, :, -2 * self.rc_ghosts : -self.rc_ghosts
]
increased_data[
:, :, increased_data.shape[2] - self.rc_ghosts :
] = increased_data[:, :, self.rc_ghosts : 2 * self.rc_ghosts]
increased_data = np.moveaxis(increased_data, 2, -1)
dim += 1
elif self.boundary_condition == "NoFlux":
dim = 0
while dim < len(shape) - 2:
increased_data[:, :, : self.rc_ghosts] = increased_data[
:, :, 2 * self.rc_ghosts - 1 : self.rc_ghosts - 1 : -1
]
increased_data[
:, :, increased_data.shape[2] - self.rc_ghosts :
] = increased_data[
:, :, -self.rc_ghosts - 1 : -2 * self.rc_ghosts - 1 : -1
]
increased_data = np.moveaxis(increased_data, 2, -1)
dim += 1
return increased_data
# TODO: JIT
[docs]
@staticmethod
def _evaluate_one_step(
prediction: np.ndarray,
supervision_data: np.ndarray,
errrofunction="NRMSE",
mean_norm: float = 1,
error_stop: float = 1,
) -> tuple[bool, float]:
r"""Evaluate the prediction of one time step using the error function :code:`errorfunction`.
If :code:`errorfunction ="NRMSE"`, the normalized root mean square error
.. math::
\frac{\|\vec{u}(t)-\vec{u}^{\mathrm{true}}(t)\|_2}{\langle\|\vec{u}^{\mathrm{true}}(t)\|^2\rangle_{\mathrm{t}}^{1/2}}
is used.
Args:
prediction (np.ndarray):
Prediction of the system at one time step. Shape is :code:`(variables, *spatial_shape)`.
supervision_data (np.ndarray):
Supervision data for the prediction. Shape is :code:`(variables, *spatial_shape)`.
errorfunction (str):
Error function to evaluate the prediction. Only the nomalized root mean square error :code:`"NRMSE"` is implemented so far.
mean_norm (float):
Mean norm of the supervision data, is used to normalize the error root mean square error.
error_stop (float):
Threshold for the error function. Iterative predictions are stopped if the error is above the threshold.
Returns:
bool:
True if the error is below the threshold :code:`error_stop`, else False.
Warning: This method is not tested and might have errors!
"""
# print(prediction.shape, supervision_data.shape)
if errrofunction == "NRMSE":
error = float(np.linalg.norm(prediction - supervision_data) / mean_norm)
return bool(error_stop > error), error
else:
raise NotImplementedError(
"Only Normalized Root Mean Square Error 'NRMSE' is implemented so far."
)
[docs]
def save(self, filename: str) -> None:
"""Save a ParallelReservoir to pkl file
A possible use case of this is to conserve a trained multi-reservoir for later.
Args:
filename (str):
name of file to generate
Warning: This function might be depracated. Else, might need to be put to derived classes.
"""
pickle.dump(self, open(filename, "wb"))
[docs]
def load(self, filename: str) -> None:
"""Load a ParallelReservoir from pkl file
A possible use case of this is to load a trained model and perform further
predictions without having to do the training again.
Args:
filename (str):
name of file to read in
Warning: This function might be depracated. Else, might need to be put to derived classes.
"""
self = pickle.load(open(filename, "rb"))
[docs]
class ParallelReservoirs(ParallelReservoirsBase):
"""Use multiple Reservoir Computers in parallel to predict high dimensional systems without dimensionality reduction.
Parallel is to be understood in terms of domain splitting of the input data.
This class uses the base class :code:`ParallelReservoirsBase`, which initiates Reservoirs of the class :code:`ReservoirComputer` and handles the training and prediction of the parallel reservoirs.
**Initialization**:
- Only base class setup is done. No further initialization is needed.
"""
def __init__(
self,
*,
Parameter: ParallelReservoirsArguments,
**kwargs,
):
logger.log(loglevel_info_multiple_run, f"Setting up parallel reservoirs.")
# init base class
super().__init__(Parameter, **kwargs)
if self.dr_fraction != 1:
raise ValueError(
"Class ParallelReservoirs should not have dimension reduction, but got dr_fraction != 1."
)
logger.log(loglevel_info_multiple_run, f"Done.")
[docs]
class ParallelReservoirsFFT(ParallelReservoirsBase):
"""Use multiple Reservoir Computers in parallel to predict high dimensional systems using largest modes of an FFT for dimensionality reduction.
Parallel is to be understood in terms of domain splitting of the input data.
This class uses the base class :code:`ParallelReservoirsBase`, which initiates Reservoirs of the class :code:`ReservoirComputer` and handles the training and prediction of the parallel reservoirs.
"""
def __init__(
self,
*,
Parameter: ParallelReservoirsArguments,
prediction_model: str,
**kwargs,
):
"""
**Initialization**:
- Calculates or loads the largest FFT modes for the given datatype, i.e. model and parameters, and input and output dimension for each parallel Reservoir Computer.
- Base class: Set up reservoir and parallel-reservoir parameters, initialize reservoirs and slices.
Args:
args (ParallelReservoirsArguments):
All needed arguments in a dataclass object.
prediction_model (str):
Name of the model to be predicted. Used to load or choose FFT modes with largest amplitude.
**kwargs : dict
nr_fft_datasets (int):
The number of training data sets used to train the FFT. If not used all 10 training data sets are used.
This is useful for computations with not enough memory for the full training.
Warning:
The same modes are used everywhere and choosen on all training data sets and all domains.
Hence, the class only works if two conditions are met:
1. The predicted Systems are homogeneous.
2. All data sets need to be sampled from the same attractor.
"""
super().__init__(Parameter, **kwargs)
# precompute tuple of spatial axes
self.spatial_axes = tuple(2 + np.arange(len(self.spatial_shape)))
# init list of indices of largest modes (will be overwritten with ordered list, decreasing)
## The initialization here is needed to run the transform function, to calculate the modes
self.largest_modes = tuple(range(self.res_output_length))
filename = prediction_model + "_" + str(self.res_domain_size)
fft_path = str(Config.get_git_root()) + "/Data/FFT/"
if "nr_fft_datasets" in kwargs:
if kwargs["nr_fft_datasets"] > 10:
raise ValueError(
"Number of training data sets for PCA can be 10 at max."
)
nr_trainings = kwargs["nr_fft_datasets"]
filename += f"_trainedon{nr_trainings}"
else:
nr_trainings = 10
temp_skip = 2
if paths.isfile(fft_path + filename + ".pkl"):
logger.log(
loglevel_info_multiple_run,
f"Loading FFT modes for {prediction_model} with domain size {self.res_domain_size} from {fft_path+filename}.",
)
self.largest_modes = pickle.load(open(fft_path + filename + ".pkl", "rb"))
else:
logger.log(
loglevel_info_multiple_run,
f"Choosing FFT modes for {prediction_model} with domain size {self.res_domain_size}.",
)
if prediction_model == "1D_KuramotoSivashinsky":
max_len = 80000 // (len(self.reservoirs))
logger.log(
loglevel_info_multiple_run,
f"Gather triningdata.",
)
fft_data = np.concatenate(
[
self._boundary_condition(
data=np.load(
str(Config.get_git_root())
+ f"/Data/{prediction_model}/TrainingData{i}.npy"
)[:max_len:temp_skip, np.newaxis, :],
add_ghostcells=True,
)
for i in range(nr_trainings)
],
axis=0,
)
fft_data = np.concatenate(
[
fft_data[self.reservoir_slices[i]]
for i in range(len(self.reservoir_slices))
],
axis=0,
)
elif prediction_model == "2D_AlievPanfilov":
max_len = 20000 // (len(self.reservoirs))
logger.log(
loglevel_info_multiple_run,
f"Gather triningdata.",
)
fft_data = np.concatenate(
[
self._boundary_condition(
data=np.load(
str(Config.get_git_root())
+ f"/Data/{prediction_model}/TrainingData{i}.npz"
)["vars"][:max_len:temp_skip, : self.system_variables],
add_ghostcells=True,
)
for i in range(nr_trainings)
],
axis=0,
)
fft_data = np.concatenate(
[
fft_data[self.reservoir_slices[i]]
for i in range(len(self.reservoir_slices))
],
axis=0,
)
else:
raise FileNotFoundError(
f"Training data for {prediction_model} not found."
)
logger.log(
loglevel_info_multiple_run,
f"Choosing FFT modes for domain size {self.res_domain_size} with {fft_data.shape[0]} Samples/ timesteps.",
)
# select temporal max of largest modes
self.largest_modes = np.argsort(
np.amax(self._transform_data(fft_data, 1), axis=0)
)[::-1]
logger.log(
loglevel_info_multiple_run,
f"Saving FFT modes for {prediction_model} with domain size {self.res_domain_size} at {fft_path+filename}.",
)
pickle.dump(self.largest_modes, open(fft_path + filename + ".pkl", "wb"))
# create object to invert the ordering of the largest modes
self.invert_largest_modes = np.arange(len(self.largest_modes))
np.put(
self.invert_largest_modes,
self.largest_modes,
np.arange(self.largest_modes.size),
)
self.inv_largest_modes = np.argsort(self.largest_modes)
# save an ordered list of indices of the largest modes
# take only self.dimensionreduction_fraction of this ordered list --> cookie cutter for transform
[docs]
class ParallelReservoirsPCA(ParallelReservoirsBase):
"""Use multiple Reservoir Computers in parallel to predict high dimensional systems with using a PCA for dimensionality reduction.
Parallel is to be understood in terms of domain splitting of the input data.
This class uses the base class :code:`ParallelReservoirsBase`, which initiates Reservoirs of the class :code:`ReservoirComputer` and handles the training and prediction of the parallel reservoirs.
"""
def __init__(
self,
*,
Parameter: ParallelReservoirsArguments,
prediction_model: str,
**kwargs,
):
r"""
**Initialization**:
- Trains or loads the PCA object for the given datatype, i.e. model and parameters, and output dimension for each parallel Reservoir Computer.
Due to memory constrains, the pca-training data is taken from 10 training data sets and only each second time step is used. In addition for multiple parallel reservoirs the training data is sampled from all domains. In this case, the number of timesteps per domain is shortened (devided by number of domains) to keep the number of timesteps independet of the number of parallel reservoirs.
Computational Complexity: :math:`\mathcal{O}(n\cdot d^2)`, where :math:`n` is the number of samples and :math:`d` is the number of dimensions.
- Base class: Set up reservoir and parallel-reservoir parameters, initialize reservoirs and slices.
Args:
args (ParallelReservoirsArguments):
All needed arguments in a dataclass object.
prediction_model (str):
Name of the model to be predicted. Used to load or train the PCA object.
**kwargs : dict
nr_pca_trainings (int):
The number of training data sets used to train the PCA. If not used all 10 training data sets are used.
This is useful for computations with not enough memory for the full training.
Warning:
The same PCA is used everywhere and trained on all training data sets and all domains.
Hence, the class only works if two conditions are met:
1. The predicted Systems are homogeneous.
2. All data sets need to be sampled from the same attractor.
Warining:
Training PCAs with many parallel reservoirs and large ghost cells can be memory intensive.
"""
# init base class
super().__init__(args=Parameter)
filename = prediction_model + "_" + str(self.res_domain_size)
pca_path = str(Config.get_git_root()) + "/Data/PCA/"
if "nr_pca_trainings" in kwargs:
if kwargs["nr_pca_trainings"] > 10:
raise ValueError(
"Number of training data sets for PCA can be 10 at max."
)
nr_trainings = kwargs["nr_pca_trainings"]
filename += f"_trainedon{nr_trainings}"
else:
nr_trainings = 10
temp_skip = 2
if paths.isfile(pca_path + filename + ".pkl"):
logger.log(
loglevel_info_multiple_run,
f"Loading PCA for {prediction_model} with domain size {self.res_domain_size} from {pca_path+filename}.",
)
self.pca = pickle.load(open(pca_path + filename + ".pkl", "rb"))
else:
logger.log(
loglevel_info_multiple_run,
f"Training PCA for {prediction_model} with domain size {self.res_domain_size}.",
)
if prediction_model == "1D_KuramotoSivashinsky":
max_len = 80000 // len(self.reservoirs)
logger.log(
loglevel_info_multiple_run,
f"Gather pca triningdata.",
)
pca_trainingdata = np.concatenate(
[
self._boundary_condition(
data=np.load(
str(Config.get_git_root())
+ f"/Data/{prediction_model}/TrainingData{i}.npy"
)[:max_len:temp_skip, np.newaxis, :],
add_ghostcells=True,
)
for i in range(nr_trainings)
],
axis=0,
)
pca_trainingdata = np.concatenate(
[
pca_trainingdata[self.reservoir_slices[i]]
for i in range(len(self.reservoir_slices))
],
axis=0,
)
elif prediction_model == "2D_AlievPanfilov":
max_len = 20000 // (len(self.reservoirs))
logger.log(
loglevel_info_multiple_run,
f"Gather pca triningdata.",
)
pca_trainingdata = np.concatenate(
[
self._boundary_condition(
data=np.load(
str(Config.get_git_root())
+ f"/Data/{prediction_model}/TrainingData{i}.npz"
)["vars"][:max_len:temp_skip, : self.system_variables],
add_ghostcells=True,
)
for i in range(nr_trainings)
],
axis=0,
)
pca_trainingdata = np.concatenate(
[
pca_trainingdata[self.reservoir_slices[i]]
for i in range(len(self.reservoir_slices))
],
axis=0,
)
else:
raise FileNotFoundError(
f"Training data for {prediction_model} not found."
)
logger.log(
loglevel_info_multiple_run,
f"Training PCA for domain size {self.res_domain_size} with {pca_trainingdata.shape[0]} Samples.",
)
self.pca = PCA()
self.pca.fit(pca_trainingdata.reshape(pca_trainingdata.shape[0], -1))
logger.log(
loglevel_info_multiple_run,
f"Saving PCA for {prediction_model} with domain size {self.res_domain_size} at {pca_path+filename+'.pkl'}.",
)
pickle.dump(self.pca, open(pca_path + filename + ".pkl", "wb"))