#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File : access.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date : 30.01.2022
import re
import os
import sys
import time
import textwrap
import contextlib
import subprocess
import pickle
import shutil
import warnings
from datetime import datetime, timedelta
from concurrent.futures import ProcessPoolExecutor
import numpy as np
import pandas as pd
import toml
import cma
import coexist
from .base import Simulation
from . import schedulers
from .combiners import Product
from .code_trees import code_contains_variable
from .code_trees import code_substitute_variable
from .utilities import autorepr
from .utilities import SignalHandlerKI
signal_handler = SignalHandlerKI()
[docs]@autorepr(short = {"script"})
class AccessSetup:
'''Structure storing constant attributes for an ACCES optimisation run.
Code validation and generation are handled too.
Attributes
----------
parameters : pd.DataFrame
The free parameters extracted from the user script.
parameters_scaled : pd.DataFrame
The free parameters scaled to the phenotype space, such that the
initial variance (`sigma`) is unity.
scaling : np.ndarray
A vector of values that the free parameters are scaled by; it is the
initial variance (`sigma`) given by the user.
script : str
The modified user script that will be executed.
population : int
The number of simulations to be run in parallel in each epoch.
target : float
The target scaled variance - i.e. decrease the uncertainty from the
initial 1 down to `target`.
seed: int
The random seed defining a single ACCES run.
rng: np.random.Generator
The random number generator used, seeded with `seed`.
'''
[docs] def __init__(self, script_path):
'''Given a path to a user-defined simulation script, extract the free
parameters and generate the ACCES script.
'''
# Uninitialised parameters (will be set later)
self.population = None
self.target = None
self.rng = None
self.seed = None
# Extract parameters and generate ACCES script
with open(script_path, "r") as f:
user_code = f.readlines()
# Find the two parameter definition directives
params_start_line = None
params_end_line = None
regex_prefix = r"#+\s*ACCES{1,2}\s+PARAMETERS"
params_start_finder = re.compile(regex_prefix + r"\s+START")
params_end_finder = re.compile(regex_prefix + r"\s+END")
for i, line in enumerate(user_code):
if params_start_finder.match(line):
params_start_line = i
if params_end_finder.match(line):
params_end_line = i
if params_start_line is None or params_end_line is None:
raise NameError(textwrap.fill((
f"The user script found in file `{script_path}` did not "
"contain the blocks `# ACCESS PARAMETERS START` and "
"`# ACCESS PARAMETERS END`. Please define your simulation "
"free parameters between these two comments / directives."
)))
# Execute the code between the two directives to get the initial
# `parameters`. `exec` saves all the code's variables in the
# `parameters_exec` dictionary
user_params_code = "".join(
user_code[params_start_line:params_end_line]
)
user_params_exec = dict()
exec(user_params_code, user_params_exec)
if "parameters" not in user_params_exec:
raise NameError(textwrap.fill((
"The code between the user script's directives "
"`# ACCESS PARAMETERS START` and "
"`# ACCESS PARAMETERS END` does not define a variable "
"named exactly `parameters`."
)))
self.validate_parameters(user_params_exec["parameters"])
self.parameters = user_params_exec["parameters"]
if not code_contains_variable(user_code, "error"):
raise NameError(textwrap.fill((
f"The user script found in file `{script_path}` does not "
"define the required variable `error`."
)))
# Substitute the `parameters` creation in the user code with loading
# them from an ACCESS-defined location
parameters_code = [
"\n# Unpickle `parameters` from this script's first " +
"command-line argument and set\n",
'# `access_id` to a unique simulation ID\n',
code_substitute_variable(
user_code[params_start_line:params_end_line],
"parameters",
('with open(sys.argv[1], "rb") as f:\n'
' parameters = pickle.load(f)\n')
)
]
# Also define a unique ACCESS ID for each simulation
parameters_code += (
'access_id = int(sys.argv[1].split(".")[-2])\n'
)
# Read in the `async_access_template.py` code template and find the
# code injection directives
template_code_path = os.path.join(
os.path.split(coexist.__file__)[0],
"template_access_script.py"
)
with open(template_code_path, "r") as f:
template_code = f.readlines()
for i, line in enumerate(template_code):
if line.startswith("# ACCESS INJECT USER CODE START"):
inject_start_line = i
if line.startswith("# ACCESS INJECT USER CODE END"):
inject_end_line = i
generated_code = "".join((
template_code[:inject_start_line + 1] +
user_code[:params_start_line + 1] +
parameters_code +
user_code[params_end_line:] +
template_code[inject_end_line:]
))
self.script = generated_code
# Scale free parameters (+ bounds and sigma) to unit variance
self.scaling = self.parameters["sigma"].to_numpy().copy()
self.parameters_scaled = self.parameters.copy()
for i in range(len(self.parameters_scaled.columns)):
self.parameters_scaled.iloc[:, i] /= self.scaling
[docs] @staticmethod
def validate_parameters(parameters):
'''Validate the free parameters extracted from a user script (a
``pandas.DataFrame``).
'''
if not isinstance(parameters, pd.DataFrame):
raise ValueError(textwrap.fill((
"The `parameters` variable defined in the user script is "
"not a pandas.DataFrame instance (or subclass thereof)."
)))
if len(parameters) < 2:
raise ValueError(textwrap.fill((
"The `parameters` DataFrame defined in the user script must "
"have at least two free parameters defined. Found only"
f"`len(parameters) = {len(parameters)}`."
)))
columns_needed = ["value", "min", "max", "sigma"]
if not all(c in parameters.columns for c in columns_needed):
raise ValueError(textwrap.fill((
"The `parameters` DataFrame defined in the user script must "
"have at least four columns defined: ['value', 'min', "
f"'max', 'sigma']. Found these: `{parameters.columns}`. You "
"can use the `coexist.create_parameters` function for this."
)))
[docs] def setup_complete(self, population, target, seed):
'''Set up the final attributes before starting the ACCES run - i.e.
the ones set in the ``Access.learn`` method.
'''
# Type-checking inputs and setting attributes
self.population = int(population)
self.target = float(target)
if seed is None:
self.seed = np.random.randint(1000, 10_000)
else:
self.seed = int(seed)
self.rng = np.random.default_rng(self.seed)
[docs] def starting_guess(self):
'''Return the initial parameter combinations to start CMA-ES with.
'''
# First guess, scaled
x0 = self.parameters_scaled["value"].to_numpy()
bounds = [
self.parameters_scaled["min"].to_numpy(),
self.parameters_scaled["max"].to_numpy(),
]
return x0, bounds
[docs]@autorepr
class AccessPaths:
'''Structure handling IO and storing all paths relevant for an ACCES run.
Loading and saving epochs and history are handled too.
Attributes
----------
directory : str
Path to the ACCES directory, e.g. ``access_seed123``.
results : str
Path to the results directory.
outputs : str
Path to the outputs directory.
script : str
Path to the ACCES-modified user script.
setup : str
Path to the saved ACCES setup.
epochs : str
Path to the epochs CSV data file.
epochs_scaled : str
Path to the scaled epochs CSV data file.
history : str
Path to the historical CSV data file.
history_scaled : str
Path to the scaled historical CSV data file.
'''
[docs] def __init__(
self,
directory: str = None,
results: str = None,
outputs: str = None,
script: str = None,
setup: str = None,
epochs: str = None,
epochs_scaled: str = None,
history: str = None,
history_scaled: str = None,
):
self.directory = directory
self.results = results
self.outputs = outputs
self.script = script
self.setup = setup
self.epochs = epochs
self.epochs_scaled = epochs_scaled
self.history = history
self.history_scaled = history_scaled
[docs] def create_directories(self, access):
'''Given a ``coexist.Access`` instance, create the required directory
hierarchy for a single ACCES run.
'''
# Include the random seed used in the `access_seed<seed>` dirpath
self.directory = f"access_seed{access.setup.seed}"
self.results = os.path.join(self.directory, "results")
self.outputs = os.path.join(self.directory, "outputs")
if access.verbose >= 3:
now = datetime.now().strftime(r"%H:%M:%S on %d/%m/%Y")
print(
"\n" + "=" * 80 + "\n" +
f"Starting ACCES run at {now} in directory "
f"`{self.directory}`.",
flush = True,
)
# Include the population size in the history filename to ensure future
# runs don't accidentally use wrong numbers of solutions per epoch
#
# Results history
self.history = os.path.join(
self.directory,
f"history_pop{access.setup.population}.csv",
)
self.history_scaled = os.path.join(
self.directory,
f"history_pop{access.setup.population}_scaled.csv",
)
# Per-epoch data
self.epochs = os.path.join(
self.directory,
f"epochs_pop{access.setup.population}.csv",
)
self.epochs_scaled = os.path.join(
self.directory,
f"epochs_pop{access.setup.population}_scaled.csv",
)
self.script = os.path.join(self.directory, "access_script.py")
self.setup = os.path.join(self.directory, "access_setup.toml")
# Create directories
if not os.path.isdir(self.directory):
os.mkdir(self.directory)
elif access.verbose >= 3:
pass
if not os.path.isdir(self.results):
os.mkdir(self.results)
if not os.path.isdir(self.outputs):
os.mkdir(self.outputs)
# Save information about the run
now = datetime.now().strftime("%H:%M:%S on %d/%m/%Y")
logfile = os.path.join(self.directory, "loginfo.txt")
with open(logfile, "a", encoding = "utf-8") as f:
f.write((
80 * "-" + "\n" +
f"Starting ACCESS run at {now}\n\n" +
access.__repr__() + "\n"
))
readmefile = os.path.join(self.directory, "readme.rst")
with open(readmefile, "w", encoding = "utf-8") as f:
f.write(textwrap.dedent(f'''
ACCES Optimisation Run Directory
--------------------------------
This directory was generated by ACCES at {now}.
You can load the data saved by ACCES into a tidy Python object
using ``coexist.AccessData(<dirpath>)``; alternatively, the
calibration / optimisation results may be read from the CSV
files (see below) in any external program.
All data can be accessed as the calibration progresses to check
intermediate results. This `{self.directory}` directory
is self-contained and may be archived / moved outside the
initial ACCES folder.
You are welcome to read and use the data saved here, but
modifying or removing files is not recommended (except for the
files inside `outputs` and `results`, see below).
File Hierarchy
--------------
For a given simulation, ACCES runs are uniquely determined by a
seed for the stochastic algorithms and the number of parameter
combinations to try in one epoch (i.e. the population size).
The resulting hierarchy looks like this:
::
access_seed<seed_number>
├── access_script.py
├── access_setup.toml
├── epochs_pop<population_size>.csv
├── epochs_pop<population_size>_scaled.csv
├── history_pop<population_size>.csv
├── history_pop<population_size>_scaled.csv
├── loginfo.txt
├── readme.rst
├── outputs
│ ├── stderr.0.log
│ ...
│ ├── stdout.0.log
│ ...
└── results
├── parameters.0.pickle
...
├── result.0.pickle
...
The `access_seed<seed_number>` is the current directory where
ACCES saves simulation results; naturally, don't execute
multiple ACCES runs with the same random seeds in the same
directory.
The `access_script.py` Python script is the modified user-code
that will be executed for each function evaluation.
The `access_setup.toml` file saves the constant ACCES objects
for a given run: relevant paths, parameters, population size,
target uncertainty and random seed. The TOML format it uses is
human-readable and may be loaded into a Python ``dict`` using:
.. code-block:: python
import toml
with open("filepath.toml") as f:
obj = toml.load(f)
The `epochs_pop<population_size>.csv` file stores relevant data
for each epoch: the current parameter optimum estimates and
uncertainties; the other `_scaled.csv` file stores the same,
but scaled to the internal CMA-ES "phenotype space". Notably,
the uncertainties are scaled between 0 (perfect estimate) and
1 (the initial estimate); if the parameter response is very
nonlinear / convolved / inexistent, the uncertainty may
increase beyond 1. The column names are given in the file; the
number of rows is equal to the number of epochs completed.
The `history_pop<population_size>.csv` file stores the history
of ACCES results: the parameter combinations tried and the
evaluated error values for each epoch. The other `_scaled.csv`
file stores the same, but scaled to the internal CMA-ES
"phenotype space". The column names are given in the file; all
epochs are concatenated, so the number of rows will be the
population size multiplied by the number of executed epochs.
The `loginfo.txt` file logs when ACCES runs are (re)started.
This `readme.rst` file is self-explanatory - you're reading it!
The `outputs` directory stores the captured `stdout` and
`stderr` messages that were emitted during each simulation.
The `results` directory stores the parameter values tried for
each simulation (e.g. `parameters.0.pickle`) and the
corresponding error value found (e.g. `result.0.pickle`).
The `outputs` and `results` directories are only needed for
logging purposes; you may safely remove the files inside *only
for the completed ACCES epochs*.
'''))
[docs] def update_paths(self, prefix):
'''Translate all paths saved in this class relative to a new `prefix`
(which will replace the `directory` attribute).
Please ensure that the `prefix` directory contains the required ACCES
files.
'''
self.directory = prefix
for attr in ["results", "outputs", "script", "setup", "epochs",
"epochs_scaled", "history", "history_scaled"]:
prev = getattr(self, attr)
if isinstance(prev, str):
setattr(
self,
attr,
os.path.join(prefix, os.path.split(prev)[1])
)
[docs] def save_history(self, setup, progress):
'''Given an ``AccessSetup`` and ``AccessProgress`` instance, save the
results history.
'''
np.savetxt(
self.history,
progress.history,
header = " ".join(setup.parameters.index.to_list() + ["error"]),
)
np.savetxt(
self.history_scaled,
progress.history_scaled,
header = " ".join(setup.parameters.index.to_list() + ["error"]),
)
[docs] def load_history(self, access):
'''Load previous results into ``access.progress``.
'''
# History columns: [param1, param2, ..., error_value] for each function
# evaluation
if os.path.isfile(self.history):
access.progress.history = np.loadtxt(
self.history, dtype = float
)
else:
access.progress.history = None
# Scaling and unscaling parameter values introduce numerical errors
# that confuse the optimiser. Thus save unscaled values separately
if os.path.isfile(self.history_scaled):
if access.verbose >= 3:
print(
"Found previous ACCES results in " +
f"`{self.history_scaled}`.\n" + "=" * 80 + "\n",
flush = True,
)
access.progress.history_scaled = np.loadtxt(
self.history_scaled, dtype = float
)
else:
access.progress.history_scaled = None
[docs] def save_epochs(self, setup, progress):
'''Given an ``AccessSetup`` and ``AccessProgress`` instance, save the
optimisation epochs.
'''
np.savetxt(
self.epochs,
progress.epochs,
header = " ".join(
[f"{p}_mean" for p in setup.parameters.index] +
[f"{p}_std" for p in setup.parameters.index] +
["overall_std"]
),
)
np.savetxt(
self.epochs_scaled,
progress.epochs_scaled,
header = " ".join(
[f"{p}_mean" for p in setup.parameters.index] +
[f"{p}_std" for p in setup.parameters.index] +
["overall_std"]
),
)
[docs] def load_epochs(self, access):
'''Given an ``Access`` instance, load previous ACCES runs' `epochs` and
`epochs_scaled` into ``access.progress``.
'''
# Epochs columns: [param1_mean, param2_mean, ..., param1_std,
# param2_std,..., overall_std] for each epochs
if os.path.isfile(self.epochs):
access.progress.epochs = np.loadtxt(
self.epochs, dtype = float
)
else:
access.progress.epochs = np.empty(
(0, 2 * len(access.setup.parameters) + 1)
)
if os.path.isfile(self.epochs_scaled):
access.progress.epochs_scaled = np.loadtxt(
self.epochs_scaled, dtype = float
)
else:
access.progress.epochs_scaled = np.empty(
(0, 2 * len(access.setup.parameters) + 1)
)
[docs] def copy(self):
'''Create a copy of an `AccessPaths` object.
'''
return AccessPaths(
directory = self.directory,
results = self.results,
outputs = self.outputs,
script = self.script,
setup = self.setup,
epochs = self.epochs,
epochs_scaled = self.epochs_scaled,
history = self.history,
history_scaled = self.history_scaled,
)
[docs]@autorepr(short = True)
class AccessProgress:
'''Structure saving the current ACCES optimisation progress.
The `epochs` array has columns [mean_param1, mean_param2, ..., std_param1,
std_param2, ..., std_overall] for each epoch.
The `history` array has columns [param1, param2, ..., error] for each
function evaluation.
Attributes
----------
epochs: np.ndarray
Matrix with columns [mean_param1, mean_param2, ..., std_param1,
std_param2, ..., std_overall] with one row per epoch.
epochs_scaled: np.ndarray
Same as ``epochs``, scaled such that the initial variance (``sigma``)
becomes unity.
history: np.ndarray = None
Matrix with columns [param1, param2, ..., error] for each parameter
combination tried - i.e. ``population * num_epochs``.
history_scaled: np.ndarray = None
Same as ``history``, scaled such that the initial variance (``sigma``)
becomes unity.
stdout: str = None
The latest unique recorded stdout message.
stderr: str = None
The latest unique recorded stderr message.
'''
[docs] def __init__(
self,
epochs: np.ndarray = None,
epochs_scaled: np.ndarray = None,
history: np.ndarray = None,
history_scaled: np.ndarray = None,
stdout: str = None,
stderr: str = None,
):
self.epochs = None
self.epochs_scaled = None
self.history = None
self.history_scaled = None
self.stdout = None
self.stderr = None
[docs] def update_epochs(self, es, scaling):
'''Update each epoch array after an ACCES run has been completed.
'''
self.epochs = np.vstack((
self.epochs,
np.hstack((
es.result.xfavorite * scaling,
es.result.stds * scaling,
es.sigma,
)),
))
self.epochs_scaled = np.vstack((
self.epochs_scaled,
np.hstack([es.result.xfavorite, es.result.stds, es.sigma]),
))
[docs] def update_history(self, es, scaling, solutions, results):
'''Update the ACCES history with the latest simulation solutions and
results.
'''
solutions = np.asarray(solutions)
current = np.c_[solutions * scaling, results]
current_scaled = np.c_[solutions, results]
# Check that we have the correct number of columns - if all simulations
# in an epoch crashed, we'll have a single error column filled with NaN
history = self.history
history_scaled = self.history_scaled
if self.history is None:
# First epoch
self.history = current
self.history_scaled = current_scaled
return
if np.isnan(self.history[:, -1]).all():
# History does not have enough columns - pad with NaNs
pad = np.full((
self.history.shape[0],
current.shape[1] - self.history.shape[1],
), np.nan)
history = np.c_[self.history, pad]
history_scaled = np.c_[self.history_scaled, pad]
if np.isnan(current[:, -1]).all():
# Current does not have enough columns - pad with NaNs
pad = np.full((
current.shape[0],
self.history.shape[1] - current.shape[1],
), np.nan)
current = np.c_[current, pad]
current_scaled = np.c_[current_scaled, pad]
self.history = np.vstack((history, current))
self.history_scaled = np.vstack((history_scaled, current_scaled))
[docs] def gather_results(
self,
processes,
paths,
result_paths,
multi_objective,
verbose,
):
'''Check whether the jobs have finished and retrieve the standard
deviation, errors and the combined total error.
'''
results = []
# stdout_rec = []
# stderr_rec = []
crashed = []
# Occasionally check if jobs finished
wait = 0.1 # Time between checking results
waited = 0. # Total time waited
logged = 0 # Number of times logged remaining simulations
tlog = 30 * 60 # Time until logging remaining simulations again
while wait != 0:
done = sum((p.poll() is not None for p in processes))
if done == len(processes):
wait = 0
for i, proc in enumerate(processes):
proc_index = int(proc.args[-1].split(".")[-2])
proc.communicate()
# Load result if the file exists, otherwise set it to NaN
if os.path.isfile(result_paths[i]):
with open(result_paths[i], "rb") as f:
errors = pickle.load(f)
if hasattr(errors, "__iter__"):
errors = np.array(errors, dtype = float)
else:
errors = np.array([errors], dtype = float)
combined = multi_objective.combine(errors)
results.append(np.append(errors, combined))
else:
results.append(None)
crashed.append(proc_index)
# Every `remaining` seconds print remaining jobs
if verbose >= 4 and wait != 0 and waited > (logged + 1) * tlog:
logged += 1
tlog *= 1.5
remaining = " ".join([
p.args[-1].split(".")[-2]
for p in processes if p.poll() is None
])
minutes = int(waited / 60)
if minutes > 60:
timer = f"{minutes // 60} h {minutes % 60} min"
else:
timer = f"{minutes} min"
print((
f" * Remaining jobs after {timer}:\n" +
textwrap.indent(textwrap.fill(remaining), " * ")
), flush = True)
# Wait for increasing numbers of seconds until checking for results
# again - at most 1 minute
time.sleep(wait)
waited += wait
wait = min(wait * 1.5, 60)
return results, crashed
[docs]@autorepr
class Access:
'''Optimise an arbitrary user-defined script's parameters in parallel.
A minimal user script - saved in a separate file - would be:
::
# In file "script_filepath.py"
# ACCESS PARAMETERS START
import coexist
parameters = coexist.create_parameters(
variables = ["fp1", "fp2"],
minimums = [-3, -7],
maximums = [+5, +3],
)
access_id = 0 # Optional
# ACCESS PARAMETERS END
x = parameters.at["fp1", "value"]
y = parameters.at["fp2", "value"]
error = x ** 2 + y ** 2
This script defines two free parameters to optimise "fp1" and "fp2" with
ranges [-3, +5] and [-7, +3] and saves an error value to be optimised
in the variable `error`. To optimise it, run in another file:
::
# In file "access_learn.py"
import coexist
access = coexist.Access("script_filepath.py")
access.learn(num_solutions = 10, target_sigma = 0.1, random_seed = 42)
Once you run `access.learn()`, a folder named "access_42" is
generated which stores all information about this access run, including all
simulation data. You can load this data using ``coexist.AccessData.read``
even while the optimisation is still running.
In general, an ACCESS user script must define one simulation whose
parameters will be optimised this way:
1. Use a variable named "parameters" to define this simulation's free /
optimisable parameters. Create it using `coexist.create_parameters`.
An initial guess can also be set here.
2. The `parameters` creation should be **fully self-contained** between two
``# ACCESS PARAMETERS START`` and ``# ACCESS PARAMETERS END`` comments -
i.e. it should not depend on code ran before the block.
3. By the end of the simulation script, define a variable named ``error``
storing a single number representing this simulation's error value.
Notice that there is no limitation on how the error value is calculated. It
can be any simulation, executed in any way - even externally; just launch
a separate process from the Python script, run the simulation, extract data
back into the Python script and set ``error`` to what you need optimised.
If you need to save data to disk, use file names containing the
``access_id`` variable which is set to a unique integer ID for each
simulation, so that you don't overwrite existing files when simulations are
executed in parallel.
For more information on the implementation details and how parallel
execution of your user script is achieved, check out the generated file
"access_seed<seed>/access_script.py" after running `access.learn()`.
Attributes
----------
setup : coexist.access.AccessSetup
Structure storing given ACCES configuration, containing the free
``parameters``, number of solutions to try in parallel ``population``,
target uncertainty ``target``, seeded random number generator ``rng``
and the ``seed``.
paths : coexist.access.AccessPaths
Structure storing paths to the ACCES ``directory``, saved ``state``,
``simulations`` tried, captured ``outputs`` directory, and paths to
the previous results ``history`` and ``history_scaled``.
progress : coexist.access.AccessProgress
Structure storing ACCES optimisation run progress - ``epochs``,
``history`` (and CMA-scaled versions of them) and latest ``stdout`` and
``stderr`` messages.
scheduler : coexist.schedulers.Scheduler subclass
The scheduler used to launch each simulation in parallel.
multi_objective : object, optional
An object defining the method `combine`, combining multiple errors into
a single value.
verbose : int, optional
Integer denoting the level of verbosity, where 0 is quiet and 5 is
maximally verbose.
'''
[docs] def __init__(
self,
script_path: str,
scheduler = schedulers.LocalScheduler(),
):
'''`Access` class constructor.
Parameters
----------
script_path : str
A path to a user-defined script that runs one simulation. It should
use the free / optimisable parameters saved in a pandas.DataFrame
named exactly ``parameters``, defined between two comments
``# ACCESS PARAMETERS START`` and ``# ACCESS PARAMETERS END``.
By the end of the script, one variable named `error` must be
defined containing the error value, a number.
scheduler : coexist.schedulers.Scheduler subclass
Scheduler used to spawn function evaluations / simulations in
parallel. The default ``LocalScheduler`` simply starts new Python
interpreters on the local machine for executing the user's script.
See the other schedulers in `coexist.schedulers` for e.g. spawning
jobs on a supercomputing cluster.
'''
# Creating class attributes
self.setup = AccessSetup(script_path)
self.paths = AccessPaths()
self.progress = AccessProgress()
# Type-check scheduler
if not isinstance(scheduler, schedulers.Scheduler):
raise TypeError(textwrap.fill((
"The input `scheduler` must be a subclass of `coexist."
f"schedulers.Scheduler`. Received {type(scheduler)}."
)))
self.scheduler = scheduler
# Will be set in `learn`
self.multi_objective = None
self.verbose = None
[docs] def learn(
self,
num_solutions = 8,
target_sigma = 0.1,
random_seed = None,
multi_objective = Product(),
verbose = 4,
):
'''Learn the free `parameters` from the user script that minimise the
`error` variable by trying `num_solutions` parameter combinations at
a time until the overall uncertainty becomes lower than `target_sigma`.
For `multi_objective` optimisation, use a `coexist.combiner` to combine
multiple error values into a single one.
'''
# Type-checking inputs
if not hasattr(multi_objective, "combine"):
raise TypeError(textwrap.fill((
"The input `mulit_objective` has no attribute `combine`. "
"Check you are using a `coexist.combiner` to combine "
"multiple error values into a single combined error."
)))
# Set last setup attributes and create ACCES directories
self.verbose = int(verbose)
self.setup.setup_complete(num_solutions, target_sigma, random_seed)
self.paths.create_directories(self)
self.multi_objective = multi_objective
# Save this ACCES run's files
self.save_setup()
# Load previous history and epochs into self.progress
self.paths.load_history(self)
self.paths.load_epochs(self)
# Scale sigma, bounds, solutions, results to unit variance
scaling = self.setup.scaling
x0, bounds = self.setup.starting_guess()
sigma0 = 1.
# Instantiate CMA-ES optimiser; silence initial CMA-ES message
with open(os.devnull, "w") as f, contextlib.redirect_stdout(f), \
warnings.catch_warnings():
# CMA-ES sometimes warns about changing the initial standard
# deviation; ACCES users don't control that, so we can hide it
warnings.simplefilter("ignore", UserWarning)
es = cma.CMAEvolutionStrategy(x0, sigma0, dict(
bounds = bounds,
popsize = self.setup.population,
randn = lambda *args: self.setup.rng.standard_normal(args),
verbose = 3 if self.verbose >= 3 else -9,
))
es.logger = cma.CMADataLogger(
os.path.join(self.paths.directory, "cache", "")
)
# Start optimisation: ask the optimiser for parameter combinations
# (solutions), run the simulations between `start_index:end_index` and
# feed the results back to CMA-ES.
epoch = 0
while not es.stop():
solutions = es.ask()
# If we have historical data, inject it for each epoch
if self.has_historical(epoch):
self.inject_historical(es, epoch)
epoch += 1
if self.finished(es):
break
continue
if self.verbose >= 2:
self.print_before_eval(es, epoch, scaling)
# Save current epoch's mean, sigma
self.progress.update_epochs(es, scaling)
# Evaluate each solution - i.e. run simulations in parallel
results = self.evaluate_solutions(solutions * scaling, epoch)
# We already warn about crashed simulations, so hide CMA-ES ones
with warnings.catch_warnings():
warnings.simplefilter("ignore", UserWarning)
es.tell(solutions, results[:, -1])
epoch += 1
# Save historical data as function evaluations are very expensive
self.progress.update_history(es, scaling, solutions, results)
self.paths.save_epochs(self.setup, self.progress)
self.paths.save_history(self.setup, self.progress)
if self.verbose >= 2:
self.print_after_eval(es, epoch, solutions, scaling, results)
if self.finished(es):
break
if es.result.xbest is None:
raise ValueError(textwrap.fill((
"No parameter combination was evaluated successfully. All "
"simulations crashed - please check the error logs in the "
f"`{self.paths.outputs}` folder."
)))
if self.verbose >= 1:
self.print_finished(es, scaling)
return AccessData.read(self.paths.directory)
[docs] def save_setup(self):
'''Save current ACCES run's modified script (py) and state (toml).
'''
with open(self.paths.script, "w") as f:
f.write(self.setup.script)
setup_dict = dict(
paths = self.paths.__dict__,
setup = dict(
parameters = self.setup.parameters.to_dict(),
parameters_scaled = self.setup.parameters_scaled.to_dict(),
scaling = self.setup.scaling.tolist(),
population = self.setup.population,
target = self.setup.target,
seed = self.setup.seed,
),
)
with open(self.paths.setup, "w") as f:
toml.dump(setup_dict, f)
[docs] def has_historical(self, epoch):
'''Check ACCES still has historical solutions to inject.
'''
if (
self.progress.history_scaled is not None and
epoch * self.setup.population < len(self.progress.history_scaled)
):
return True
return False
[docs] def inject_historical(self, es, epoch):
'''Inject the CMA-ES optimiser with pre-computed (historical) results.
The solutions must have a Gaussian distribution in each problem
dimension - though the standard deviation can vary for each of them.
Ideally, this should only use historical values that CMA-ES asked for
in a previous ACCESS run.
'''
pop = self.setup.population
num_params = len(self.setup.parameters)
results_scaled = self.progress.history_scaled[
(epoch * pop):(epoch * pop + pop)
]
es.tell(results_scaled[:, :num_params], results_scaled[:, -1])
if self.verbose >= 1:
ns = len(self.progress.history_scaled)
maxlen = len(str(ns))
print((
f"Injected {(epoch + 1) * len(results_scaled):>{maxlen}} / "
f"{ns} historical solutions"
))
[docs] def print_before_eval(self, es, epoch, scaling):
'''Print current estimates before evaluating current epoch.
'''
info = pd.DataFrame(
np.vstack((
es.result.xfavorite * scaling,
es.result.stds * scaling,
es.result.stds,
)),
index = ["estimate", "uncertainty", "scaled_std"],
columns = self.setup.parameters.index,
)
# Display all the DataFrame columns and rows
old_max_columns = pd.get_option("display.max_columns")
old_max_rows = pd.get_option("display.max_rows")
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
head = "=" * 80
line = "-" * 80
# Current time, formatted
now_str = datetime.now().strftime(r"%H:%M:%S")
# Save and print time elapsed since last epoch
now = time.time()
elapsed = now - getattr(self, "_elapsed", now)
setattr(self, "_elapsed", now)
if elapsed == 0:
since_str = ""
else:
hours, remainder = divmod(int(elapsed), 3600)
minutes, seconds = divmod(remainder, 60)
elapsed_str = f"{minutes:02}:{seconds:02}"
if hours > 0:
elapsed_str = f"{hours:02}:" + elapsed_str
since_str = f" | Since Last {elapsed_str}"
print((
f"{head}\n"
f"Epoch {epoch:>4} | Population {self.setup.population:>4} | "
f"Time {now_str}{since_str}\n"
f"{line}\n"
f"Scaled overall standard deviation: {es.sigma}\n"
f"{info}\n"
), flush = True)
pd.set_option("display.max_columns", old_max_columns)
pd.set_option("display.max_rows", old_max_rows)
[docs] def print_status_eval(self, crashed):
'''Print logged stdout and stderr messages and crashed simulations
after evaluating an epoch.
'''
if len(crashed):
line = "-" * 80
crashed_str = textwrap.fill(" ".join(
str(c) for c in crashed
))
print(
line + "\n" +
"No results were found for these jobs:\n" +
textwrap.indent(crashed_str, " ") + "\n" +
"They crashed or terminated early; for details, check the "
f"output logs in:\n {self.paths.outputs}\n"
"The error values for these simulations were set to NaN.\n" +
line + "\n",
flush = True,
)
[docs] def print_after_eval(
self,
es,
epoch,
solutions,
scaling,
results,
):
'''Display parameter combinations evaluated in the current epoch and
the corresponding errors found.
'''
# Display evaluation results: solutions, error values, etc.
sols_results = np.c_[solutions * scaling, results]
cols = self.setup.parameters.index.to_list() + [
f"error{i}" for i in range(results.shape[1] - 1)
] + ["error"]
# Store solutions and results in a DataFrame for easy pretty printing
pop = len(results)
sols_results = pd.DataFrame(
data = sols_results,
columns = cols,
index = range(epoch * pop - pop, epoch * pop),
)
# Display all the DataFrame columns and rows
old_max_columns = pd.get_option("display.max_columns")
old_max_rows = pd.get_option("display.max_rows")
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
print((
f"{sols_results}\n"
f"Total function evaluations: {es.result.evaluations}\n"
), flush = True)
pd.set_option("display.max_columns", old_max_columns)
pd.set_option("display.max_rows", old_max_rows)
[docs] def print_finished(self, es, scaling):
'''Display final message after successful convergence on optimum
parameters.
'''
solutions = list(es.result.xbest * scaling) + [es.result.fbest]
stds = list(es.result.stds * scaling) + [" "]
proc = os.path.join(
self.paths.results,
f"parameters.{es.result.evals_best - 1}.pickle",
)
info = pd.DataFrame(
[solutions, stds],
index = ["value", "sigma"],
columns = self.setup.parameters.index.to_list() + ["error"],
)
line = "=" * 80
print((
f"\n{line}\n"
f"The best result was found in {es.result.iterations} epochs:\n"
f"{textwrap.indent(str(info), ' ')}\n\n"
"These results were found for the job:\n"
f" {proc}\n"
f"{line}"
), flush = True)
[docs] def finished(self, es):
'''Check if the optimisation run is done and display the best solution
found for the target sigma value.
'''
# If overall sigma went below target
if es.sigma < self.setup.target:
if self.verbose >= 1:
print((
"\nOptimal solution found within `target_sigma`, i.e. "
f"{self.setup.target * 100}%:\n"
f" sigma = {es.sigma} < {self.setup.target}"
), flush = True)
return True
# If all individual sigmas went below target
if np.all(es.result.stds < self.setup.target):
if self.verbose >= 1:
print((
"\nAll parameters found within `target_sigma`, i.e. "
f"{self.setup.target * 100}%:\n"
f" scaled_std = {es.result.stds} < {self.setup.target}"
), flush = True)
return True
return False
[docs] def evaluate_solutions(self, solutions, epoch):
'''Evaluate the parameter combinations given in `solutions` for the
current `epoch` in parallel.
'''
# Aliases
param_names = self.setup.parameters.index
pop = self.setup.population
start_index = epoch * pop
# For every solution to try, start a separate OS process that runs the
# `access_seed<seed>/access_code.py` script, which computes and saves
# an error value
processes = []
# These are this epoch's paths to save the simulation outputs to; they
# will be given to `self.paths.script` as command-line arguments
parameters_paths = [
os.path.join(
self.paths.results,
f"parameters.{start_index + i}.pickle",
) for i in range(pop)
]
result_paths = [
os.path.join(
self.paths.results,
f"result.{start_index + i}.pickle",
) for i in range(pop)
]
output_files = [
open(
os.path.join(
self.paths.outputs,
f"output.{start_index + i}.log"
),
"w",
) for i in range(pop)
]
# Catch the KeyboardInterrupt (Ctrl-C) signal to shut down the spawned
# processes before aborting.
try:
signal_handler.set()
# Spawn a separate process for every solution to try / sim to run
for i, sol in enumerate(solutions):
# Create new set of parameters and save them to disk
parameters = self.setup.parameters.copy()
for j, sol_val in enumerate(sol):
parameters.at[param_names[j], "value"] = sol_val
with open(parameters_paths[i], "wb") as f:
pickle.dump(parameters, f)
# Get job scheduling command
scheduler_cmd = self.scheduler.schedule(
self.paths.directory,
start_index + i,
)
processes.append(
subprocess.Popen(
scheduler_cmd + [
self.paths.script,
parameters_paths[i],
result_paths[i],
],
stdout = output_files[i],
stderr = subprocess.STDOUT,
)
)
# Gather results and crashed simulations
results, crashed = self.progress.gather_results(
processes,
self.paths,
result_paths,
self.multi_objective,
self.verbose,
)
except KeyboardInterrupt:
for proc in processes:
proc.kill()
raise
finally:
signal_handler.unset()
for of in output_files:
of.close()
if self.verbose >= 1:
self.print_status_eval(crashed)
# Find number of error values returned for one successful simulation
num_errors = 1
for i, res in enumerate(results):
if res is not None:
if num_errors == 1:
num_errors = len(res)
continue
if len(res) != num_errors:
raise ValueError(textwrap.fill((
f"The simulation at index {start_index + i} returned "
f"{len(res) - 1} error values, while previous "
f"simulations had {num_errors - 1} error values."
)))
# Substitute results that are None (i.e. crashed) with rows of NaNs
for i in range(len(results)):
if results[i] is None:
results[i] = np.full(num_errors, np.nan)
return np.array(results)
class AccessFileNotFoundLegacy:
pass
[docs]class AccessData:
'''Access (pun intended) data generated by a ``coexist.Access`` run; read
it in using ``coexist.AccessData.read("access_seed<seed>")``.
Attributes
----------
paths : AccessPaths
Struct-like object storing relevant paths in the given ACCES directory.
parameters : pd.DataFrame
The optimum free parameters found (final or intermediate if ACCES is
still running).
parameters_scaled : pd.DataFrame
The optimum free parameters found, divided by ``scaling`` such that the
initial variance in the parameter values was unity.
scaling : np.ndarray
A vector with the values to scale each parameter by - they are the
initial variances (``sigma``).
population : int
The number of simulations to run in parallel within a single epoch, or
number of parameter combinations to try at once.
num_epochs : int
The number of epochs that were successfully executed.
target : float
The target variance, where the initial parameter uncertainty must be
decreased from 1 to ``target``.
seed : int
The random number generator seed uniquely defining this ACCES run.
epochs : np.ndarray
Matrix with columns [mean_param1, mean_param2, ..., std_param1,
std_param2, ..., std_overall] with one row per epoch.
epochs_scaled : np.ndarray
Same as ``epochs``, scaled such that the initial variance (``sigma``)
becomes unity.
results : np.ndarray
Matrix with columns [param1, param2, ..., error] for each parameter
combination tried - i.e. ``population * num_epochs``.
results_scaled : np.ndarray
Same as ``results``, scaled such that the initial variance (``sigma``)
becomes unity.
Examples
--------
Suppose you run ``coexist.Access.learn(random_seed = 123)`` - then a
directory "access_seed123/" would be generated. Access (yes, still
intended) all data generated in a Python-friendly format using:
>>> import coexist
>>> data = coexist.AccessData("access_123")
>>> data
AccessData
--------------------------------------------------------------------------
paths ╎ AccessPaths(...)
parameters ╎ value min max sigma
╎ fp1 -0.005312 -5.0 10.0 0.024483
╎ fp2 0.003409 -5.0 10.0 0.034576
╎ fp3 0.296074 -5.0 10.0 2.078181
num_epochs ╎ 31
target ╎ 0.1
seed ╎ 123
epochs ╎ DataFrame(fp1_mean, fp2_mean, fp3_mean, fp1_std, fp2_std,
╎ fp3_std, overall_std)
epochs_scaled ╎ DataFrame(fp1_mean, fp2_mean, fp3_mean, fp1_std, fp2_std,
╎ fp3_std, overall_std)
results ╎ DataFrame(fp1, fp2, fp3, error)
results_scaled ╎ DataFrame(fp1, fp2, fp3, error)
'''
[docs] def __init__(self, access_path = "."):
'''Read in data generated by ``coexist.Access``; the `access_path` can
be either the "`access_info_<hash>`" directory itself, or its
parent directory.
'''
access_path = find_access_path(access_path)
# Check for data in the legacy coexist-0.1.0 format
legacy_finder = re.compile(r"opt_history_[0-9]+.csv")
if any(legacy_finder.match(f) for f in os.listdir(access_path)):
self.legacy(access_path)
return
setup_path = os.path.join(access_path, "access_setup.toml")
with open(setup_path) as f:
setup_dict = toml.load(f)
# Update paths prefix in case files are read from a different location
# than they were created at
paths = AccessPaths(**setup_dict["paths"])
paths.update_paths(access_path)
parameters = pd.DataFrame.from_dict(
setup_dict["setup"]["parameters"]
)
parameters_scaled = pd.DataFrame.from_dict(
setup_dict["setup"]["parameters_scaled"]
)
scaling = np.array(setup_dict["setup"]["scaling"])
population = setup_dict["setup"]["population"]
target = setup_dict["setup"]["target"]
seed = setup_dict["setup"]["seed"]
history = np.loadtxt(paths.history)
columns = parameters.index.to_list() + [
f"error{i}"
for i in range(history.shape[1] - len(parameters) - 1)
] + ["error"]
results = pd.DataFrame(history, columns = columns, dtype = float)
history_scaled = np.loadtxt(paths.history_scaled)
columns_scaled = parameters.index.to_list() + [
f"error{i}"
for i in range(history_scaled.shape[1] - len(parameters) - 1)
] + ["error"]
results_scaled = pd.DataFrame(
history_scaled,
columns = columns_scaled,
dtype = float,
)
epochs = pd.DataFrame(
np.loadtxt(paths.epochs),
columns = (
[f"{p}_mean" for p in parameters.index] +
[f"{p}_std" for p in parameters.index] +
["overall_std"]
),
dtype = float,
)
epochs_scaled = pd.DataFrame(
np.loadtxt(paths.epochs_scaled),
columns = (
[f"{p}_mean" for p in parameters.index] +
[f"{p}_std" for p in parameters.index] +
["overall_std"]
),
dtype = float,
)
num_epochs = len(epochs)
# Set parameters' values to the best results
ns = len(parameters)
parameters["value"] = results.iloc[results["error"].idxmin()][:-1]
parameters["sigma"] = epochs.iloc[-1, ns:ns + ns].to_numpy()
parameters_scaled["value"] = results_scaled.iloc[
results_scaled["error"].idxmin()
][:-1]
parameters_scaled["sigma"] = epochs_scaled.iloc[
-1, ns:ns + ns
].to_numpy()
# Set class attributes
self.paths = paths
self.parameters = parameters
self.parameters_scaled = parameters_scaled
self.scaling = scaling
self.population = population
self.num_epochs = num_epochs
self.target = target
self.seed = seed
self.epochs = epochs
self.epochs_scaled = epochs_scaled
self.results = results
self.results_scaled = results_scaled
[docs] @staticmethod
def empty():
'''Create an empty `AccessData` object that you can set attributes
to directly.
Examples
--------
Create an empty `AccessData` object:
>>> import coexist
>>> data = coexist.AccessData.empty()
'''
return AccessData.__new__(AccessData)
[docs] @staticmethod
def read(access_path = "."):
'''Read in data generated by ``coexist.Access``; the `access_path` can
be either the "`access_seed<hash>`" directory itself, or its
parent directory.
Here for backwards-compatibility; you can instantiate the class
directly with the ``access_path``, e.g. ``AccessData(".")``.
'''
return AccessData(access_path)
[docs] def legacy(self, access_path):
'''Read in data from legacy coexist-0.1.0 ACCES format; this is
normally called automatically by ``AccessData.read``.
'''
# Check all legacy files exist
legacy_files = ["access_code.py", "access_info.pickle"]
if any(not os.path.isfile(os.path.join(access_path, f))
for f in legacy_files):
raise FileNotFoundError(textwrap.fill((
f"The legacy AccessData files `{legacy_files}` were not found "
f"in `{access_path}`."
)))
# Find legacy history file
history_finder = re.compile(r"opt_history_[0-9]+\.csv")
for f in os.listdir(access_path):
if history_finder.search(f):
history_path = os.path.join(access_path, f)
history_scaled_path = (
history_path.split(".csv")[0] + "_scaled.csv"
)
num_solutions = int(
re.split(r"opt_history_|\.csv", history_path)[1]
)
break
else:
raise FileNotFoundError(textwrap.fill((
f"No legacy history file was found in `{access_path}`."
)))
with open(os.path.join(access_path, "access_info.pickle"), "rb") as f:
access_info = pickle.load(f)
history = np.loadtxt(history_path)
history_scaled = np.loadtxt(history_scaled_path)
# Translate legacy data into modern format
notfound = AccessFileNotFoundLegacy()
paths = AccessPaths(
directory = access_path,
results = os.path.join(access_path, "simulations"),
outputs = os.path.join(access_path, "outputs"),
script = os.path.join(access_path, "access_code.py"),
setup = notfound,
epochs = notfound,
epochs_scaled = notfound,
history = history_path,
history_scaled = history_scaled_path,
)
parameters = access_info.parameters
population = num_solutions
num_epochs = len(history) // population
target = access_info.target_sigma
seed = access_info.random_seed
# Infer scaled parameters
pop = population
nparams = len(parameters)
scaling = np.mean(
history[:, :nparams] / history_scaled[:, :nparams],
axis = 0,
)
parameters_scaled = parameters.copy()
for i in range(len(parameters_scaled.columns)):
parameters_scaled.iloc[:, i] /= scaling
# Infer epochs data
means = np.array([
history[i * pop:i * pop + pop, :nparams].mean(axis = 0)
for i in range(num_epochs)
])
stds = history[::pop, nparams:2 * nparams]
overall_stds = history[::pop, -2]
epochs = pd.DataFrame(
np.c_[means, stds, overall_stds],
columns = (
[f"{p}_mean" for p in parameters.index] +
[f"{p}_std" for p in parameters.index] +
["overall_std"]
),
dtype = float,
)
means = np.array([
history_scaled[i * pop:i * pop + pop, :nparams].mean(axis = 0)
for i in range(num_epochs)
])
stds = history_scaled[::pop, nparams:2 * nparams]
overall_stds = history_scaled[::pop, -2]
epochs_scaled = pd.DataFrame(
np.c_[means, stds, overall_stds],
columns = (
[f"{p}_mean" for p in parameters.index] +
[f"{p}_std" for p in parameters.index] +
["overall_std"]
),
dtype = float,
)
# Translate history data
results = pd.DataFrame(
history[:, list(range(nparams)) + [-1]],
columns = parameters.index.to_list() + ["error"],
dtype = float,
)
results_scaled = pd.DataFrame(
history_scaled[:, list(range(nparams)) + [-1]],
columns = parameters.index.to_list() + ["error"],
dtype = float,
)
# Set parameters' values to the best results
ns = len(parameters)
parameters["value"] = results.iloc[results["error"].idxmin()][:-1]
parameters["sigma"] = epochs.iloc[-1, ns:ns + ns].to_numpy()
parameters_scaled["value"] = results_scaled.iloc[
results_scaled["error"].idxmin()
][:-1]
parameters_scaled["sigma"] = epochs_scaled.iloc[
-1, ns:ns + ns
].to_numpy()
# Set class attributes
self.paths = paths
self.parameters = parameters
self.parameters_scaled = parameters_scaled
self.scaling = scaling
self.population = population
self.num_epochs = num_epochs
self.target = target
self.seed = seed
self.epochs = epochs
self.epochs_scaled = epochs_scaled
self.results = results
self.results_scaled = results_scaled
[docs] def copy(self):
'''Return copy of `AccessData` object.
'''
data = AccessData.empty()
data.paths = self.paths.copy()
data.parameters = self.parameters.copy()
data.parameters_scaled = self.parameters_scaled.copy()
data.scaling = self.scaling.copy()
data.population = self.population
data.num_epochs = self.num_epochs
data.target = self.target
data.seed = self.seed
data.epochs = self.epochs.copy()
data.epochs_scaled = self.epochs_scaled.copy()
data.results = self.results.copy()
data.results_scaled = self.results_scaled.copy()
return data
[docs] def save(self, dirname):
'''Save `AccessData` to a new directory at `dirname`.
'''
# Copy previous folder to new location
shutil.copytree(self.paths.directory, dirname)
self.paths.update_paths(dirname)
# Save history
to_pad = len(self.results.columns) - len(self.parameters) - 1
columns = self.parameters.index.to_list() + [
f"error{i}" for i in range(to_pad)
] + ["error"]
np.savetxt(
self.paths.history,
self.results.to_numpy(),
header = " ".join(columns),
)
np.savetxt(
self.paths.history_scaled,
self.results_scaled.to_numpy(),
header = " ".join(columns),
)
# Save epochs
np.savetxt(
self.paths.epochs,
self.epochs.to_numpy(),
header = " ".join(
[f"{p}_mean" for p in self.parameters.index] +
[f"{p}_std" for p in self.parameters.index] +
["overall_std"]
),
)
np.savetxt(
self.paths.epochs_scaled,
self.epochs_scaled.to_numpy(),
header = " ".join(
[f"{p}_mean" for p in self.parameters.index] +
[f"{p}_std" for p in self.parameters.index] +
["overall_std"]
),
)
# Save setup
setup_dict = dict(
paths = self.paths.__dict__,
setup = dict(
parameters = self.parameters.to_dict(),
parameters_scaled = self.parameters_scaled.to_dict(),
scaling = self.scaling.tolist(),
population = self.population,
target = self.target,
seed = self.seed,
),
)
with open(self.paths.setup, "w") as f:
toml.dump(setup_dict, f)
def __getitem__(self, index):
# Select AccessData epochs
if isinstance(index, int):
# Allow negative indices
while index < 0:
index += self.num_epochs
if index >= self.num_epochs:
raise IndexError(textwrap.fill((
f"The index=`{index}` is out of bounds for AccessData "
f"with {self.num_epochs} epochs."
)))
data = self.copy()
data.num_epochs = 1
data.epochs = self.epochs.iloc[index:index + 1]
data.epochs_scaled = self.epochs_scaled.iloc[index:index + 1]
data.results = self.results.iloc[
index * self.population:(index + 1) * self.population
]
data.results_scaled = self.results_scaled.iloc[
index * self.population:(index + 1) * self.population
]
return data
elif isinstance(index, slice):
if index.step is not None and index.step != 1:
raise ValueError(textwrap.fill((
"Indexing with a `slice.step` is not yet available. "
"If this would be useful for you please get in touch."
)))
start = index.start if index.start is not None else 0
stop = index.stop if index.stop is not None else self.num_epochs
# Allow negative indices
while start < 0:
stop += self.num_epochs
while stop < 0:
stop += self.num_epochs
if stop > self.num_epochs or start >= self.num_epochs or \
start >= stop:
raise IndexError(textwrap.fill((
f"The slice=`{start}:{stop}` is out of bounds for "
f"AccessData with {self.num_epochs} epochs."
)))
data = self.copy()
data.num_epochs = stop - start
data.epochs = self.epochs.iloc[start:stop]
data.epochs_scaled = self.epochs_scaled.iloc[start:stop]
data.results = self.results.iloc[
start * self.population:stop * self.population
]
data.results_scaled = self.results_scaled.iloc[
start * self.population:stop * self.population
]
return data
else:
raise TypeError(textwrap.fill((
"Epoch selection via subscripting is only possible with "
"integer / slice indices (e.g. `access_data[5]` or "
"`access_data[2:5]`). Received index with type "
f"`{type(index)}`."
)))
def __repr__(self):
name = self.__class__.__name__
underline = "-" * 80
def wrap(text, prep = 30):
return textwrap.fill(
text, width = 80,
initial_indent = prep * " ",
subsequent_indent = prep * " ",
)[prep:]
'''Combine the column data for the relevant parameters.
'''
cols = wrap(", ".join(self.epochs.columns))
epochs = f"DataFrame({cols})"
cols = wrap(", ".join(self.epochs_scaled.columns))
epochs_scaled = f"DataFrame({cols})"
cols = wrap(", ".join(self.results.columns))
results = f"DataFrame({cols})"
cols = wrap(", ".join(self.results_scaled.columns))
results_scaled = f"DataFrame({cols})"
parameters = str(self.parameters).split("\n")
parameters = "\n".join(
parameters[0:1] +
[20 * " " + p for p in parameters[1:]]
)
parameters_scaled = str(self.parameters_scaled).split("\n")
parameters_scaled = "\n".join(
parameters_scaled[0:1] +
[20 * " " + p for p in parameters_scaled[1:]]
)
docstr = (
f"{name}\n"
f"{underline}\n"
f"paths {self.paths.__class__.__name__}(...)\n"
f"parameters {parameters}\n"
f"parameters_scaled {parameters_scaled}\n"
f"scaling {self.scaling}\n"
f"population {self.population}\n"
f"num_epochs {self.num_epochs}\n"
f"target {self.target}\n"
f"seed {self.seed}\n"
f"epochs {epochs}\n"
f"epochs_scaled {epochs_scaled}\n"
f"results {results}\n"
f"results_scaled {results_scaled}\n"
)
# Add vertical line
docstr = docstr.split("\n")
for i in range(2, len(docstr) - 1):
d = docstr[i]
docstr[i] = d[:18] + "╎" + d[19:]
return "\n".join(docstr)
def find_access_path(path):
'''Locate the `access_seed<seed>` directory.
'''
finder = re.compile(r"access_seed[0-9]+")
# The directory itself
if finder.match(path):
return path
# The parent
matched = [f for f in os.listdir(path) if finder.match(f)]
if len(matched) == 1:
return os.path.join(path, matched[0])
elif len(matched) > 1:
raise RuntimeError((
f"Multiple ACCES directories were found at `{path}`:\n"
f"{matched}\n\n"
"Use the full path to the ACCES directory you want."
))
# If no `access_seed<seed>` was found, return the path as is in case it was
# renamed but ACCES files are inside still
return path
class AccessCoupled:
def __init__(
self,
simulations: Simulation,
scheduler = [sys.executable],
max_workers = None,
):
'''`Access` class constructor.
Parameters
----------
simulations: Simulation or list[Simulation]
The particle simulation object, implementing the
`coexist.Simulation` interface (e.g. `LiggghtsSimulation`).
Alternatively, this can be a *list of simulations*, in which case
multiple simulations will be run **for the same parameter
combination tried**. This allows the implementation of error
functions using multiple simulation regimes.
scheduler: list[str], default [sys.executable]
The executable that will run individual simulations as separate
Python scripts. In the simplest case, it would be just
`["python3"]`. Alternatively, this can be used to schedule
simulations on a cluster / supercomputer, e.g.
`["srun", "-n", "1", "python3"]` for SLURM. Each list element is
a piece of a shell command that will be run.
max_workers: int, optional
The maximum number of threads used by the main Python script to
run the error function on each simulation result. If `None`, the
output from `len(os.sched_getaffinity(0))` is used.
'''
# Type-checking inputs
def error_simulations(simulations):
# Print error message if the input `simulations` does not have the
# required type
raise TypeError(textwrap.fill((
"The `simulation` input parameter must be an instance of "
"`coexist.Simulation` (or a subclass thereof) or a list "
f"of simulations. Received type `{type(simulations)}`."
)))
if isinstance(simulations, Simulation):
self.simulations = [simulations]
elif hasattr(simulations, "__iter__"):
if not len(simulations) >= 1:
error_simulations(simulations)
params = simulations[0].parameters
for sim in simulations:
if not isinstance(sim, Simulation):
error_simulations(simulations)
if not params.equals(sim.parameters):
raise ValueError((
"The simulation parameters (`Simulation.parameters` "
"attribute) must be the same across all simulations "
"in the input list. The two unequal parameters are:\n"
f"{params}\n\n"
f"{sim.parameters}\n"
))
self.simulations = simulations
else:
error_simulations(simulations)
# Setting class attributes
self.scheduler = list(scheduler)
if max_workers is None:
self.max_workers = len(os.sched_getaffinity(0))
else:
self.max_workers = int(max_workers)
# These are the constant class attributes relevant for a single
# ACCESS run. They will be set in the `learn` method
self.rng = None
self.error = None
self.start_times = None
self.end_times = None
self.num_checkpoints = None
self.num_solutions = None
self.target_sigma = None
self.use_historical = None
self.save_positions = None
self.save_path = None
self.simulations_path = None
self.outputs_path = None
self.classes_path = None
self.sim_classes_paths = None
self.sim_paths = None
self.history_path = None
self.history_scaled_path = None
self.random_seed = None
self.verbose = None
# Message printed to the stdout and stderr by spawned OS processes
self._stdout = None
self._stderr = None
def learn(
self,
error,
start_times,
end_times,
num_checkpoints = 100,
num_solutions = 10,
target_sigma = 0.1,
use_historical = True,
save_positions = True,
random_seed = None,
verbose = True,
):
# Type-checking inputs
if not callable(error):
raise TypeError(textwrap.fill((
"The input `error` must be a function (i.e. callable). "
f"Received `{error}` with type `{type(error)}`."
)))
self.error = error
if hasattr(start_times, "__iter__"):
self.start_times = [float(st) for st in start_times]
else:
self.start_times = [float(start_times) for _ in self.simulations]
if hasattr(end_times, "__iter__"):
self.end_times = [float(et) for et in end_times]
else:
self.end_times = [float(end_times) for _ in self.simulations]
if len(self.start_times) != len(self.simulations) or \
len(self.end_times) != len(self.simulations):
raise ValueError(textwrap.fill((
"The input `start_times` and `end_times` must have the same "
"number of elements as the number of simulations. Received "
f"{len(self.start_times)} start times / {len(self.end_times)} "
f"end times for {len(self.simulations)} simulations."
)))
if hasattr(num_checkpoints, "__iter__"):
self.num_checkpoints = [int(nc) for nc in num_checkpoints]
else:
self.num_checkpoints = [
int(num_checkpoints)
for _ in self.simulations
]
if len(self.num_checkpoints) != len(self.simulations):
raise ValueError(textwrap.fill((
"The input `num_checkpoints` must be either a single value or "
"a list with the same number of elements as the number of "
f"simulations. Received {len(self.num_checkpoints)} "
f"checkpoints for {len(self.simulations)} simulations."
)))
self.num_solutions = int(num_solutions)
self.target_sigma = float(target_sigma)
self.use_historical = bool(use_historical)
self.save_positions = bool(save_positions)
self.random_seed = random_seed
self.verbose = bool(verbose)
# Setting constant class attributes
self.rng = np.random.default_rng(self.random_seed)
# Aliases
sims = self.simulations
rng = self.rng
# The random hash that represents this optimisation run. If a random
# seed was specified, this will make the simulation and optimisations
# fully deterministic (i.e. repeatable)
rand_hash = str(round(abs(rng.random() * 1e6)))
self.save_path = f"access_info_{rand_hash}"
self.simulations_path = f"{self.save_path}/simulations"
self.classes_path = f"{self.save_path}/classes"
self.outputs_path = f"{self.save_path}/outputs"
self.sim_classes_paths = [
f"{self.classes_path}/simulation_{i}_class.pickle"
for i in range(len(self.simulations))
]
self.sim_paths = [
f"{self.classes_path}/simulation_{i}"
for i in range(len(self.simulations))
]
# Check if we have historical data about the optimisation - these are
# pre-computed values for this exact simulation, random seed, and
# number of solutions
self.history_path = (
f"{self.save_path}/opt_history_{self.num_solutions}.csv"
)
self.history_scaled_path = (
f"{self.save_path}/opt_history_{self.num_solutions}_scaled.csv"
)
# Create all required paths above if they don't exist already
self.create_directories()
# History columns: [param1, param2, ..., stddev_param1, stddev_param2,
# ..., stddev_all, error_value]
if use_historical and os.path.isfile(self.history_path):
history = np.loadtxt(self.history_path, dtype = float)
elif use_historical:
history = []
else:
history = None
# Scaling and unscaling parameter values introduce numerical errors
# that confuse the optimiser. Thus save unscaled values separately
if self.use_historical and os.path.isfile(self.history_scaled_path):
history_scaled = np.loadtxt(
self.history_scaled_path, dtype = float
)
elif self.use_historical:
history_scaled = []
else:
history_scaled = None
# Minimum and maximum possible values for the DEM parameters
params_mins = sims[0].parameters["min"].to_numpy()
params_maxs = sims[0].parameters["max"].to_numpy()
# If any `sigma` value is smaller than 5% (max - min), clip it
for sim in sims:
sim.parameters["sigma"].clip(
lower = 0.05 * (params_maxs - params_mins),
inplace = True,
)
# Scale sigma, bounds, solutions, results to unit variance
scaling = sims[0].parameters["sigma"].to_numpy()
# First guess, scaled
x0 = sims[0].parameters["value"].to_numpy() / scaling
sigma0 = 1.0
bounds = [
params_mins / scaling,
params_maxs / scaling
]
# Instantiate CMA-ES optimiser
es = cma.CMAEvolutionStrategy(x0, sigma0, dict(
bounds = bounds,
popsize = self.num_solutions,
randn = lambda *args: rng.standard_normal(args),
verbose = 3 if self.verbose else -9,
))
# Start optimisation: ask the optimiser for parameter combinations
# (solutions), run the simulations between `start_index:end_index` and
# feed the results back to CMA-ES.
epoch = 0
while not es.stop():
solutions = es.ask()
# If we have historical data, inject it for each epoch
if self.use_historical and \
epoch * self.num_solutions < len(history_scaled):
self.inject_historical(es, history_scaled, epoch)
epoch += 1
if self.finished(es):
break
continue
if self.verbose:
self.print_before_eval(es, solutions)
results = self.try_solutions(solutions * scaling, epoch)
es.tell(solutions, results)
epoch += 1
# Save every step's historical data as function evaluations are
# very expensive. Save columns [param1, param2, ..., stdev_param1,
# stdev_param2, ..., stdev_all, error_val].
if self.use_historical:
if not isinstance(history, list):
history = list(history)
for sol, res in zip(solutions, results):
history.append(
list(sol * scaling) +
list(es.result.stds * scaling) +
[es.sigma, res]
)
np.savetxt(self.history_path, history)
# Save scaled values separately to avoid numerical errors
if not isinstance(history_scaled, list):
history_scaled = list(history_scaled)
for sol, res in zip(solutions, results):
history_scaled.append(
list(sol) +
list(es.result.stds) +
[es.sigma, res]
)
np.savetxt(self.history_scaled_path, history_scaled)
if self.verbose:
self.print_after_eval(es, solutions, scaling, results)
if self.finished(es):
break
solutions = es.result.xbest * scaling
stds = es.result.stds * scaling
if self.verbose:
print(f"Best results for solutions: {solutions}", flush = True)
# Run the simulation with the best parameters found and save the
# results to disk. Return the paths to the saved values
radii_paths, positions_paths, velocities_paths = \
self.run_simulation_best(solutions, stds)
return radii_paths, positions_paths, velocities_paths
def run_simulation_best(self, solutions, stds):
'''Save the radii, positions, and velocities for the simulations with
the best parameter values.
'''
# Path for saving the simulations with the best parameters
best_path = f"{self.save_path}/best"
if not os.path.isdir(best_path):
os.mkdir(best_path)
# Paths for saving the best simulations' outputs
best_radii_paths = [
f"{best_path}/best_radii_{i}.npy"
for i in range(len(self.simulations))
]
best_positions_paths = [
f"{best_path}/best_positions_{i}.npy"
for i in range(len(self.simulations))
]
best_velocities_paths = [
f"{best_path}/best_velocities_{i}.npy"
for i in range(len(self.simulations))
]
# Change sigma, min and max based on optimisation results
param_names = self.simulations[0].parameters.index
for sim in self.simulations:
sim.parameters["sigma"] = stds
# Change parameters to the best solution
for i, sol_val in enumerate(solutions):
for sim in self.simulations:
sim[param_names[i]] = sol_val
# Run each simulation with the best parameters found. This cannot be
# done in parallel as the simulation library might be thread-unsafe
for i, sim in enumerate(self.simulations):
if self.verbose:
print((
f"Running the simulation at index {i} with the best "
"parameter values found..."
))
checkpoints = np.linspace(
self.start_times[i],
self.end_times[i],
self.num_checkpoints[i],
)
positions = []
velocities = []
for t in checkpoints:
sim.step_to_time(t)
positions.append(sim.positions())
velocities.append(sim.velocities())
radii = sim.radii()
positions = np.array(positions, dtype = float)
velocities = np.array(velocities, dtype = float)
np.save(best_radii_paths[i], radii)
np.save(best_positions_paths[i], positions)
np.save(best_velocities_paths[i], velocities)
error = self.error(
best_radii_paths,
best_positions_paths,
best_velocities_paths,
)
if self.verbose:
print((f"Error (computed by the `error` function) for solution: "
f"{error}\n---"), flush = True)
return best_radii_paths, best_positions_paths, best_velocities_paths
def create_directories(self):
# Save the current simulation state in a `restarts` folder
if not os.path.isdir(self.save_path):
os.mkdir(self.save_path)
# Save positions and parameters in a new folder inside `restarts`
if not os.path.isdir(self.simulations_path):
os.mkdir(self.simulations_path)
# Save classes objects in a new folder inside `restarts`
if not os.path.isdir(self.classes_path):
os.mkdir(self.classes_path)
# Save simulation outputs (stderr and stdout) in a new folder
if not os.path.isdir(self.outputs_path):
os.mkdir(self.outputs_path)
# Serialize the simulations' concrete class so that it can be
# reconstructed even if it is a `coexist.Simulation` subclass
for i in range(len(self.simulations)):
with open(self.sim_classes_paths[i], "wb") as f:
pickle.dump(self.simulations[i].__class__, f)
# Save current checkpoint and extra data for parallel computation
self.simulations[i].save(self.sim_paths[i])
infofile = f"{self.save_path}/opt_run_info.txt"
with open(infofile, "a", encoding = "utf-8") as f:
now = datetime.now().strftime("%H:%M:%S - %D")
f.writelines([
"--------------------------------------------------------\n",
f"Starting ACCESS run at {now}\n\n",
"Simulations:\n"
f"{self.simulations}\n\n",
f"start_times = {self.start_times}\n",
f"end_times = {self.end_times}\n",
f"num_checkpoints = {self.num_checkpoints}\n",
f"target_sigma = {self.target_sigma}\n",
f"num_solutions = {self.num_solutions}\n",
f"random_seed = {self.random_seed}\n",
f"use_historical = {self.use_historical}\n",
f"save_positions = {self.save_positions}\n\n",
f"save_path = {self.save_path}\n",
f"simulations_path = {self.simulations_path}\n",
f"classes_path = {self.classes_path}\n",
f"outputs_path = {self.outputs_path}\n",
f"sim_classes_paths = {self.sim_classes_paths}\n",
f"sim_paths = {self.sim_paths}\n\n",
f"history_path = {self.history_path}\n",
f"history_scaled_path = {self.history_scaled_path}\n",
"--------------------------------------------------------\n\n",
])
def inject_historical(self, es, history_scaled, epoch):
'''Inject the CMA-ES optimiser with pre-computed (historical) results.
The solutions must have a Gaussian distribution in each problem
dimension - though the standard deviation can vary for each of them.
Ideally, this should only use historical values that CMA-ES asked for
in a previous ACCESS run.
'''
ns = self.num_solutions
num_params = len(self.simulations[0].parameters)
results_scaled = history_scaled[(epoch * ns):(epoch * ns + ns)]
es.tell(results_scaled[:, :num_params], results_scaled[:, -1])
if self.verbose:
print((
f"Injected {(epoch + 1) * len(results_scaled)} / "
f"{len(history_scaled)} historical solutions."
))
def print_before_eval(self, es, solutions):
'''Print the individual and overal scaled standard deviations along
with the parameter combinations to try.
'''
print((
f"Scaled overall standard deviation: {es.sigma}\n"
f"Scaled individual standard deviations:\n{es.result.stds}"
f"\n\nTrying {len(solutions)} parameter combinations..."
), flush = True)
def print_after_eval(
self,
es,
solutions,
scaling,
results,
):
# Display evaluation results: solutions, error values, etc.
cols = list(self.simulations[0].parameters.index) + ["error"]
sols_results = np.hstack((
solutions * scaling,
results[:, np.newaxis],
))
# Store solutions and results in a DataFrame for easy pretty printing
sols_results = pd.DataFrame(
data = sols_results,
columns = cols,
index = None,
)
# Display all the DataFrame columns and rows
old_max_columns = pd.get_option("display.max_columns")
old_max_rows = pd.get_option("display.max_rows")
pd.set_option("display.max_columns", None)
pd.set_option("display.max_rows", None)
print((
f"{sols_results}\n"
f"Function evaluations: {es.result.evaluations}\n---"
), flush = True)
pd.set_option("display.max_columns", old_max_columns)
pd.set_option("display.max_rows", old_max_rows)
def finished(self, es):
'''If the overal sigma value is less than the target sigma value,
finish the simulation.
'''
if es.sigma < self.target_sigma:
if self.verbose:
print((
"Optimal solution found within `target_sigma`, i.e. "
f"{self.target_sigma * 100}%:\n"
f"sigma = {es.sigma} < {self.target_sigma}"
), flush = True)
return True
return False
def std_outputs(self, run_index, sim_index, stdout, stderr):
'''If new errors and outputs are produced, write them to the correct
olders.
'''
# If we had new errors, write them to `error.log`.
if len(stderr) and stderr != self._stderr:
self._stderr = stderr.decode("utf-8")
error_path = (
f"{self.outputs_path}/"
f"error_{run_index}_{sim_index}.log"
)
print((
"A new error ocurred while running simulation (run "
f"{run_index} / index {sim_index}):\n"
f"{self._stderr}\n\n"
f"Writing error message to `{error_path}`\n"
))
with open(error_path, "w") as f:
f.write(self._stderr)
# If we had new outputs, write them to `output.log`
if len(stdout) and stdout != self._stdout:
self._stdout = stdout.decode("utf-8")
output_path = (
f"{self.outputs_path}/"
f"output_{run_index}_{sim_index}.log"
)
print((
"A new message was outputted while running simulation (run "
f"{run_index} / index {sim_index}):\n"
f"{self._stdout}\n\n"
f"Writing output message to `{output_path}`\n"
))
with open(output_path, "w") as f:
f.write(self._stdout)
def simulations_save_paths(self, epoch):
'''For every simulation run in every parameter combination, append the
simulation, radii, position and velocity path to a corresponding list.
'''
sim_paths = []
radii_paths = []
positions_paths = []
velocities_paths = []
start_index = epoch * self.num_solutions
# For every parameter combination...
for i in range(self.num_solutions):
sim_run = []
radii_run = []
positions_run = []
velocities_run = []
# For every simulation run...
for j in range(len(self.simulations)):
sim_run.append((
f"{self.simulations_path}/"
f"opt_{start_index + i}_{j}"
))
radii_run.append((
f"{self.simulations_path}/"
f"opt_{start_index + i}_{j}_radii.npy"
))
positions_run.append((
f"{self.simulations_path}/"
f"opt_{start_index + i}_{j}_positions.npy"
))
velocities_run.append((
f"{self.simulations_path}/"
f"opt_{start_index + i}_{j}_velocities.npy"
))
sim_paths.append(sim_run)
radii_paths.append(radii_run)
positions_paths.append(positions_run)
velocities_paths.append(velocities_run)
return sim_paths, radii_paths, positions_paths, velocities_paths
def try_solutions(self, solutions, epoch):
'''For every solution to try and simulate a run, start a separate OS
process that runs the `async_access_error.py` file and saves the
positions in a `.npy` file.
'''
# Aliases
param_names = self.simulations[0].parameters.index
# Path to `async_access_error.py`
async_xi = os.path.join(
os.path.split(coexist.__file__)[0],
"async_access_error.py"
)
processes = []
# These are all lists of lists: axis 0 is the parameter combination to
# try, while axis 1 is the particular simulation to run
sim_paths, radii_paths, positions_paths, velocities_paths = \
self.simulations_save_paths(epoch)
# Catch the KeyboardInterrupt (Ctrl-C) signal to shut down the spawned
# processes before aborting.
try:
# Run all simulations in `self.simulations` for each parameter
# combination in `solutions`
for i, sol in enumerate(solutions):
single_run_processes = []
# For every simulation in `self.simulations`, start a new proc
for j, sim in enumerate(self.simulations):
# Change parameter values to save them along with the
# full simulation state
for k, sol_val in enumerate(sol):
sim.parameters.at[param_names[k], "value"] = sol_val
sim.save(sim_paths[i][j])
single_run_processes.append(
subprocess.Popen(
self.scheduler + [ # Python interpreter path
async_xi, # async_access_error.py path
self.sim_classes_paths[j],
sim_paths[i][j],
str(self.start_times[j]),
str(self.end_times[j]),
str(self.num_checkpoints[j]),
radii_paths[i][j],
positions_paths[i][j],
velocities_paths[i][j],
],
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
)
)
processes.append(single_run_processes)
# Compute the error function values in a parallel environment.
with ProcessPoolExecutor(max_workers = self.max_workers) \
as executor:
futures = []
# Get the output from each OS process / simulation
for i, run_procs in enumerate(processes):
# Check simulations didn't crash
crashed = False
for j, proc in enumerate(run_procs):
stdout, stderr = proc.communicate()
proc_index = epoch * self.num_solutions + i
self.std_outputs(proc_index, j, stdout, stderr)
# Only load simulations if they exist - i.e. no errors
# occurred
if not (os.path.isfile(radii_paths[i][j]) and
os.path.isfile(positions_paths[i][j]) and
os.path.isfile(velocities_paths[i][j])):
print((
"At least one of the simulation files "
f"{radii_paths[i][j]}, "
f"{positions_paths[i][j]} or "
f"{velocities_paths[i][j]}, was not found; "
f"the simulation (run {i} / index {j}) most "
"likely crashed. Check the error, output and "
"LIGGGHTS logs for what went wrong. The error "
"value for this simulation run is set to NaN."
))
crashed = True
# If no simulations crashed in this run, execute the error
# function in parallel
if not crashed:
futures.append(
executor.submit(
self.error,
radii_paths[i],
positions_paths[i],
velocities_paths[i],
)
)
else:
futures.append(None)
# Crashed solutions will have np.nan as a value.
results = np.full(len(solutions), np.nan)
for i, f in enumerate(futures):
if f is not None:
results[i] = f.result()
except KeyboardInterrupt:
for proc_run in processes:
for proc in proc_run:
proc.kill()
sys.exit(130)
# If `save_positions` is not True, remove all simulation files
if not self.save_positions:
for radii_run in radii_paths:
[os.remove(rp) for rp in radii_run]
for positions_run in positions_paths:
[os.remove(pp) for pp in positions_run]
for velocities_run in velocities_paths:
[os.remove(vp) for vp in velocities_run]
return results