"""
Wrapper class for YAML configurations.
Use logger.INFO_1RUN for verbose output of this module's functionality.
.. codeauthor:: Gerrit Wellecke
"""
import itertools
import logging as log
import sys
import warnings
from pathlib import Path
from shutil import copy
import h5py
import numpy as np
import yaml
from git import Repo
from more_itertools import divide
from drrc.tools.logger_config import drrc_logger as logger
CLUSTER_CORES = 32
# get logger levels
loglevel_info_single_run = log.getLevelName("INFO_1RUN")
loglevel_info_multiple_run = log.getLevelName("INFO_nRUN")
[docs]
class IndentDumper(yaml.Dumper):
"""YAML dumper needed for correct printing of config"""
def increase_indent(self, flow=False, indentless=False):
return super(IndentDumper, self).increase_indent(flow, indentless)
[docs]
class Config:
"""Configuration of a system from a YAML file"""
def __init__(self, proj_path: Path):
"""Initializes the config from a path
Args:
proj_path (str): absolute path from the git repository's root
"""
self.path = Path(proj_path)
self.dict = self.parse_YAML()
self.name = self.path.stem
self.max_length = self["Jobscript"]["max_job_count"]
@property
def path(self):
"""pathlib.Path: Absolute path to the configuration YAML file for the current
system."""
return self._path
@path.setter
def path(self, proj_path: Path):
"""prepend git root"""
self._path = self.get_git_root() / proj_path.absolute()
@property
def max_length(self):
"""int: Maximum length of param_scan_list"""
return self._max_length
@max_length.setter
def max_length(self, min_job_count):
"""Check if each job runs multiple parallel tasks"""
try:
self._max_length = min_job_count * self["Jobscript"]["tasks_per_job"]
except:
self._max_length = min_job_count
[docs]
@staticmethod
def get_git_root() -> Path:
"""Get root of the current git repository
Returns:
:class:`pathlib.Path`: absolute path to the git-root for the current system
Warning:
This does not really belong in this class.
"""
path = Path(
Repo(".", search_parent_directories=True).git.rev_parse("--show-toplevel")
).absolute()
return path
[docs]
def get_data_dir(self) -> Path:
"""Get absolute path to the current systems data directory.
Returns:
:class:`pathlib.Path`: absolute path to the data directory for the current system
"""
return self.get_git_root() / Path(
f"Data/{self['Data']['model_dimension']}D_{self['Data']['model_name']}/"
)
[docs]
def load_training_dataset(self, index: int) -> np.ndarray:
"""Load and return the training data.
Args:
index: The index of the training data set to load.
Returns:
One training data set with minimal temporal length needed for iterative timeseries predictions, i.e. :code:`temporal_length=(transient_steps + training_steps + 1)`.
Warning:
This function should be modified to load only the amount of variables needed, to save memory.
"""
logger.log(loglevel_info_single_run, "Loading training data.")
transient_steps = int(
self["Data"]["Usage"]["tansient_length"]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
)
training_steps = int(
self["Data"]["Usage"]["training_length"]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
)
# Check if the number of training steps is larger than the length of the training data
if transient_steps + training_steps + 1 > (
self["Data"]["Creation"]["TemporalParameter"]["training_length"]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
):
training_steps = (
(
self["Data"]["Creation"]["TemporalParameter"]["training_length"]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
)
- transient_steps
- 1
)
logger.warning(
f"Transient steps + training steps + 1 is larger than the length of training data. Reducing the number of training steps to {training_steps} (the largest possible)."
)
# Load the training data
if self["Data"]["Usage"]["fileformat"] == ".npz":
training_data = np.load(
self.get_data_dir() / Path(f"TrainingData{index}.npz")
)["vars"][: (transient_steps + training_steps + 1)]
elif self["Data"]["Usage"]["fileformat"] == ".npy":
training_data = np.load(
self.get_data_dir() / Path(f"TrainingData{index}.npy")
)[: (transient_steps + training_steps + 1), np.newaxis, :]
else:
raise ValueError(
f"config['Data']['Usage']['fileformat'] needs to be either '.npz' or '.npy' but is {self['Data']['Usage']['fileformat']}"
)
logger.log(loglevel_info_single_run, "Done.")
return training_data
[docs]
def load_evalaluation_datasets(self) -> list[np.ndarray]:
"""Load and return the evaluation data.
Args:
config: The configuration object.
Returns:
A list of numpy arrays containing the first evaluation data sets.
Warning:
This function should be modified to load only the amount of variables needed, to save memory.
"""
logger.log(loglevel_info_single_run, "Loading evaluation data.")
transient_steps = int(
self["Data"]["Usage"]["tansient_length"]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
)
evaluation_steps = int(
self["Data"]["Usage"]["evaluation_length"]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
)
# Check if the number of evaluation steps is larger than the length of the evaluation data
if self["Data"]["model_name"] == "AlievPanfilov":
if transient_steps + evaluation_steps + 1 > (
self["Data"]["Creation"]["TemporalParameter"]["evaluation_length"]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
):
training_steps = (
(
self["Data"]["Creation"]["TemporalParameter"][
"evaluation_length"
]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
)
- transient_steps
- 1
)
logger.warning(
f"Transient steps + training steps + 1 is larger than the length of training data. Reducing the number of training steps to {training_steps} (the largest possible)."
)
elif self["Data"]["model_name"] == "KuramotoSivashinsky":
if transient_steps + evaluation_steps + 1 > (
self["Data"]["Creation"]["TemporalParameter"]["evaluation_length"]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
):
training_steps = (
(
self["Data"]["Creation"]["TemporalParameter"][
"evaluation_length"
]
// self["Data"]["Creation"]["TemporalParameter"]["dt"]
)
- transient_steps
- 1
)
logger.warning(
f"Transient steps + training steps + 1 is larger than the length of training data. Reducing the number of training steps to {training_steps} (the largest possible)."
)
# Load the evaluation data
if self["Data"]["Usage"]["fileformat"] == ".npz":
eval_data = [
np.load(self.get_data_dir() / Path(f"EvaluationData{i}.npz"))["vars"][
: transient_steps + evaluation_steps + 1
]
for i in range(self["Data"]["Usage"]["evaluation_datasets"])
]
elif self["Data"]["Usage"]["fileformat"] == ".npy":
eval_data = [
np.load(self.get_data_dir() / Path(f"EvaluationData{i}.npy"))[
: transient_steps + evaluation_steps + 1, np.newaxis, :
]
for i in range(self["Data"]["Usage"]["evaluation_datasets"])
]
else:
raise ValueError(
f"config['Data']['Usage']['fileformat'] needs to be either '.npz' or '.npy' but is {self['Data']['Usage']['fileformat']}"
)
logger.log(loglevel_info_single_run, "Done.")
return eval_data
[docs]
def parse_YAML(self) -> dict:
"""Parse the config.yml file to get systems parameters
Returns:
:class:`dict`: representation of the config as given in the YAML file
"""
with open(self.path, "r") as file:
params = yaml.safe_load(file)
return params
def __str__(self) -> str:
"""Return configuration as string, e.g. for printing"""
conf_file = yaml.dump(self.dict, Dumper=IndentDumper, sort_keys=False)
return conf_file
def __getitem__(self, key: str):
"""Subscript access to config dict params"""
return self.dict[key]
[docs]
def param_scan_list(self) -> list[list[dict]]:
r"""Return set of parameters for a cluster run
Returns:
list of lists of dictionaries or dictionary, where each sublist is to be
understood as a single job in a job array
Note:
In order to run a simulation as a parameter scan, supply the Config YAML
with the following block:
.. code:: YAML
ParamScan:
A:
- "range"
- [3]
B:
- "range"
- [5, 10]
C:
- "range"
- [0, 100, 10]
D:
- "list"
- [1, 5, 13]
# useful shorthand for the above config:
ParamScan:
A: ["range", [3]]
B: ["range", [5, 10]]
C: ["range", [0, 100, 10]]
D: ["list", [1, 5, 13]]
The arguments to the parameters must be lists where the first entry
specifies the type of parameters to expect and the second specifies the
values.
If the specified as :code:`"list"` then the values will simply be taken as
specified.
If instead specified as :code:`"range"` the values are passed to
:func:`numpy.arange` by list unpacking, e.g. :code:`np.arange(*B)`.
Similar commands with :code:`linspace, geomspace` are available, then the
specified values would have to be :code:`start, stop, number`.
The above example would mean that
.. math::
A \in \{0, 1, 2\} \,,
B \in \{5, 6, 7, 8, 9\} \,,
C \in \{0, 10, 20, \ldots, 90\} \,,
D \in \{1, 5, 13\} \,.
A job will then be started for each permutation of :math:`A,B,C,D`.
If you intend to run a single execution of a fixed set of parameters, you
may set the following within the YAML
.. code:: YAML
ParamScan: null
"""
param_dict = {}
# parse constant variables
for parameter, value in self["Parameters"].items():
param_dict[parameter] = [value]
# if no ParamScan parameters are defined simply run a single execution
if self["ParamScan"] is None:
return [[self["Parameters"]]]
# parse parameter scan variables
for parameter, specs in self["ParamScan"].items():
param_dict.pop(parameter, None)
if specs[0] == "list":
param_dict[parameter] = specs[1]
elif specs[0] == "range":
param_dict[parameter] = np.arange(*specs[1])
elif specs[0] == "linspace":
param_dict[parameter] = np.linspace(*specs[1])
elif specs[0] == "geomspace":
param_dict[parameter] = np.geomspace(*specs[1])
else:
raise ValueError(f"Must specify the type of parameter {parameter}")
# generate all permutations of parameter scan values and write a list of
# parameters wherein each entry is a single job of the simulation
workload_permutations = itertools.product(*param_dict.values())
workload_length = len(list(workload_permutations))
# ensure that no sublist is empty
if workload_length < self.max_length:
self.max_length = workload_length
# get a list of sublists, each being one job on the cluster
params = [
[dict(zip(param_dict.keys(), v)) for v in sublist]
for sublist in divide(
self.max_length, itertools.product(*param_dict.values())
)
]
return params
[docs]
def param_scan_len(self) -> int:
"""Total iterations of a cluster run
Warning:
This is **deprecated**! Use :code:`len(conf.param_scan_list())` instead
"""
warnings.warn("use len(Config.param_scan_list()) instead", DeprecationWarning)
return len(self.param_scan_list())
[docs]
def jobscript_datadir(self, output_type: str) -> Path:
"""Path to raw data as defined in YAML
The cluster will expect data to be within a YAML format like such:
args:
output_type: The output type (:codes: `'ValidTimes', 'RunTimes', 'Memory'`), which defines the folder to be written into.
.. code:: yaml
Saving:
OutputDirectory: "path/to/data"
"""
if not (output_type in ["ValidTimes", "RunTimes", "Memory"]):
raise ValueError(
f"outputs must be one of 'ValidTimes', 'RunTimes', 'Memory', but is {output_type}"
)
outpath = (
Path(f"Data/{output_type}")
/ Path(f"{self['Data']['model_dimension']}D_{self['Data']['model_name']}")
/ self.name
)
return self.get_git_root() / Path(outpath)
[docs]
def make_jobscript_datadir(
self, *, output_type: str, copy_yaml: bool = False
) -> None:
"""Create datadir as specified in Config if it doesn't exist
Args:
output_type (str): The output type (:codes: `'ValidTimes', 'RunTimes', 'Memory'`), which defines the folder to be written into.
copy_yaml: If set to True, the YAML will be copied to the directory created
Warning:
If the directory already exists, an error will be raised to avoid
overwriting previous data.
The recommended procedure is to either delete old data or rename the YAML
such that new data is written in a new directory.
"""
if not (output_type in ["ValidTimes", "RunTimes", "Memory"]):
raise ValueError(
f"outputs must be one of 'ValidTimes', 'RunTimes', 'Memory', but is {output_type}"
)
logger.log(
loglevel_info_multiple_run,
f"Generating ouput directory ({self.jobscript_datadir(output_type)})...",
)
self.jobscript_datadir(output_type).mkdir(parents=True, exist_ok=False)
if copy_yaml:
new_path = self.jobscript_datadir(output_type) / self.path.name
copy(self.path, new_path)
[docs]
def generate_submission_script_from_YAML(
self, *, output_type: str, template: Path | None = None
) -> Path:
r"""Creates a shell script that can then be passed to qsub based on the
information contained in the Config.
Args:
output_type (str): The output type (:code:`'ValidTimes', 'RunTimes', 'Memory'`), which defines the folder to be written into.
template (:class:`pathlib.Path` or None):
Optional path to a different template than
:code:`drrc/templates/qsub.template`
Note:
For this to work, the configuration YAML must contain the following block:
.. code:: YAML
Jobscript:
# for qsub
Type: "qsub" # specify to run at MPI-DS
Cores: 4 # number of cores per job (should be divisor of 32)
max_job_count: 1000 # array job will have 1000 jobs
# optional:
force_parallel_queue: False # force job to run on teutates-mvapich2.q (optional)
# for slurm
Type: "sbatch" # specify submission command (template must have matching name!)
max_job_count: 2000 # 2000 jobs will be submitted as an array
tasks_per_job: 4 # each job will have 4 job steps
cores_per_task: 4 # each job step will use 4 cores
mem_per_task: 24 # and 24GB of memory
cluster: "GWDG" # use specific cluster options
time: "00:05:00" # each task will run at most 5 minutes
This will create a parallel jobs with 4 cores per task.
On qsub this will always fill nodes of 32 cores.
When using SLURM this additionally allows to set how many tasks should be
run in each job (potentially allowing for smaller jobs / faster queueing).
The resulting shell script will always be placed in the data directory as
returned by :func:`Config.jobscript_datadir` to ensure data is kept with the
submission script.
Warning:
There seems to be some configuration in place for :code:`cluster: "raven"`
and :code:`cluster: "viper"` such that total memory must always be defined.
I'm not quite sure yet how to write scripts for that.
**So right now this only works @GWDG!**
Important:
**If using** :code:`Type: "qsub"`:
Currently this is set up such that a single cluster node of 32 cores will
receive as many jobs as it can fit. For optimal use of the cluster
:code:`Cores` should be a divisor of 32.
When choosing this, keep in mind that per cluster node there are 192GB RAM.
If :code:`Cores` is set to 1, this function assumes that the job will be
submitted to the serial cluster and thus adapt the submission script.
However, one may set the optional argument
:code:`force_parallel_queue: True` to run 32 single-core jobs per node in
the parallel queue.
Jobs are submitted using:
.. code:: bash
qsub Submit-job.sh
**If using** :code:`Type: "slurm"`:
In this case :code:`RAM, partition` must be defined in the YAML.
Note that this is the RAM per CPU core. So the in the above example SLURM
will allocate :math:`4\cdot12\mathrm{GB}=48\mathrm{GB}` of RAM.
The job can then be submitted using
.. code:: bash
sbatch Submit-job.sh
"""
from jinja2 import Template
logger.log(loglevel_info_single_run, "Generating submission script...")
script_name = f"{self.jobscript_datadir(output_type)}/Submit-{self.name}.sh"
# allow for different workload managers
template_name = self["Jobscript"]["Type"]
# read template
if template is None:
template_path = (
Path(__file__).parent / "templates" / f"{template_name}.template"
)
else:
template_path = Path(template)
logger.log(loglevel_info_multiple_run, f"found template at {template_path}")
with open(template_path, "r") as f:
script_template = f.read()
# template variables
if template_name == "qsub":
# get optional parameters from YAML
fpq_value = self["Jobscript"].get("force_parallel_queue")
fpq = fpq_value if fpq_value is not None else False
# collect all placeholder values
script_vars = {
"JOB_NAME": self.name,
"OUTPUT_PATH": self.jobscript_datadir(output_type),
"GIT_ROOT": self.get_git_root(),
"EXECUTABLE": Path(sys.argv[0]).absolute(),
"NUM_CORES": self["Jobscript"]["Cores"],
"YAMLPATH": self.path,
"JOB_LENGTH": self.param_scan_len(),
"JOB_STRIDE": CLUSTER_CORES // self["Jobscript"]["Cores"],
# optional arguments
"FORCE_PARALLEL_QUEUE": fpq,
}
elif template_name == "sbatch":
# calculate total memory
mem_total = int(
self["Jobscript"]["tasks_per_job"] * self["Jobscript"]["mem_per_task"]
)
mem_per_cpu = int(
self["Jobscript"]["mem_per_task"] / self["Jobscript"]["cores_per_task"]
)
script_vars = {
"JOB_NAME": self.name,
"NTASKS": self["Jobscript"]["tasks_per_job"],
"OUTPUT_PATH": self.jobscript_datadir(output_type),
"GIT_ROOT": self.get_git_root(),
"EXECUTABLE": Path(sys.argv[0]).absolute(),
"NCORES": self["Jobscript"]["cores_per_task"],
"YAMLPATH": self.path,
"JOB_LENGTH": self["Jobscript"]["max_job_count"],
"MEM": mem_total,
"MEM_PER_CPU": mem_per_cpu,
"CLUSTER": self["Jobscript"]["cluster"],
"TIME": self["Jobscript"]["time"],
}
else:
raise ValueError('must define a valid template name as "Type"!')
# populate template
populated = Template(script_template).render(script_vars)
# write to file & make script executable
with open(script_name, "w") as f:
f.write(populated)
Path(script_name).chmod(0o755)
logger.log(loglevel_info_multiple_run, f"Generated {script_name}")
return Path(script_name).absolute()