import numpy as np
import warnings
from multiprocessing.connection import Connection
from pathlib import Path
from threading import Event
from typing import Any, Callable, Optional, Sequence, Set, Tuple, Union
from .baseoptimizer import BaseOptimizer, MinimizeResult
from ..core._backends import ChunkingQueue
from ...plams.core.settings import Settings
__all__ = ("ARMCOptimizer",)
# Create a range which does not contain 1.0
_MOVE_RANGE = np.delete(np.arange(start=0.9, stop=1.1, step=0.005), 20)
_MOVE_RANGE.setflags(write=False)
class StopCalledException(Exception):
"""Special error raised by ``ARMCOptimizer.callstop``."""
[docs]class ARMCOptimizer(BaseOptimizer):
r"""An optimizer using the Adaptive Rate Monte Carlo (ARMC) algorithm.
1. A trial state, :math:`S_{\omega}`, is generated by moving a random parameter
retrieved from a user-specified parameter set (*e.g.* atomic charge).
By default, parameters moves are applied in a multiplicative manner.
2. The move is accepted if the new set of parameters, :math:`S_{\omega}`, lowers
the auxiliary error (:math:`\Delta \varepsilon_{QM-MM}`) with respect to
the previous set of accepted parameters :math:`S_{\omega-i}` (:math:`i > 0`; see :eq:`1`).
The auxiliary error is calculated with a user-specified cost function.
.. math::
:label: 1
p(\omega \leftarrow \omega-i) =
\Biggl \lbrace
{
1, \quad
\Delta \varepsilon_{QM-MM} ( S_{\omega} )
\; \lt \;
\Delta \varepsilon_{QM-MM} ( S_{\omega-i} )
\atop
0, \quad
\Delta \varepsilon_{QM-MM} ( S_{\omega} )
\; \gt \;
\Delta \varepsilon_{QM-MM} ( S_{\omega-i} )
}
3. The parameter history is updated.
Either :math:`S_{\omega}` or :math:`S_{\omega-i}` is increased by the variable
:math:`\phi` (see :eq:`2`) if, respectively, the new parameters are accepted or rejected.
In this manner the underlying PES is continuously modified, preventing the optimizer
from getting permanently stuck in a (local) parameter space minima.
.. math::
:label: 2
\Delta \varepsilon_{QM-MM} ( S_{\omega} ) + \phi \quad \text{if}
\quad \Delta \varepsilon_{QM-MM} ( S_{\omega} ) \;
\lt \; \Delta \varepsilon_{QM-MM} ( S_{\omega-i} )
\atop
\Delta \varepsilon_{QM-MM} ( S_{\omega-i} ) + \phi \quad \text{if}
\quad \Delta \varepsilon_{QM-MM} ( S_{\omega} ) \; \gt
\; \Delta \varepsilon_{QM-MM} ( S_{\omega-i} )
4. The parameter :math:`\phi` is updated at regular intervals
in order to maintain a constant acceptance rate :math:`\alpha_{t}`.
This is illustrated in :eq:`3`, where :math:`\phi` is updated
the beginning of every super-iteration :math:`\kappa`. In this example
the total number of iterations, :math:`\kappa \omega`, is divided into
:math:`\kappa` super- and :math:`\omega` sub-iterations.
.. math::
:label: 3
\phi_{\kappa \omega} =
\phi_{ ( \kappa - 1 ) \omega} * \gamma^{
\text{sgn} ( \alpha_{t} - \overline{\alpha}_{ ( \kappa - 1 ) })
}
\quad
\kappa = 1, 2, 3, ..., N
:Parameters:
_opt_id, _signal_pipe, _results_queue, _pause_flag, _is_log_detailed, _workers, _backend
See :class:`.BaseOptimizer`.
phi
The variable :math:`\phi`.
gamma
The constant :math:`\gamma`.
a_target
The target acceptance rate :math:`\alpha_{t}`.
super_iter_len
The total number of each super-iterations :math:`\kappa`.
Total number of iterations: :math:`\kappa \omega`.
sub_iter_len
The length of each ARMC sub-iteration :math:`\omega`.
Total number of iterations: :math:`\kappa \omega`.
move_range
An array-like object containing all allowed move sizes.
move_func
A callable for performing the moves.
The callable should take 2 floats
(*i.e.* a single value from the parameter set `S_{\omega-i}` and the move size) and
return 1 float (*i.e.* an updated value for the parameter set `S_{\omega}`).
:Attributes:
phi : float
The variable :math:`\phi`.
gamma : float
The constant :math:`\gamma`.
a_target : float
The target acceptance rate :math:`\alpha_{t}`.
super_iter : range
A range object with the total number of each super-iterations :math:`\kappa`.
Total number of iterations: :math:`\kappa \omega`.
sub_iter : range
A range object with the length of each ARMC sub-iteration :math:`\omega`.
Total number of iterations: :math:`\kappa \omega`.
move_range : numpy.ndarray[float]
An array-like object containing all allowed move sizes.
move_func : Callable[[float, float], float]
A callable for performing the moves.
The callable should take 2 floats
(*i.e.* a single value from the parameter set `S_{\omega-i}` and the move size) and
return 1 float (*i.e.* an updated value for the parameter set `S_{\omega}`).
run : Callable
See the ``function`` parameter in :meth:`ARMCOptimizer.minimize`.
bounds : numpy.ndarray or (None, None)
An 2D array (or 2-tuple filled with ``None``) denoting minimum and maximum values for
each to-be moved parameter.
See the ``bounds`` parameter in :meth:`ARMCOptimizer.minimize`.
x_best : numpy.ndarray[float]
The parameter set which minimizes the user-specified error function.
fx_best : float
The error associated with ``x_best``.
fx_old : float
The error of the last set of accepted parameters.
See Also:
The paper describing the original ARMC implementation:
Salvatore Cosseddu *et al.*, *J. Chem. Theory Comput.*, **2017**, *13*, 297–308
`10.1021/acs.jctc.6b01089 <https://doi.org/10.1021/acs.jctc.6b01089>`_.
The Python-based implementation of ARMC this class is based on:
Automated Forcefield Optimization Extension
`github.com/nlesc-nano/auto-FOX <https://github.com/nlesc-nano/auto-FOX>`_.
"""
def __init__(
self,
_opt_id: Optional[int] = None,
_signal_pipe: Optional[Connection] = None,
_results_queue: Optional[ChunkingQueue] = None,
_pause_flag: Optional[Event] = None,
_is_log_detailed: bool = False,
_workers: int = 1,
_backend: str = "threads",
phi: float = 1.0,
gamma: float = 2.0,
a_target: float = 0.25,
super_iter_len: int = 1000,
sub_iter_len: int = 100,
move_range: Union[float, Sequence[float]] = _MOVE_RANGE,
move_func: Callable[[float, float], float] = np.multiply,
):
super().__init__(
_opt_id,
_signal_pipe,
_results_queue,
_pause_flag,
_is_log_detailed,
_workers,
_backend,
phi=phi,
gamma=gamma,
a_target=a_target,
super_iter_len=super_iter_len,
sub_iter_len=sub_iter_len,
move_range=move_range,
move_func=move_func,
)
if _workers > 1:
warnings.warn(
f"Number of workers provided for this optimizer is {_workers}, but Adaptive Rate Monte "
f"Carlo (ARMC) does not support parallel function evaluations."
)
self.phi = float(phi)
self.gamma = float(gamma)
self.a_target = float(a_target)
self.super_iters_max = super_iter_len
self.super_iters_used = 0
self.sub_iter_max = sub_iter_len
self.sub_iter_used = 0
self.move_range = np.array(move_range, dtype=float, ndmin=1, copy=False)
self.move_func = move_func
# To-be set by ARMCOptimizer.minimize() and ARMCOptimizer._inner()
self._run: Callable[[Sequence[float]], float] = None
self._bounds: Optional[np.ndarray] = None
self.x_best: np.ndarray = None
self.x_current: np.ndarray = None
self.fx_best: float = None
self.fx_old: float = None
self.acceptance: np.ndarray = None
def __amssettings__(self, s: Settings) -> Settings:
s.input.ams.Optimizer.Type = "AdaptiveRateMonteCarlo"
s.input.ams.Optimizer.AdaptiveRateMonteCarlo.Phi = self.phi
s.input.ams.Optimizer.AdaptiveRateMonteCarlo.Gamma = self.gamma
s.input.ams.Optimizer.AdaptiveRateMonteCarlo.AcceptanceTarget = self.a_target
s.input.ams.Optimizer.AdaptiveRateMonteCarlo.SuperIterations = self.super_iters_max
s.input.ams.Optimizer.AdaptiveRateMonteCarlo.SubIterations = self.sub_iter_max
s.input.ams.Optimizer.AdaptiveRateMonteCarlo.MoveRange = " ".join((str(n) for n in self.move_range))
return s
@property
def bounds(self) -> np.ndarray:
"""Get or set the ``bounds`` parameter of :meth:`ARMCOptimizer.minimize`."""
return self._bounds
@bounds.setter
def bounds(self, value: Optional[Sequence[Tuple[float, float]]] = None) -> None:
if value is None:
self._bounds = np.array((None, None))
else:
try:
x0_len, bounds_len = len(self.x_best), len(value)
except TypeError:
pass # self.x_best has not yet been set
else: # Check of the bounds and parameter set are of the same length
if x0_len != bounds_len:
raise ValueError(
f"The length of 'bounds' ({bounds_len}) does not match the " f"length of 'x0' ({x0_len})"
)
self._bounds = np.array(value, dtype=float, ndmin=2, copy=False).T
[docs] def minimize(
self,
function: Callable[[Sequence[float]], float],
x0: Union[float, Sequence[float]],
bounds: Optional[Sequence[Tuple[float, float]]] = None,
) -> MinimizeResult:
"""Start the minimization process.
Parameters:
function : Callable
The function to be minimized.
x0 : array-like[float]
A 1D array-like object representing the set of to-be optimized parameters :math:`S`.
bounds : array-like[float], optional
An (optional) 2D array-like object denoting minimum and maximum values for
each to-be moved parameter.
The sequence should be of the same length as ``x0``.
"""
# Parse arguments
self._run = function
self.bounds = bounds
# Set initial values
skip_first_reset = self.is_restart
if not self.is_restart:
self.x_best = self.x_current = x0 = np.array(x0, dtype=float, ndmin=1, copy=False)
self.fx_best = self._run(x0)
self.fx_old = self.phi_apply(self.fx_best)
# Start the main loop
try:
for kappa in range(self.super_iters_used, self.super_iters_max):
if skip_first_reset:
skip_first_reset = False
else:
self.sub_iter_used = 0
self.acceptance = np.zeros(self.sub_iter_max, dtype=bool)
for omega in range(self.sub_iter_used, self.sub_iter_max):
self.x_current = self._inner(self.x_current, omega)
self.sub_iter_used += 1
self.phi_update()
self.super_iters_used += 1
except StopCalledException: # self.callstop() was called
return MinimizeResult(success=False, fx=self.fx_best, x=self.x_best)
else:
if self._results_queue:
self.message_manager(0, f"Optimizer convergence")
return MinimizeResult(success=True, fx=self.fx_best, x=self.x_best)
def _inner(self, x0_old: np.ndarray, omega: int) -> np.ndarray:
r"""Run the inner loop of :meth:`ARMCOptimizer.minimize`.
Parameters:
x0_old : numpy.ndarray[float]
The last set of accepted parameters :math:`S_{\omega-i}`.
omega : int
The ARMC sub-iteration :math:`\omega`.
acceptance : numpy.ndarray[bool]
A boolean array for keeping track of accepted moves during the current sub-iteration.
Returns:
numpy.ndarray[float]:
The accepted parameters :math:`S_{\omega}` or :math:`S_{\omega-i}`.
Equivalent to ``x0_old`` if the new parameter set is not accepted.
"""
# Step 1: Perform a random move
x0_new = self.move(x0_old, *self.bounds)
# Step 2: Perform the calculation and extract the output of the fitness function
fx_new = self._run(x0_new)
fx_old = self.fx_old
# GloMPO: Check the manager immediately after every call
if self._results_queue:
self.check_messages()
self._pause_signal.wait()
# Step 3: Evaluate the auxiliary error; accept if the new parameter set lowers the error
accept = np.sum(fx_new - fx_old) < 0
# Step 4: Update the auxiliary error history & apply phi
self.acceptance[omega] = accept
if accept:
if fx_new < self.fx_best: # We've got a new set of best parameters!
self.fx_best = fx_new
self.x_best = x0_new
self.fx_old = self.phi_apply(fx_new)
return x0_new
else:
self.fx_old = self.phi_apply(fx_old)
return x0_old
def callstop(self, reason: Any = None):
"""Signal to terminate the :meth:`minimize` loop while still returning |MinimizeResult|."""
raise StopCalledException(reason)
[docs] def move(
self, x0: np.ndarray, x0_min: Optional[np.ndarray] = None, x0_max: Optional[np.ndarray] = None
) -> np.ndarray:
r"""Create a copy of ``x0`` and apply a random move to it.
The move will be applied with ``move_func`` (multiplication by default)
using a random value from ``move_range``.
Parameters:
x0 : numpy.ndarray[float]
A 1D array of parameters :math:`S_{\omega-i}`.
value_min : numpy.ndarray[float], optional
A 1D array minimum values for each to-be moved parameter.
The array should be of the same length as ``x0``.
value_max : numpy.ndarray[float], optional
A 1D array maximum values for each to-be moved parameter.
The array should be of the same length as ``x0``.
Returns:
numpy.ndarray[float]
A copy of ``x0`` :math:`S_{\omega}`.
A single value is moved within this parameter set.
"""
# Prepare parameters for self.move_func
i: int = np.random.randint(len(x0)) - 1 # Index of a random item in x0
x1: float = x0[i]
x2: float = np.random.choice(self.move_range)
# Apply (and clip) the move
value_unclipped = self.move_func(x1, x2)
value_min = x0_min[i] if x0_min is not None else -np.inf
value_max = x0_max[i] if x0_max is not None else np.inf
value = np.clip(value_unclipped, value_min, value_max)
# Return the new parameter array
ret = x0.copy()
ret[i] = value
return ret
[docs] def phi_apply(self, aux_err: float) -> float:
"""Apply ``phi`` to the supplied auxiliary error:
:math:`\\Delta \\varepsilon_{QM-MM} + \\phi`.
"""
return aux_err + self.phi
[docs] def phi_update(self) -> None:
r"""Update the variable :math:`\phi` (``phi``).
:math:`\phi` is updated based on the target acceptance rate,
:math:`\alpha_{t}` (``a_target``), and the
acceptance rate, ``acceptance``, of the current super-iteration:
.. math::
\phi_{\kappa \omega} =
\phi_{ ( \kappa - 1 ) \omega} * \gamma^{
\text{sgn} ( \alpha_{t} - \overline{\alpha}_{ ( \kappa - 1 ) })
}
Parameters:
acceptance : numpy.ndarray[bool]
A boolean array for keeping track of accepted moves over the course
of the current sub-iteration of :math:`\kappa`.
"""
sign = np.sign(self.a_target - np.mean(self.acceptance))
self.phi *= self.gamma**sign
[docs] def checkpoint_save(
self, path: Union[Path, str], force: Optional[Set[str]] = None, block: Optional[Set[str]] = None
):
super().checkpoint_save(path, force={"_bounds", "move_func"}, block={"bounds"})