Source code for scm.glompo.optimizers.scipy

import warnings
from multiprocessing.connection import Connection
from pathlib import Path
from queue import Queue
from threading import Event
from typing import Callable, Optional, Set, Sequence, Tuple, Union

from scipy.optimize import minimize

from .baseoptimizer import BaseOptimizer, MinimizeResult
from ...plams.core.settings import Settings

__all__ = ("Scipy",)


class ManagerShutdownSignal(Exception):
    """Special exception to exit out of SciPy optimizers when signalled by the manager."""


class GloMPOControl:
    def __init__(self, parent: "ScipyOptimizeWrapper"):
        self.parent = parent

    def __call__(self, *args, **kwargs):
        # GloMPO specific callbacks
        if self.parent._results_queue:
            self.parent._pause_signal.wait()
            self.parent.check_messages()
            if self.parent.stop:  # callstop called through check_messages
                stop_cond = "GloMPO termination signal."
                self.parent.logger.debug("Stop = %s after message check from manager", bool(stop_cond))
                self.parent.message_manager(0, stop_cond)
                raise ManagerShutdownSignal


[docs]class Scipy(BaseOptimizer): """Wrapper around `scipy.optimize.minimize <https://docs.scipy.org/doc/scipy-1.4.1/reference/generated/scipy.optimize.minimize.html#scipy-optimize-minimize>`_. .. warning:: This is quite a rough wrapper around SciPy's optimizers since the code is quite impenetrable to outside code, and callbacks do not function consistently. Therefore, most GloMPO functionality like checkpointing and information sharing is not available. Users are advised to try :class:`.Nevergrad` instead which offers an interface to the SciPy optimizers with full GloMPO functionality. This optimizer is also prone to hanging in certain edge cases, thus you are advised to set ``end_timeout`` in the :class:`.GloMPOManager` to a reasonable value. :Parameters: _opt_id, _signal_pipe, _results_queue, _pause_flag, _is_log_detailed, _workers, _backend See :class:`.BaseOptimizer`. method, jac, hess, hessp, tol Passed to the ``scipy.optimize.minimize`` arguments of the same name. ``**kwargs`` Passed to the ``options`` argument of ``scipy.optimize.minimize``. """ _scaler = "std" def __init__( self, _opt_id: int = None, _signal_pipe: Connection = None, _results_queue: Queue = None, _pause_flag: Event = None, _is_log_detailed: bool = False, _workers: int = 1, _backend: str = "threads", method: str = "Nelder-Mead", jac: Union[Callable, str, bool, None] = None, hess: Union[Callable, str, None] = None, tol: Optional[float] = None, **kwargs, ): super().__init__( _opt_id, _signal_pipe, _results_queue, _pause_flag, _is_log_detailed, _workers, _backend, method=method, jac=jac, hess=hess, tol=tol, **kwargs, ) if _workers > 1: warnings.warn( f"Number of workers provided for this optimizer is {_workers}, but the Scipy algorithms do " f"not support parallel function evaluations." ) self.stop = False self.opt_init_kwargs = {"method": method, "jac": jac, "hess": hess, "tol": tol, "options": kwargs} def __amssettings__(self, s: Settings) -> Settings: s.input.ams.Optimizer.Type = "Scipy" s.input.ams.Optimizer.Scipy.Algorithm = self.opt_init_kwargs["method"] s.input.ams.Optimizer.Scipy.Jacobian = self.opt_init_kwargs["jac"] s.input.ams.Optimizer.Scipy.Hessian = self.opt_init_kwargs["hess"] s.input.ams.Optimizer.Scipy.Tolerance = self.opt_init_kwargs["tol"] for k, v in self.opt_init_kwargs["options"].items(): s.input.ams.Optimizer.Scipy.Settings[k] = v return s def minimize( self, function: Callable[[Sequence[float]], float], x0: Sequence[float], bounds: Sequence[Tuple[float, float]] ) -> MinimizeResult: warnings.filterwarnings("ignore", "Method .+ cannot handle constraints nor bounds.") general_opt = False callback = GloMPOControl(self) self.opt_init_kwargs["bounds"] = bounds result = MinimizeResult() try: sp_result = minimize(function, x0=x0, callback=callback, **self.opt_init_kwargs) try: # Different Scipy methods return different result structures sp_result = sp_result.lowest_optimization_result except (AttributeError, UnboundLocalError): pass if self._results_queue: self.message_manager(0, "Optimizer convergence") result.x = sp_result.x result.fx = sp_result.fun if general_opt: result.success = sp_result.success except ManagerShutdownSignal: pass return result def checkpoint_save( self, path: Union[Path, str], force: Optional[Set[str]] = None, block: Optional[Set[str]] = None ): warnings.warn( "Checkpointing requested but this is not supported by Scipy optimizers. Checkpoints will not " "behave as expected." ) super().checkpoint_save(path, force, block) def callstop(self, *args): self.stop = True