Source code for scm.simple_active_learning.plams.simple_active_learning_job

import os
from pathlib import Path
from typing import Dict, List, Optional, Tuple, Union

from natsort import natsorted
from watchdog.events import FileModifiedEvent, PatternMatchingEventHandler
from watchdog.observers import Observer

from scm.akfreader import AKFReader
from scm.input_classes import drivers
from scm.libbase import KFError, KFFile
from scm.params.plams.paramsjob import ParAMSJob
from scm.plams import AMSJob, Molecule, Results, Settings, SingleJob, config, log
from scm.plams.core.errors import FileError


class SimpleActiveLearningJobLogTailHandler(PatternMatchingEventHandler):

    def __init__(self, job, jobmanager):
        super().__init__(
            patterns=[os.path.join(jobmanager.workdir, f"{job.name}*", "simple_active_learning.log")],
            ignore_patterns=["*.rkf", "*.out"],
            ignore_directories=True,
            case_sensitive=True,
        )
        self._job = job
        self._seekto = 0

    def on_any_event(self, event):
        if (
            self._job.path is not None
            and event.src_path == os.path.join(self._job.path, "simple_active_learning.log")
            and isinstance(event, FileModifiedEvent)
        ):
            try:
                with open(event.src_path, "r") as f:
                    f.seek(self._seekto)
                    while True:
                        line = f.readline()
                        if not line:
                            break
                        print(line.rstrip())
                    self._seekto = f.tell()
            except FileNotFoundError:
                self._seekto = 0


