Source code for scm.plams.interfaces.adfsuite.ams

import os
from os.path import join as opj
from typing import Dict, List, Literal, Set, Union

import numpy as np
from scm.plams.core.basejob import SingleJob
from scm.plams.core.errors import FileError, JobError, PlamsError, PTError, ResultsError
from scm.plams.core.functions import config, log, parse_heredoc
from scm.plams.core.private import sha256
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.mol.atom import Atom
from scm.plams.mol.bond import Bond
from scm.plams.mol.molecule import Molecule
from scm.plams.tools.converters import gaussian_output_to_ams, qe_output_to_ams, vasp_output_to_ams
from scm.plams.tools.kftools import KFFile, KFReader
from scm.plams.tools.units import Units

try:
    from scm.pisa.block import DriverBlock

    _has_scm_pisa = True
except ImportError:
    _has_scm_pisa = False

try:
    from scm.libbase import UnifiedChemicalSystem as ChemicalSystem

    _has_scm_unichemsys = True
except ImportError:
    _has_scm_unichemsys = False


try:
    from watchdog.events import FileModifiedEvent, PatternMatchingEventHandler
    from watchdog.observers import Observer

    _has_watchdog = True

    class AMSJobLogTailHandler(PatternMatchingEventHandler):

        def __init__(self, job, jobmanager):
            super().__init__(
                patterns=[os.path.join(jobmanager.workdir, f"{job.name}*", "ams.log")],
                ignore_patterns=["*.rkf", "*.out"],
                ignore_directories=True,
                case_sensitive=True,
            )
            self._job = job
            self._seekto = 0

        def on_any_event(self, event):
            if (
                self._job.path is not None
                and event.src_path == os.path.join(self._job.path, "ams.log")
                and isinstance(event, FileModifiedEvent)
            ):
                try:
                    with open(event.src_path, "r") as f:
                        f.seek(self._seekto)
                        while True:
                            line = f.readline()
                            if not line:
                                break
                            log(f"{self._job.name}: " + line[25:-1])
                        self._seekto = f.tell()
                except FileNotFoundError:
                    self._seekto = 0

except ImportError:
    _has_watchdog = False


__all__ = ["AMSJob", "AMSResults"]


