import logging
import os
from abc import ABC, abstractmethod
from multiprocessing import Pool
from pathlib import Path
import numpy as np
import pandas as pd
from drrc.config import Config
[docs]
class AutomaticPostprocessing:
"""This class enables the automatic concatenation of all subtask outputs into a
single file called "DataFrame.csv".
Within the respective job directories this still generates a unique path.
"""
def __init__(self, root: Path):
"""Initialise automatic concatenation
Args:
root:
most shallow path from which to look for job directories, starting from
git root
Todo:
- make this work with relative paths from the git root
- either use a root path or supply a yaml for single job post processing
"""
self.path = root
cpus = os.cpu_count()
if cpus is not None:
self.num_cores = cpus // 2
else:
self.num_cores = 1
[docs]
def summary(self) -> None:
"""Print summary of the automatic concatenation
Note:
This function does not touch any of the files!
Its main purpose is debugging / checking that
:func:`AutomaticConcatenation.auto_concatenate` behaves as expected.
"""
relevant_paths = [p.parent for p in Path(self.path).glob("**/*-1.csv")]
print(f"Checking in: {relevant_paths}")
for fp in relevant_paths:
if not (fp / Path("DataFrame.csv")).is_file():
print(f"Concatenation needed in: {fp}")
[docs]
def auto_concatenate(self, Delete=False) -> None:
"""Automatically concatenate all raw output files
Args:
Delete:
If True, then the raw output files are deleted after concatenation.
Default is False.
The result is then saved with the raw output in a file named
:code:`DataFrame.csv`
"""
logging.info(f"Concatenating data in subfolders of {self.path}")
key = self.path.name
# make iterator of all paths to be considered
relevant_paths = list(Path(self.path).glob("**/*-1.csv"))
# in each path, check if DataFrame.csv file exists
for fp in relevant_paths:
fp = fp.parent
key_index = fp.parts.index(key)
# if not, then call self._concatenate with the current path
if not (fp / Path("DataFrame.csv")).is_file():
logging.info(f"{Path(*fp.parts[key_index:])}/DataFrame.csv' not found")
df = self._concatenate(fp)
# Check if both 'transformation' and 'Transformation' exist
if "transformation" in df.columns and "Transformation" in df.columns:
# If both exist, drop 'transformation'
df.drop("transformation", axis=1, inplace=True)
elif "transformation" in df.columns:
# If only 'transformation' exists, rename it to 'Transformation'
df.rename(
columns={"transformation": "Transformation"}, inplace=True
)
# If only 'Transformation' exists, do nothing
df.to_csv(fp / Path("DataFrame.csv"), index=False)
logging.info(
f"Saved concatenated data at {Path(*fp.parts[key_index:])}/DataFrame.csv"
)
else:
logging.info(
f"{Path(*fp.parts[key_index:])}/DataFrame.csv' already exists"
)
if Delete:
# delete all raw output files
for f in fp.glob("*-*.csv"):
f.unlink()
[docs]
def _concatenate(self, path: Path) -> pd.DataFrame:
"""Concatenate a single cluster job
This takes all numbered :code:`.csv` DataFrames and concatenates them into one.
It checks if the number of lines agrees with what is expected from the config. Else it raises a warning.
Args:
path: path to output of cluster job
Returns:
:class:`pd.DataFrame` which contains all data from a single job
"""
# get expected number of files
conf = Config(list(path.glob("*.yml"))[0])
tasks = [
len(pars) * conf["Data"]["Usage"]["evaluation_datasets"]
for pars in conf.param_scan_list()
]
total_tasks = np.sum(tasks)
# get all relevant paths
relevant_paths = list(Path(path).glob("*-*.csv"))
# collect data into new DataFrame
# df = pd.concat([pd.read_csv(f) for f in tqdm(relevant_paths, desc=f"Concatenating {str(path.name)}")])
# iterate through all score files in parallel & write statistics
stat_list = []
with Pool(processes=self.num_cores) as pool:
logging.info(
f"Processing {len(relevant_paths)} files in {path.name} on {self.num_cores} cores."
)
stat_list += pool.starmap(
self._read_test_csv, [(path, tasks) for path in relevant_paths]
)
# return the DataFrame
df = pd.concat(stat_list, ignore_index=True)
if len(df) != total_tasks:
logging.warning(
f"Jobs died in {path.name}! Length of csv is {len(df)}, which does not match expected number of total entries {total_tasks}."
)
return df
[docs]
def _read_test_csv(self, path: Path, jobs: list) -> pd.DataFrame:
"""Read a single csv file, test if it has the expected number of results and return the DataFrame.
Args:
path: path to csv file
jobs: list of number of jobs per task
Returns:
:class:`pd.DataFrame` which contains all data from a single job
"""
df = pd.read_csv(path)
# get zero based task id from 1 based file name
task_id = int(str(path.name).split("-")[-1].split(".")[0]) - 1
if len(df) != jobs[task_id]:
logging.warning(
f"Number of entries in {path.parent.name}/{path.name} is {len(df)} and does not match expected number of entries {jobs[task_id]}."
)
return df
[docs]
def auto_statisticsgeneration(self) -> None:
"""Generate statistics from the concatenated DataFrame"""
logging.info(f"Generating statistics of data in subfolders of {self.path}")
key = self.path.name
# make iterator of all paths to be considered
relevant_paths = list(Path(self.path).glob("**/DataFrame.csv"))
# in each path, check if DataFrame.csv file exists
for fp in relevant_paths:
fp = fp.parent
key_index = fp.parts.index(key)
# if not, then call self._concatenate with the current path
if not (fp / Path("ProcessedValidTimes.csv")).is_file():
logging.info(
f"{Path(*fp.parts[key_index:])}/ProcessedValidTimes.csv not found"
)
self._generate_statistics(
pd.read_csv(fp / Path("DataFrame.csv"))
).to_csv(fp / Path("ProcessedValidTimes.csv"), index=False)
logging.info(
f"Saved processed data at {Path(*fp.parts[key_index:])}/ProcessedValidTimes.csv"
)
else:
logging.info(
f"{Path(*fp.parts[key_index:])}/ProcessedValidTimes.csv already exists"
)
[docs]
def _generate_statistics(self, df: pd.DataFrame) -> pd.DataFrame:
"""Generate statistics from the concatenated DataFrame"""
if "transformation" in df.keys():
df.rename(columns={"transformation": "Transformation"}, inplace=True)
params = [
"adjacency_degree",
"adjacency_dense",
"input_bias",
"spatial_shape",
"system_variables",
"parallelreservoirs_ghosts",
"boundary_condition",
"identical_inputmatrix",
"identical_adjacencymatrix",
"identical_outputmatrix",
"training_includeinput",
"training_output_bias",
"adjacency_spectralradius",
"input_scaling",
"training_regularization",
"reservoir_leakage",
"parallelreservoirs_grid_shape",
"dimensionreduction_fraction",
"reservoir_nodes",
"Transformation",
]
return (
df.groupby(params)["valid_time"]
.agg(["mean", "std"])
.reset_index()
.rename(columns={"mean": "mean_ValidTime", "std": "std_ValidTime"})
)
[docs]
class AnalyseClusterRunBase(ABC):
"""Base class for all analysis types we will run later.
This includes all basic functionality that will be used later, such as IO.
"""
def __init__(self, conf: Config):
"""Initialize a cluster run object for analysis
Args:
conf (Config):
A config object that has previously been run on the cluster.
"""
self.conf = conf
[docs]
@abstractmethod
def process(self) -> pd.DataFrame:
"""Read output of cluster run as defined by self.conf"""
pass
[docs]
@abstractmethod
def save(self) -> None:
"""Save processed data in a file"""
pass
[docs]
class HyperPostProcessing(AnalyseClusterRunBase):
"""Post-processing of our hyperparameter scans
This class is meant to take in raw data and generate the desired DataFrame
"""
def __init__(
self,
conf: Config,
data_name: str = "score_",
data_type: str = ".txt",
num_cores: int = os.cpu_count() // 2,
):
"""Initialize post-processing
Important:
This class assumes that output files contain a numerical (1-based) index
between :code:`data_name` and :code:`data_type`!
Args:
conf (Config):
Config of the corresponding clusterrun
data_dir (strPath):
Path to raw data
out_dir (Path):
Path for saving dataframe
data_name (str):
File name, e.g. :code:`"score_"`
data_type (str):
File extension, e.g. :code:`".txt"`
num_cores (int):
Number of cores available for processing
"""
super().__init__(conf)
self.df = None
self.data_name = data_name
self.data_type = data_type
self.number_seeds = self.conf["Training"]["Datasets"]
self.number_evals = self.conf["Evaluation"]["Datasets"]
self.num_cores = num_cores
self.data_dir = Path(self.conf["Saving"]["OutputDirectory"])
self.out_dir = self.conf.get_git_root() / Path(
self.conf["Saving"]["OutputDirectory"]
)
# TODO: make data_dir a property
@property
def data_dir(self):
return self._data_dir
@data_dir.setter
def data_dir(self, data_dir: Path):
"""Set path to raw data and search three possible git repos on file server, if
given path does not exist"""
# Testing and Creating Directories
self._data_dir = self.conf.get_git_root() / data_dir
testfile = self._get_raw_file(0)
# Find raw data in file system
FilesNotFound = False
try:
_ = np.loadtxt(testfile, skiprows=13)
except FileNotFoundError:
logging.info(
f"{testfile} not found in local git repository. Testing other users."
)
FilesNotFound = True
GitRepos = [
Path("/data.bmp/gwellecke/DRRC/"),
Path("/data.bmp/khollborn/DRRC/"),
Path(
"/data.bmp/lfleddermann/DataAnalysis/2023_Paper_Reservoir_DimensionReduction/DRRC/"
),
]
i = 0
while FilesNotFound and i < len(GitRepos):
try:
_ = np.loadtxt(GitRepos[i] / testfile, skiprows=13)
self._data_dir = GitRepos[i] / self._data_dir
logging.info(f"Using {self._data_dir} as Datadirectory.")
FilesNotFound = False
except FileNotFoundError:
logging.info(f"Files not found at {GitRepos[i]/self._data_dir}.")
pass
i += 1
if FilesNotFound:
raise FileNotFoundError("Data does not exist.")
@property
def out_dir(self):
return self._out_dir
@out_dir.setter
def out_dir(self, out_dir: Path):
"""Set variable and make directory for output"""
self._out_dir = out_dir
try:
os.makedirs(self._out_dir)
logging.info(f"Created Outputfolder: {self._out_dir}")
except FileExistsError:
logging.warning(
f"Outputdirectory {self._out_dir} already exists. Data might be overwritten."
)
[docs]
def _get_raw_file(self, i: int) -> Path:
"""Generate filename for the n-th raw data file
Args:
i (int): Number of the file using zero-based indexing
Returns:
(str): Path to requested file
Todo:
Make this method return a Path instead of str
"""
return Path(
str(self.data_dir) + "/" + self.data_name + str(i + 1) + self.data_type
)
[docs]
def _generate_statistics(self, i: int, params: dict):
"""Internal function to generate the statistics of :func:`self.process`
Warning:
This function will not return any statistics if the numpy array contains any
NaNs. Numpy has functions for this, but right now we don't use them.
"""
raw_file = self._get_raw_file(i)
try:
# new: (works with missing data)
col_names = ["seed"] + [f"val_{i}" for i in range(self.number_evals)]
raw_input = pd.read_csv(
raw_file, sep="\t", names=col_names, skiprows=1
).to_numpy()
tmp_Times = raw_input[:, 1:].astype(float) # N x M
# we don't ever use the seed...
# tmp_seeds = raw_input[:, 0] # N x 1
# BETTER APPROACH:
# calculate statistics at this point and write to dataframe later
# want: mean, std, max, mean_std_seed, mean_std_eval
# --> just take param_list and add the needed keys to it!
# i.e.:
params["mean_t"] = tmp_Times.mean()
params["std_t"] = tmp_Times.std()
params["max_t"] = tmp_Times.max()
params["avg_std_seed"] = tmp_Times.std(axis=0).mean()
params["avg_std_data"] = tmp_Times.std(axis=1).mean()
# write important information about cluster run
params["DR_type"] = self.conf["Reservoir"]["transform"]
params["DR_ratio"] = self.conf["Reservoir"]["fracin"]
params["num_res"] = self.conf["Reservoir"]["replicas"][0]
except (FileNotFoundError, ValueError) as error:
logging.warning(f"{error} opening {raw_file}")
return params
[docs]
def process(
self,
) -> pd.DataFrame:
"""Extract Validtime data and seeds from :code:`.txt` files stored in
:code:`conf['Saving']['OutputDirectory']` into a :class:`pd.DataFrame`.
To accelerate pre-processing of raw data this function runs on half the system's
cores by default.
Args:
conf (Config):
:class:`Config` with ClusterRun information.
DataDir (Path):
Passed to Data, might deviate from Path in conf depending on user.
DataName (str):
Name of files with Validtime data up to iterator number.
Default :code:`score_`.
DataType (str):
File type of files with Validtime data. Default :code:`.txt`.
NumberSeeds (int):
Number of seeds/ different networks drawn. Default 10.
num_cores (int):
Number of CPU cores to use for the preprocessing.
Default is half the system's cores.
Return:
(pd.DataFrame):
Each row corresponds to a single set of hyperparameters.
Statistics are given for the following:
.. list-table::
* - :code:`mean_t`
- Mean valid time over all executions of the hyperparameter set
* - :code:`std_t`
- Standard variance of :code:`mean_t` over all executions of the
hyperparameter set
* - :code:`max_t`
- Maximum valid time over all executions of the hyperparameter set
* - :code:`avg_std_seed`
- Average standard deviation per training seed
* - :code:`avg_std_data`
- Average standard deviation per training dataset
"""
param_list = self.conf.param_scan_list()
stat_list = []
# iterate through all score files in parallel & write statistics
with Pool(processes=self.num_cores) as pool:
logging.info(
f"Processing {len(param_list)} files on {self.num_cores} cores."
)
stat_list += pool.starmap(self._generate_statistics, enumerate(param_list))
# return the DataFrame
self.df = pd.DataFrame(stat_list)
return self.df
[docs]
def save(self) -> None:
"""Save dataframe to csv"""
self.df.to_csv(self.out_dir / Path("DataFrame.csv"))
return
[docs]
def fix_raw_data(self) -> None:
"""Fix bad output raw data and save to new file with prefix :code:`fn_mod`
This function also modifies the expected filename.
Args:
fn_mod(str):
Modified prefix of the input file. Default is :code:`"rf"`, such that,
e.g. :code:`"score_0.txt" --> "rf-score_0.txt"`
"""
# loop over all raw files
file_count = len(self.conf.param_scan_list())
with Pool(processes=self.num_cores) as pool:
logging.info(f"Re-formatting {file_count} files on {self.num_cores} cores.")
pool.map(self._reformat_single_file, range(file_count))
[docs]
def _reformat_single_file(self, file_index: int):
"""Takes a filename and creates the reformatted file.
Args:
file_index (int):
Index of the file to be reformatted, starting at 0.
"""
filename = self._get_raw_file(file_index)
old_filename = filename
filename = filename.parent / Path("old_" + str(filename.name))
try:
f = open(old_filename, "r")
seeds = []
valtimes = []
# skip two lines
f.readline()
if f.readline() == "seeds:\n":
os.rename(old_filename, filename)
# read seeds up to ':'
while True:
line = f.readline()
if line == ":\n":
break
seeds.append(line[:-1])
# then read the rest of the file, i.e. valtimes
while True:
line = f.readline()
if not line:
break
valtimes.append(line)
# join seeds with valtimes
reformatted = [seeds[i] + "\t" + valtimes[i] for i in range(len(seeds))]
f.close()
# write reformatted file
with open(old_filename, "w") as f:
f.writelines(reformatted)
if file_index % 100 == 0:
logging.debug(f"Reformated {file_index}.")
else:
logging.debug(f"{old_filename} already formatted")
except FileNotFoundError:
logging.info(f"{old_filename} doesnt exist.")