Source code for fragmentmnp.fragmentmnp

from typing import Tuple
import numpy as np
import numpy.typing as npt
from scipy.integrate import solve_ivp
from scipy import interpolate
from schema import SchemaError
from . import validation
from .output import FMNPOutput
from ._errors import FMNPNumericalError


[docs]class FragmentMNP(): """ The class that controls usage of the FRAGMENT-MNP model Parameters ---------- config : dict Model configuration options data : dict Model input data validate: bool, default=True Should config and data be validated? It is strongly recommended to use validation, but this option is provided if you are *certain* your config and data are correct and wish to speed up model initialisation """ def __init__(self, config: dict, data: dict, validate: bool = True) -> None: """ Initialise the model """ # Validate the config and data (if we are meant to) if validate: config, data = self._validate_inputs(config, data) # If we passed validation (or aren't validating), save attributes self.config = config self.data = data # Set the number of particle size classes and timesteps, and # the times at which to store the computed solution self.n_size_classes = self.config['n_size_classes'] self.n_timesteps = self.config['n_timesteps'] self.t_eval = np.arange(0, self.n_timesteps) \ if self.config['solver_t_eval'] == 'integer' \ else self.config['solver_t_eval'] # Initial concentrations self.initial_concs = np.array(data['initial_concs']) # Set the particle phys-chem properties self.psd = self._set_psd() self.fsd = self.set_fsd(self.n_size_classes, self.psd, self.data['fsd_beta']) self.theta_1 = data['theta_1'] self.density = data['density'] self.k_diss = self._set_k_diss(data['k_diss'], config['k_diss_scaling_method'], self.psd, self.n_size_classes, data['k_diss_gamma']) self.k_frag = self._set_k_frag(data['k_frag'], self.theta_1, self.data['k_frag_tau'], self.psd, self.n_timesteps)
[docs] def run(self) -> FMNPOutput: r""" Run the model with the config and data provided at initialisation. Returns ------- :class:`fragmentmnp.output.FMNPOutput` object containing model output data. Notes ----- Internally the model numerically solves the following differential equation for each size class, to give a time series of mass concentrations `c`. `k` is the current size class, `i` are the daughter size classes. .. math:: \frac{dc_k}{dt} = -k_{\text{frag},k} c_k + \Sigma_i f_{i,k} k_{\text{frag},i} c_i - k_{\text{diss},k} c_k Here, :math:`k_{\text{frag},k}` is the fragmentation rate of size class `k`, :math:`f_{i,k}` is the fraction of daughter fragments produced from a fragmenting particle of size `i` that are of size `k`, and :math:`k_{\text{diss},k}` is the dissolution rate from size class `k`. Mass concentrations are converted to particle number concentrations by assuming spherical particles with the density given in the input data. """ # Define the initial value problem to pass to SciPy to solve. # This must satisfy c'(t) = f(t, c) with initial values given in data. def f(t, c): # Get the number of size classes and create results to be filled N = self.n_size_classes dcdt = np.empty(N) # Interpolate the time-dependent parameters to the specific # timestep given (which will be a float, rather than integer index) t_model = np.arange(self.n_timesteps) f = interpolate.interp1d(t_model, self.k_frag, axis=0, fill_value='extrapolate') k_frag = f(t) # Loop over the size classes and perform the calculation for k in np.arange(N): # The differential equation that is being solved dcdt[k] = - k_frag[k] * c[k] \ + np.sum(self.fsd[:, k] * k_frag * c[:N]) \ - self.k_diss[k] * c[k] # Return the solution for all of the size classes return dcdt # Numerically solve this given the initial values for c soln = solve_ivp(fun=f, method=self.config['solver_method'], t_span=(0, self.n_timesteps), y0=self.initial_concs, t_eval=self.t_eval, rtol=self.config['solver_rtol'], atol=self.config['solver_atol'], max_step=self.config['solver_max_step']) # If we didn't find a solution, raise an error if not soln.success: raise FMNPNumericalError('Model solution could not be ' + f'found: {soln.message}') # Calculate the timeseries of mass concentration lost to dissolution # from the solution j_diss = self.k_diss[:, None] * soln.y # Use this to calculate the cumulative mass concentration # lost to dissolution c_diss = np.cumsum(j_diss, axis=1) # Now we have the solutions as mass concentrations, we can # convert to particle number concentrations n = self.mass_to_particle_number(soln.y) n_diss = self.mass_to_particle_number(c_diss) # Return the solution in an FMNPOutput object return FMNPOutput(soln.t, soln.y, n, c_diss, n_diss, soln, self.psd)
[docs] def mass_to_particle_number(self, mass): """ Convert mass (concentration) to particle number (concentration). Here, particles are assumed to be spherical, but this function can be overloaded to account for different shape particles. """ n = mass / (self.density * (4.0/3.0) * np.pi * (self.psd[:, None] / 2) ** 3) return n
def _set_psd(self) -> npt.NDArray[np.float64]: """ Calculate the particle size distribution based on the config options passed in `self.config` """ if 'particle_size_classes' in self.config: # If a particle size distribution has been specified, use that psd = np.array(self.config['particle_size_classes']) elif 'particle_size_range' in self.config: # Else, construct the size distribution from the given size range psd = np.logspace(*self.config['particle_size_range'], self.n_size_classes) else: # TODO move this check into validation raise ValueError('particle_size_classes or particle_size_range ' + 'must be present in the model config, but ' + 'neither were found.') return psd @staticmethod def _set_k_frag(k_frag: float, theta_1: float, tau: float, psd: npt.NDArray[np.float64], n_timesteps: int) -> npt.NDArray[np.float64]: r""" Set the fragmentation rate `k_frag` based on either the average `k_frag` for the median particle size bin and `theta_1` (surface energy empirical parameter), or directly if a distribution is provided. `tau` (time dependence empirical parameter) then scales `k_frag` to be dependent on time. Parameters ---------- k_frag : float or iterable Either the average :math:`k_frag` for the median particle size bin, or a distribution of :math:`k_frag` values theta_1 : float The surface energy empirical parameter :math:`\theta_1` tau : float The time-dependence parameter :math:`\tau` psd : np.ndarray The particle size distribution n_timesteps : int The number of model timesteps Returns ------- k_frag = np.ndarray (n_timesteps, n_size_classes) Fragmentation rate array over timesteps and size classes """ # Check if k_frag is a scalar value, in which case we need # to calculate a distribution based on theta1 if isinstance(k_frag, (int, float)): # Get the proportionality constant k_prop = k_frag / (np.median(psd) ** (2 * theta_1)) # Now create the array of k_frags k_frag_dist = k_prop * psd ** (2 * theta_1) # We presume fragmentation from the smallest size class # can't happen, and the only loss from this size class # is from dissolution k_frag_dist[0] = 0.0 # Else just set k_frag directly from the provided array. # Validation makes sure this is the correct length else: k_frag_dist = np.array(k_frag) # Now set the time dependence of k_frag using tau t = np.arange(1, n_timesteps + 1) k_frag_2d = k_frag_dist * t[:, np.newaxis] ** tau / np.median(t) ** tau return k_frag_2d
[docs] @staticmethod def set_fsd(n: int, psd: npt.NDArray[np.float64], beta: float) -> npt.NDArray[np.float64]: r""" Set the fragment size distribution matrix, assuming that fragmentation events result in a split in mass between daughter fragments that scales proportionally to :math:`d^\beta`, where :math:`d` is the particle diameter and :math:`\beta` is an empirical fragment size distribution parameter. For example, if :math:`\beta` is negative, then a larger proportion of the fragmenting mass goes to smaller size classes than larger. For an equal split between daughter size classes, set :math:`\beta` to 0. Parameters ---------- n : int Number of particle size classes psd : np.ndarray Particle size distribution beta : float Fragment size distribution empirical parameter Returns ------- np.ndarray Matrix of fragment size distributions for all size classes """ # Start with a zero-filled array of shape (N,N) fsd = np.zeros((n, n)) # Fill with the split to daughter size classes scaled # proportionally to d^beta for i in np.arange(1, n): fsd[i, :-(n-i)] = (psd[:-(n-i)] ** beta / np.sum(psd[:-(n-i)] ** beta)) return fsd
@staticmethod def _set_k_diss(k_diss: float, scaling_method: str, psd: npt.NDArray[np.float64], n_size_classes: int, gamma: float = 1.0) -> npt.NDArray[np.float64]: """ Set the dissolution rate for each of the size classes, based on an average dissolution rate Parameters ---------- k_diss : float Either average dissolution rate across size classes, or the full distribution. scaling_method: str How to scale ``k_diss`` across size classes? Either `constant` or `surface_area`. If `constant`, the same ``k_diss`` is used for all size classes. If `surface_area`, ``k_diss`` is scaled according to particle surface area per unit volume of polymer. psd : np.ndarray The particle size distribution n_size_classes : int The number of particle size classes Returns ------- np.ndarray Dissolution rate distribution Notes ----- At the moment, we are assuming spherical particles when scaling by surface area. This might change. """ # Check if k_diss is a scalar value, in which case we need # to calculate a distribution based on gamma if isinstance(k_diss, (int, float)): # What scaling method has been chosen? if scaling_method == 'constant': # Use the average k_diss across all size classes k_diss_dist = np.full((n_size_classes,), k_diss) elif scaling_method == 'surface_area': # Scale the dissolution according to surface area per unit # volume, assuming our particles are spheres k_diss_dist = k_diss * FragmentMNP._f_surface_area(psd, gamma) else: # We shouldn't get here, if validation has been performed! raise ValueError('Invalid k_diss_scaling_factor provided: ', {scaling_method}) # Otherwise we will have been given a distribution, so use # that directly else: k_diss_dist = k_diss return k_diss_dist @staticmethod def _f_surface_area(psd: npt.NDArray[np.float64], gamma: float = 1.0) -> npt.NDArray[np.float64]: r""" Calculate the scaling factor for surface area, which is defined as the ratio of the surface area to volume ratio of the polymer for each size class to the median size class, such that ``f`` is 1 for the median size class, larger for the smaller size classes (because there are more particles per unit volume), and smaller for larger size classes. An empirical parameter ``gamma`` linearly scales the factor by :math:`f^\gamma`. Parameters ---------- psd : np.ndarray The particle size distribution gamma: float Empirical scaling factor that scales ``f`` as :math:`s^\gamma`, where ``s`` is the surface area to volume ratio of each size class. Therefore, if ``gamma`` is 1, then ``k_diss`` scales directly with ``s``. Returns ------- np.ndarray Surface area scaling factor Notes ----- By assuming spherical particles, calculating their volumes and surface areas and simplifying the algebra, ``f`` can be defined as .. math:: f_\text{s} = \left(\frac{s}{\hat{s}}\right)^\gamma where :math:`s` is the surface area to volume ratio, and :math:`\hat{s}` is the median of :math:`s`: .. math:: s = \frac{4 \pi r_\text{max}}{\textbf{r}} Here, :math:`r_\text{max}` is the radius of the largest particle size class, and :math:`\textbf{r}` is an array of the particle size class radii. """ # Calculate ratio of surface area to the largest volume and scale # to the median (so f = 1 for the median size class) surface_area_volume_ratio = (4 * np.pi * (psd.max() / 2) ** 3) / psd f = surface_area_volume_ratio / np.median(surface_area_volume_ratio) return f ** gamma @staticmethod def _validate_inputs(config: dict, data: dict) -> Tuple[dict, dict]: """ Validate the config and data dicts passed to the model Parameters ---------- config : dict Model config dict data : dict Model input data dict Returns ------- Tuple[dict, dict] Config and data dicts validated and filled with defaults """ # Try and validate the config try: # Returns the config dict with defaults filled config = validation.validate_config(config) except SchemaError as err: raise SchemaError('Model config did not pass validation!') from err # Try and validate data try: # Returns the data dict with defaults filled data = validation.validate_data(data, config) except SchemaError as err: raise SchemaError('Input data did not pass validation!') from err # Return the config and data with filled defaults return config, data