Source code for scm.glompo.optimizers.cmawrapper

""" Implementation of CMA-ES as a GloMPO compatible optimizer. """

import cma
import copy
import logging
import numpy as np
import pickle
import warnings
from cma.restricted_gaussian_sampler import GaussVDSampler, GaussVkDSampler
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor, as_completed
from multiprocessing.connection import Connection
from pathlib import Path
from queue import Queue
from threading import Event
from typing import Callable, Optional, Sequence, Tuple, Union, Set, Type
from ..common.wrappers import needs_optional_package
from .baseoptimizer import BaseOptimizer, MinimizeResult
from ...plams.core.settings import Settings

__all__ = ("CMAOptimizer",)


[docs]class CMAOptimizer(BaseOptimizer): """Wrapper around a CMA-ES python implementation. Note that this class is also *stand-alone*, this means it can be used independently of the GloMPO framework. It is also built in such a way that it :meth:`minimize` can be called multiple times on different functions. :Parameters: _opt_id, _signal_pipe, _results_queue, _pause_flag, _is_log_detailed, _workers, _backend See :class:`.BaseOptimizer`. sigma0 Initial standard deviation of the multivariate normal distribution from which trials are drawn. One value for all parameters which means that all parameters must be scaled accordingly. sampler Allows the use of ``'GaussVDSampler'`` and ``'GaussVkDSampler'`` settings. verbose If ``True``, print status messages during the optimization, else no output will be printed. keep_files Directory in which to save files containing extra optimizer convergence information. force_injects If ``True``, injections of parameter vectors into the solver will be exact, guaranteeing that the solution will be in the next iteration's population. If ``False``, the injection will result in a direction relative nudge towards the vector. Forcing the injecting can limit global exploration but non-forced injections may have little effect. injection_interval If ``None``, injections are ignored by the optimizer. If an ``int`` is provided then injection are only accepted if at least ``injection_interval`` iterations have passed since the last injection. ``**cmasettings`` `CMA-ES <http://cma.gforge.inria.fr/apidocs-pycma/>`_ package-specific settings. See ``cma.s.pprint(cma.CMAOptions())`` for a list of available options. Most useful keys are: ``'timeout'``, ``'tolstagnation'``, ``'popsize'``. Additionally, the following options are supported through this class: * ``'minsigma'``: Termination if ``sigma < minsigma`` * ``'maxsigma'``: Limit sigma to ``min(sigma,maxsigma)`` * ``'storeC'``: If ``True``, creates a file called ``cma_C.npy`` in ``keep_files`` with the covariance matrix at every iteration. ``keep_files`` must also be provided and ``sampler`` must be the default (``'full'``). :Notes: #. Although not the default, by adjusting the injection settings above, the optimizer will inject the saved incumbent solution into the solver influencing the points sampled by the following iteration. The incumbent begins at ``x0`` and is updated by the inject method called by the GloMPO manager. #. If ``'popsize'`` is not provided during optimizer initialisation, it will be set to the number of ``BaseOptimizer.workers`` if this is larger than 1, else it will be set to the default: ``4 + int(3 * log(d))``. :References: Hansen, N. (2021). CMA-ES, Covariance Matrix Adaptation Evolution Strategy for non-linear numerical optimization in Python. *PyPI*. https://pypi.org/project/cma/ """ def __init__( self, _opt_id: Optional[int] = None, _signal_pipe: Optional[Connection] = None, _results_queue: Optional[Queue] = None, _pause_flag: Optional[Event] = None, _is_log_detailed: bool = False, _workers: int = 1, _backend: str = "threads", sigma0: float = 0.05, sampler: str = "full", verbose: bool = True, keep_files: Union[None, str, Path] = None, force_injects: Optional[bool] = None, injection_interval: Optional[int] = None, **cmasettings, ): super().__init__( _opt_id, _signal_pipe, _results_queue, _pause_flag, _is_log_detailed, _workers, _backend, sigma0=sigma0, sampler=sampler, verbose=verbose, keep_files=keep_files, force_injects=force_injects, injection_interval=injection_interval, **cmasettings, ) self.verbose = verbose self.es = None self.result = None self.sigma0 = sigma0 self.cmasettings = cmasettings self.popsize = cmasettings["popsize"] if "popsize" in cmasettings else -1 self.stop_called = False self._user_settings_input = copy.deepcopy(cmasettings) # Directly store user input for amssettings method if "sigma" in cmasettings: warnings.warn( "The use of the 'sigma' keyword has been deprecated. Please use 'sigma0' instead.'", DeprecationWarning ) self.sigma0 = cmasettings["sigma"] del cmasettings["sigma"] if keep_files is True: self.keep_files = Path("cmadata") elif keep_files: self.keep_files = Path(keep_files) else: self.keep_files = None if self.keep_files and self.opt_id: self.keep_files = self.keep_files.with_suffix(f".{self.opt_id:03}") if self.sigma0 <= 0: self.logger.critical("sigma0 value invalid. Please select a positive value.") raise ValueError("sigma0 value invalid. Please select a positive value.") self.force_injects = force_injects self.injection_interval = injection_interval self.injection_counter = 0 if self.keep_files and "storeC" in cmasettings and cmasettings["storeC"] is True: self.storeC = [] else: self.storeC = False # Sort all non-native CMA options into the custom cmaoptions key 'vv': customopts = {} for key, val in [*self.cmasettings.items()]: if key not in cma.CMAOptions().keys(): customopts[key] = val del self.cmasettings[key] self.cmasettings["vv"] = customopts self.cmasettings["verbose"] = -3 # Silence CMA Logger if "maxiter" not in cmasettings: # Deactivated to not interfere with GloMPO Stopping cmasettings["maxiter"] = float("inf") self.sampler = sampler if sampler == "vd": self.cmasettings = GaussVDSampler.extend_cma_options(self.cmasettings) elif sampler == "vkd": self.cmasettings = GaussVkDSampler.extend_cma_options(self.cmasettings) elif sampler == "full": pass else: raise ValueError(f"Sampler {sampler} not understood. Choose from ['full', 'vd', 'vkd'].") def __amssettings__(self, s: Settings) -> Settings: s.input.ams.Optimizer.Type = "CMAES" s.input.ams.Optimizer.CMAES.Sigma0 = self.sigma0 s.input.ams.Optimizer.CMAES.Sampler = self.sampler s.input.ams.Optimizer.CMAES.Popsize = self.popsize s.input.ams.Optimizer.CMAES.InjectionInterval = self.injection_interval if self.injection_interval else 0 s.input.ams.Optimizer.CMAES["Verbose"] = bool(self.verbose) s.input.ams.Optimizer.CMAES["KeepFiles"] = bool(self.keep_files) s.input.ams.Optimizer.CMAES["ForceInjections"] = bool(self.force_injects) for k, v in self._user_settings_input.items(): if k == "minsigma": s.input.ams.Optimizer.CMAES.MinSigma = v s.input.ams.Optimizer.CMAES.Settings[k] = v return s
[docs] def minimize( self, function: Callable[[Sequence[float]], float], x0: Sequence[float], bounds: Sequence[Tuple[float, float]] ) -> MinimizeResult: """Begin CMA-ES minimization loop. :Parameters: function bounds callbacks See :meth:`.BaseOptimizer.minimize` x0 See :meth:`.BaseOptimizer.minimize`. Force injected into the solver to guarantee it is evaluated. :Returns: MinimizeResult Location, function value and other optimization information about the lowest value found by the optimizer. """ task_settings = copy.deepcopy(self.cmasettings) if self.popsize == -1: self.popsize = self.workers if self.popsize == 0 or self.popsize == 1: self.popsize = 4 + int(3 * np.log(len(x0))) task_settings["popsize"] = self.popsize if self.popsize < self.workers: warnings.warn( f"'popsize'={self.popsize} is less than 'workers'={self.workers}. " f"This is an inefficient use of resources" ) self.logger.warning( "'popsize'=%d is less than 'workers'=%d. This is an inefficient use of resources", self.popsize, self.workers, ) if self.keep_files: self.keep_files.mkdir(parents=True, exist_ok=True) task_settings.update({"verb_filenameprefix": str(self.keep_files / "cma_")}) with (self.keep_files / "cma_C.dat").open("a") as file: print("# iter min(abs(C)i==j) max(abs(C)i==j) min(abs(C)i!=j) max(abs(C)i!=j)", file=file) # Crucial hack to actually make CMA controllable by manager task_settings["termination_callback"] = self._termination_callback if not self.is_restart: self.logger.info("Setting up fresh CMA") self.result = MinimizeResult() task_settings.update({"bounds": np.transpose(bounds).tolist()}) self.es = cma.CMAEvolutionStrategy(x0, self.sigma0, task_settings) self.es.inject([x0], force=True) if self.keep_files: self.es.logger.name_prefix = str(self.keep_files / "cma_") self.logger.debug("Entering optimization loop") i = self.es.countiter x = None es = self.es es_opts = self.es.opts["vv"] while not es.stop() and not any(es.callbackstop): # es.stop evaluates the callback but es.callbackstop needs to be checked, it is not returned in es.stop i += 1 self.logger.debug("Asking for parameter vectors") x = es.ask() self.logger.debug("Parameter vectors generated") fx = self._parallel_map(function, x) if len(x) != len(fx): self.logger.debug("Unfinished evaluation detected. Breaking out of loop") break if i == 1: self.incumbent = {"x": x0, "fx": fx[0]} es.tell(x, fx) self.logger.debug("Told solutions") if "minsigma" in es_opts and es.sigma < es_opts["minsigma"]: # Stop if sigma falls below minsigma self.callstop("Early CMA stop: 'minsigma'.") if "maxsigma" in es_opts: es.sigma = min(es.sigma, es_opts["maxsigma"]) if self.keep_files: es.logger.add() self._log() with warnings.catch_warnings(record=True) as W: # catch RuntimeWarning in cma/evolution_strategy.py:3399++ self.result.x, self.result.fx = es.result[:2] for w in W: if w.category == RuntimeWarning and "invalid" in str(w.message).lower(): warnings.warn(f"Warning: Ill-defined region. Optimization might not succeed.", RuntimeWarning) else: warnings.warn(w.message) if self.result.fx == float("inf"): self.logger.warning( "CMA iteration found no valid results." "fx = 'inf' and x = (first vector generated by es.ask())" ) self.result.x = x[0] self.logger.debug("Extracted x and fx from result") if self.verbose and i % 10 == 0 or i == 1: print(f"@ iter = {i} fx={self.result.fx:.2E} sigma={es.sigma:.3E}") if self._results_queue: self.check_messages() self.logger.debug("Checked messages") self._pause_signal.wait() self.logger.debug("Passed pause test") self.logger.debug("callbacks called") if ( self.incumbent["fx"] < min(fx) and self.injection_interval and i - self.injection_counter >= self.injection_interval ): self.injection_counter = i es.inject([self.incumbent["x"]], force=self.force_injects) self.logger.debug("Exited optimization loop") if "tolfun" in es.stop() or "tolfunhist" in es.stop(): warnings.warn( f"CMA quit due to a flat loss function. If this happens early, the optimized parameters are " f"likely not sensitive to your training set." ) if self.keep_files: self._log() self.result.x, self.result.fx = es.result[:2] self.result.success = np.isfinite(self.result.fx) and self.result.success if self.result.fx == float("inf"): self.logger.warning( "CMA iteration found no valid results. " "fx = 'inf' and x = (first vector generated by es.ask())" ) self.result.x = x[0] if self.verbose: print(f"Optimization terminated: success = {self.result.success}") print(f"Optimizer convergence {es.stop()}") print(f"Final fx={self.result.fx:.2E}") if self._results_queue: self.logger.debug("Messaging termination to manager.") self.message_manager(0, f"Optimizer convergence {es.stop()}") if es.stop() != "Checkpoint Shutdown": if self.keep_files: name = "cma_" if self._opt_id: name += f"opt{self._opt_id}_" name += "results.pkl" with (self.keep_files / name).open("wb") as file: self.logger.debug("Pickling results") pickle.dump( {key: getattr(es, key) for key in ("result", "sigma", "B", "C", "D", "popsize", "sigma0")}, file ) return self.result
def _parallel_map( self, function: Callable[[Sequence[float]], float], x: Sequence[Sequence[float]] ) -> Sequence[float]: """Returns the function evaluations for a given set of trial parameters, x. Calculations are distributed over threads or processes depending on the number of workers and backend selected. """ if self.workers > 1: pool_executor = ProcessPoolExecutor if self._backend == "processes" else ThreadPoolExecutor self.logger.debug("Executing within %s with %d workers", pool_executor.__name__, self.workers) with pool_executor(max_workers=self.workers) as executor: submitted = {slot: executor.submit(function, parms) for slot, parms in enumerate(x)} # For very slow evaluations this will allow evaluations to be interrupted. if self._results_queue: loop = 0 for _ in as_completed(submitted.values()): loop += 1 self.logger.debug("Result %d/%d returned.", loop, len(x)) self._pause_signal.wait() self.check_messages() if any(self.es.callbackstop): self.logger.debug("Stop command received during function evaluations.") cancelled = [future.cancel() for future in submitted.values()] if self.logger.isEnabledFor(logging.DEBUG): self.logger.debug("Aborted %d calls.", sum(cancelled)) break fx = [future.result() for future in submitted.values() if not future.cancelled()] else: self.logger.debug("Executing serially") fx = [function(i) for i in x] return fx def _log(self): if self.sampler == "full": # log some min-max values of the covariance matrix with (self.keep_files / "cma_C.dat").open("a") as file: absC = abs(self.es.C) diag = np.diagonal(absC) offdiag = absC[~np.eye(absC.shape[0], dtype=bool)] print( f"{self.es.countiter:06d} " f"{diag.min():.4e} " f"{diag.max():.4e} " f"{offdiag.min():.4e} " f"{offdiag.max():.4e}", file=file, ) if self.storeC is not False and self.sampler == "full": # store the covariance matrix history self.storeC.append(self.es.C) np.save(str(self.keep_files / "cma_C.npy"), np.array(self.storeC, dtype="float32")) def callstop(self, reason: str = "Manager termination signal"): if reason and self.verbose: print(reason) self.logger.debug("Calling stop. Reason = %s", reason) self.stop_called = True self.result.success = all([reason != cond for cond in ("GloMPO Crash", "Manager termination signal")]) def _termination_callback(self, *args, **kwargs) -> bool: return self.stop_called # We need to remove all the functions stored in self.es before we can # pickle the optimizer for checkpointing. Otherwise the references to the # type bound functions cause the entire optimizer to be pickled as a # subobject, including things like _results_queue, which can then not be # unpickled, see e.g.: # # https://stackoverflow.com/questions/27002753/python-pickle-throwing-filenotfounderror # # This was new in AMS2023, see SCMSUITE-8709. Probably related to the # update of the CMA library since AMS2022 ...
[docs] def checkpoint_save( self, path: Union[Path, str], force: Optional[Set[str]] = None, block: Optional[Set[str]] = None ): try: self.es.inopts["termination_callback"] = None self.es.opts["termination_callback"] = None super().checkpoint_save(path, force, block) finally: self.es.inopts["termination_callback"] = self._termination_callback self.es.opts["termination_callback"] = self._termination_callback
[docs] @classmethod @needs_optional_package("dill") def checkpoint_load(cls: Type["CMAOptimizer"], path: Union[Path, str], **kwargs) -> "CMAOptimizer": opt = super(CMAOptimizer, cls).checkpoint_load(path, **kwargs) opt.es.inopts["termination_callback"] = opt._termination_callback opt.es.opts["termination_callback"] = opt._termination_callback return opt