Source code for scm.glompo.optimizers.armc

import numpy as np
import warnings
from multiprocessing.connection import Connection
from pathlib import Path
from threading import Event
from typing import Any, Callable, Optional, Sequence, Set, Tuple, Union
from .baseoptimizer import BaseOptimizer, MinimizeResult
from ..core._backends import ChunkingQueue
from ...plams.core.settings import Settings

__all__ = ("ARMCOptimizer",)

# Create a range which does not contain 1.0
_MOVE_RANGE = np.delete(np.arange(start=0.9, stop=1.1, step=0.005), 20)
_MOVE_RANGE.setflags(write=False)


class StopCalledException(Exception):
    """Special error raised by ``ARMCOptimizer.callstop``."""


[docs]class ARMCOptimizer(BaseOptimizer): r"""An optimizer using the Adaptive Rate Monte Carlo (ARMC) algorithm. 1. A trial state, :math:`S_{\omega}`, is generated by moving a random parameter retrieved from a user-specified parameter set (*e.g.* atomic charge). By default, parameters moves are applied in a multiplicative manner. 2. The move is accepted if the new set of parameters, :math:`S_{\omega}`, lowers the auxiliary error (:math:`\Delta \varepsilon_{QM-MM}`) with respect to the previous set of accepted parameters :math:`S_{\omega-i}` (:math:`i > 0`; see :eq:`1`). The auxiliary error is calculated with a user-specified cost function. .. math:: :label: 1 p(\omega \leftarrow \omega-i) = \Biggl \lbrace { 1, \quad \Delta \varepsilon_{QM-MM} ( S_{\omega} ) \; \lt \; \Delta \varepsilon_{QM-MM} ( S_{\omega-i} ) \atop 0, \quad \Delta \varepsilon_{QM-MM} ( S_{\omega} ) \; \gt \; \Delta \varepsilon_{QM-MM} ( S_{\omega-i} ) } 3. The parameter history is updated. Either :math:`S_{\omega}` or :math:`S_{\omega-i}` is increased by the variable :math:`\phi` (see :eq:`2`) if, respectively, the new parameters are accepted or rejected. In this manner the underlying PES is continuously modified, preventing the optimizer from getting permanently stuck in a (local) parameter space minima. .. math:: :label: 2 \Delta \varepsilon_{QM-MM} ( S_{\omega} ) + \phi \quad \text{if} \quad \Delta \varepsilon_{QM-MM} ( S_{\omega} ) \; \lt \; \Delta \varepsilon_{QM-MM} ( S_{\omega-i} ) \atop \Delta \varepsilon_{QM-MM} ( S_{\omega-i} ) + \phi \quad \text{if} \quad \Delta \varepsilon_{QM-MM} ( S_{\omega} ) \; \gt \; \Delta \varepsilon_{QM-MM} ( S_{\omega-i} ) 4. The parameter :math:`\phi` is updated at regular intervals in order to maintain a constant acceptance rate :math:`\alpha_{t}`. This is illustrated in :eq:`3`, where :math:`\phi` is updated the beginning of every super-iteration :math:`\kappa`. In this example the total number of iterations, :math:`\kappa \omega`, is divided into :math:`\kappa` super- and :math:`\omega` sub-iterations. .. math:: :label: 3 \phi_{\kappa \omega} = \phi_{ ( \kappa - 1 ) \omega} * \gamma^{ \text{sgn} ( \alpha_{t} - \overline{\alpha}_{ ( \kappa - 1 ) }) } \quad \kappa = 1, 2, 3, ..., N :Parameters: _opt_id, _signal_pipe, _results_queue, _pause_flag, _is_log_detailed, _workers, _backend See :class:`.BaseOptimizer`. phi The variable :math:`\phi`. gamma The constant :math:`\gamma`. a_target The target acceptance rate :math:`\alpha_{t}`. super_iter_len The total number of each super-iterations :math:`\kappa`. Total number of iterations: :math:`\kappa \omega`. sub_iter_len The length of each ARMC sub-iteration :math:`\omega`. Total number of iterations: :math:`\kappa \omega`. move_range An array-like object containing all allowed move sizes. move_func A callable for performing the moves. The callable should take 2 floats (*i.e.* a single value from the parameter set `S_{\omega-i}` and the move size) and return 1 float (*i.e.* an updated value for the parameter set `S_{\omega}`). :Attributes: phi : float The variable :math:`\phi`. gamma : float The constant :math:`\gamma`. a_target : float The target acceptance rate :math:`\alpha_{t}`. super_iter : range A range object with the total number of each super-iterations :math:`\kappa`. Total number of iterations: :math:`\kappa \omega`. sub_iter : range A range object with the length of each ARMC sub-iteration :math:`\omega`. Total number of iterations: :math:`\kappa \omega`. move_range : numpy.ndarray[float] An array-like object containing all allowed move sizes. move_func : Callable[[float, float], float] A callable for performing the moves. The callable should take 2 floats (*i.e.* a single value from the parameter set `S_{\omega-i}` and the move size) and return 1 float (*i.e.* an updated value for the parameter set `S_{\omega}`). run : Callable See the ``function`` parameter in :meth:`ARMCOptimizer.minimize`. bounds : numpy.ndarray or (None, None) An 2D array (or 2-tuple filled with ``None``) denoting minimum and maximum values for each to-be moved parameter. See the ``bounds`` parameter in :meth:`ARMCOptimizer.minimize`. x_best : numpy.ndarray[float] The parameter set which minimizes the user-specified error function. fx_best : float The error associated with ``x_best``. fx_old : float The error of the last set of accepted parameters. See Also: The paper describing the original ARMC implementation: Salvatore Cosseddu *et al.*, *J. Chem. Theory Comput.*, **2017**, *13*, 297–308 `10.1021/acs.jctc.6b01089 <https://doi.org/10.1021/acs.jctc.6b01089>`_. The Python-based implementation of ARMC this class is based on: Automated Forcefield Optimization Extension `github.com/nlesc-nano/auto-FOX <https://github.com/nlesc-nano/auto-FOX>`_. """ def __init__( self, _opt_id: Optional[int] = None, _signal_pipe: Optional[Connection] = None, _results_queue: Optional[ChunkingQueue] = None, _pause_flag: Optional[Event] = None, _is_log_detailed: bool = False, _workers: int = 1, _backend: str = "threads", phi: float = 1.0, gamma: float = 2.0, a_target: float = 0.25, super_iter_len: int = 1000, sub_iter_len: int = 100, move_range: Union[float, Sequence[float]] = _MOVE_RANGE, move_func: Callable[[float, float], float] = np.multiply, ): super().__init__( _opt_id, _signal_pipe, _results_queue, _pause_flag, _is_log_detailed, _workers, _backend, phi=phi, gamma=gamma, a_target=a_target, super_iter_len=super_iter_len, sub_iter_len=sub_iter_len, move_range=move_range, move_func=move_func, ) if _workers > 1: warnings.warn( f"Number of workers provided for this optimizer is {_workers}, but Adaptive Rate Monte " f"Carlo (ARMC) does not support parallel function evaluations." ) self.phi = float(phi) self.gamma = float(gamma) self.a_target = float(a_target) self.super_iters_max = super_iter_len self.super_iters_used = 0 self.sub_iter_max = sub_iter_len self.sub_iter_used = 0 self.move_range = np.array(move_range, dtype=float, ndmin=1, copy=False) self.move_func = move_func # To-be set by ARMCOptimizer.minimize() and ARMCOptimizer._inner() self._run: Callable[[Sequence[float]], float] = None self._bounds: Optional[np.ndarray] = None self.x_best: np.ndarray = None self.x_current: np.ndarray = None self.fx_best: float = None self.fx_old: float = None self.acceptance: np.ndarray = None def __amssettings__(self, s: Settings) -> Settings: s.input.ams.Optimizer.Type = "AdaptiveRateMonteCarlo" s.input.ams.Optimizer.AdaptiveRateMonteCarlo.Phi = self.phi s.input.ams.Optimizer.AdaptiveRateMonteCarlo.Gamma = self.gamma s.input.ams.Optimizer.AdaptiveRateMonteCarlo.AcceptanceTarget = self.a_target s.input.ams.Optimizer.AdaptiveRateMonteCarlo.SuperIterations = self.super_iters_max s.input.ams.Optimizer.AdaptiveRateMonteCarlo.SubIterations = self.sub_iter_max s.input.ams.Optimizer.AdaptiveRateMonteCarlo.MoveRange = " ".join((str(n) for n in self.move_range)) return s @property def bounds(self) -> np.ndarray: """Get or set the ``bounds`` parameter of :meth:`ARMCOptimizer.minimize`.""" return self._bounds @bounds.setter def bounds(self, value: Optional[Sequence[Tuple[float, float]]] = None) -> None: if value is None: self._bounds = np.array((None, None)) else: try: x0_len, bounds_len = len(self.x_best), len(value) except TypeError: pass # self.x_best has not yet been set else: # Check of the bounds and parameter set are of the same length if x0_len != bounds_len: raise ValueError( f"The length of 'bounds' ({bounds_len}) does not match the " f"length of 'x0' ({x0_len})" ) self._bounds = np.array(value, dtype=float, ndmin=2, copy=False).T
[docs] def minimize( self, function: Callable[[Sequence[float]], float], x0: Union[float, Sequence[float]], bounds: Optional[Sequence[Tuple[float, float]]] = None, ) -> MinimizeResult: """Start the minimization process. Parameters: function : Callable The function to be minimized. x0 : array-like[float] A 1D array-like object representing the set of to-be optimized parameters :math:`S`. bounds : array-like[float], optional An (optional) 2D array-like object denoting minimum and maximum values for each to-be moved parameter. The sequence should be of the same length as ``x0``. """ # Parse arguments self._run = function self.bounds = bounds # Set initial values skip_first_reset = self.is_restart if not self.is_restart: self.x_best = self.x_current = x0 = np.array(x0, dtype=float, ndmin=1, copy=False) self.fx_best = self._run(x0) self.fx_old = self.phi_apply(self.fx_best) # Start the main loop try: for kappa in range(self.super_iters_used, self.super_iters_max): if skip_first_reset: skip_first_reset = False else: self.sub_iter_used = 0 self.acceptance = np.zeros(self.sub_iter_max, dtype=bool) for omega in range(self.sub_iter_used, self.sub_iter_max): self.x_current = self._inner(self.x_current, omega) self.sub_iter_used += 1 self.phi_update() self.super_iters_used += 1 except StopCalledException: # self.callstop() was called return MinimizeResult(success=False, fx=self.fx_best, x=self.x_best) else: if self._results_queue: self.message_manager(0, f"Optimizer convergence") return MinimizeResult(success=True, fx=self.fx_best, x=self.x_best)
def _inner(self, x0_old: np.ndarray, omega: int) -> np.ndarray: r"""Run the inner loop of :meth:`ARMCOptimizer.minimize`. Parameters: x0_old : numpy.ndarray[float] The last set of accepted parameters :math:`S_{\omega-i}`. omega : int The ARMC sub-iteration :math:`\omega`. acceptance : numpy.ndarray[bool] A boolean array for keeping track of accepted moves during the current sub-iteration. Returns: numpy.ndarray[float]: The accepted parameters :math:`S_{\omega}` or :math:`S_{\omega-i}`. Equivalent to ``x0_old`` if the new parameter set is not accepted. """ # Step 1: Perform a random move x0_new = self.move(x0_old, *self.bounds) # Step 2: Perform the calculation and extract the output of the fitness function fx_new = self._run(x0_new) fx_old = self.fx_old # GloMPO: Check the manager immediately after every call if self._results_queue: self.check_messages() self._pause_signal.wait() # Step 3: Evaluate the auxiliary error; accept if the new parameter set lowers the error accept = np.sum(fx_new - fx_old) < 0 # Step 4: Update the auxiliary error history & apply phi self.acceptance[omega] = accept if accept: if fx_new < self.fx_best: # We've got a new set of best parameters! self.fx_best = fx_new self.x_best = x0_new self.fx_old = self.phi_apply(fx_new) return x0_new else: self.fx_old = self.phi_apply(fx_old) return x0_old def callstop(self, reason: Any = None): """Signal to terminate the :meth:`minimize` loop while still returning |MinimizeResult|.""" raise StopCalledException(reason)
[docs] def move( self, x0: np.ndarray, x0_min: Optional[np.ndarray] = None, x0_max: Optional[np.ndarray] = None ) -> np.ndarray: r"""Create a copy of ``x0`` and apply a random move to it. The move will be applied with ``move_func`` (multiplication by default) using a random value from ``move_range``. Parameters: x0 : numpy.ndarray[float] A 1D array of parameters :math:`S_{\omega-i}`. value_min : numpy.ndarray[float], optional A 1D array minimum values for each to-be moved parameter. The array should be of the same length as ``x0``. value_max : numpy.ndarray[float], optional A 1D array maximum values for each to-be moved parameter. The array should be of the same length as ``x0``. Returns: numpy.ndarray[float] A copy of ``x0`` :math:`S_{\omega}`. A single value is moved within this parameter set. """ # Prepare parameters for self.move_func i: int = np.random.randint(len(x0)) - 1 # Index of a random item in x0 x1: float = x0[i] x2: float = np.random.choice(self.move_range) # Apply (and clip) the move value_unclipped = self.move_func(x1, x2) value_min = x0_min[i] if x0_min is not None else -np.inf value_max = x0_max[i] if x0_max is not None else np.inf value = np.clip(value_unclipped, value_min, value_max) # Return the new parameter array ret = x0.copy() ret[i] = value return ret
[docs] def phi_apply(self, aux_err: float) -> float: """Apply ``phi`` to the supplied auxiliary error: :math:`\\Delta \\varepsilon_{QM-MM} + \\phi`. """ return aux_err + self.phi
[docs] def phi_update(self) -> None: r"""Update the variable :math:`\phi` (``phi``). :math:`\phi` is updated based on the target acceptance rate, :math:`\alpha_{t}` (``a_target``), and the acceptance rate, ``acceptance``, of the current super-iteration: .. math:: \phi_{\kappa \omega} = \phi_{ ( \kappa - 1 ) \omega} * \gamma^{ \text{sgn} ( \alpha_{t} - \overline{\alpha}_{ ( \kappa - 1 ) }) } Parameters: acceptance : numpy.ndarray[bool] A boolean array for keeping track of accepted moves over the course of the current sub-iteration of :math:`\kappa`. """ sign = np.sign(self.a_target - np.mean(self.acceptance)) self.phi *= self.gamma**sign
[docs] def checkpoint_save( self, path: Union[Path, str], force: Optional[Set[str]] = None, block: Optional[Set[str]] = None ): super().checkpoint_save(path, force={"_bounds", "move_func"}, block={"bounds"})