[docs]class AMSResults(Results): """A specialized |Results| subclass for accessing the results of |AMSJob|.""" def __init__(self, *args, **kwargs): Results.__init__(self, *args, **kwargs) self.rkfs = {}
[docs] def collect(self): """Collect files present in the job folder. Use parent method from |Results|, then create an instance of |KFFile| for each ``.rkf`` file present in the job folder. Collect these files in ``rkfs`` dictionary, with keys being filenames without ``.rkf`` extension. The information about ``.rkf`` files generated by engines is taken from the main ``ams.rkf`` file. This method is called automatically during the final part of the job execution and there is no need to call it manually. """ Results.collect(self) self.collect_rkfs()
def collect_rkfs(self): rkfname = "ams.rkf" if rkfname in self.files: main = KFFile(opj(self.job.path, rkfname)) n = main[("EngineResults", "nEntries")] for i in range(1, n + 1): files = main[("EngineResults", "Files({})".format(i))].split("\x00") if files[0].endswith(".rkf"): key = files[0][:-4] self.rkfs[key] = KFFile(opj(self.job.path, files[0])) self.rkfs["ams"] = main else: log("WARNING: Main KF file {} not present in {}".format(rkfname, self.job.path), 1) def _copy_to(self, newresults): super()._copy_to(newresults) newresults.rkfs = {} newresults.collect_rkfs()
[docs] def refresh(self): """Refresh the contents of ``files`` list. Use the parent method from |Results|, then look at |KFFile| instances present in ``rkfs`` dictionary and check if they point to existing files. If not, try to reinstantiate them with current job path (that can happen while loading a pickled job after the entire job folder was moved). """ Results.refresh(self) to_remove = [] for key, val in self.rkfs.items(): if not os.path.isfile(val.path): if os.path.dirname(val.path) != self.job.path: guessnewpath = opj(self.job.path, os.path.basename(val.path)) if os.path.isfile(guessnewpath): self.rkfs[key] = KFFile(guessnewpath) else: to_remove.append(key) else: to_remove.append(key) for i in to_remove: del self.rkfs[i]
[docs] def engine_names(self): """Return a list of all names of engine specific ``.rkf`` files. The identifier of the main result file (``'ams'``) is not present in the returned list, only engine specific names are listed.""" self.refresh() ret = list(self.rkfs.keys()) ret.remove("ams") return ret
[docs] def read_hybrid_term_rkf(self, section: str, variable: str, term: int, file="engine"): """Reads a Hybrid-termX-subengine.rkf file. The engine.rkf file contains a section EngineResults with Files(1), Files(2) etc. that point to the individual term .rkf files. This method reads the corresponding individual term .rkf file. Example: job.results.read_hybrid_term_rkf("AMSResults", "Energy", file="engine", term=1) """ kf_file = self.readrkf("EngineResults", f"Files({term})", file=file) kf = KFReader(os.path.join(self.job.path, kf_file)) return kf.read(section, variable)
[docs] def rkfpath(self, file: str = "ams"): """Return the absolute path of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. """ return self._access_rkf(lambda x: x.path, file)
[docs] def readrkf(self, section, variable, file: str = "ams"): """Read data from *section*/*variable* of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. The type of the returned value depends on the type of *variable* defined inside KF file. It can be: single int, list of ints, single float, list of floats, single boolean, list of booleans or string. .. note:: If arguments *section* or *variable* are incorrect (not present in the chosen file), the returned value is ``None``. Please mind the fact that KF files are case sensitive. """ return self._access_rkf(lambda x: x.read(section, variable), file)
[docs] def read_rkf_section(self, section: str, file: str = "ams"): """Return a dictionary with all variables from a given *section* of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. .. note:: If *section* is not present in the chosen file, the returned value is an empty dictionary. Please mind the fact that KF files are case sensitive. """ return self._access_rkf(lambda x: x.read_section(section), file)
[docs] def get_rkf_skeleton(self, file: str = "ams") -> Dict[str, Set[str]]: """Return a dictionary with the structure of a chosen ``.rkf`` file. Each key corresponds to a section name with the value being a set of variable names present in that section. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. """ return self._access_rkf(lambda x: x.get_skeleton(), file)
[docs] def get_molecule(self, section: str, file: str = "ams"): """Return a |Molecule| instance stored in a given *section* of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. All data used by this method is taken from the chosen ``.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ sectiondict = self.read_rkf_section(section, file) if sectiondict: return Molecule._mol_from_rkf_section(sectiondict)
def get_ase_atoms(self, section: str, file: str = "ams"): from ase import Atoms sectiondict = self.read_rkf_section(section, file) bohr2angstrom = 0.529177210903 nLatticeVectors = sectiondict.get("nLatticeVectors", 0) pbc = [True] * nLatticeVectors + [False] * (3 - nLatticeVectors) if nLatticeVectors > 0: cell = np.zeros((3, 3)) lattice = np.array(sectiondict.get("LatticeVectors")).reshape(-1, 3) cell[: lattice.shape[0], : lattice.shape[1]] = lattice * bohr2angstrom else: cell = None atomsymbols = sectiondict["AtomSymbols"].split() positions = np.array(sectiondict["Coords"]).reshape(-1, 3) * bohr2angstrom return Atoms(symbols=atomsymbols, positions=positions, pbc=pbc, cell=cell)
[docs] def get_input_molecule(self): """Return a |Molecule| instance with the initial coordinates. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ return self.get_molecule("InputMolecule", "ams")
[docs] def get_input_molecules(self) -> Dict[str, Molecule]: """Return a dictionary mapping the name strings from the AMS input file to |Molecule| instances. The main molecule (aka the one that did not have a string in the block header in the input file) will be returned under the key of the empty string "". All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ mols: Dict[str, Molecule] = {} skel = self.get_rkf_skeleton() if "InputMolecule" in skel: mols[""] = self.get_molecule("InputMolecule") if "InputMolecules" in skel: num_named_molecules: int = self.readrkf("InputMolecules", "numNamedMolecules") for imol in range(1, num_named_molecules + 1): mols[self.readrkf("InputMolecules", f"Name({imol})")] = self.get_molecule(f"InputMolecule({imol})") return mols
[docs] def get_main_molecule(self): """Return a |Molecule| instance with the final coordinates. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ return self.get_molecule("Molecule", "ams")
[docs] def get_main_ase_atoms(self, get_results: bool = False): """Return an ase.Atoms instance with the final coordinates. An alternative is to call toASE(results.get_main_molecule()) to convert a Molecule to ASE Atoms. get_results : bool If True, the returned Atoms will have a SinglePointCalculator as their calculator, allowing you to call .get_potential_energy(), .get_forces(), and .get_stress() on the returned Atoms object. """ from ase.calculators.singlepoint import SinglePointCalculator atoms = self.get_ase_atoms("Molecule", "ams") if get_results: atoms.set_calculator(SinglePointCalculator(atoms)) try: energy = self.get_energy(unit="eV") atoms.calc.results["energy"] = energy except KeyError: pass try: forces = (-1) * np.array(self.get_gradients(dist_unit="angstrom", energy_unit="eV")).reshape(-1, 3) atoms.calc.results["forces"] = forces except KeyError: pass try: stress = np.array(self.get_stresstensor()).ravel() * Units.convert(1.0, "hartree/bohr^3", "eV/ang^3") if len(stress) == 9: stress = stress.reshape(3, 3) atoms.calc.results["stress"] = np.array( [stress[0][0], stress[1][1], stress[2][2], stress[1][2], stress[0][2], stress[0][1]] ) except KeyError: pass return atoms
[docs] def get_history_molecule(self, step: int): """Return a |Molecule| instance with coordinates taken from a particular *step* in the ``History`` section of ``ams.rkf`` file. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ if "ams" in self.rkfs: main = self.rkfs["ams"] if not self.is_valid_stepnumber(main, step): return coords = main.read("History", f"Coords({step})") coords = [coords[i : i + 3] for i in range(0, len(coords), 3)] if ("History", f"SystemVersion({step})") in main: system = self.get_system_version(main, step) mol = self.get_molecule(f"ChemicalSystem({system})") molsrc = f"ChemicalSystem({system})" else: mol = self.get_main_molecule() molsrc = "Molecule" assert mol is not None if len(mol) != len(coords): raise ResultsError( f'Coordinates taken from "History%Coords({step})" have incompatible length with molecule from {molsrc} section' ) for at, c in zip(mol, coords): at.move_to(c, unit="bohr") if ("History", "LatticeVectors(" + str(step) + ")") in main: lattice = Units.convert(main.read("History", "LatticeVectors(" + str(step) + ")"), "bohr", "angstrom") mol.lattice = [tuple(lattice[j : j + 3]) for j in range(0, len(lattice), 3)] # Bonds from the reference molecule are probably outdated. Let us never use them ... mol.delete_all_bonds() # ... but instead use bonds from the history section if they are available: if all( ("History", i) in main for i in [f"Bonds.Index({step})", f"Bonds.Atoms({step})", f"Bonds.Orders({step})"] ): index = main.read("History", f"Bonds.Index({step})") if not isinstance(index, list): index = [index] atoms = main.read("History", f"Bonds.Atoms({step})") if not isinstance(atoms, list): atoms = [atoms] orders = main.read("History", f"Bonds.Orders({step})") if not isinstance(orders, list): orders = [orders] for i in range(len(index) - 1): for j in range(index[i], index[i + 1]): mol.add_bond(mol[i + 1], mol[atoms[j - 1]], orders[j - 1]) if ("History", f"Bonds.CellShifts({step})") in main: assert mol.lattice cellShifts = main.read("History", f"Bonds.CellShifts({step})") ndim = len(mol.lattice) for i, b in enumerate(mol.bonds): b.properties.suffix = " ".join( [f"{cellShifts[ndim*i+j]}" for j in range(min(len(mol.lattice), ndim))] ) return mol
[docs] def is_valid_stepnumber(self, main, step: int): """ Check if the requested step number is in the results file """ if "History" not in main: raise KeyError("'History' section not present in {}".format(main.path)) n = main.read("History", "nEntries") if step > n: raise KeyError("Step {} not present in 'History' section of {}".format(step, main.path)) return True
[docs] def get_system_version(self, main, step: int): """ Determine which Molecule version is requested """ if ("History", f"SystemVersion({step})") in main: version = main.read("History", f"SystemVersion({step})") if "SystemVersionHistory" in main: if ("SystemVersionHistory", "blockSize") in main: blockSize = main.read("SystemVersionHistory", "blockSize") else: blockSize = 1 block = (version - 1) // blockSize + 1 offset = (version - 1) % blockSize system = main.read("SystemVersionHistory", f"SectionNum({block})", return_as_list=True)[offset] else: system = version return system
[docs] def get_history_variables(self, history_section: str = "History"): """Return a set of keynames stored in the specified history section of the ``ams.rkf`` file. The *history_section argument should be a string representing the name of the history section (``History`` or ``MDHistory``)* """ if not "ams" in self.rkfs: return main = self.rkfs["ams"] keylist = [var for sec, var in main if sec == history_section] # Now throw out all the last parts return set([key.split("(")[0] for key in keylist if len(key.split("(")) > 1])
[docs] def get_history_length(self, history_section: str = "History") -> int: """Returns the number of entries (nEntries) in the history section on the ams.rkf file.""" return self.readrkf(history_section, "nEntries")
[docs] def get_history_property(self, varname: str, history_section: str = "History"): """Return the values of *varname* in the history section *history_section*.""" if not "ams" in self.rkfs: return main = self.rkfs["ams"] if history_section not in main: raise KeyError(f"The requested section '{history_section}' does not exist in {main.path}") if (history_section, "nScanCoord") in main: # PESScan nentries = main.read(history_section, "nScanCoord") as_block = False elif (history_section, "nEntries") in main: nentries = main.read(history_section, "nEntries") as_block = self._values_stored_as_blocks(main, varname, history_section) if as_block: nblocks = main.read(history_section, "nBlocks") values = [ main.read(history_section, f"{varname}({iblock})", return_as_list=True) for iblock in range(1, nblocks + 1) ] values = [val for blockvals in values for val in blockvals if isinstance(blockvals, list)] else: values = [main.read(history_section, f"{varname}({step})") for step in range(1, nentries + 1)] return values
[docs] def get_property_at_step(self, step: int, varname: str, history_section: str = "History"): """Return the value of *varname* in the history section *history_section at step *step*.""" if not "ams" in self.rkfs: return main = self.rkfs["ams"] as_block = self._values_stored_as_blocks(main, varname, history_section) if as_block: blocksize = main.read(history_section, "blockSize") iblock = int(np.ceil(step / blocksize)) value = main.read( history_section, f"{varname}({iblock})" ) # this can return something that isn't a list, for example an int try: value = value[(step % blocksize) - 1] except TypeError: # TypeError: 'int' object is not subscriptable pass else: value = main.read(history_section, f"{varname}({step})") return value
def _values_stored_as_blocks(self, main, varname: str, history_section: str): """Determines wether the values of varname in a trajectory rkf file are stored in blocks""" nentries = main.read(history_section, "nEntries") as_block = False # This is extremely slow, because looping over main is very slow. # This is because the (sec,var) tuples are first stored in a set, then sorted, and only then yielded keylist = [var for sec, var in main if sec == history_section] if "nBlocks" in keylist: if not f"{varname}({nentries})" in keylist: as_block = True return as_block
[docs] def get_atomic_temperatures_at_step(self, step: int, history_section: str = "MDHistory"): """ Get all the atomic temperatures for step `step` Note: Numbering of steps starts at 1 """ if not "ams" in self.rkfs: return main = self.rkfs["ams"] if not self.is_valid_stepnumber(main, step): return # Read the masses molname = "Molecule" if ("History", f"SystemVersion({step})") in main: system = self.get_system_version(main, step) molname = "ChemicalSystem(%i)" % (system) masses = np.array(main.read(molname, "AtomMasses")) nats = len(masses) # Read the velocities velocities = np.array(self.get_property_at_step(step, "Velocities", history_section)).reshape((nats, 3)) # Convert to SI units and compute temperatures m = masses * 1.0e-3 / Units.constants["NA"] vels = velocities * Units.conversion_ratio("Bohr", "Angstrom") * 1.0e5 temperatures = (m.reshape((nats, 1)) * vels**2).sum(axis=1) temperatures /= 3 * Units.constants["k_B"] return temperatures
[docs] def get_band_structure(self, bands=None, unit: str = "hartree", only_high_symmetry_points: bool = False): """ Extracts the electronic band structure from DFTB or BAND calculations. The returned data can be plotted with ``plot_band_structure``. Note: for unrestricted calculations bands 0, 2, 4, ... are spin-up and bands 1, 3, 5, ... are spin-down. Returns: ``x``, ``y_spin_up``, ``y_spin_down``, ``labels``, ``fermi_energy`` ``x``: 1D array of float ``y_spin_up``: 2D array of shape (len(x), len(bands)). Every column is a separate band. In units of ``unit`` ``y_spin_down``: 2D array of shape (len(x), len(bands)). Every column is a separate band. In units of ``unit``. If the calculation was restricted this is identical to y_spin_up. ``labels``: 1D list of str of length len(x). If a point is not a high-symmetry point then the label is an empty string. ``fermi_energy``: float. The Fermi energy (in units of ``unit``). Arguments below. bands: list of int or None If None, all bands are returned. Note: the band indices start with 0. unit: str Unit of the returned band energies only_high_symmetry_points: bool Return only the first point of each edge. """ read_labels = True nBands = self.readrkf("band_curves", "nBands", file="engine") nEdges = self.readrkf("band_curves", "nEdges", file="engine") try: nSpin = self.readrkf("band_curves", "nSpin", file="engine") except KeyError: nSpin = 1 if bands is None: bands = np.arange(nBands) spindown_bands = bands if nSpin == 2: spindown_bands = np.array(bands) + nBands prevmaxx = 0 complete_spinup_data = [] complete_spindown_data = [] labels = [] x = [] for i in range(nEdges): my_x = self.readrkf("band_curves", f"Edge_{i+1}_xFor1DPlotting", file="engine") my_x = np.array(my_x) + prevmaxx prevmaxx = np.max(my_x) if read_labels: my_labels = self.readrkf("band_curves", f"Edge_{i+1}_labels", file="engine").split() if len(my_labels) == 2: if only_high_symmetry_points: labels += [my_labels[0]] # only the first point of the curve else: labels += [my_labels[0]] + [""] * (len(my_x) - 2) + [my_labels[1]] if only_high_symmetry_points: x.append(my_x[0:1]) else: x.append(my_x) A = self.readrkf("band_curves", f"Edge_{i+1}_bands", file="engine") A = np.array(A).reshape(-1, nBands * nSpin) spinup_data = A[:, bands] spindown_data = A[:, spindown_bands] if only_high_symmetry_points: spinup_data = np.reshape(spinup_data[0, :], (-1, len(bands))) spindown_data = np.reshape(spindown_data[0, :], (-1, len(spindown_bands))) complete_spinup_data.append(spinup_data) complete_spindown_data.append(spindown_data) complete_spinup_data = np.concatenate(complete_spinup_data) complete_spinup_data = Units.convert(complete_spinup_data, "hartree", unit) complete_spindown_data = np.concatenate(complete_spindown_data) complete_spindown_data = Units.convert(complete_spindown_data, "hartree", unit) x = np.concatenate(x).ravel() fermi_energy = self.readrkf("BandStructure", "FermiEnergy", file="engine") fermi_energy = Units.convert(fermi_energy, "hartree", unit) return x, complete_spinup_data, complete_spindown_data, labels, fermi_energy
[docs] def get_engine_results(self, engine: str = None): """Return a dictionary with contents of ``AMSResults`` section from an engine results ``.rkf`` file. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read_section("AMSResults"), engine)
[docs] def get_engine_properties(self, engine: str = None): """Return a dictionary with all the entries from ``Properties`` section from an engine results ``.rkf`` file. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ def properties(kf): if not "Properties" in kf: return {} # There are two *kinds* of "Properties" sections: # - ADF simply has a set of variables in the properties section # - DFTB and some other engines have a *scheme* for storing "Properties". See the fortran 'RKFileModule' (RKFile.f90) # for more details on this. ret = {} if ("Properties", "nEntries") in kf: # This is a 'RKFileModule' properties section: n = kf.read("Properties", "nEntries") for i in range(1, n + 1): tp = kf.read("Properties", "Type({})".format(i)).strip() stp = kf.read("Properties", "Subtype({})".format(i)).strip() val = kf.read("Properties", "Value({})".format(i)) key = stp if stp.endswith(tp) else ("{} {}".format(stp, tp) if stp else tp) ret[key] = val else: skeleton = kf.get_skeleton() for variable in skeleton["Properties"]: ret[variable] = kf.read("Properties", variable) return ret return self._process_engine_results(properties, engine)
[docs] def get_energy(self, unit: str = "hartree", engine: str = None): """Return final energy, expressed in *unit*. The final energy is found in AMSResults%Energy of the engine rkf file. You can find the meaning of final energy in the engine documentation. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read("AMSResults", "Energy"), engine) * Units.conversion_ratio( "au", unit )
[docs] def get_energy_uncertainty(self, unit: str = "hartree", engine: str = None): """Return final energy uncertainty, expressed in *unit*. The final energy is found in AMSResults%EnergyU of the engine rkf file. You can find the meaning of final energy uncertainty in the engine documentation. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results( lambda x: x.read("AMSResults", "EnergyUncertainty"), engine ) * Units.conversion_ratio("au", unit)
[docs] def get_gradients(self, energy_unit: str = "hartree", dist_unit: str = "bohr", engine: str = None) -> np.ndarray: """Return the gradients of the final energy, expressed in *energy_unit* / *dist_unit*. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return ( np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "Gradients"), engine)).reshape(-1, 3) * Units.conversion_ratio("au", energy_unit) / Units.conversion_ratio("au", dist_unit) )
[docs] def get_gradients_uncertainty( self, energy_unit: str = "hartree", dist_unit: str = "bohr", engine: str = None ) -> np.ndarray: """Return the uncertainty of the gradients of the final energy, expressed in *energy_unit* / *dist_unit*. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return ( np.asarray( self._process_engine_results(lambda x: x.read("AMSResults", "GradientsUncertainty"), engine) ).reshape(-1, 3) * Units.conversion_ratio("au", energy_unit) / Units.conversion_ratio("au", dist_unit) )
[docs] def get_gradients_magnitude_uncertainty( self, energy_unit: str = "hartree", dist_unit: str = "bohr", engine: str = None ) -> np.ndarray: """Return the uncertainty of the magnitude of the gradients of the final energy, expressed in *energy_unit* / *dist_unit*. This is computed using error propagation. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return ( np.asarray( self._process_engine_results(lambda x: x.read("AMSResults", "GradientsMagnitudeUncertainty"), engine) ).reshape(-1) * Units.conversion_ratio("au", energy_unit) / Units.conversion_ratio("au", dist_unit) )
[docs] def get_stresstensor(self, engine: str = None) -> np.ndarray: """Return the final stress tensor, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "StressTensor"), engine)).reshape( len(self.get_input_molecule().lattice), -1 )
[docs] def get_hessian(self, engine: str = None) -> np.ndarray: """Return the Hessian matrix, i.e. the second derivative of the total energy with respect to the nuclear coordinates, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "Hessian"), engine)).reshape( 3 * len(self.get_input_molecule()), -1 )
[docs] def get_elastictensor(self, engine: str = None): """Return the elastic tensor, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ et_flat = np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "ElasticTensor"), engine)) num_latvec = len(self.get_input_molecule().lattice) if num_latvec == 1: return et_flat.reshape(1, 1) elif num_latvec == 2: return et_flat.reshape(3, 3) else: return et_flat.reshape(6, 6)
[docs] def get_frequencies(self, unit: str = "cm^-1", engine: str = None): """Return a numpy array of vibrational frequencies, expressed in *unit*. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ freqs = self._process_engine_results(lambda x: x.read("Vibrations", "Frequencies[cm-1]"), engine) freqs = np.array(freqs) if isinstance(freqs, list) else np.array([freqs]) return freqs * Units.conversion_ratio("cm^-1", unit)
[docs] def get_frequency_spectrum( self, engine: str = None, broadening_type: Literal["gaussian", "lorentzian"] = "gaussian", broadening_width=40, min_x=0, max_x=4000, x_spacing=0.5, post_process: Literal["max_to_1"] = None, ): """Return the "frequency spectrum" (i.e. the frequencies broaden with intensity equal to 1). Units: frequencies are in cm-1, the intensities by the default are in mode counts units but post_process is equal to max_to_1 are in arbitrary units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ from scm.plams.tools.postprocess_results import broaden_results frequencies = self.get_frequencies(engine=engine) intensities = frequencies * 0 + 1 x_data, y_data = broaden_results( centers=frequencies, areas=intensities, broadening_width=broadening_width, broadening_type=broadening_type, x_data=(min_x, max_x, x_spacing), post_process=post_process, ) if post_process == "max_to_1": y_data /= np.max(y_data) return x_data, y_data
[docs] def get_force_constants(self, engine: str = None): """Return a numpy array of force constants, expressed in atomic units (Hartree/bohr^2). The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ forceConstants = self._process_engine_results(lambda x: x.read("Vibrations", "ForceConstants"), engine) forceConstants = np.array(forceConstants) if isinstance(forceConstants, list) else np.array([forceConstants]) return forceConstants
[docs] def get_normal_modes(self, engine: str = None): """Return a numpy array of normal modes with shape: (num_normal_modes, num_atoms, 3), expressed in dimensionless units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ normal_modes_list = [] num_normal_modes = self._process_engine_results(lambda x: x.read("Vibrations", "nNormalModes"), engine) for i in range(num_normal_modes): n_mode = np.array( self._process_engine_results(lambda x: x.read("Vibrations", f"NoWeightNormalMode({i+1})"), engine) ).reshape(-1, 3) normal_modes_list.append(n_mode) return np.array(normal_modes_list).reshape(num_normal_modes, -1, 3)
[docs] def get_charges(self, engine: str = None): """Return the atomic charges, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "Charges"), engine))
[docs] def get_dipolemoment(self, engine: str = None): """Return the electric dipole moment, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "DipoleMoment"), engine))
[docs] def get_dipolegradients(self, engine: str = None): """Return the nuclear gradients of the electric dipole moment, expressed in atomic units. This is a (3*numAtoms x 3) matrix. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray( self._process_engine_results(lambda x: x.read("AMSResults", "DipoleGradients"), engine) ).reshape(-1, 3)
[docs] def get_polarizability(self, engine: str = None): """Return the polarizability, expressed in atomic units [(e*bohr)^2/hartree]. This is a (3 x 3) matrix. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ p_components = np.asarray( self._process_engine_results(lambda x: x.read("AMSResults", "Polarizability"), engine) ) if p_components.shape == (6,): polarizability_matrix = np.array( [ [p_components[0], p_components[1], p_components[3]], [p_components[1], p_components[2], p_components[4]], [p_components[3], p_components[4], p_components[5]], ] ) elif p_components.shape == (3, 3): polarizability_matrix = p_components else: raise ValueError( f"AMSResults-Polarizability shape is {p_components.shape} not in agreement with the option as inputs (6,) [xx,xy,yy,xz,zy,zz] or (3,3)" ) return polarizability_matrix
[docs] def get_zero_point_energy(self, unit: str = "hartree", engine: str = None): """Return zero point energy, expressed in *unit*. The zero point energy is found in Vibrations%ZeroPointEnergy of the engine rkf file. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results( lambda x: x.read("Vibrations", "ZeroPointEnergy"), engine ) * Units.conversion_ratio("au", unit)
[docs] def get_ir_intensities(self, engine: str = None): """Return the IR intensities in km/mol unit (kilometers/mol). The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray( self._process_engine_results(lambda x: x.read("Vibrations", "Intensities[km/mol]"), engine) ).reshape( -1, )
[docs] def get_ir_spectrum( self, engine: str = None, broadening_type: Literal["gaussian", "lorentzian"] = "gaussian", broadening_width=40, min_x=0, max_x=4000, x_spacing=0.5, post_process: Literal["all_intensities_to_1", "max_to_1"] = None, ): """Return the IR spectrum. Units: frequencies are in cm-1, the intensities by the default are in km/mol units (kilometers/mol) but if post_process is all_intensities_to_1 the units are in modes counts otherwise if equal to max_to_1 are in arbitrary units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ from scm.plams.tools.postprocess_results import broaden_results frequencies = self.get_frequencies(engine=engine) if post_process == "all_intensities_to_1": intensities = frequencies * 0 + 1 else: intensities = self.get_ir_intensities(engine=engine) x_data, y_data = broaden_results( centers=frequencies, areas=intensities, broadening_width=broadening_width, broadening_type=broadening_type, x_data=(min_x, max_x, x_spacing), post_process=post_process, ) if post_process == "max_to_1": y_data /= np.max(y_data) return x_data, y_data
[docs] def get_n_spin(self, engine: str = None): """n_spin is 1 in case of spin-restricted or spin-orbit coupling calculations, and 2 in case of spin-unrestricted calculations The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read("AMSResults", "nSpin"))
[docs] def get_orbital_energies(self, unit: str = "Hartree", engine: str = None): """Return the orbital energies in a numpy array of shape [nSpin,nOrbitals] (nSpin is 1 in case of spin-restricted or spin-orbit coupling and 2 in case of spin unrestricted) The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return Units.convert( np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "orbitalEnergies"), engine)).reshape( self.get_n_spin(), -1 ), "Hartree", unit, )
[docs] def get_orbital_occupations(self, engine: str = None): """Return the orbital occupations in a numpy array of shape [nSpin,nOrbitals]. For spin restricted calculations, the occupations will be between 0 and 2. For spin unrestricted or spin-orbit coupling the values will be between 0 and 1. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray( self._process_engine_results(lambda x: x.read("AMSResults", "orbitalOccupations"), engine) ).reshape(self.get_n_spin(), -1)
[docs] def get_homo_energies(self, unit: str = "Hartree", engine: str = None): """ Return the homo energies per spin as a numpy array of size [nSpin]. nSpin is 1 in case of spin-restricted or spin-orbit coupling and 2 in case of spin unrestricted. See also :func:`~are_orbitals_fractionally_occupied`. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return Units.convert( np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "HOMOEnergy"), engine)).reshape(-1), "Hartree", unit, )
[docs] def get_lumo_energies(self, unit: str = "Hartree", engine: str = None): """ Return the lumo energies per spin as a numpy array of size [nSpin]. nSpin is 1 in case of spin-restricted or spin-orbit coupling and 2 in case of spin unrestricted. See also :func:`~are_orbitals_fractionally_occupied`. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return Units.convert( np.asarray(self._process_engine_results(lambda x: x.read("AMSResults", "LUMOEnergy"), engine)).reshape(-1), "Hartree", unit, )
[docs] def get_smallest_homo_lumo_gap(self, unit: str = "Hartree", engine: str = None): """ Returns a float containing the smallest HOMO-LUMO gap irrespective of spin (i.e. min(LUMO) - max(HOMO)). See also :func:`~are_orbitals_fractionally_occupied`. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return Units.convert( self._process_engine_results(lambda x: x.read("AMSResults", "SmallestHOMOLUMOGap"), engine), "Hartree", unit )
[docs] def are_orbitals_fractionally_occupied(self, engine: str = None): """ Returns a boolean indicating whether fractional occupations were detected. If that is the case, then the 'HOMO' and 'LUMO' labels are not well defined, since the demarcation between 'occupied' and 'empty' is somewhat arbitrary. See the AMSDriver documentation for more info. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read("AMSResults", "fractionalOccupation"), engine)
[docs] def get_timings(self): """Return a dictionary with timing statistics of the job execution. Returned dictionary contains keys cpu, system and elapsed. The values are corresponding timings, expressed in seconds.""" ret = {} try: # new AMS versions store timings on ams.rkf ret["elapsed"] = self.readrkf("General", "ElapsedTime") ret["system"] = self.readrkf("General", "SysTime") ret["cpu"] = self.readrkf("General", "CPUTime") except: # fall back to reading output, was needed for old AMS versions cpu = self.grep_output("Total cpu time:") system = self.grep_output("Total system time:") elapsed = self.grep_output("Total elapsed time:") ret["elapsed"] = float(elapsed[0].split()[-1]) ret["system"] = float(system[0].split()[-1]) ret["cpu"] = float(cpu[0].split()[-1]) return ret
[docs] def get_forcefield_params(self, engine: str = None): """ Read all force field data from a forcefield.rkf file into self * ``filename`` -- Name of the RKF file that contains ForceField data """ from scm.plams.interfaces.adfsuite.forcefieldparams import forcefield_params_from_kf return self._process_engine_results(forcefield_params_from_kf, engine)
[docs] def get_exit_condition_message(self) -> str: """Tries to get the error message if the driver was stopped by an exit condition. Returns an empty string if no message was provided and the driver was not stopped by an exit condition""" try: msg = self.readrkf("History", "ExitConditionMsg") assert isinstance(msg, str) return msg except: return ""
def get_poissonratio(self, engine: str = None): return self._process_engine_results(lambda x: x.read("AMSResults", "PoissonRatio"), engine) def get_youngmodulus(self, unit: str = "au", engine: str = None): return self._process_engine_results( lambda x: x.read("AMSResults", "YoungModulus"), engine ) * Units.conversion_ratio("au", unit) def get_shearmodulus(self, unit: str = "au", engine: str = None): return self._process_engine_results( lambda x: x.read("AMSResults", "ShearModulus"), engine ) * Units.conversion_ratio("au", unit) def get_bulkmodulus(self, unit: str = "au", engine: str = None): return self._process_engine_results( lambda x: x.read("AMSResults", "BulkModulus"), engine ) * Units.conversion_ratio("au", unit)
[docs] def get_pesscan_results(self, molecules: bool = True): """ For PESScan jobs, this functions extracts information about the scan coordinates and energies. molecules : bool Whether to return a Molecule at each PES point. Returns a dictionary, with the following keys: 'RaveledScanCoords': a list of str with the names of the scan coordinates: ['sc_1_1','sc_1_2','sc_2_1','sc_2_2',...] 'nRaveledScanCoords': the length of the previous list 'RaveledUnits': a list of str with the units: ['bohr', 'bohr', 'radian', 'bohr', ...] 'RaveledPESCoords': a nested list with the values of the scan coordinates: [[val_1_1_1,val_1_1_2,...],[val_1_2_1,val_1_2_2,...],[val_2_1_1,val_2_1_2,...],[val_2_2_1,val_2_2_2]] 'ScanCoords': a nested list: [['sc_1_1','sc_1_2'],['sc_2_1','sc_2_2'],...] 'nScanCoords': length of the previous list 'Units': a nested list with the units: [['bohr', 'bohr'],['radian', 'bohr'], ...] 'OrigScanCoords': a list of str in the newline-separated format stored on the .rkf file: ['sc_1_1\\nsc_1_2','sc_2_1\\nsc_2_2',...] 'nPESPoints': int, number of PES points 'PES': list of float, the energies at each PES point 'Converged': list of bool, whether the geometry optimization at each PES point converged 'Molecules': list of Molecule, the structure at each PES point. Only if the property molecules == True 'HistoryIndices': list of int, the indices (1-based) in the History section which correspond to the Molecules and PES. 'ConstrainedAtoms': set of int, all atom indices (1-based) that were part of any PESScan scan coordinates (other constrained atoms are not included) 'Properties': list of dict. The dictionary keys are what can be found on the AMSResults section of the engine .rkf file. These will only be populated if "CalcPropertiesAtPESPoints" is set to Yes when running the PES scan. """ import re from natsort import natsorted def tolist(x): if isinstance(x, list): return x else: return [x] nScanCoord = self.readrkf("PESScan", "nScanCoord") pes = tolist(self.readrkf("PESScan", "PES")) origscancoords = tolist(self.get_history_property("ScanCoord", history_section="PESScan")) # one scan coordinate may have several variables scancoords = [x.split("\n") for x in origscancoords] units = [] pescoords = tolist(self.readrkf("PESScan", "PESCoords")) pescoords = np.array(pescoords).reshape(-1, sum(len(x) for x in scancoords)) pescoords = np.transpose(pescoords) units = [] for i in range(nScanCoord): units.append([]) for j in range(len(scancoords[i])): if ( scancoords[i][j] in ["a", "b", "c"] or "Dist" in scancoords[i][j] or "Coordinate" in scancoords[i][j] ): units[-1].append("bohr") elif "Volume" in scancoords[i][j]: units[-1].append("bohr^3") elif "Area" in scancoords[i][j]: units[-1].append("bohr^2") else: units[-1].append("radian") converged = tolist(self.readrkf("PESScan", "GOConverged")) converged = [bool(x) for x in converged] historyindices = tolist(self.readrkf("PESScan", "HistoryIndices")) ret = {} raveled_scancoords = [x[i] for x in scancoords for i in range(len(x))] # 1d list raveled_units = [x[i] for x in units for i in range(len(x))] # 1d list constrained_atoms = [] for sc in raveled_scancoords: constrained_atoms.extend(re.findall(r"\((\d+)\)", sc)) try: constrained_atoms = set(int(x) for x in constrained_atoms) except ValueError: constrained_atoms = set() ret["RaveledScanCoords"] = raveled_scancoords ret["nRaveledScanCoords"] = len(raveled_scancoords) ret["ConstrainedAtoms"] = constrained_atoms ret["ScanCoords"] = scancoords ret["nScanCoords"] = len(scancoords) ret["OrigScanCoords"] = origscancoords # newline delimiter for joint scan coordinates ret["RaveledPESCoords"] = pescoords.tolist() ret["Units"] = units ret["RaveledUnits"] = raveled_units ret["Converged"] = converged ret["PES"] = pes ret["nPESPoints"] = len(pes) ret["HistoryIndices"] = historyindices if molecules: mols = [] for ind in historyindices: mols.append(self.get_history_molecule(ind)) ret["Molecules"] = mols ret["Properties"] = [] for key in natsorted(title for title in self.rkfs.keys() if title.startswith("PESPoint")): amsresults = self.rkfs[key].read_section("AMSResults") ret["Properties"].append(amsresults) return ret
[docs] def get_neb_results(self, molecules: bool = True, unit: str = "au"): """ Returns a dictionary with results from a NEB calculation. molecules : bool Whether to include the 'Molecules' key in the return result unit : str Energy unit for the Energies, LeftBarrier, RightBarrier, and ReactionEnergy Returns: dict 'nImages': number of images (excluding end points) 'nIterations': number of iterations 'Energies': list of energies (including end points) 'Climbing': bool, whether climbing image NEB was used 'LeftBarrier': float, left reaction barrier 'RightBarrier': float, right reaction barrier 'ReactionEnergy': float, reaction energy 'HistoryIndices': list of int, same length as 'Energies', contains indices in the History section 'Molecules': list of Molecule (including end points) """ def tolist(x): if isinstance(x, list): return x else: return [x] ret = {} conversion_ratio = Units.conversion_ratio("au", unit) ret["nImages"] = self.readrkf("NEB", "nebImages") ret["nIterations"] = self.readrkf("NEB", "nebIterations") ret["Climbing"] = bool(self.readrkf("NEB", "climbing")) ret["HighestIndex"] = self.readrkf("NEB", "highestIndex") ret["LeftBarrier"] = self.readrkf("NEB", "LeftBarrier") * conversion_ratio ret["RightBarrier"] = self.readrkf("NEB", "RightBarrier") * conversion_ratio ret["ReactionEnergy"] = self.readrkf("NEB", "ReactionEnergy") * conversion_ratio history_dim = tolist(self.readrkf("NEB", "historyIndex@dim")) # nimages, randombign history_dim.reverse() # randombign, nimages history_indices_matrix = tolist(self.readrkf("NEB", "historyIndex")) # this matrix is padded with -1 values history_indices_matrix = np.array(history_indices_matrix).reshape(history_dim) history_indices = np.max(history_indices_matrix, axis=0, keepdims=False).tolist() if any(x == -1 for x in history_indices): raise ValueError("Found -1 in the 'converged' part of historyIndex. This should not happen!") ret["HistoryIndices"] = history_indices ret["Energies"] = [self.get_property_at_step(ind, "Energy") * conversion_ratio for ind in ret["HistoryIndices"]] if molecules: ret["Molecules"] = [self.get_history_molecule(ind) for ind in ret["HistoryIndices"]] return ret
[docs] def get_irc_results(self, molecules: bool = True, unit: str = "au"): """ Returns a dictionary with results from an IRC calculation. molecules : bool Whether to include the 'Molecules' key in the return result unit : str Energy unit for the Energies, LeftBarrier, RightBarrier Returns: dict 'Energies': list of energies 'RelativeEnergies': list of energies relative to the first point 'LeftBarrier': float, left reaction barrier 'RightBarrier': float, right reaction barrier 'Molecules': list of Molecule (including end points) 'IRCDirection': IRC direction 'Energies': Energies (in ``unit``) 'PathLength': Path length in angstrom 'IRCIteration': list of int 'IRCGradMax': list of float 'IRCGradRms': list of float 'ArcLength': list of float """ from itertools import compress def tolist(x): if isinstance(x, list): return x else: return [x] conversion_ratio = Units.conversion_ratio("au", unit) sec = "History" nEntries = self.readrkf(sec, "nEntries") items = [ "IRCDirection", "Energy", "PathLength", "IRCIteration", "IRCGradMax", "IRCGradRms", "ArcLength", "HistoryIndices", "Molecules", ] d = {} forw = {} back = {} reformed = {} converged_mask = None forw_mask = None back_mask = None converged = self.get_history_property("Converged", history_section=sec) converged = tolist(converged) converged_mask = [x != 0 for x in converged] history_indices = [i for i, x in enumerate(converged_mask, 1) if x] # raw, not rearranged for k in items: # first half is forward direction, second half is backward direction # second half should be reversed, path length made negative. if k == "Molecules": if not molecules: continue d[k] = [self.get_history_molecule(ind) for ind in history_indices] # rearrangement happens later elif k == "HistoryIndices": d[k] = history_indices # rearrangement happens later else: d[k] = self.get_history_property(k, history_section=sec) d[k] = tolist(d[k]) d[k] = list(compress(d[k], converged_mask)) if k == "IRCDirection": forw_mask = [x == 1 for x in d[k]] back_mask = [x != 1 for x in d[k]] d[k] = ["Forward" if x == 1 else "Backward" if x == 2 else x for x in d[k]] n = len(d[k]) forw[k] = list(compress(d[k], forw_mask)) back[k] = list(compress(d[k], back_mask)) back[k].reverse() if k == "PathLength": # print backwards direction as negative numbers back[k] = [-x for x in back[k]] reformed[k] = back[k] + forw[k] conversion_ratio = Units.convert(1.0, "au", unit) reformed["Energies"] = [x * conversion_ratio for x in reformed["Energy"]] del reformed["Energy"] max_energy = max(reformed["Energies"]) if "Forward" in reformed["IRCDirection"]: reformed["LeftBarrier"] = max_energy - reformed["Energies"][0] if "Backward" in reformed["IRCDirection"]: reformed["RightBarrier"] = max_energy - reformed["Energies"][-1] reformed["RelativeEnergies"] = [x - reformed["Energies"][0] for x in reformed["Energies"]] return reformed
[docs] def get_time_step(self, history_section: Literal["BinLog", "MDHistory"] = "MDHistory"): """Returns the time step between adjacent frames (NOT the TimeStep in the settings, but Timestep*SamplingFreq) in femtoseconds for MD simulation jobs""" time1 = self.get_property_at_step(1, "Time", history_section=history_section) time2 = self.get_property_at_step(2, "Time", history_section=history_section) time_step = time2 - time1 return time_step
def _get_integer_start_end_every_max(self, start_fs, end_fs, every_fs, max_dt_fs, time_step=None): """converts from femtoseconds to integers""" if time_step is None: time_step = self.get_time_step() start_step = round(start_fs / time_step) end_step = round(end_fs / time_step) if end_fs is not None else None every = round(every_fs / time_step) if every_fs is not None else 1 max_dt = round((max_dt_fs / time_step) / every) if max_dt_fs is not None else None return start_step, end_step, every, max_dt
[docs] def get_velocity_acf( self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, atom_indices=None, x=True, y=True, z=True, normalize=False, ): """ Calculate the velocity autocorrelation function. Works only for trajectories with a constant number of atoms. start_fs : float Start time in femtoseconds. Defaults to 0 (first frame) end_fs : float End time in femtoseconds. Defaults to the end of the trajectory. max_dt_fs : float Maximum correlation time in femtoseconds. atom_indices: list of int Atom indices for which to calculate the velocity autocorrelation function. Defaults to all atoms. The atom indices start with 1. x : bool Whether to use the velocities along x y : bool Whether to use the velocities along y z : bool Whether to use the velocities along z normalize : bool Whether to normalize the velocity autocorrelation function so that it is 1 at time 0. Returns: 2-tuple (times, C) ``times`` is a 1D np array with times in femtoseconds. ``C`` is a 1D numpy array with shape (max_dt,) containing the autocorrelation function. If not ``normalize`` then the unit is angstrom^2/fs^2. """ from scm.plams.trajectories.analysis import autocorrelation nEntries = self.readrkf("MDHistory", "nEntries") time_step = self.get_time_step() start_step, end_step, every, max_dt = self._get_integer_start_end_every_max( start_fs, end_fs, every_fs, max_dt_fs ) data = self.get_history_property("Velocities", history_section="MDHistory") data = np.array(data).reshape(nEntries, -1, 3)[start_step:end_step:every] if atom_indices is not None: zero_based_atom_indices = [x - 1 for x in atom_indices] data = data[:, zero_based_atom_indices, :] n_dimensions = 3 if not x or not y or not z: components = [] if x: components += [0] if y: components += [1] if z: components += [2] data = data[:, :, components] n_dimensions = len(components) data *= Units.convert(1.0, "bohr", "angstrom") # convert from bohr/fs to ang/fs vacf = ( autocorrelation(data, max_dt=max_dt, normalize=normalize) * n_dimensions ) # multiply by n_dimensions to undo the averaging per component and instead average per atom times = np.arange(len(vacf)) * time_step * every return times, vacf
def get_dipole_history(self, dipole_unit="e*bohr"): dipole_x = self.get_history_property(history_section="BinLog", varname="DipoleMoment_x") dipole_y = self.get_history_property(history_section="BinLog", varname="DipoleMoment_y") dipole_z = self.get_history_property(history_section="BinLog", varname="DipoleMoment_z") data = np.column_stack((dipole_x, dipole_y, dipole_z)) data *= Units.convert(1.0, "e*bohr", dipole_unit) return data
[docs] def get_dipole_derivatives_acf( self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, x=True, y=True, z=True, normalize=False ): """ Calculate the velocity autocorrelation function. Works only for trajectories with a constant number of atoms. start_fs : float Start time in femtoseconds. Defaults to 0 (first frame) end_fs : float End time in femtoseconds. Defaults to the end of the trajectory. max_dt_fs : float Maximum correlation time in femtoseconds. x : bool Whether to use the velocities along x y : bool Whether to use the velocities along y z : bool Whether to use the velocities along z normalize : bool Whether to normalize the velocity autocorrelation function so that it is 1 at time 0. Returns: 2-tuple (times, C) ``times`` is a 1D np array with times in femtoseconds. ``C`` is a 1D numpy array with shape (max_dt,) containing the autocorrelation function. If not ``normalize`` then the unit is angstrom^2/fs^2. """ from scm.plams.trajectories.analysis import autocorrelation # nEntries = self.readrkf('BinLog', 'nEntries') time_step = self.get_time_step(history_section="BinLog") data = self.get_dipole_history() start_step, end_step, every, max_dt = self._get_integer_start_end_every_max( start_fs, end_fs, every_fs, max_dt_fs, time_step ) data = data[start_step:end_step:every, :] n_dimensions = 3 if not x or not y or not z: components = [] if x: components += [0] if y: components += [1] if z: components += [2] data = data[:, components] n_dimensions = len(components) data_deriv = np.diff(data, axis=0) / time_step if max_dt is None: # default value is 5000 or num_timesteps // 2 num_timesteps = data_deriv.shape[0] max_dt = num_timesteps // 2 if max_dt > 5000: max_dt = 5000 dipole_deriv_acf = ( autocorrelation(data_deriv, max_dt=max_dt, normalize=normalize) * n_dimensions ) # multiply by n_dimensions to undo the averaging per component and instead average per atom times = np.arange(len(dipole_deriv_acf)) * time_step * every return times, dipole_deriv_acf
[docs] def get_diffusion_coefficient_from_velocity_acf(self, times=None, acf=None, n_dimensions=3): """ Diffusion coefficient by integration of the velocity autocorrelation function If ``times`` or ``acf`` is None, then a default velocity autocorrelation function will be calculated. times: 1D numpy array 1D numpy array with the times acf: 1D numpy array 1D numpy array with the velocity autocorrelation function in ang^2/fs^2 n_dimensions: int Number of dimensions that were used to calculate the autocorrelation function. Returns: 2-tuple (times, diffusion_coefficient) ``times`` are the times in femtoseconds. ``diffusion_coefficient`` is a 1D numpy array with the diffusion coefficients in m^2/s. """ from scipy.integrate import cumtrapz if times is None or acf is None: times, acf = self.get_velocity_acf(normalize=False) diffusion_coefficient = (1.0 / n_dimensions) * cumtrapz(acf, times) # ang^2/fs diffusion_coefficient *= 1e-20 / 1e-15 new_times = times[:-1] return np.array(new_times), diffusion_coefficient
[docs] def get_power_spectrum(self, times=None, acf=None, max_dt_fs=None, max_freq=None, number_of_points=None): """ Calculates a power spectrum from the velocity autocorrelation function. If ``times`` or ``acf`` is None, then a default velocity autocorrelation function will be calculated. times: 1D numpy array of float The ``times`` returned by ``AMSResults.get_velocity_acf()``. acf: 1D numpy arra of float The ``vacf`` returned by ``AMSResults.get_velocity_acf()`` max_dt_fs : float Maximum correlation time in femtoseconds. max_freq: float Maximum frequency (in cm^-1) of the returned power spectrum. Defaults to 5000 cm^-1. number_of_points: int Number of points in the returned power spectrum. Defaults to a number so that the spacing between points is about 1 cm^-1. Returns: 2-tuple (frequencies, intensities) ``frequencies`` : Frequencies in cm^-1 (1D numpy array). ``intensities``: Intensities (1D numpy array). """ from scm.plams.trajectories.analysis import power_spectrum if times is None or acf is None: times, acf = self.get_velocity_acf(max_dt_fs=max_dt_fs, normalize=True) return power_spectrum(times, acf, max_freq=max_freq, number_of_points=number_of_points)
[docs] def get_ir_spectrum_md( self, times: np.ndarray = None, acf: np.ndarray = None, max_dt_fs: float = None, max_freq: float = 5000, number_of_points: int = None, ): """ Calculates a ir spectrum from the dipole moment autocorrelation function. If ``times`` or ``acf`` is None, then a default velocity autocorrelation function will be calculated. times: 1D numpy array of float The ``times`` returned by ``AMSResults.get_dipole_derivatives_acf()``. acf: 1D numpy array of float The ``acf`` returned by ``AMSResults.get_dipole_derivatives_acf()`` max_dt_fs : float Maximum correlation time in femtoseconds. max_freq: float Maximum frequency (in cm^-1) of the returned power spectrum. Defaults to 5000 cm^-1. number_of_points: int Number of points in the returned power spectrum. Defaults to a number so that the spacing between points is about 1 cm^-1. Returns: 2-tuple (frequencies, intensities) ``frequencies`` : Frequencies in cm^-1 (1D numpy array). ``intensities``: Intensities (1D numpy array). """ from scm.plams.trajectories.analysis import power_spectrum if times is None or acf is None: times, acf = self.get_dipole_derivatives_acf(max_dt_fs=max_dt_fs, normalize=True) return power_spectrum(times, acf, max_freq=max_freq, number_of_points=number_of_points)
@staticmethod def _get_green_kubo_viscosity(pressuretensor, time_step, max_dt, volume, temperature, xy=True, yz=True, xz=True): from scipy.integrate import cumtrapz from scm.plams.tools.units import Units from scm.plams.trajectories.analysis import autocorrelation data = np.array(pressuretensor) components = [] if yz: components += [3] if xz: components += [4] if xy: components += [5] data = data[:, components] acf = autocorrelation(data, max_dt=max_dt) times = np.arange(len(acf)) * time_step integrated_acf = cumtrapz(acf, times, initial=0) integrated_times = np.arange(len(integrated_acf)) * time_step k_B = Units.constants["k_B"] au2Pa = Units.convert(1.0, "hartree/bohr^3", "Pa") viscosity = (volume * 1e-30) / (k_B * temperature) * integrated_acf * au2Pa**2 * 1e-15 * 1e3 return integrated_times, viscosity
[docs] def get_green_kubo_viscosity( self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, xy=True, yz=True, xz=True, pressuretensor=None ): """ Calculates the viscosity using the Green-Kubo relation (integrating the off-diagonal pressure tensor autocorrelation function). start_fs : float Start time in femtoseconds. Defaults to 0 (first frame) end_fs : float End time in femtoseconds. Defaults to the end of the trajectory. max_dt_fs : float Maximum correlation time in femtoseconds. xy : bool Whether to use the xy off-diagonal elements yz : bool Whether to use the yz off-diagonal elements xz : bool Whether to use the xz off-diagonal elements pressuretensor : np.array shape (N,6) Pressure tensor in atomic units. If not specified, it will be read from the ams.rkf file. Returns: 2-tuple (times, C) ``times`` is a 1D np array with times in femtoseconds. ``C`` is a 1D numpy array with shape (max_dt,) containing the viscosity (in mPa*s) integral. It should converge to the viscosity values as the time increases. """ from scipy.integrate import cumtrapz from scm.plams.tools.units import Units from scm.plams.trajectories.analysis import autocorrelation time_step = self.get_time_step() start_step, end_step, every, max_dt = self._get_integer_start_end_every_max( start_fs, end_fs, every_fs, max_dt_fs ) data = pressuretensor if data is None: data = self.get_history_property("PressureTensor", "MDHistory") data = [ x for x in data if x is not None ] # None might appear in currently running trajectories if the job was loaded with load_external data = np.array(data)[start_step:end_step:every] components = [] if yz: components += [3] if xz: components += [4] if xy: components += [5] data = data[:, components] acf = autocorrelation(data, max_dt=max_dt) times = np.arange(len(acf)) * time_step * every integrated_acf = cumtrapz(acf, times) integrated_times = np.arange(len(integrated_acf)) * time_step * every V = self.get_main_molecule().unit_cell_volume() try: T = self.get_history_property("Temperature", "MDHistory") T = np.array(T)[start_step:end_step:every] mean_T = np.mean(T) except KeyError: # might be triggered for currently running trajectories, then just use the first temperature mean_T = self.get_property_at_step(1, "Temperature", "MDHistory") k_B = Units.constants["k_B"] au2Pa = Units.convert(1.0, "hartree/bohr^3", "Pa") viscosity = (V * 1e-30) / (k_B * mean_T) * integrated_acf * au2Pa**2 * 1e-15 * 1e3 return integrated_times, viscosity
[docs] def get_density_along_axis( self, axis: str = "z", density_type: str = "mass", start_fs=0, end_fs=None, every_fs=None, bin_width=0.1, atom_indices=None, ): """ Calculates the density of atoms along a Cartesian coordinate axis. This only works if the axis is perpendicular to the other two axes. The system must be 3D-periodic and the number of atoms cannot change during the trajectory. axis : str 'x', 'y', or 'z' density_type : str 'mass' gives the density in g/cm^3. 'number' gives the number density in ang^-3, 'histogram' gives the number of atoms per frame (so the number depends on the bin size) start_fs : float Start time in fs end_fs : float End time in fs every_fs : float Use data every every_fs timesteps bin_width : float Bin width of the returned coordinates (in angstrom). atom_indices: list of int Indices (starting with 1) for the atoms to use for calculating the density. Returns: 2-tuple (coordinates, density) ``coordinates`` is a 1D array with the coordinates (in angstrom). ``density`` is a 1D array with the densities (in g/cm^3 or ang^-3 depending on ``density_type``). """ assert axis in ["x", "y", "z"], f"Unknown axis: {axis}. Must be 'x', 'y', or 'z'" assert density_type in [ "mass", "number", "histogram", ], f"Unknown density_type: {density_type}. Must be 'mass' or 'number'" main_mol = self.get_main_molecule() histogram = density_type == "histogram" bohr2ang = Units.convert(1.0, "bohr", "angstrom") start_step, end_step, every, _ = self._get_integer_start_end_every_max(start_fs, end_fs, every_fs, None) nEntries = self.readrkf("History", "nEntries") coords = np.array(self.get_history_property("Coords")).reshape(nEntries, -1, 3) coords = coords[start_step:end_step:every] axis2index = {"x": 0, "y": 1, "z": 2} index = axis2index[axis] if not histogram: other_indices = [0, 1, 2] other_indices.remove(index) assert ( len(main_mol.lattice) == 3 ), f"get_density_along_axis with density_type='mass' or 'number' can only be called for 3d-periodic systems. Current periodicity: {len(main_mol.lattice)}. Use density_type='histogram' instead." assert ( np.abs(np.dot(main_mol.lattice[other_indices[0]], main_mol.lattice[index])) < 1e-6 ), f"Axis {axis} must be perpendicular to the other two axes" assert ( np.abs(np.dot(main_mol.lattice[other_indices[1]], main_mol.lattice[index])) < 1e-6 ), f"Axis {axis} must be perpendicular to the other two axes" assert ( np.abs(main_mol.lattice[index][other_indices[0]]) < 1e-6 ), f"Density along {axis} requires that lattice vector {index+1} has only 1 non-zero component (along {axis}). The vector is {main_mol.lattice[index]}" assert ( np.abs(main_mol.lattice[index][other_indices[1]]) < 1e-6 ), f"Density along {axis} requires that lattice vector {index+1} has only 1 non-zero component (along {axis}). The vector is {main_mol.lattice[index]}" lattice_vectors = np.array(self.get_history_property("LatticeVectors")).reshape(-1, 3, 3) lattice_vectors = lattice_vectors[start_step:end_step:every] * bohr2ang vec1s = lattice_vectors[:, other_indices[0], :] vec2s = lattice_vectors[:, other_indices[1], :] cross_products = np.cross(vec1s, vec2s) areas = np.linalg.norm(cross_products, axis=1) mean_area = np.mean(areas) coords = coords[:, :, index] if atom_indices is not None: zero_based_atom_indices = [x - 1 for x in atom_indices] coords = coords[:, zero_based_atom_indices] coords *= bohr2ang avogadro = Units.constants["NA"] min_c = np.min(coords) max_c = np.max(coords) num_bins = round((max_c - min_c) / bin_width + 1) bins = np.linspace(min_c, max_c, num_bins, endpoint=True) if density_type == "mass": masses = np.array(self.readrkf("Molecule", "AtomMasses")) if atom_indices is not None: masses = masses[zero_based_atom_indices] masses_broadcasted = np.broadcast_to(masses, coords.shape) hist, bin_edges = np.histogram(coords, weights=masses_broadcasted, bins=bins) volume_slice_ang3 = mean_area * (bin_edges[1] - bin_edges[0]) volume_slice_cm3 = volume_slice_ang3 * 1e-24 density = (1.0 / nEntries) * (hist / avogadro) / volume_slice_cm3 elif density_type == "number": hist, bin_edges = np.histogram(coords, bins=bins) volume_slice_ang3 = mean_area * (bin_edges[1] - bin_edges[0]) density = (1.0 / nEntries) * hist / volume_slice_ang3 elif density_type == "histogram": hist, bin_edges = np.histogram(coords, bins=bins) density = (1.0 / nEntries) * hist z = (bin_edges[:-1] + bin_edges[1:]) / 2.0 return z, density
[docs] def recreate_molecule(self) -> Union[None, Molecule, Dict[str, Molecule]]: """Recreate the input molecule(s) for the corresponding job based on files present in the job folder. This method is used by |load_external|. If ``ams.rkf`` is present in the job folder, extract data from the ``InputMolecule`` and ``InputMolecule(*)`` sections. """ if "ams" in self.rkfs: mols = self.get_input_molecules() if len(mols) == 0: return None elif len(mols) == 1 and "" in mols: return mols[""] else: return mols return None
[docs] def recreate_settings(self): """Recreate the input |Settings| instance for the corresponding job based on files present in the job folder. This method is used by |load_external|. If ``ams.rkf`` is present in the job folder, extract user input and parse it back to a |Settings| instance using ``scm.libbase`` module. Remove the ``system`` branch from that instance. """ if "ams" in self.rkfs: user_input = self.readrkf("General", "user input") try: from scm.libbase import InputParser inp = InputParser().to_settings("ams", user_input) except: log("Failed to recreate input settings from {}".format(self.rkfs["ams"].path)) return None s = Settings() s.input = inp if "system" in s.input.ams: del s.input.ams.system s.soft_update(config.job) return s return None
[docs] def ok(self): """Check if the execution of the associated :attr:`job` was successful or not. See :meth:`Job.ok<scm.plams.core.basejob.Job.ok>` for more information.""" return self.job.ok()
[docs] def get_errormsg(self): """Tries to get an an error message for a associated :attr:`job`. This method returns ``None`` if the associated job was successful. See :meth:`Job.get_errormsg<scm.plams.core.basejob.Job.errormsg>` for more information.""" return self.job.get_errormsg()
@property def name(self): """Retrun the :attr:`job.name` of the job associated with this results instance.""" return self.job.name class EnergyLandscape: class State: def __init__( self, landscape, engfile, energy, mol, count, isTS, reactantsID=None, productsID=None, prefactorsFromReactant=None, prefactorsFromProduct=None, ): self._landscape = landscape self.engfile = engfile self.energy = energy self.molecule = mol self.count = count self.isTS = isTS self.reactantsID = reactantsID self.productsID = productsID self.prefactorsFromReactant = prefactorsFromReactant self.prefactorsFromProduct = prefactorsFromProduct @property def id(self): return self._landscape._states.index(self) + 1 @property def reactants(self): return self._landscape._states[self.reactantsID - 1] @property def products(self): return self._landscape._states[self.productsID - 1] def __str__(self): if self.isTS: lines = [ f"State {self.id}: {self.molecule.get_formula(False)} transition state @ {self.energy:.8f} Hartree (found {self.count} times" + (f", results on {self.engfile})" if self.engfile is not None else ")") ] if self.reactantsID is not None: lines += [f" +- Reactants: {self.reactants}"] if self.productsID is not None: lines += [f" Products: {self.products}"] if self.reactantsID is not None and self.productsID is not None: eV = Units.convert(1.0, "eV", "hartree") lines += [ f" Prefactors: {self.prefactorsFromReactant:.3E}:{self.prefactorsFromProduct:.3E} s^-1" ] dEforward = (self.energy - self._landscape._states[self.reactantsID - 1].energy) / eV dEbackward = (self.energy - self._landscape._states[self.productsID - 1].energy) / eV lines += [f" Barriers: {dEforward:.3f}:{dEbackward:.3f} eV"] elif self.productsID is not None: lines += [f" Prefactors: {self.prefactorsFromReactant:.3E}:?"] else: lines = [ f"State {self.id}: {self.molecule.get_formula(False)} local minimum @ {self.energy:.8f} Hartree (found {self.count} times" + (f", results on {self.engfile})" if self.engfile is not None else ")") ] return "\n".join(lines) class Fragment: def __init__(self, landscape, engfile, energy, mol): self._landscape = landscape self.engfile = engfile self.energy = energy self.molecule = mol @property def id(self): return self._landscape._fragments.index(self) + 1 def __str__(self): lines = [ f"Fragment {self.id}: {self.molecule.get_formula(False)} local minimum @ {self.energy:.8f} Hartree (results on {self.engfile})" ] return "\n".join(lines) class FragmentedState: def __init__( self, landscape, energy, composition, connections=None, adsorptionPrefactors=None, desorptionPrefactors=None, ): self._landscape = landscape self.energy = energy self.composition = composition self.connections = connections self.adsorptionPrefactors = adsorptionPrefactors self.desorptionPrefactors = desorptionPrefactors @property def id(self): return self._landscape._fstates.index(self) + 1 @property def fragments(self): return [self._landscape._fragments[i] for i in self.composition] def __str__(self): formula = "" for i, id in enumerate(self.composition): formula += self._landscape._fragments[id].molecule.get_formula(False) if i != len(self.composition) - 1: formula += "+" lines = [ f"FragmentedState {self.id}: {formula} local minimum @ {self.energy:.8f} Hartree (fragments {[i+1 for i in self.composition]})" ] for i, iState in enumerate(self.connections): lines += [f" +- {self._landscape._states[iState]}"] if i == len(self.connections) - 1: lines += [ f" Prefactors: {self.adsorptionPrefactors[i]:.3E}:{self.desorptionPrefactors[i]:.3E}" ] else: lines += [ f" | Prefactors: {self.adsorptionPrefactors[i]:.3E}:{self.desorptionPrefactors[i]:.3E}" ] return "\n".join(lines) def __init__(self, results): self._states = [] self._fragments = [] self._fstates = [] if results is None: return if "EnergyLandscape" not in results.get_rkf_skeleton(): return sec = results.read_rkf_section("EnergyLandscape") # If there is only 1 state in the 'EnergyLandscape' section, some variables that are normally lists are insted be build-in types (e.g. a 'float' instead of a 'list of floats'). # For convenience here we make sure that the following variables are always 'lists': for var in ["energies", "counts", "isTS", "reactants", "products"]: if not isinstance(sec[var], list): sec[var] = [sec[var]] nStates = sec["nStates"] for iState in range(nStates): energy = sec["energies"][iState] resfile = os.path.splitext(sec["fileNames"].split("\0")[iState])[0] mol = results.get_molecule("Molecule", file=resfile) count = sec["counts"][iState] if not sec["isTS"][iState]: self._states.append(AMSResults.EnergyLandscape.State(self, resfile, energy, mol, count, False)) else: reactantsID = sec["reactants"][iState] if sec["reactants"][iState] > 0 else None productsID = sec["products"][iState] if sec["products"][iState] > 0 else None prefactorsFromReactant = ( sec["prefactorsFromReactant"][iState] if sec["products"][iState] > 0 else None ) prefactorsFromProduct = ( sec["prefactorsFromProduct"][iState] if sec["products"][iState] > 0 else None ) self._states.append( AMSResults.EnergyLandscape.State( self, resfile, energy, mol, count, True, reactantsID, productsID, prefactorsFromReactant, prefactorsFromProduct, ) ) if "nFragments" not in sec: return nFragments = sec["nFragments"] for iFragment in range(nFragments): energy = sec["fragmentsEnergies"][iFragment] resfile = os.path.splitext(sec["fragmentsFileNames"].split("\0")[iFragment])[0] mol = results.get_molecule("Molecule", file=resfile) self._fragments.append(AMSResults.EnergyLandscape.Fragment(self, resfile, energy, mol)) if "nFStates" not in sec: return nFragmentedStates = sec["nFStates"] for iFState in range(nFragmentedStates): iEnergy = sec["fStatesEnergy(" + str(iFState + 1) + ")"] value = sec["fStatesComposition(" + str(iFState + 1) + ")"] iComposition = [i - 1 for i in value] if isinstance(value, list) else [value - 1] value = sec["fStatesConnections(" + str(iFState + 1) + ")"] iConnections = [i - 1 for i in value] if isinstance(value, list) else [value - 1] value = sec["fStatesAdsorptionPrefactors(" + str(iFState + 1) + ")"] iAdsorptionPrefactors = value if isinstance(value, list) else [value] value = sec["fStatesDesorptionPrefactors(" + str(iFState + 1) + ")"] iDesorptionPrefactors = value if isinstance(value, list) else [value] self._fstates.append( AMSResults.EnergyLandscape.FragmentedState( self, iEnergy, iComposition, iConnections, iAdsorptionPrefactors, iDesorptionPrefactors ) ) @property def minima(self): return [s for s in self._states if not s.isTS] @property def transition_states(self): return [s for s in self._states if s.isTS] @property def fragments(self): return [f for f in self._fragments] @property def fragmented_states(self): return [fs for fs in self._fstates] def __str__(self): lines = ["All stationary points:"] lines += ["======================"] for s in self._states: lines += [str(s)] for f in self._fragments: lines += [str(f)] for fs in self._fstates: lines += [str(fs)] return "\n".join(lines) def __getitem__(self, i): return self._states[i - 1] def __iter__(self): return iter(self._states) def __len__(self): return len(self._states)
[docs] def get_energy_landscape(self): """Returns the energy landscape obtained from a PESExploration job run by the AMS driver. The energy landscape is a set of stationary PES points (local minima and transition states). The returned object is of the type ``AMSResults.EnergyLandscape`` and offers convenient access to the states' energies, geometries as well as the information on which transition states connect which minima. .. code-block:: python el = results.get_energy_landscape() print(el) for state in el: print(f"Energy = {state.energy}") print(f"Is transition state = {state.isTS}") print("Geometry:", state.molecule) if state.isTS: print(f"Forward barrier: {state.energy - state.reactants.energy}") print(f"Backward barrier: {state.energy - state.products.energy}") """ return AMSResults.EnergyLandscape(self)
# ========================================================================= def _access_rkf(self, func, file="ams"): """A skeleton method for accessing any of the ``.rkf`` files produced by AMS. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. The *func* argument has to be a function to call on a chosen ``.rkf`` file. It should take one argument, an instance of |KFFile|. """ # Try unique engine: if file == "engine": names = self.engine_names() if len(names) == 1: return func(self.rkfs[names[0]]) else: raise ValueError( "You cannot use 'engine' as 'file' argument if the engine results file is not unique. Please use the real name of the file you wish to read" ) # Try: if file in self.rkfs: return func(self.rkfs[file]) # Try harder: filename = file + ".rkf" self.refresh() if filename in self.files: self.rkfs[file] = KFFile(opj(self.job.path, filename)) return func(self.rkfs[file]) # Surrender: raise FileError("File {} not present in {}".format(filename, self.job.path)) def _process_engine_results(self, func, engine: str = None): """A generic method skeleton for processing any engine results ``.rkf`` file. *func* should be a function that takes one argument (an instance of |KFFile|) and returns arbitrary data. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ names = self.engine_names() if engine is not None: if engine in names: return func(self.rkfs[engine]) else: raise FileError(f"File {engine}.rkf not present in {self.job.path}\n engine names found are: {names}") else: if len(names) == 1: return func(self.rkfs[names[0]]) elif len(names) == 0: raise FileError("There is no engine .rkf present in {}".format(self.job.path)) else: raise ValueError( "You need to specify the 'engine' argument when there are multiple engine result files present in the job folder" )
# =========================================================================== # =========================================================================== # ===========================================================================
[docs]class AMSJob(SingleJob): """A class representing a single computation with AMS driver. The corresponding results type is |AMSResults|.""" results: AMSResults molecule: Union[Molecule, Dict[str, Molecule], None] _result_type = AMSResults _command = "ams"
[docs] def __init__(self, molecule: Union[Molecule, Dict[str, Molecule], None] = None, *args, **kwargs): super().__init__(molecule, *args, **kwargs)
[docs] def run(self, jobrunner=None, jobmanager=None, watch=False, **kwargs) -> AMSResults: """Run the job using *jobmanager* and *jobrunner* (or defaults, if ``None``). If *watch* is set to ``True``, the contents of the AMS driver logfile will be forwarded line by line to the PLAMS logfile (and stdout), allowing for an easier monitoring of the running job. Not that the forwarding of the AMS driver logfile will cause make the call to this method block until the job's execution has finished, even when using a parallel |JobRunner|. Other keyword arguments (*\*\*kwargs*) are stored in ``run`` branch of job's settings. Returned value is the |AMSResults| instance associated with this job. """ if _has_watchdog and watch: if "default_jobmanager" in config: jobmanager = config.default_jobmanager else: raise PlamsError("No default jobmanager found. This probably means that PLAMS init() was not called.") observer = Observer() event_handler = AMSJobLogTailHandler(self, jobmanager) observer.schedule(event_handler, jobmanager.workdir, recursive=True) observer.start() try: results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs) results.wait() finally: observer.stop() observer.join() else: results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs) return results
[docs] def get_input(self): """Generate the input file. This method is just a wrapper around :meth:`_serialize_input`. Each instance of |AMSJob| or |AMSResults| present as a value in ``settings.input`` branch is replaced with an absolute path to ``ams.rkf`` file of that job. If you need to use a path to some engine specific ``.rkf`` file rather than the main ``ams.rkf`` file, you can to it by supplying a tuple ``(x, name)`` where ``x`` is an instance of |AMSJob| or |AMSResults| and ``name`` is a string with the name of the ``.rkf`` file you want. For example, ``(myjob, 'dftb')`` will transform to the absolute path to ``dftb.rkf`` file in ``myjob``'s folder, if such a file is present. Instances of |KFFile| are replaced with absolute paths to corresponding files. """ special = { AMSJob: lambda x: x.results.rkfpath(), AMSResults: lambda x: x.rkfpath(), KFFile: lambda x: x.path, tuple: lambda x: AMSJob._tuple2rkf(x), } return self._serialize_input(special)
[docs] def get_runscript(self): """Generate the runscript. Returned string is of the form:: unset AMS_SWITCH_LOGFILE_AND_STDOUT AMS_JOBNAME=jobname AMS_RESULTSDIR=. $AMSBIN/ams [-n nproc] --input=jobname.in [>jobname.out] ``-n`` flag is added if ``settings.runscript.nproc`` exists. ``[>jobname.out]`` is used based on ``settings.runscript.stdout_redirect``. If ``settings.runscript.preamble_lines`` exists, those lines will be added to the runscript verbatim before the execution of AMS. If ``settings.runscript.postamble_lines`` exists, those lines will be added to the runscript verbatim after the execution of AMS. """ ret = "unset AMS_SWITCH_LOGFILE_AND_STDOUT\n" ret += "unset SCM_LOGFILE\n" if "nnode" in self.settings.runscript and config.slurm: # Running as a SLURM job step and user specified the number of nodes explicitly. ret += f'export SCM_SRUN_OPTIONS="$SCM_SRUN_OPTIONS -N {self.settings.runscript.nnode}"\n' elif "nproc" in self.settings.runscript and config.slurm: # Running as a SLURM job step and user asked for a specific number of tasks. # Make sure to use as few nodes as possible to avoid distributing jobs needlessly across nodes. # See: https://stackoverflow.com/questions/71382578 for nnode in range(1, len(config.slurm.tasks_per_node) + 1): if sum(config.slurm.tasks_per_node[0:nnode]) >= self.settings.runscript.nproc: break if nnode > 1: nnode = f"1-{nnode}" ret += f'export SCM_SRUN_OPTIONS="$SCM_SRUN_OPTIONS -N {nnode}"\n' if _has_scm_pisa and isinstance(self.settings.input, DriverBlock): if self.settings.input.Engine.name == "QuantumESPRESSO": ret += f"export SCM_DISABLE_MPI=1\n" else: if "QuantumEspresso" in self.settings.input: ret += f"export SCM_DISABLE_MPI=1\n" if "preamble_lines" in self.settings.runscript: for line in self.settings.runscript.preamble_lines: ret += f"{line}\n" ret += 'AMS_JOBNAME="{}" AMS_RESULTSDIR=. $AMSBIN/ams'.format(self.name) if "nproc" in self.settings.runscript: ret += " -n {}".format(self.settings.runscript.nproc) ret += ' --input="{}" < /dev/null'.format(self._filename("inp")) if self.settings.runscript.stdout_redirect: ret += ' >"{}"'.format(self._filename("out")) ret += "\n\n" if "postamble_lines" in self.settings.runscript: for line in self.settings.runscript.postamble_lines: ret += f"{line}\n" ret += "\n" return ret
[docs] def check(self): """Check if ``termination status`` variable from ``General`` section of main KF file equals ``NORMAL TERMINATION``.""" try: status = self.results.readrkf("General", "termination status") except: return False if "NORMAL TERMINATION" in status: if "errors" in status: log("Job {} reported errors. Please check the output".format(self._full_name()), 1) return False if "warnings" in status: log("Job {} reported warnings. Please check the output".format(self._full_name()), 1) return True return False
[docs] def get_errormsg(self): """Tries to get an error message for a failed job. This method returns ``None`` for successful jobs.""" if self.check(): return None else: # Something went wrong. The first place to check is the termination status on the ams.rkf. # If the AMS driver stopped with a known error (called StopIt in the Fortran code), the error will be in there. try: msg = self.results.readrkf("General", "termination status") if msg == "NORMAL TERMINATION with errors" or msg is None: # Apparently this wasn't a hard stop in the middle of the job. # Let's look for the last error in the logfile ... msg = self.results.grep_file("ams.log", "ERROR: ")[-1].partition("ERROR: ")[2] elif msg == "IN PROGRESS" and "$JN.err" in self.results: # If the status is still "IN PROGRESS", that probably means AMS was shut down hard from the outside. # E.g. it got SIGKILL from the scheduler for exceeding some resource limit. # In this case useful information may be found on stderr. with open(self.results["$JN.err"], "r") as err: errlines = err.read().splitlines() for el in reversed(errlines): if el != "" and not el.isspace(): msg = "Killed while IN PROGRESS: " + el break except: msg = "Could not determine error message. Please check the output manually." return msg
[docs] def hash_input(self): """Calculate the hash of the input file. All instances of |AMSJob| or |AMSResults| present as values in ``settings.input`` branch are replaced with hashes of corresponding job's inputs. Instances of |KFFile| are replaced with absolute paths to corresponding files. """ special = { AMSJob: lambda x: x.hash_input(), AMSResults: lambda x: x.job.hash_input(), KFFile: lambda x: x.path, tuple: lambda x: AMSJob._tuple2rkf(x), } return sha256(self._serialize_input(special))
# ========================================================================= def _serialize_input(self, special) -> str: """Transform the contents of ``settings.input`` branch into string with blocks, keys and values. First, the contents of ``settings.input`` are extended with entries returned by :meth:`_serialize_molecule`. Then the contents of ``settings.input.ams`` are used to generate AMS text input. Finally, every other (than ``ams``) entry in ``settings.input`` is used to generate engine specific input. Special values can be indicated with *special* argument, which should be a dictionary having types of objects as keys and functions translating these types to strings as values. """ def unspec(value): """Check if *value* is one of a special types and convert it to string if it is.""" for spec_type in special: if isinstance(value, spec_type): return special[spec_type](value) return value def serialize(key, value, indent, end="End"): """Given a *key* and its corresponding *value* from the |Settings| instance produce a snippet of the input file representing this pair. If the value is a nested |Settings| instance, use recursive calls to build the snippet for the entire block. Indent the result with *indent* spaces. """ ret = "" if isinstance(value, Settings): # Open block ... ret += " " * indent + key # ... with potential header if "_h" in value: ret += " " + unspec(value["_h"]) ret += "\n" # Free block, or explicitly placed entries with _1, _2, ... i = 1 while f"_{i}" in value: if isinstance(value[f"_{i}"], Settings): for ckey in value[f"_{i}"]: ret += serialize(ckey, value[f"_{i}"][ckey], indent + 2) else: ret += serialize("", value["_" + str(i)], indent + 2) i += 1 # Figure out the order in which we should serialize the entries in the block if "_o" in value: # Ordered block: we need to serialize the contents in the order given by "_o" children = [] for v in value["_o"]: ckey, _, idx = v.partition("[") if idx: # Child is a recurring entry, e.g. SubBlock[5] idx = int(idx.rstrip("]")) children.append((ckey, value[ckey][idx - 1])) else: # Child is a unique entry children.append((ckey, value[ckey])) else: # Unordered block: normal Python order when iterating over a dict/Settings children = [(ckey, value[ckey]) for ckey in value if not ckey.startswith("_")] # Serialize all children for ckey, cvalue in children: split_key = key.lower().split() split_ckey = ckey.lower().split() if len(split_key) > 0 and split_key[0] == "engine" and ckey.lower() == "input": ret += serialize(ckey, cvalue, indent + 2, "EndInput") # REB: For the hybrid engine. How to deal with the space in ckey (Engine DFTB)? Replace by underscore? elif len(split_ckey) > 0 and split_ckey[0] == "engine": engine = " ".join(ckey.split("_")) ret += serialize(engine, cvalue, indent + 2, end="EndEngine") + "\n" else: ret += serialize(ckey, cvalue, indent + 2) # Close block if key.lower() == "input": end = "endinput" ret += " " * indent + end + "\n" elif isinstance(value, list): for el in value: ret += serialize(key, el, indent, end) elif value == "" or value is True: ret += " " * indent + key + "\n" elif value is False or value is None: pass else: ret += " " * indent + key + " " + str(unspec(value)) + "\n" return ret if _has_scm_pisa and isinstance(self.settings.input, DriverBlock): # AMS specific way of writing input files: # self.settings.input is an input class that knows how to serialize itself to text input. txtinp = self.settings.input.get_input_string() systems = self._serialize_molecule() for system in systems: system_input = serialize("System", system, 0) # merge existing system block with one serialized from the molecule if hasattr(self.settings.input, "System") and self.settings.input.System.value_changed: if system["_h"]: split_line = f"System {system['_h']}\n" else: split_line = "System\n" start, end = txtinp.split(split_line) # strip duplicate 'End' from molecule system block system_input = "".join(system_input.splitlines(keepends=True)[:-1]) # insert system input in proper place txtinp = start + system_input + end else: txtinp += "\n" + system_input else: # Open-source PLAMS way of writing input files: # self.settings.input is a Settings object that we need to serialize to text input. fullinput = self.settings.input.copy() # prepare contents of 'system' block(s) more_systems = self._serialize_molecule() if more_systems: if "system" in fullinput.ams: # nonempty system block was already present in input.ams system = fullinput.ams.system system_list = system if isinstance(system, list) else [system] system_list_set = Settings({(s._h if "_h" in s else ""): s for s in system_list}) more_systems_set = Settings({(s._h if "_h" in s else ""): s for s in more_systems}) system_list_set += more_systems_set system_list = list(system_list_set.values()) system = system_list[0] if len(system_list) == 1 else system_list fullinput.ams.system = system else: fullinput.ams.system = more_systems[0] if len(more_systems) == 1 else more_systems txtinp = "" ams = fullinput.find_case("ams") # contents of the 'ams' block (AMS input) go first for item in fullinput[ams]: txtinp += serialize(item, fullinput[ams][item], 0) + "\n" # and then engines for engine in fullinput: if engine != ams: txtinp += serialize("Engine " + engine, fullinput[engine], 0, end="EndEngine") + "\n" return txtinp def _serialize_molecule(self): """Return a list of |Settings| instances containing the information about one or more |Molecule| instances stored in the ``molecule`` attribute. Molecular charge is taken from ``molecule.properties.charge``, if present. Additional, atom-specific information to be put in ``atoms`` block after XYZ coordinates can be supplied with ``atom.properties.suffix``. If the ``molecule`` attribute is a dictionary, the returned list is of the same length as the size of the dictionary. Keys from the dictionary are used as headers of returned ``system`` blocks. """ def serialize_to_settings(name, mol): if isinstance(mol, Molecule): sett = serialize_molecule_to_settings(mol) elif _has_scm_unichemsys and isinstance(mol, ChemicalSystem): sett = serialize_unichemsys_to_settings(mol) if name: sett._h = name return sett def serialize_unichemsys_to_settings(mol): from scm.libbase import InputParser sett = InputParser().to_settings(AMSJob._command, str(mol)) return sett.ams.system[0] def serialize_molecule_to_settings(mol): sett = Settings() if len(mol.lattice) in [1, 2] and mol.align_lattice(): log( "The lattice of {} Molecule supplied for job {} did not follow the convention required by AMS. I rotated the whole system for you. You're welcome".format( name if name else "main", self._full_name() ), 3, ) sett.Atoms._1 = [ atom.str(symbol=self._atom_symbol(atom), space=18, decimal=10, suffix=self._atom_suffix(atom)) for atom in mol ] if mol.lattice: sett.Lattice._1 = ["{:16.10f} {:16.10f} {:16.10f}".format(*vec) for vec in mol.lattice] if len(mol.bonds) > 0: lines = ["{} {} {}".format(mol.index(b.atom1), mol.index(b.atom2), b.order) for b in mol.bonds] # Add bond properties if they are defined sett.BondOrders._1 = [ ( "{} {}".format(text, mol.bonds[i].properties.suffix) if "suffix" in mol.bonds[i].properties else text ) for i, text in enumerate(lines) ] if "charge" in mol.properties: sett.Charge = mol.properties.charge if "regions" in mol.properties: # sett.Region = [Settings({'_h':name, '_1':['%s=%s'%(k,str(v)) for k,v in data.items()]}) for name, data in molecule.properties.regions.items()] sett.Region = [ Settings({"_h": name, "Properties": Settings({"_1": [line for line in data]})}) for name, data in mol.properties.regions.items() if isinstance(data, list) ] return sett if self.molecule is None: return Settings() moldict = {} if isinstance(self.molecule, Molecule) or (_has_scm_unichemsys and isinstance(self.molecule, ChemicalSystem)): moldict = {"": self.molecule} elif isinstance(self.molecule, dict): moldict = self.molecule else: raise JobError( "Incorrect 'molecule' attribute of job {}. 'molecule' should be a Molecule, a UnifiedChemicalSystem, a dictionary or None, and not {}".format( self._full_name(), type(self.molecule) ) ) ret = [serialize_to_settings(name, molecule) for name, molecule in moldict.items()] return ret # =========================================================================
[docs] def get_task(self): """Returns the AMS Task from the job's settings. If it does not exist, returns None.""" if ( isinstance(self.settings, Settings) and "input" in self.settings and "ams" in self.settings.input and "task" in self.settings.input.ams ): return self.settings.input.ams.task return None
@staticmethod def _atom_suffix(atom): """Return the suffix of an atom. Combines both ``properties.suffix`` as well as the other properties in a format that is suitable for inclusion in the AMSuite input.""" # Build key-value dictionary from properties keyval_dict = {} def serialize(sett, prefix=""): for key, val in sett.items(): if prefix == "" and key.lower() in ["suffix", "ghost", "name"]: # Special atomic properties that are handled by _atom_symbol() already. # Suffix handled explicitly below ... continue if isinstance(val, Settings): # Recursively serialize nested Settings object serialize(val, prefix + key + ".") continue elif isinstance(val, list): # Lists are converted to a comma separated string of elements val = ",".join(str(v) for v in val) elif isinstance(val, set): # Sets are converted to a *sorted* comma separated string of elements val = ",".join(str(v) for v in sorted(val)) key = prefix + key if not isinstance(val, str): val = str(val) if "\n" in val: raise ValueError(f"String representation of atomic property {key} may not include line breaks") if "{" in val or "}" in val: raise ValueError( f"String representation of atomic property {key} may not include curly brackets: {val}" ) if " " in val: # Ensure that space containing values are quoted if '"' not in val: val = '"' + val + '"' elif "'" not in val: val = "'" + val + "'" else: raise ValueError(f"Atomic property {key} can not include both single and double quotes: {val}") keyval_dict[key] = str(val) serialize(atom.properties) # Assemble and return complete suffix string ret = " ".join(f"{key}={val}" for key, val in keyval_dict.items()) if "suffix" in atom.properties: ret += " " + str(atom.properties.suffix) return ret.strip() @staticmethod def _atom_symbol(atom): """Return the atomic symbol of *atom*. Ensure proper formatting for AMSuite input taking into account ``ghost`` and ``name`` entries in ``properties`` of *atom*.""" smb = atom.symbol if "ghost" in atom.properties and atom.properties.ghost: smb = ("Gh." + smb).rstrip(".") if "name" in atom.properties: smb = (smb + "." + str(atom.properties.name)).lstrip(".") return smb @staticmethod def _tuple2rkf(arg): """Transform a pair ``(x, name)`` where ``x`` is an instance of |AMSJob| or |AMSResults| and ``name`` is a name of ``.rkf`` file (``ams`` or engine) to an absolute path to that ``.rkf`` file.""" if len(arg) == 2 and isinstance(arg[1], str): if isinstance(arg[0], AMSJob): return arg[0].results.rkfpath(arg[1]) if isinstance(arg[0], AMSResults): return arg[0].rkfpath(arg[1]) return str(arg)
[docs] @classmethod def load_external(cls, path, settings=None, molecule=None, finalize=False, fmt="ams") -> "AMSJob": """Load an external job from *path*. In this context an "external job" is an execution of some external binary that was not managed by PLAMS, and hence does not have a ``.dill`` file. It can also be used in situations where the execution was started with PLAMS, but the Python process was terminated before the execution finished, resulting in steps 9-12 of :ref:`job-life-cycle` not happening. All the files produced by your computation should be placed in one folder and *path* should be the path to this folder or a file in this folder. The name of the folder is used as a job name. Input, output, error and runscript files, if present, should have names defined in ``_filenames`` class attribute (usually ``[jobname].in``, ``[jobname].out``, ``[jobname].err`` and ``[jobname].run``). It is not required to supply all these files, but in most cases one would like to use at least the output file, in order to use methods like :meth:`~scm.plams.core.results.Results.grep_output` or :meth:`~scm.plams.core.results.Results.get_output_chunk`. If *path* is an instance of an AMSJob, that instance is returned. This method is a class method, so it is called via class object and it returns an instance of that class:: >>> a = AMSJob.load_external(path='some/path/jobname') >>> type(a) scm.plams.interfaces.adfsuite.ams.AMSJob You can supply |Settings| and |Molecule| instances as *settings* and *molecule* parameters, they will end up attached to the returned job instance. If you don't do this, PLAMS will try to recreate them automatically using methods :meth:`~scm.plams.core.results.Results.recreate_settings` and :meth:`~scm.plams.core.results.Results.recreate_molecule` of the corresponding |Results| subclass. If no |Settings| instance is obtained in either way, the defaults from ``config.job`` are copied. You can set the *finalize* parameter to ``True`` if you wish to run the whole :meth:`~Job._finalize` on the newly created job. In that case PLAMS will perform the usual :meth:`~Job.check` to determine the job status (*successful* or *failed*), followed by cleaning of the job folder (|cleaning|), |postrun| and pickling (|pickling|). If *finalize* is ``False``, the status of the returned job is *copied*. fmt : str One of 'ams', 'qe', 'vasp', or 'any'. 'ams': load a finished AMS job 'qe': convert a Quantum ESPRESSO .out file to ams.rkf and qe.rkf and load from those files 'vasp': convert a VASP OUTCAR file to ams.rkf and vasp.rkf and load from those files (see more below) 'any': auto-detect the format. This method can also be used to convert a finished VASP job to an AMSJob. If you supply the path to a folder containing OUTCAR, then a subdirectory will be created in this folder called AMSJob. In the AMSJob subdirectory, two files will be created: ams.rkf and vasp.rkf, that contain some of the results from the VASP calculation. If the AMSJob subdirectory already exists, the existing ams.rkf and vasp.rkf files will be reused. NOTE: the purpose of loading VASP data this way is to let you call for example job.results.get_energy() etc., not to run new VASP calculations! """ if isinstance(path, cls): return path preferred_name = None fmt = fmt.lower() # first check if path is a VASP output # in which case call cls._vasp_to_ams, which will return a NEW path # containing ams.rkf and vasp.rkf (the .rkf files will be created if they do not exist) if os.path.isdir(path): preferred_name = os.path.basename(os.path.abspath(path)) if ( not os.path.exists(os.path.join(path, "ams.rkf")) and os.path.exists(os.path.join(path, "OUTCAR")) and (fmt == "vasp" or fmt == "any") ): path = vasp_output_to_ams(path, overwrite=False) elif os.path.exists(path) and os.path.basename(path) == "OUTCAR" and (fmt == "vasp" or fmt == "any"): preferred_name = os.path.basename(os.path.dirname(os.path.abspath(path))) path = vasp_output_to_ams(os.path.dirname(path), overwrite=False) elif os.path.exists(path): from ase.io.formats import filetype try: ft = filetype(path) # check if path is a Quantum ESPRESSO .out file # qe_output_to_ams returns a NEW path containng the ams.rkf and qe.rkf files (will be created if # they do not exist) if fmt == "qe": assert ft == "espresso-out", f"The file {path} does not seem to be a Quantum ESPRESSO output file" if fmt == "gaussian": assert ft == "gaussian-out", f"The file {path} does not seem to be a Gaussian output file" if ft == "espresso-out" and (fmt == "qe" or (fmt == "any" and not path.endswith("rkf"))): path = qe_output_to_ams(path, overwrite=False) elif ft == "gaussian-out" and (fmt == "gaussian" or (fmt == "any" and not path.endswith("rkf"))): path = gaussian_output_to_ams(path, overwrite=False) except Exception: # several types of exceptions can be raised, e.g. StopIteration and UnicodeDecodeError # assume that any exception means that the file was not a QE output file pass if not os.path.isdir(path): if os.path.exists(path): path = os.path.dirname(os.path.abspath(path)) elif os.path.isdir(path + ".results"): path = path + ".results" elif os.path.isdir(path + "results"): path = path + "results" else: raise FileError("Path {} does not exist, cannot load from it.".format(path)) job = super(AMSJob, cls).load_external(path, settings, molecule, finalize) if preferred_name is not None: job.name = preferred_name if job.name.endswith(".results") and len(job.name) > 8: job.name = job.name[:-8] return job
[docs] @classmethod def from_input(cls, text_input: str, **kwargs) -> "AMSJob": """ Creates an AMSJob from AMS-style text input. This function requires that the SCM Python package is installed (if not, it will raise an ImportError). text_input : a multi-line string Returns: An AMSJob instance Example:: text = ''' Task GeometryOptimization Engine DFTB Model GFN1-xTB EndEngine ''' job = AMSJob.from_input(text) .. note:: If *molecule* is included in the keyword arguments to this method, the *text_input* may not contain any System blocks. In other words, the molecules to be used either need to come from the *text_input*, or the keyword argument, but not both. If *settings* is included in the keyword arguments to this method, the |Settings| created from the *text_input* will be soft updated with the settings from the keyword argument. In other word, the *text_input* takes precedence over the *settings* keyword argument. """ from scm.libbase import InputParser sett = Settings() sett.input = InputParser().to_settings(cls._command, text_input) mol = cls.settings_to_mol(sett) if mol: if "molecule" in kwargs: raise JobError("AMSJob.from_input(): molecule passed in both text_input and as keyword argument") else: mol = kwargs.pop("molecule", None) if "settings" in kwargs: sett.soft_update(kwargs.pop("settings")) return cls(molecule=mol, settings=sett, **kwargs)
[docs] @classmethod def from_inputfile(cls, filename: str, heredoc_delimit: str = "eor", **kwargs) -> "AMSJob": """Construct an :class:`AMSJob` instance from an AMS inputfile or runfile. If a runscript is provide than this method will attempt to extract the input file based on the heredoc delimiter (see *heredoc_delimit*). """ with open(filename, "r") as f: inp_file = parse_heredoc(f.read(), heredoc_delimit) return cls.from_input(inp_file, **kwargs)
[docs] @staticmethod def settings_to_mol(s: Settings) -> Dict[str, Molecule]: """Pop the `s.input.ams.system` block from a settings instance and convert it into a dictionary of molecules. The provided settings should be in the same style as the ones produced by the SCM input parser. Dictionary keys are taken from the header of each system block. The existing `s.input.ams.system` block is removed in the process, assuming it was present in the first place. """ from scm.plams.tools.units import Units def get_list(s): if "_1" in s and not "_2" in s and isinstance(s._1, list): return s._1 else: i = 1 l = [] while "_" + str(i) in s: l.append(s["_" + str(i)]) i += 1 return l def read_mol(settings_block: Settings) -> Molecule: """Retrieve single molecule from a single `s.input.ams.system` block.""" if "geometryfile" in settings_block: mol = Molecule(settings_block.geometryfile) else: mol = Molecule() if "_h" in settings_block.atoms: conv = Units.conversion_ratio(settings_block.atoms._h.strip("[]"), "Angstrom") else: conv = 1.0 for atom in get_list(settings_block.atoms): # Extract arguments for Atom() symbol, x, y, z, *suffix = atom.split(maxsplit=4) coords = conv * float(x), conv * float(y), conv * float(z) try: at = Atom(symbol=symbol, coords=coords) except PTError: # It's either a ghost atom and/or an atom with a custom name kwargs = {} if symbol.startswith("Gh."): # Ghost atom kwargs["ghost"] = True _, symbol = symbol.split(".", maxsplit=1) if "." in symbol: # Atom with a custom name symbol, kwargs["name"] = symbol.split(".", maxsplit=1) at = Atom(symbol=symbol, coords=coords, **kwargs) if suffix: at.properties.soft_update(AMSJob._atom_suffix_to_settings(suffix[0])) mol.add_atom(at) # Set the lattice vector if applicable if get_list(settings_block.lattice): if "_h" in settings_block.lattice: conv = Units.conversion_ratio(settings_block.lattice._h.strip("[]"), "Angstrom") else: conv = 1.0 mol.lattice = [tuple(conv * float(j) for j in i.split()) for i in get_list(settings_block.lattice)] # Add bonds for bond in get_list(settings_block.bondorders): _at1, _at2, _order, *suffix = bond.split(maxsplit=3) at1, at2, order = mol[int(_at1)], mol[int(_at2)], float(_order) plams_bond = Bond(at1, at2, order=order) if suffix: plams_bond.properties.suffix = suffix[0] mol.add_bond(plams_bond) # Set the molecular charge if settings_block.charge: mol.properties.charge = settings_block.charge # Set the region info (used in ACErxn) if settings_block.region: for s_reg in settings_block.region: if not "_h" in s_reg.keys(): raise JobError("Region block requires a header!") if s_reg.properties: mol.properties.regions[s_reg._h] = s_reg.properties._1 # Apply the supercell keyword if "supercell" in settings_block: if isinstance(settings_block.supercell, str): mol = mol.supercell(*[int(d) for d in settings_block.supercell.split()]) else: mol = mol.supercell(*settings_block.supercell) for at in mol: del at.properties.supercell mol.properties.name = str(settings_block.get("_h", "")) return mol # Raises a KeyError if the `system` key is absent with s.suppress_missing(): try: settings_list = s.input.ams.system if not isinstance(settings_list, list): settings_list = [settings_list] except KeyError: # The block s.input.ams.system is absent return None # Create a new dictionary with system headers as keys and molecules as values moldict: Dict[str, Molecule] = {} for settings_block in settings_list: key = ( str(settings_block._h) if ("_h" in settings_block) else "" ) # Empty string used as default system name. if key in moldict: raise KeyError(f"Duplicate system headers found in s.input.ams.system: {repr(key)}") moldict[key] = read_mol(settings_block) used_properties = ["atoms", "bondorders", "lattice", "charge", "region", "supercell"] for used_property in used_properties: if used_property in settings_block: settings_block.pop(used_property) s.input.ams.System = [system for system in settings_block if system] if not len(s.input.ams.System): del s.input.ams.System return moldict
@staticmethod def _atom_suffix_to_settings(suffix) -> Settings: def is_int(s): if "." in s: return False try: return int(s) == float(s) except ValueError: return False def is_float(s): try: float(s) return True except ValueError: return False import shlex tokens = shlex.split(suffix) # Remove possible spaces around = signs i = 0 while i < len(tokens): if tokens[i].endswith("="): tokens[i] += tokens.pop(i + 1) elif tokens[i].startswith("="): tokens[i - 1] += tokens.pop(i) else: i += 1 properties = Settings() for i, t in enumerate(tokens): key, _, val = t.partition("=") if not val: # Token that is not a key=value pair? Let's accumulate those in the plain text suffix. if "suffix" in properties: properties.suffix += " " + key else: properties.suffix = key continue else: # We have a value. Let's make an educated guess for its type. if val.lower() == "true": val = True elif val.lower() == "false": val = False elif is_int(val): val = int(val) elif is_float(val): val = float(val) elif "," in val: elem = val.split(",") if key.lower() == "region": # For regions we will output a set instead of a list val = set(elem) elif all(is_int(i) for i in elem): val = [int(i) for i in elem] elif all(is_float(f) for f in elem): val = [float(f) for f in elem] else: # Anything else will come out as a list of strings val = elem properties.set_nested(key.split("."), val) return properties @staticmethod def _add_region(atom: Atom, name: str): """ Converts atom.properties.region to a set if it isn't already a set. Adds the name to the set. If name is None, then only the conversion of atom.properties.region is performed. """ if "region" not in atom.properties: atom.properties.region = set() if isinstance(atom.properties.region, str): atom.properties.region = set([atom.properties.region]) if not isinstance(atom.properties.region, set): atom.properties.region = set(atom.properties.region) if name is None: return atom.properties.region.add(name)
def extract_engine_settings(settings: Settings) -> Settings: """ Extracts the engine settings from a Settings object. Example: s.input.ams.Task = "singlepoint" s.runscript.nproc = 1 s.input.ForceField.Type = "UFF" extract_engine_settings(s) will return Settings with s.input.ForceField.Type = "UFF" """ ret = Settings() if "input" in settings: for e in settings.input: if e != "ams": ret.input[e] = settings.input[e].copy() break return ret
[docs]def hybrid_committee_engine_settings(settings_list: List[Settings]) -> Settings: """ Creates the Settings for an AMS Hybrid engine that takes the average of all subengines as the prediction (for energies and forces). settings_list: list of Settings The Settings (at the top level) for each subengine. Returns: Settings (at the top level) for a Hybrid engine. """ def get_partial_settings(x: int) -> Settings: original_settings = settings_list[x] pure_settings = None pure_engine_name = "" for e in original_settings.input: if e != "ams": pure_engine_name = e pure_settings = original_settings.input[e] break assert pure_settings is not None pure_settings._h = f"{pure_engine_name} Engine{x+1}" return pure_settings N = len(settings_list) s = Settings() s.input.Hybrid.Engine = [get_partial_settings(x) for x in range(N)] s.input.Hybrid.Energy.Term = [f"Factor={1/N} Region=* UseCappingAtoms=No EngineID=Engine{x+1}" for x in range(N)] s.input.Hybrid.Committee.Enabled = "Yes" s.runscript.preamble_lines = ["export OMP_NUM_THREADS=1"] return s