Source code for coexist.base

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
# File   : base.py
# License: GNU v3.0
# Author : Andrei Leonard Nicusan <a.l.nicusan@bham.ac.uk>
# Date   : 01.09.2020


import  re
import  os
import  pickle
import  textwrap
from    abc         import  ABC, abstractmethod

import  numpy       as      np
import  pandas      as      pd

from    tqdm        import  tqdm
from    pyevtk.hl   import  pointsToVTK




def save(path, obj):
    '''Save Coexist object using the binary Pickle format.
    '''
    with open(path, "wb") as f:
        pickle.dump(obj, f)
    return obj




def load(path):
    '''Load Coexist object from a binary Pickle-formatted file.
    '''
    with open(path, "rb") as f:
        obj = pickle.load(f)
    return obj




[docs]def to_vtk( dirname, positions, *, times = None, velocities = None, radii = None, verbose = True, ): '''Export particle `positions` and optionally `times`, `velocities` and `radii` to a folder `dirname` in the modern binary VTK format. Parameters ---------- dirname : str A path to the directory to save particle data to. positions : (T,) list[(P, 3) np.ndarray] A list-like of particle locations at each timestep T; each list entry represents a single timestep and should contain a 2D NumPy array with columns formatted as [x, y, z] - each row represents a particle P. times : (T,) list[float], optional A list of times for each timestep given in `positions`. velocities : (T,) list[(P, 3) np.ndarray], optional Same as `positions`, a list of T timesteps with each entry being a 2D NumPy array containing P rows and 3 columns [vx, vy, vz]. radii : (T,) list[(P,) np.ndarray], optional A list of length T (for T timesteps) with each entry containing a 1D NumPy array containing the radii of the P particles. verbose : bool, default True Print extra information while saving. ''' # Type-checking inputs num_timesteps = len(positions) for i in range(num_timesteps): positions[i] = np.asarray(positions[i]) if positions[i].ndim != 2 or positions[i].shape[1] != 3: raise ValueError(textwrap.fill(( "The input `positions` must be a list of the simulated " "particles' locations - that is, a 3D array-like with axes " "(T, P, C), where T is the number of timesteps, P is the " "number of particles, and C are the (x, y, z) coordinates. " f"Received an list with length {num_timesteps} where at " f"timestep index {i} the array of particle positions had " f"shape `{positions[i].shape}`." ))) if times is not None: times = np.asarray(times, order = "C") if times.ndim != 1 or len(times) != len(positions): raise ValueError(textwrap.fill(( "The input `times` must be a 1D list of the times at which " "the particle locations were recorded. It should have the " "same length as the input `positions` array (= " f"{len(positions)}). Received an array with shape " f"`{times.shape}`." ))) if velocities is not None: for i in range(num_timesteps): velocities[i] = np.asarray(velocities[i]) if velocities[i].ndim != 2 or velocities[i].shape[1] != 3 or \ len(velocities) != len(positions) or \ len(velocities[i]) != len(positions[i]): raise ValueError(textwrap.fill(( "The input `velocities` must be a list of the simulated " "particles' dimension-wise velocities - that is, a 3D " "array-like with axes (T, P, V), where T is the number of " "timesteps (same as for the input `positions`), P is the " "number of particles, and V are the (vx, vy, vz) " "velocities. " f"Received a list with length {num_timesteps} where at " f"timestep index {i} the array of particle velocities had " f"shape `{velocities[i].shape}`." ))) if radii is not None: for i in range(num_timesteps): radii[i] = np.asarray(radii[i]) if radii[i].ndim != 1 or len(radii) != len(positions) or \ len(radii[i]) != len(positions[i]): raise ValueError(textwrap.fill(( "The input `radii` must be a list of the simulated " "particles' radii - that is, a 2D array-like with axes " "(T, R), where T is the number of timesteps (same as for " "the input `positions`) and R are the radii for each " "particles." f"Received a list with length {num_timesteps} where at " f"timestep index {i} the array of particle radii had " f"shape `{radii[i].shape}`." ))) # If it doesn't exist, create directory to save VTK files if not os.path.isdir(dirname): os.mkdir(dirname) # Save each timestep in a file if verbose: positions = tqdm(positions) for i, pos in enumerate(positions): properties = dict() if times is not None: properties["time"] = np.ones(len(pos)) * times[i] if velocities is not None: vel = velocities[i] properties["velocity"] = ( np.ascontiguousarray(vel[:, 0]), np.ascontiguousarray(vel[:, 1]), np.ascontiguousarray(vel[:, 2]), ) if radii is not None: properties["radius"] = radii[i] pointsToVTK( os.path.join(dirname, f"locations_{i}"), np.ascontiguousarray(pos[:, 0]), np.ascontiguousarray(pos[:, 1]), np.ascontiguousarray(pos[:, 2]), data = properties, )
[docs]def create_parameters( variables = [], minimums = [], maximums = [], values = None, sigma = None, **kwargs, ): '''Create a ``pandas.DataFrame`` storing ``Access`` free parameters' names, bounds, and optionally starting values and relative uncertainty. This is simply a helper returning a ``pandas.DataFrame`` with the format required by e.g. ``coexist.Access``. Only the `variables`, `minimums` and `maximums` are necessary. If unset, the initial ``values`` are set to halfway between the lower and upper bounds; the initial standard deviation ``sigma`` is set to 40% of this range, so that the entire space is explored. Parameters ---------- variables : list[str], default [] A list of the free parameters' names. minimums : list[float], default [] A list with the same length as ``variables`` storing the lower bound for each corresponding variable. maximums : list[float], default [] A list with the same length as ``variables`` storing the lower bound for each corresponding variable. values : list[float], optional The optimisation starting values; not essential as ACCES samples the space randomly anyways. If unset, they are set to halfway between ``minimums`` and ``maximums``. sigma : list[float], optional The initial standard deviation in each variable, setting how far away from the initial ``values`` the parameters will be sampled; the sampling is Gaussian. If unset, they are set to 40% of the data range (i.e. ``maximums`` - ``minimums``) so that the entire space is initially explored. ACCES will adapt and minimise this uncertainty. **kwargs : other keyword arguments Other columns to include in the returned parameters DataFrame, given as other lists with the same length as ``variables``. Returns ------- pandas.DataFrame A table storing the intial ``value``, ``min``, ``max`` and ``sigma`` (columns) for each free parameter (rows). Examples -------- Create a DataFrame storing two free parameters, specifying the minimum and maximum bounds; notice that the starting guess and uncertainty are set automatically. >>> import coexist >>> parameters = coexist.create_parameters( >>> variables = ["cor", "separation"], >>> minimums = [-3, -7], >>> maximums = [+5, +3], >>> ) >>> parameters value min max sigma cor 1.0 -3.0 5.0 3.2 separation -2.0 -7.0 3.0 4.0 ''' if not (len(variables) == len(minimums) == len(maximums)) or \ (values is not None and len(values) != len(variables)) or \ (sigma is not None and len(sigma) != len(variables)): raise ValueError(textwrap.fill( '''The input iterables `variables`, `minimums`, `maximums`, `values` and `sigma` (if defined), must all have the same length. ''' )) minimums = np.array(minimums, dtype = float) maximums = np.array(maximums, dtype = float) if values is None: values = (maximums + minimums) / 2 else: values = np.array(values, dtype = float) if (minimums >= maximums).any(): raise ValueError(textwrap.fill( '''Found value in `maximums` that was smaller or equal than the corresponding value in `minimums`.''' )) if sigma is None: sigma = 0.4 * (maximums - minimums) parameters = pd.DataFrame( data = { "value": values, "min": minimums, "max": maximums, "sigma": sigma, **kwargs, }, index = variables, ) return parameters
class Parameters(pd.DataFrame): '''Pandas DataFrame subclass with a custom constructor for dynamically changing LIGGGHTS simulation parameters - for `Coexist`. In order to dynamically change LIGGGHTS simulation parameters, a macro command must be run (e.g. `liggghts.command(fix m1 all property/global youngsModulus peratomtype 0.8e9)`). This class saves the data needed to modify simulation parameters in a DataFrame: 1. The required *command template* to change a given parameter using LIGGGHTS equal-style variables. E.g. "fix m1 all property/global youngsModulus peratomtype ${youngmodP} ${youngmodP} ${youngmodP}" is a LIGGGHTS command which uses the ${youngmodP} variable. 2. Per-parameter initial guesses. 3. Per-parameter lower bounds (i.e. minimum valid value), optional. 4. Per-parameter upper bounds (i.e. maximum valid value), optional. All those values are indexed by the parameter name. Below is an example that shows how to construct a `Parameters` class containing a hypothetical simulation's properties that will be dynamically changed. Examples -------- In the example below, the command to change a single simulation parameter contains other variables which we won't modify: .. code-block:: python parameters = coexist.Parameters( variables = ["fricPP", "corPP"], commands = [ "fix m4 all property/global coefficientFriction \ peratomtypepair 3 ${fricPP} ${fricPW} ${fricPSW} \ ${fricPW} ${fric} ${fric} \ ${fricPSW} ${fric} ${fric} ", "fix m3 all property/global coefficientRestitution \ peratomtypepair 3 ${corPP} ${corPP} ${corPP} \ ${corPP} ${corPP} ${corPP} \ ${corPP} ${corPP} ${corPP} ", ], values = [0.5, 0.5], minimums = [0.0, 0.0], maximums = [1.0, 1.0], ) parameters command value min max corPP fix m3 all property/global coefficientRes... 0.5 None 0.0 corPW fix m3 all property/global coefficientRes... 0.5 None 1.0 Notes ----- As this class inherits from `pandas.DataFrame`, all methods from it are available after instantiation - only the constructor is custom. ''' def __init__( self, *args, variables = [], commands = [], values = None, minimums = [], maximums = [], sigma = None, **kwargs, ): '''`Parameters` class constructor. Parameters ---------- variables : list[str], default [] An iterable containing the LIGGGHTS variable names that will be used for changing simulation parameters. commands : list[str], default [] An iterable containing the macro commands required to modify LIGGGHTS simulation parameters, containing the variable names as `${varname}`. E.g. `"fix m1 all property/global youngsModulus peratomtype ${youngmodP} ${youngmodP} ${youngmodP}"`. values : list[float], default [] An iterable containing the initial values for each LIGGGHTS simulation parameter. minimums : list[float], default [] An iterable containing the lower bounds for each LIGGGHTS parameter. For non-existing bounds, use `None`, in which case also define a `sigma0`. maximums : list[float], default [] An iterable containing the upper bounds for each LIGGGHTS parameter. For non-existing bounds, use `None`, in which case also define a `sigma0`. sigma : list[float], optional The standard deviation of the first population of solutions tried by the CMA-ES optimisation algorithm. If unset, it is computed as `0.2 * (maximum - minimum)`. ''' pd.DataFrame.__init__(self, *args, **kwargs) if not (len(variables) == len(commands) == len(minimums) == len(maximums)) or \ (values is not None and len(values) != len(variables)): raise ValueError(textwrap.fill( '''The input iterables `variables`, `commands`, `values` (if defined), `minimums` and `maximums` must all have the same length.''' )) minimums = np.array(minimums, dtype = float) maximums = np.array(maximums, dtype = float) if values is None: values = (maximums + minimums) / 2 else: values = np.array(values, dtype = float) if (minimums >= maximums).any(): raise ValueError(textwrap.fill( '''Found value in `maximums` that was smaller or equal than the corresponding value in `minimums`.''' )) if sigma is None: sigma = 0.4 * (maximums - minimums) elif len(sigma) != len(variables): raise ValueError(textwrap.fill( '''If defined, `sigma` must have the same length as the other input parameters.''' )) for cmd in commands: if re.search(r"\$\{\w+\}", cmd) is None: raise ValueError(textwrap.fill( '''The strings in the input `commands` must contain at least one substring `"${varname}"` (e.g. `"fix m2 all property/global poissonsRatio peratomtype ${poissP} ${poissP} ${poissP}"`), in which the right value will be substituted when running the LIGGGHTS command.''' )) self["command"] = commands self["value"] = values self["min"] = minimums self["max"] = maximums self["sigma"] = sigma self.index = variables def copy(self, *args, **kwargs): parameters_copy = Parameters( variables = self.index.copy(), commands = self["command"].copy(), values = self["value"].copy(), minimums = self["min"].copy(), maximums = self["max"].copy(), sigma = self["sigma"].copy(), ) return parameters_copy class Experiment: '''Class encapsulating an experiment's recorded particle positions at multiple timesteps for `Coexist`. For a single timestep, the particles in a system can be represented as a 2D array with columns [x, y, z]. For multiple timesteps, the positions arrays can be stacked, yielding a 3D array with shape (T, N, 3), where T is the number of timesteps, N is the number of particles, plus three columns for their cartesian coordinates. This class contains the recorded timesteps and the stacked 3D array of particle positions. It is used for the ground truth data that another DEM simulation will learn the parameters of. ''' def __init__( self, times, positions_all, resolution, **kwargs ): '''`Experiment` class constructor. Parameters ---------- times : list[float] A list (or vector) of timestamp values for all positions recorded in the experiment. positions_all : (T, N, M>=3) numpy.ndarray A 3D array-like with dimensions (T, N, M>=3), where T is the number of timesteps (corresponding exactly to the length of `times`), N is the number of particles in the system and M is the number of data columns per particle (at least 3 for their [x, y, z] coordinates, but can contain extra columns). resolution : float The expected error on the positions recorded. kwargs : keyword arguments Extra data that should be attached to this class as new attributes (e.g. "exp.droplets = 5" creates a new attribute). Raises ------ ValueError If `times` is not a flat list-like, or `times` contains repeated values, or `positions_all` does not have exactly three dimensions and at least 3 columns (axis 2), or `len(times) != len(positions_all)`. ''' times = np.asarray(times, dtype = float, order = "C") positions_all = np.asarray(positions_all, dtype = float, order = "C") if times.ndim != 1: raise ValueError(textwrap.fill(( "The `times` input parameter must have exactly one dimension. " f"Received `ndim = {times.ndim}`." ))) if len(np.unique(times)) != len(times): raise ValueError(textwrap.fill(( "The `times` input parameter must have only unique values. " f"Found {len(times) - len(np.unique(times))} repeated values." ))) if positions_all.ndim != 3 or positions_all.shape[2] < 3: raise ValueError(textwrap.fill(( "The `positions_all` input parameter must have exactly three " "dimensions and at least three columns. Received " f"`shape = {positions_all.shape}`." ))) if len(times) != len(positions_all): raise ValueError(textwrap.fill(( "The input `positions_all` parameter must be a 3D array with " "the particles' positions at every timestep in `times`. The " "shape of `positions_all` should then be (T, N, 3), where T " "is the number of timesteps, N is the number of particles, " "plus three columns for x, y, z coordinates. Received " f"{len(times)} timesteps in `times` and {len(positions_all)} " "stacked arrays in `positions_all`." ))) self.times = times self.positions_all = positions_all self.resolution = float(resolution) for k, v in kwargs: self.k = v def positions(self, timestep): time_idx = np.argwhere(np.isclose(timestep, self.times)) if len(time_idx) == 0: raise ValueError(textwrap.fill(( "There are no values in `times` equal to the requested " f"timestep {timestep}." ))) return self.positions_all[time_idx[0, 0]] def __str__(self): # Shown when calling print(class) docstr = ( f"times:\n{self.times}\n\n" f"positions_all:\n{self.positions_all}" ) return docstr def __repr__(self): # Shown when writing the class on a REPL docstr = ( "Class instance that inherits from `pyCoexist.Experiment`.\n" f"Type:\n{type(self)}\n\n" "Attributes\n----------\n" f"{self.__str__()}" ) return docstr
[docs]class Simulation(ABC): '''Abstract class defining the interface a DEM simulation engine must implement to be used by the `Coexist` and `Access` algorithms. This interface is needed in order to manage a DEM simulation whose parameters will be modified dynamically as the algorithms *learn* them. Alternatively, this interface has enough features to allow setting up, running, and analysing DEM simulations in Python. Each method required is described below. As an example of an implemented LIGGGHTS front-end, see `LiggghtsSimulation`. ''' @property @abstractmethod def parameters(self): '''A `coexist.Parameters` class instance containing the free simulation parameters, along with their bounds. ''' pass @property @abstractmethod def step_size(self): '''The time duration of a single simulated step. ''' pass
[docs] @abstractmethod def step(self, num_steps: int): '''Move the simulation forward in time for `num_steps` steps. ''' pass
[docs] @abstractmethod def step_to(self, step: int): '''Move the simulation forward in time up to timestep `step`. ''' pass
[docs] @abstractmethod def step_time(self, duration: float): '''Move the simulation forward in time for `duration` seconds. ''' pass
[docs] @abstractmethod def step_to_time(self, time: float): '''Move the simulation forward up to time `time`. ''' pass
[docs] @abstractmethod def time(self) -> float: '''Return the current simulation time. ''' pass
[docs] @abstractmethod def timestep(self) -> int: '''Return the current simulation timestep. ''' pass
[docs] @abstractmethod def num_atoms(self) -> int: '''Return the number of simulated particles. ''' pass
[docs] @abstractmethod def radii(self) -> np.ndarray: '''A 1D numpy array listing the radius of each particle. ''' pass
[docs] @abstractmethod def positions(self) -> np.ndarray: '''Return a 2D numpy array of the particle positions at the current timestep, where the columns represent the cartesian coordinates. ''' pass
[docs] @abstractmethod def velocities(self) -> np.ndarray: '''Return the 2D numpy array of the particle velocities at the current timestep, where the columns represent the velocity in the x-, y-, and z-dimensions. ''' pass
[docs] @abstractmethod def set_position(self, particle_index: int, position: np.ndarray): '''Set the 3D position of the particle at an index (indexed from 0). ''' pass
[docs] @abstractmethod def set_velocity(self, particle_index: int, velocity: np.ndarray): '''Set the 3D velocity of the particle at an index (indexed from 0). ''' pass
[docs] @abstractmethod def copy(self): '''Create a deep copy of the simulation, along with all its state. It is important to be a deep copy as it will be used in asynchronous contexts. ''' pass
[docs] @abstractmethod def save(self, filename: str): '''Save the full state of a simulation to a file. ''' pass
[docs] @staticmethod @abstractmethod def load(filename: str): # -> Simulation '''Load the full state of a simulation from a file. ''' pass
@abstractmethod def __setitem__(self, key, value): '''A custom class attribute setter (i.e. called when using the subscript notation `simulation[key] = value`) that sets a simulation's free parameter value. The free parameter must already exist in the `parameters` property. ''' pass