[docs]class SimpleActiveLearningResults(Results): """ Results class for SimpleActiveLearningJob """
[docs] def get_errormsg(self) -> str: """Returns the error message of this calculation if any were raised. :return: String containing the error message. :rtype: str """ return self.job.get_errormsg()
[docs] def rkfpath(self, file="simple_active_learning") -> str: """Returns path to simple_active_learning.rkf :return: Path to simple_active_learning.rkf :rtype: str """ if file not in ["simple_active_learning", "ams"]: raise ValueError( f"Unknown file argument to rkfpath(), expected one of simple_active_learning or ams: {file}" ) return str((Path(self.job.path) / f"{file}.rkf").resolve())
def recreate_molecule(self) -> Dict[str, Molecule]: """Obtain the input molecule(s) used to create this result from the rkf file associated with this result. :return: The input molecule(s). :rtype: Dict[str, Molecule] """ molecule = SimpleActiveLearningJob.from_rkf(self.rkfpath()).molecule if isinstance(molecule, Molecule): molecule = {"": molecule} return molecule def recreate_settings(self) -> Settings: """Obtain the original input used to create this result from the rkf file associated with this result. :return: The original input Settings. :rtype: Settings """ return SimpleActiveLearningJob.from_rkf(self.rkfpath()).settings
[docs] def readrkf(self, section: str, variable: str): """Reads simple_active_learning.rkf""" return AKFReader(self.rkfpath()).read(f"{section}%{variable}")
def _get_directory( self, suffix: str, allow_extra: bool, extra: Optional[str] = None, step: Optional[int] = None, attempt: Optional[int] = None, ) -> str: if step is None and attempt is not None: raise ValueError(f"Must specify both step and attempt. Got {step=} {attempt=}") if step is not None and attempt is None: raise ValueError(f"Must specify both step and attempt. Got {step=} {attempt=}") if allow_extra and not extra: raise ValueError("Must specify extra if allow_extra=True") if step is not None and attempt is not None: sas = Path(self.job.path) / f"step{step}_attempt{attempt}_{suffix}" if not sas.exists(): raise FileNotFoundError(f"Couldn't find {sas}") return str(sas) if allow_extra and extra: fps = Path(self.job.path) / extra if fps.exists(): return str(fps) # try to autodetect d = None for d in natsorted(Path(self.job.path).glob(f"step*_attempt*_{suffix}")): pass if d is not None: return str(d) initial_path = Path(self.job.path) / f"initial_{suffix}" if initial_path.exists(): return str(initial_path) loaded_path = Path(self.job.path) / f"loaded_{suffix}" if loaded_path.exists(): return str(loaded_path) raise FileNotFoundError(f"Couldn't find any {suffix} directory")
[docs] def get_simulation_directory( self, step: Optional[int] = None, attempt: Optional[int] = None, allow_final: bool = True ) -> str: """ Returns the absolute path to a simulation directory. step: optional, int The step number. If not specified will be autodetected to the last step. attempt: optional, int The attempt number. If not specified will be autodetected to the last attempt. allow_final: bool If True and step=None and attempt=None, then it will return final_production_simulation if it exists. """ return self._get_directory( "simulation", allow_extra=allow_final, extra="final_production_simulation", step=step, attempt=attempt )
[docs] def get_main_molecule(self, allow_final: bool = True) -> Union[Molecule, None]: """Returns AMSResults.get_main_molecule() on the main simulation job. :param allow_final: _description_, defaults to True :type allow_final: bool, optional :return: _description_ :rtype: Union[Molecule, Dict[str, Molecule], None] """ d = self.get_simulation_directory(allow_final=allow_final) job = AMSJob.load_external(d) return job.results.get_main_molecule()
[docs] def get_params_results_directory( self, step: Optional[int] = None, attempt: Optional[int] = None, allow_final: bool = True ) -> str: """ Returns the absolute path to a ParAMS results directory that can be loaded with ParAMSJob.load_external or used as LoadModel in ParAMS or SimpleActiveLearning input. step: optional, int The step number. If not specified will be autodetected to the last step. attempt: optional, int The attempt number. If not specified will be autodetected to the last attempt. allow_final: bool If True and step=None and attempt=None, then it will return final_training/results if it exists. """ params_dir = self._get_directory( "training", allow_extra=allow_final, extra="final_training", step=step, attempt=attempt ) results_dir = Path(params_dir) / "results" if not results_dir.exists(): raise FileNotFoundError(f"The ParAMS directory {params_dir} does not contain a results folder.") return str(results_dir)
[docs] def get_params_job( self, step: Optional[int] = None, attempt: Optional[int] = None, allow_final: bool = True ) -> ParAMSJob: """Returns the latest ParAMSJob. This can be used to analyze results from the parametrization. :param step: _description_, defaults to None :type step: Optional[int], optional :param attempt: _description_, defaults to None :type attempt: Optional[int], optional :param allow_final: _description_, defaults to True :type allow_final: bool, optional :return: _description_ :rtype: ParAMSJob """ return ParAMSJob.load_external( self.get_params_results_directory(step=step, attempt=attempt, allow_final=allow_final) )
[docs] def get_production_engine_settings( self, step: Optional[int] = None, attempt: Optional[int] = None, allow_final: bool = True ) -> Settings: """Returns the production engine settings from the ParAMSJob""" return self.get_params_job( step=step, attempt=attempt, allow_final=allow_final ).results.get_production_engine_settings()
[docs] def get_reference_data_directory( self, step: Optional[int] = None, attempt: Optional[int] = None, ) -> str: """ Returns the absolute path to a reference data directory that can be opened in the ParAMS GUI or which lets you initialize a ParAMSJob with ParAMSJob.from_yaml() step: optional, int The step number. If not specified will be autodetected to the last step. attempt: optional, int The attempt number. If not specified will be autodetected to the last attempt. allow_final: bool If True and step=None and attempt=None, then it will return final_training/results if it exists. """ return self._get_directory("reference_data", allow_extra=False, step=step, attempt=attempt)
[docs]class SimpleActiveLearningJob(SingleJob): """ PLAMS Job class for running Simple Active Learning. This class inherits from the PLAMS SingleJob class. For usage, see the SingleJob documentation. If you supply a Settings object to the constructor, it will be converted to a PISA (Python Input System for AMS) object. Attributes: * ``input``: an alias for self.settings.input """ _result_type = SimpleActiveLearningResults results: SimpleActiveLearningResults _command = "simple_active_learning" _json_definitions = "simple_active_learning" _subblock_end = "End"
[docs] def __init__( self, name: str = "simple_active_learning_job", driver: Optional[drivers.SimpleActiveLearning] = None, settings: Optional[Settings] = None, molecule: Optional[Union[Molecule, Dict[str, Molecule]]] = None, **kwargs, ): """ Initialize the SimpleActiveLearningJob. name : str The name of the job driver : scm.input_classes.drivers.SimpleActiveLearning PISA object describing the input to the SimpleActiveLearning program settings: scm.plams.Settings All settings for the job. Input settings in the PLAMS settings format under ``settings.input`` are automatically converted to the PISA format. You can specify ``settings.runscript.nproc`` to set the total number of cores to run on. molecule: scm.plams.Molecule or Dict[str, scm.plams.Molecule] The initial system in PLAMS Molecule format, or if the simulation requires multiple input system, given as a dictionary where the main system has an empty string ``""`` as the key. """ super().__init__(name=name, settings=settings, molecule=molecule, **kwargs) if driver is not None: self.settings.input = driver elif self.settings.input: text_input = AMSJob(settings=self.settings).get_input() self.settings.input = drivers.SimpleActiveLearning.from_text(text_input) else: self.settings.input = drivers.SimpleActiveLearning()
[docs] @classmethod def load_external(cls, path: Union[str, Path], finalize: bool = False) -> "SimpleActiveLearningJob": """Load a previous SimpleActiveLearning job from disk. :param path: A reactions discovery results folder. :type path: Union[str, Path] :param finalize: See SingleJob, defaults to False :type finalize: bool, optional :raises FileError: When the path does not exist. :return: An initialized SimpleActiveLearningJob :rtype: SimpleActiveLearningJob """ path = Path(path) if not os.path.isdir(path): if os.path.exists(path): path = os.path.dirname(os.path.abspath(path)) elif os.path.isdir(path / ".results"): path = path / ".results" elif os.path.isdir(path / "results"): path = path / "results" else: raise FileError("Path {} does not exist, cannot load from it.".format(path)) job = super(SimpleActiveLearningJob, cls).load_external(path, finalize=finalize) if job.name.endswith(".results") and len(job.name) > 8: job.name = job.name[:-8] return job
[docs] @classmethod def from_rkf(cls, path: str) -> "SimpleActiveLearningJob": """Initialize a job from a simple_active_learning.rkf file. :param path: Path to a simple_active_learning.rkf file :type path: str :return: A new SimpleActiveLearningJob instance based on the information found in path. :rtype: SimpleActiveLearningJob """ with KFFile(path) as kf: text_input = kf.read_string("General", "user input") return cls.from_input(text_input)
[docs] @classmethod def from_input(cls, text_input: str): """Initialize a job from text input. :param text_input: A multiline text input :type text_input: str :return: A SimpleActiveLearningJob :rtype: SimpleActiveLearningJob """ driver = drivers.SimpleActiveLearning.from_text(text_input) molecule = cls._extract_mol_from_pisa(driver) return SimpleActiveLearningJob(driver=driver, molecule=molecule)
@staticmethod def _extract_mol_from_pisa(pisa: drivers.SimpleActiveLearning) -> Union[Molecule, Dict[str, Molecule]]: """Remove a molecule from a System block in the SimpleActiveLearning PISA object and return it as molecule(s)""" settings = Settings() settings.input.ams.system = pisa.to_settings().system molecule = AMSJob.settings_to_mol(settings) object.__setattr__(pisa, "System", pisa._System("System")) return molecule
[docs] def get_errormsg(self) -> str: """Returns the contents of the jobname.err file if it exists. If the file does not exist an empty string is returned. :return: The error message :rtype: str """ try: with open(self.results["$JN.err"], "r") as err: errlines = err.read() return errlines except FileNotFoundError: return ""
[docs] def get_input(self) -> str: """Obtain the input string. :return: An input string. :rtype: str """ return AMSJob.get_input(self)
[docs] def get_runscript(self) -> str: """ Generates the runscript. Use ``self.settings.runscript.preamble_lines = ['line1', 'line2']`` or similarly for ``self.settings.runscript.postamble_lines`` to set custom settings. ``self.settings.runscript.nproc`` controls the total number of cores to run on. """ filename = self._filename("inp") ret = "" for line in self.settings.runscript.get("preamble_lines", ""): ret += f"{line}\n" # need to use `pwd` here and not "." since "." doesn't expand ret += f'AMS_JOBNAME="{self.name}" AMS_RESULTSDIR="`pwd`" "$AMSBIN/{self._command}" ' nproc = self.settings.runscript.get("nproc", None) if nproc: ret += f"-n {nproc} " ret += f'< "{filename}"\n' for line in self.settings.runscript.get("postamble_lines", ""): ret += f"{line}\n" return ret
[docs] def check(self) -> bool: """Returns True if "NORMAL TERMINATION" is given in the General section of simple_active_learning.rkf.""" with KFFile(self.results.rkfpath()) as kf: termination = kf.read_string("General", "termination status") i_am_ok = "NORMAL TERMINATION" in termination if not i_am_ok: return False return True
[docs] def ok(self) -> bool: """Synonym for check()""" return self.check()
[docs] def run(self, jobrunner=None, jobmanager=None, watch: bool = False, **kwargs) -> SimpleActiveLearningResults: """ Runs the job """ if watch: if "default_jobmanager" in config: jobmanager = config.default_jobmanager else: raise RuntimeError("No default jobmanager found. This probably means that PLAMS init() was not called.") observer = Observer() event_handler = SimpleActiveLearningJobLogTailHandler(self, jobmanager) observer.schedule(event_handler, jobmanager.workdir, recursive=True) observer.start() try: results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs) results.wait() finally: observer.stop() observer.join() else: results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs) return results
@property def input(self) -> drivers.SimpleActiveLearning: """PISA format input""" return self.settings.input @input.setter def input(self, input: drivers.SimpleActiveLearning): self.settings.input = input def _serialize_input(self, s): return AMSJob._serialize_input(self, s) def _serialize_molecule(self): return AMSJob._serialize_molecule(self) @staticmethod def _atom_symbol(s): return AMSJob._atom_symbol(s) @staticmethod def _atom_suffix(s): return AMSJob._atom_suffix(s)