import warnings
from multiprocessing.connection import Connection
from pathlib import Path
from queue import Queue
from threading import Event
from typing import Callable, Optional, Set, Sequence, Tuple, Union
from scipy.optimize import minimize
from .baseoptimizer import BaseOptimizer, MinimizeResult
from ...plams.core.settings import Settings
__all__ = ("Scipy",)
class ManagerShutdownSignal(Exception):
"""Special exception to exit out of SciPy optimizers when signalled by the manager."""
class GloMPOControl:
def __init__(self, parent: "ScipyOptimizeWrapper"):
self.parent = parent
def __call__(self, *args, **kwargs):
# GloMPO specific callbacks
if self.parent._results_queue:
self.parent._pause_signal.wait()
self.parent.check_messages()
if self.parent.stop: # callstop called through check_messages
stop_cond = "GloMPO termination signal."
self.parent.logger.debug("Stop = %s after message check from manager", bool(stop_cond))
self.parent.message_manager(0, stop_cond)
raise ManagerShutdownSignal
[docs]class Scipy(BaseOptimizer):
"""Wrapper around `scipy.optimize.minimize
<https://docs.scipy.org/doc/scipy-1.4.1/reference/generated/scipy.optimize.minimize.html#scipy-optimize-minimize>`_.
.. warning::
This is quite a rough wrapper around SciPy's optimizers since the code is quite impenetrable to outside code, and
callbacks do not function consistently. Therefore, most GloMPO functionality like checkpointing and information
sharing is not available. Users are advised to try :class:`.Nevergrad` instead which offers an interface to the
SciPy optimizers with full GloMPO functionality.
This optimizer is also prone to hanging in certain edge cases, thus you are advised to set ``end_timeout`` in the
:class:`.GloMPOManager` to a reasonable value.
:Parameters:
_opt_id, _signal_pipe, _results_queue, _pause_flag, _is_log_detailed, _workers, _backend
See :class:`.BaseOptimizer`.
method, jac, hess, hessp, tol
Passed to the ``scipy.optimize.minimize`` arguments of the same name.
``**kwargs``
Passed to the ``options`` argument of ``scipy.optimize.minimize``.
"""
_scaler = "std"
def __init__(
self,
_opt_id: int = None,
_signal_pipe: Connection = None,
_results_queue: Queue = None,
_pause_flag: Event = None,
_is_log_detailed: bool = False,
_workers: int = 1,
_backend: str = "threads",
method: str = "Nelder-Mead",
jac: Union[Callable, str, bool, None] = None,
hess: Union[Callable, str, None] = None,
tol: Optional[float] = None,
**kwargs,
):
super().__init__(
_opt_id,
_signal_pipe,
_results_queue,
_pause_flag,
_is_log_detailed,
_workers,
_backend,
method=method,
jac=jac,
hess=hess,
tol=tol,
**kwargs,
)
if _workers > 1:
warnings.warn(
f"Number of workers provided for this optimizer is {_workers}, but the Scipy algorithms do "
f"not support parallel function evaluations."
)
self.stop = False
self.opt_init_kwargs = {"method": method, "jac": jac, "hess": hess, "tol": tol, "options": kwargs}
def __amssettings__(self, s: Settings) -> Settings:
s.input.ams.Optimizer.Type = "Scipy"
s.input.ams.Optimizer.Scipy.Algorithm = self.opt_init_kwargs["method"]
s.input.ams.Optimizer.Scipy.Jacobian = self.opt_init_kwargs["jac"]
s.input.ams.Optimizer.Scipy.Hessian = self.opt_init_kwargs["hess"]
s.input.ams.Optimizer.Scipy.Tolerance = self.opt_init_kwargs["tol"]
for k, v in self.opt_init_kwargs["options"].items():
s.input.ams.Optimizer.Scipy.Settings[k] = v
return s
def minimize(
self, function: Callable[[Sequence[float]], float], x0: Sequence[float], bounds: Sequence[Tuple[float, float]]
) -> MinimizeResult:
warnings.filterwarnings("ignore", "Method .+ cannot handle constraints nor bounds.")
general_opt = False
callback = GloMPOControl(self)
self.opt_init_kwargs["bounds"] = bounds
result = MinimizeResult()
try:
sp_result = minimize(function, x0=x0, callback=callback, **self.opt_init_kwargs)
try: # Different Scipy methods return different result structures
sp_result = sp_result.lowest_optimization_result
except (AttributeError, UnboundLocalError):
pass
if self._results_queue:
self.message_manager(0, "Optimizer convergence")
result.x = sp_result.x
result.fx = sp_result.fun
if general_opt:
result.success = sp_result.success
except ManagerShutdownSignal:
pass
return result
def checkpoint_save(
self, path: Union[Path, str], force: Optional[Set[str]] = None, block: Optional[Set[str]] = None
):
warnings.warn(
"Checkpointing requested but this is not supported by Scipy optimizers. Checkpoints will not "
"behave as expected."
)
super().checkpoint_save(path, force, block)
def callstop(self, *args):
self.stop = True