Recipes

These recipe workflows are shipped with PLAMS and can be imported directly. Each recipe includes details, API documentation, and source code.

COSMO-RS Compound

Generate .coskf files equivalent to AMS GUI task “COSMO-RS Compound”.

Overview

Use this recipe when you want an easy Python interface for generating COSMO-RS compound files (.coskf) from one or many input structures.

API

API reference
class ADFCOSMORSCompoundJob(molecule, coskf_name=None, coskf_dir=None, preoptimization=None, singlepoint=False, settings=None, mol_info=None, hbc_from_MESP=False, **kwargs)[source]

A class for performing the equivalent of Task: COSMO-RS Compound in the AMS GUI

Parameters:
Keyword Arguments:
  • coskf_name – A name for the generated .coskf file. If nothing is specified, the name of the job will be used.

  • coskf_dir – The directory in which to place the generated .coskf file. If nothing is specified, the file will be put in the plams directory corresponding to the job.

  • preoptimization – If None, do not preoptimize with a fast engine priori to the optimization with ADF. Otherwise, it can be one of ‘UFF’, ‘GAFF’, ‘GFNFF’, ‘GFN1-xTB’, ‘ANI-2x’, ‘M3GNet-UP-2022’. Note that you need valid licenses for ForceField or DFTB or MLPotential to use these preoptimizers.

  • singlepoint (bool) – Run a singlepoint in gasphase and with solvation to generate the .coskf file on the given Molecule. (no geometry optimization). Cannot be combined with preoptimization.

  • settings (Settings) – A Settings object. settings.runscript.nproc, settings.input.adf.custom_options. If ‘adf’ is in settings.input it should be provided without the solvation block.

  • mol_info (dict) – an optional dictionary containing information will be written to the Compound Data section within the COSKF file.

  • hbc_from_MESP (bool) – Defaults to False. Performs DENSF analysis to determine the hydrogen bond center (HBC) used in COSMOSAC-DHB-MESP.

  • name – an optional name for the calculation directory

Example

mol = from_smiles('O')
job = ADFCOSMORSCompoundJob(
    molecule = mol,
    preoptimization = 'UFF',
    coskf_dir = 'coskfs',
    coskf_name = 'Water',
    name = 'H2O',
    mol_info = {'CAS':'7732-18-5'}
)
job.run()
print(job.results.coskfpath())
_result_type

alias of ADFCOSMORSCompoundResults

static _get_radii()[source]

Method to get the atomic radii from solvent.txt (for some elements the radii are instead the Klamt radii)

Return type:

Dict[str, float]

static adf_settings(solvation, settings=None, elements=None, atomic_ion=False)[source]

Returns ADF settings with or without solvation

If solvation == True, then also include the solvation block.

Parameters:
  • solvation (bool) –

  • elements (List[str] | None) –

  • atomic_ion (bool) –

Return type:

Settings

static convert_to_coskf(rkf_path, coskf_name, plams_dir, coskf_dir=None, mol_info=None, densf_path=None)[source]

Convert an adf.rkf file into a .coskf file

Parameters:
  • rkf_path (str) – absolute path to adf.rkf

  • coskf_name (str) – the name of the .coskf file

  • plams_dir (str) – plamsjob path to write out the .coskf file

  • coskf_dir (Optional[str]) – additional path to store the .coskf file

  • mol_info (Optional[Dict[str, Union[float, int, str]]]) – Optional information to write out in the “Compound Data” section of the .coskf file

  • densf_path (Optional[str]) – path to the densf output .t41 file

Return type:

None

static update_hbc_to_coskf(coskf, visulization=False)[source]

Determine the hydrogen bond center for existing COSKF file

Parameters:
  • coskf (str) – Existing COSKF file

  • visulization (bool) – Visulization of hydrogen bond center

Return type:

None

class ADFCOSMORSCompoundResults(job)[source]

Results class for ADFCOSMORSCompoundJob

Parameters:

job (Job) –

coskfpath()[source]

Returns the path to the resulting .coskf

get_main_molecule()[source]

Returns the optimized molecule

get_input_molecule()[source]

Returns the input molecule

get_sigma_profile(subsection='profil')[source]

Returns the sigma profile of the molecule. For more details see CRSResults.get_sigma_profile.

Parameters:

subsection (str) –

Code

Recipe module source: adfcosmorscompound.py
import os, shutil
from collections import OrderedDict
from typing import List, Optional, Union, Dict, Literal, Tuple

from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.interfaces.adfsuite.crs import CRSJob
from scm.plams.interfaces.adfsuite.densf import DensfJob

from scm.plams.tools.periodic_table import PeriodicTable
from scm.plams.mol.molecule import Molecule
from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.core.functions import add_to_instance, requires_optional_package
from scm.plams.interfaces.adfsuite.quickjobs import model_to_settings

from scm.plams.tools.hbc_utilities import parse_mesp, write_HBC_to_COSKF, view_HBC
import numpy as np

__all__ = ["ADFCOSMORSCompoundJob", "ADFCOSMORSCompoundResults"]


class ADFCOSMORSCompoundResults(Results):
    """Results class for ADFCOSMORSCompoundJob"""

    def coskfpath(self):
        """
        Returns the path to the resulting .coskf
        """
        return os.path.join(self.job.path, self.job.coskf_name)

    def get_main_molecule(self):
        """
        Returns the optimized molecule
        """

        return self.job.children["solv"].results.get_main_molecule()

    def get_input_molecule(self):
        """
        Returns the input molecule
        """
        for job in self.job.children.values():
            return job.results.get_input_molecule()

    def get_sigma_profile(self, subsection: str = "profil"):
        """
        Returns the sigma profile of the molecule. For more details see `CRSResults.get_sigma_profile`.
        """
        return self.job.children["crs"].results.get_sigma_profile(subsection=subsection)


class ADFCOSMORSCompoundJob(MultiJob):
    """
    A class for performing the equivalent of Task: COSMO-RS Compound in the AMS GUI

    Args:
        molecule : a PLAMS |Molecule|

    Keyword Args:
        coskf_name  : A name for the generated .coskf file.  If nothing is specified, the name of the job will be used.
        coskf_dir  : The directory in which to place the generated .coskf file.  If nothing is specified, the file will be put in the plams directory corresponding to the job.
        preoptimization  : If None, do not preoptimize with a fast engine priori to the optimization with ADF. Otherwise, it can be one of 'UFF', 'GAFF', 'GFNFF', 'GFN1-xTB', 'ANI-2x', 'M3GNet-UP-2022'. Note that you need valid licenses for ForceField or DFTB or MLPotential to use these preoptimizers.
        singlepoint (bool) :  Run a singlepoint in gasphase and with solvation to generate the .coskf file on the given Molecule. (no geometry optimization). Cannot be combined with ``preoptimization``.
        settings (Settings) : A |Settings| object.  settings.runscript.nproc, settings.input.adf.custom_options. If 'adf' is in settings.input it should be provided without the solvation block.
        mol_info (dict) : an optional dictionary containing information will be written to the Compound Data section within the COSKF file.
        hbc_from_MESP (bool) : Defaults to False. Performs DENSF analysis to determine the hydrogen bond center (HBC) used in COSMOSAC-DHB-MESP.
        name : an optional name for the calculation directory

    Example:

        .. code-block:: python

            mol = from_smiles('O')
            job = ADFCOSMORSCompoundJob(
                molecule = mol,
                preoptimization = 'UFF',
                coskf_dir = 'coskfs',
                coskf_name = 'Water',
                name = 'H2O',
                mol_info = {'CAS':'7732-18-5'}
            )
            job.run()
            print(job.results.coskfpath())

    """

    _result_type = ADFCOSMORSCompoundResults

    def __init__(
        self,
        molecule: Union[Molecule, None],
        coskf_name: Optional[str] = None,
        coskf_dir: Optional[str] = None,
        preoptimization: Optional[str] = None,
        singlepoint: bool = False,
        settings: Optional[Settings] = None,
        mol_info: Optional[Dict[str, Union[float, int, str]]] = None,
        hbc_from_MESP: bool = False,
        **kwargs,
    ):
        """

        Class for running the equivalent of "COSMO-RS Compound" in the AMS
        GUI. Note that these are ADF calculations, not COSMO-RS
        calculations!

        Initialize two or three jobs:

        1. (Optional): Preoptimization with force field or semi-empirical method ('UFF', 'GAFF', 'GFNFF', 'GFN1-xTB', 'ANI-2x' or 'M3GNet-UP-2022')
        Note: A valid license for ForceField or DFTB or MLPotential is required.
        2. Gasphase optimization or single-point calculation (BP86, TZP, BeckeGrid Quality Good)
        3. Take optimized structure and run singlepoint with implicit solvation

        Access the result .coskf file with ``job.results.coskfpath()``.
        Note: this file will be called jobname.coskf, where jobname is the
        name of the ADFCOSMORSCompoundJob.

        """
        if preoptimization and singlepoint:
            raise ValueError("Cannot combine preoptimization with singlepoint")

        MultiJob.__init__(self, children=OrderedDict(), **kwargs)
        self.input_molecule = molecule

        self.mol_info = dict()
        if mol_info is not None:
            self.mol_info.update(mol_info)

        self.settings = settings or Settings()

        self.coskf_name = coskf_name
        self.coskf_dir = coskf_dir
        self.hbc_from_MESP = hbc_from_MESP

        if self.coskf_dir is not None and not os.path.exists(self.coskf_dir):
            os.mkdir(self.coskf_dir)

        if self.coskf_name is None:
            self.coskf_name = f"{self.name}.coskf"
        elif isinstance(self.coskf_name, str) and not self.coskf_name.endswith(".coskf"):
            self.coskf_name += ".coskf"

        gas_s = Settings()
        gas_s += ADFCOSMORSCompoundJob.adf_settings(solvation=False, settings=self.settings)
        gas_job = AMSJob(settings=gas_s, name="gas")

        if not singlepoint:
            gas_job.settings.input.ams.Task = "GeometryOptimization"

            if preoptimization:
                preoptimization_s = Settings()
                preoptimization_s.runscript.nproc = 1
                preoptimization_s.input.ams.Task = "GeometryOptimization"
                preoptimization_s += model_to_settings(preoptimization)
                preoptimization_job = AMSJob(
                    settings=preoptimization_s, name="preoptimization", molecule=self.input_molecule
                )
                self.children["preoptimization"] = preoptimization_job

        elif singlepoint:
            gas_job.settings.input.ams.Task = "SinglePoint"

        @add_to_instance(gas_job)
        def prerun(self):  # noqa: F811
            if not singlepoint and preoptimization:
                self.molecule = self.parent.children["preoptimization"].results.get_main_molecule()
            else:
                self.molecule = self.parent.input_molecule
            self.parent.mol_info, self.parent.atomic_ion = ADFCOSMORSCompoundJob.get_compound_properties(
                self.molecule, self.parent.mol_info
            )

        self.children["gas"] = gas_job

        if self.hbc_from_MESP:
            densf_job = DensfJob(settings=ADFCOSMORSCompoundJob.densf_settings(), name="densf")
            self.children["densf"] = densf_job

            @add_to_instance(densf_job)
            def prerun(self):  # noqa: F811
                gas_job.results.wait()
                self.inputjob = f"../gas/adf.rkf #{self.parent.name}"

        solv_s = Settings()
        solv_s.input.ams.Task = "SinglePoint"
        solv_job = AMSJob(settings=solv_s, name="solv")

        @add_to_instance(solv_job)
        def prerun(self):  # noqa: F811
            gas_job.results.wait()
            self.settings.input.ams.EngineRestart = "../gas/adf.rkf"
            self.settings.input.ams.LoadSystem.File = "../gas/ams.rkf"
            molecule_charge = gas_job.results.get_main_molecule().properties.get("charge", 0)
            self.settings.input.ams.LoadSystem._1 = f"# {self.parent.name}"
            self.settings.input.ams.LoadSystem._2 = f"# charge {molecule_charge}"
            self.settings += ADFCOSMORSCompoundJob.adf_settings(
                solvation=True,
                settings=self.parent.settings,
                elements=list(set(at.symbol for at in self.parent.input_molecule)),
                atomic_ion=self.parent.atomic_ion,
            )

        @add_to_instance(solv_job)
        def postrun(self):
            if self.parent.hbc_from_MESP:
                densf_job.results.wait()
                densf_path = densf_job.results.kfpath()
            else:
                densf_path = None
            ADFCOSMORSCompoundJob.convert_to_coskf(
                rkf_path=self.results.rkfpath(file="adf"),
                coskf_name=self.parent.coskf_name,
                plams_dir=self.parent.path,
                coskf_dir=self.parent.coskf_dir,
                mol_info=self.parent.mol_info,
                densf_path=densf_path,
            )

        self.children["solv"] = solv_job

        sigma_s = Settings()
        sigma_s.input.property._h = "PURESIGMAPROFILE"

        compounds = [Settings()]
        sigma_s.input.compound = compounds
        crsjob = CRSJob(settings=sigma_s, name="sigma")

        @add_to_instance(crsjob)
        def prerun(self):  # noqa F811
            self.parent.children["solv"].results.wait()
            self.settings.input.compound[0]._h = os.path.join(self.parent.path, self.parent.coskf_name)

        self.children["crs"] = crsjob

    @staticmethod
    def get_compound_properties(
        mol: Molecule, mol_info: Optional[Dict[str, Union[float, int, str]]] = None
    ) -> Tuple[Dict[str, Union[float, int, str]], bool]:

        if mol_info is None:
            mol_info = dict()
        mol_info["Molar Mass"] = mol.get_mass()
        mol_info["Formula"] = mol.get_formula()
        try:
            rings = mol.locate_rings()
            flatten_atoms = [atom for subring in rings for atom in subring]
            nring = len(set(flatten_atoms))
            mol_info["Nring"] = int(nring)
        except:
            pass

        atomic_ion = len(mol.atoms) == 1

        return mol_info, atomic_ion

    @staticmethod
    def _get_radii() -> Dict[str, float]:
        """Method to get the atomic radii from solvent.txt (for some elements the radii are instead the Klamt radii)"""
        with open(os.path.expandvars("$AMSHOME/data/gui/solvent.txt"), "r") as f:
            mod_allinger_radii = [float(x) for i, x in enumerate(f) if i > 0]
        radii = {PeriodicTable.get_symbol(i): r for i, r in enumerate(mod_allinger_radii, 1) if i <= 118}
        klamt_radii = {
            "H": 1.30,
            "C": 2.00,
            "N": 1.83,
            "O": 1.72,
            "F": 1.72,
            "Si": 2.48,
            "P": 2.13,
            "S": 2.16,
            "Cl": 2.05,
            "Br": 2.16,
            "I": 2.32,
        }
        radii.update(klamt_radii)

        return radii

    @staticmethod
    def solvation_settings(elements: Optional[List[str]] = None, atomic_ion: bool = False) -> Settings:
        sett = Settings()

        radii = {
            "H": 1.3,
            "He": 1.275,
            "Li": 2.125,
            "Be": 1.858,
            "B": 1.792,
            "C": 2.0,
            "N": 1.83,
            "O": 1.72,
            "F": 1.72,
            "Ne": 1.333,
            "Na": 2.25,
            "Mg": 2.025,
            "Al": 1.967,
            "Si": 2.48,
            "P": 2.13,
            "S": 2.16,
            "Cl": 2.05,
            "Ar": 1.658,
            "K": 2.575,
            "Ca": 2.342,
            "Sc": 2.175,
            "Ti": 1.992,
            "V": 1.908,
            "Cr": 1.875,
            "Mn": 1.867,
            "Fe": 1.858,
            "Co": 1.858,
            "Ni": 1.85,
            "Cu": 1.883,
            "Zn": 1.908,
            "Ga": 2.05,
            "Ge": 2.033,
            "As": 1.967,
            "Se": 1.908,
            "Br": 2.16,
            "Kr": 1.792,
            "Rb": 2.708,
            "Sr": 2.5,
            "Y": 2.258,
            "Zr": 2.117,
            "Nb": 2.025,
            "Mo": 1.992,
            "Tc": 1.967,
            "Ru": 1.95,
            "Rh": 1.95,
            "Pd": 1.975,
            "Ag": 2.025,
            "Cd": 2.083,
            "In": 2.2,
            "Sn": 2.158,
            "Sb": 2.1,
            "Te": 2.033,
            "I": 2.32,
            "Xe": 1.9,
            "Cs": 2.867,
            "Ba": 2.558,
            "La": 2.317,
            "Ce": 2.283,
            "Pr": 2.275,
            "Nd": 2.275,
            "Pm": 2.267,
            "Sm": 2.258,
            "Eu": 2.45,
            "Gd": 2.258,
            "Tb": 2.25,
            "Dy": 2.242,
            "Ho": 2.225,
            "Er": 2.225,
            "Tm": 2.225,
            "Yb": 2.325,
            "Lu": 2.208,
            "Hf": 2.108,
            "Ta": 2.025,
            "W": 1.992,
            "Re": 1.975,
            "Os": 1.958,
            "Ir": 1.967,
            "Pt": 1.992,
            "Au": 2.025,
            "Hg": 2.108,
            "Tl": 2.158,
            "Pb": 2.283,
            "Bi": 2.217,
            "Po": 2.158,
            "At": 2.092,
            "Rn": 2.025,
            "Fr": 3.033,
            "Ra": 2.725,
            "Ac": 2.567,
            "Th": 2.283,
            "Pa": 2.2,
            "U": 2.1,
            "Np": 2.1,
            "Pu": 2.1,
            "Am": 2.1,
            "Cm": 2.1,
            "Bk": 2.1,
            "Cf": 2.1,
            "Es": 2.1,
            "Fm": 2.1,
            "Md": 2.1,
            "No": 2.1,
            "Lr": 2.1,
            "Rf": 2.1,
            "Db": 2.1,
            "Sg": 2.1,
            "Bh": 2.1,
            "Hs": 2.1,
            "Mt": 2.1,
            "Ds": 2.1,
            "Rg": 2.1,
            "Cn": 2.1,
            "Nh": 2.1,
            "Fl": 2.1,
            "Mc": 2.1,
            "Lv": 2.1,
            "Ts": 2.1,
            "Og": 2.1,
        }  # from _get_radii()

        if elements:
            radii = {k: radii[k] for k in sorted(elements)}

        if atomic_ion:
            charge_method = "method=atom corr"
        else:
            charge_method = "method=Conj corr"

        sett.input.adf.solvation = {
            "surf": "Delley",
            "solv": "name=CRS cav0=0.0 cav1=0.0",
            "charged": charge_method,
            "c-mat": "Exact",
            "scf": "Var All",
            "radii": radii,
        }
        return sett

    @staticmethod
    def adf_settings(
        solvation: bool, settings=None, elements: Optional[List[str]] = None, atomic_ion: bool = False
    ) -> Settings:
        """
        Returns ADF settings with or without solvation

        If solvation == True, then also include the solvation block.
        """

        s = Settings()
        if settings:
            s = settings.copy()
        if "basis" not in s.input.adf and "xc" not in s.input.adf:
            s.input.adf.Basis.Type = "TZP"
            s.input.adf.Basis.Core = "Small"
            s.input.adf.XC.GGA = "BP86"
            s.input.adf.Symmetry = "NOSYM"
            s.input.adf.BeckeGrid.Quality = "Good"
        if solvation:
            s += ADFCOSMORSCompoundJob.solvation_settings(elements=elements, atomic_ion=atomic_ion)
        return s

    @staticmethod
    def densf_settings(grid: Literal["Medium", "Fine"] = "Medium") -> Settings:
        s = Settings()
        s.input.GRID = f"{grid}\nEnd"
        s.input.Density = "SCF"
        s.input.Potential = "COUL SCF"
        return s

    @staticmethod
    @requires_optional_package("scm.base")
    def convert_to_coskf(
        rkf_path: str,
        coskf_name: str,
        plams_dir: str,
        coskf_dir: Optional[str] = None,
        mol_info: Optional[Dict[str, Union[float, int, str]]] = None,
        densf_path: Optional[str] = None,
    ) -> None:
        """
        Convert an adf.rkf file into a .coskf file

        Args:
            rkf_path (str) : absolute path to adf.rkf
            coskf_name (str) : the name of the .coskf file
            plams_dir (str) : plamsjob path to write out the .coskf file
            coskf_dir (Optional[str]) :additional path to store the .coskf file
            mol_info (Optional[Dict[str, Union[float, int, str]]]) : Optional information to write out in the "Compound Data" section of the .coskf file
            densf_path (Optional[str]) : path to the densf output .t41 file
        """
        from scm.base import KFFile

        with KFFile(rkf_path) as rkf:
            cosmo = rkf.read_section("COSMO")

        coskf_path = os.path.join(plams_dir, coskf_name)

        with KFFile(coskf_path, autosave=False) as rkf:
            for key, value in cosmo.items():
                rkf.write("COSMO", key, float(value) if isinstance(value, np.float64) else value)
            for key, value in mol_info.items():
                rkf.write("Compound Data", key, float(value) if isinstance(value, np.float64) else value)

        if densf_path is not None:
            HBC_xyz, HBC_atom, HBC_angle, HBC_info = parse_mesp(densf_path, coskf_path)
            write_HBC_to_COSKF(coskf_path, HBC_xyz, HBC_atom, HBC_angle, HBC_info)

        if coskf_dir is not None:
            shutil.copy2(coskf_path, os.path.join(coskf_dir, coskf_name))

    @staticmethod
    def update_hbc_to_coskf(coskf: str, visulization: bool = False) -> None:
        """
        Determine the hydrogen bond center for existing COSKF file

        Args:
            coskf (str) : Existing COSKF file
            visulization (bool) : Visulization of hydrogen bond center
        """
        molecule = Molecule(coskf)
        coskf_name = os.path.basename(coskf).replace(".coskf", "")

        atomic_ion = len(molecule.atoms) == 1
        gas_settings = ADFCOSMORSCompoundJob.adf_settings(solvation=False, atomic_ion=atomic_ion)
        gas_settings.input.ams.Task = "SinglePoint"
        gas_job = AMSJob(molecule=molecule, settings=gas_settings, name=f"gas_{coskf_name}")
        gas_job.run()

        gas_rkf = gas_job.results.rkfpath(file="adf")
        densf_settings = ADFCOSMORSCompoundJob.densf_settings()
        densf_job = DensfJob(gas_rkf, settings=densf_settings, name=f"densf_{coskf_name}")
        densf_job.run()

        t41 = densf_job.results.kfpath()

        HBC_xyz, HBC_atom, HBC_angle, HBC_info = parse_mesp(t41, coskf)
        write_HBC_to_COSKF(coskf, HBC_xyz, HBC_atom, HBC_angle, HBC_info)

        if visulization:
            view_HBC(coskf)

COSMO-RS Conformers

Build customizable conformer workflows that end in ADF/COSMO calculations for COSMO-RS.

Overview

This recipe builds a configurable conformer-generation pipeline and produces .coskf files for the selected conformers.

API

API reference
class ADFCOSMORSConfJob(molecule, conf_gen=None, first_filter=None, additional=None, final_filter=None, adf_singlepoint=False, initial_conformers=500, coskf_dir=None, coskf_name=None, mol_info=None, hbc_from_MESP=False, **kwargs)[source]

Multi-step conformer workflow for generating COSMO-RS .coskf files:

job_0: initial conformer generation
(default RDKit generator or user-provided conf_gen)
  |-- apply first_filter (if provided)
  v
job_i: optional refinement stages from additional
(for each (Settings, Filter) tuple, e.g. DFTB)
  |-- apply its filter after each stage
  v
adf_gas_job: gas-phase ADF conformer job
  |-- if adf_singlepoint=True: single-point only
  v
final_filter_job: apply final_filter (if provided)
  v
Branch on hbc_from_MESP:
  |-- False: replay_job -> COSMO results (.coskf)
  `-- True: per conformer job_cosmo + job_densf
        `-> COSMO + hydrogen bond center (.coskf)
Parameters:
  • molecule (Molecule) – Input PLAMS molecule.

  • conf_gen (ConformersJob | None) – User-provided job for initial conformer generation.

  • initial_conformers (int) – Number of conformers generated by the default conf_gen (RDKit). Defaults to 500.

  • first_filter (ADFCOSMORSConfFilter | None) – Filter applied after initial conformer generation.

  • additional (List[Tuple[Settings, ADFCOSMORSConfFilter | None]] | None) – Optional extra conformer-refinement stages and their filters (e.g., with a fast engine such as DFTB).

  • final_filter (ADFCOSMORSConfFilter | None) – Filter applied after gas-phase ADF conformer generation.

  • adf_singlepoint (bool) – If True, run gas-phase ADF conformer jobs without optimization. Defaults to False.

  • hbc_from_MESP (bool) – If True, determine the hydrogen-bond center (hbc) via DENSF analysis (COSMOSAC-DHB-MESP). Defaults to False.

  • coskf_dir (str | None) – Output directory for generated .coskf files.

  • coskf_name (str | None) – Base filename for conformers (e.g., {coskf_name}_{i}.coskf). Default to conformer.

  • mol_info (Dict[str, float | int | str] | None) – Extra metadata written to “Compound Data” (e.g. IUPAC, CAS, SMILES and Other Name).

  • name – Optional PLAMS job name. Default to plams.

_result_type

alias of ADFCOSMORSConfResults

new_children()[source]

Generate new children jobs.

This method is useful when some of children jobs are not known beforehand and need to be generated based on other children jobs, like for example in any kind of self-consistent procedure.

The goal of this method is to produce a new portion of children jobs. Newly created jobs should be returned in a container compatible with self.children (e.g. list for list, dict for dict). No adjustment of newly created jobs’ parent attribute is needed. This method cannot modify _active_children attribute.

The method defined here is a default template, returning None, which means no new children jobs are generated and the entire execution of the parent job consists only of running jobs initially found in self.children. To modify this behavior you can override this method in a MultiJob subclass or you can use one of Binding decorators, just like with Prerun and postrun methods.

prerun()[source]

Actions to take before the actual job execution.

This method is initially empty, it can be defined in subclasses or directly added to either the whole class or a single instance using Binding decorators.

postrun()[source]

Actions to take just after the actual job execution.

This method is initially empty, it can be defined in subclasses or directly added to either the whole class or a single instance using Binding decorators.

class ADFCOSMORSConfFilter(max_num_confs=None, max_energy_range=None)[source]

This class allows the user to specify criteria on which to filter sets of conformers.

Parameters:
  • max_num_confs (int | None) – The maximum number of conformers to bring forward to the next step. These are automatically ordered by energy, so the first n lowest energy structures are carried carried forward to the next step.

  • max_energy_range (float | None) – The maximum allowable difference (in kcal/mol) between the lowest energy conformer and the highest energy conformer that will be carried forward. For example, for a max_energy_range of 2 kcal/mol and a lowest energy structure of 1 kcal/mol, any structure with an energy above 3 (1+2) kcal/mol will be filtered out.

class ADFCOSMORSConfResults(job)[source]
Parameters:

job (Job) –

Code

Recipe module source: adfcosmorsconformers.py
from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.interfaces.adfsuite.densf import DensfJob
from scm.plams.mol.molecule import Molecule
from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.core.functions import log
from scm.plams.recipes.adfcosmorscompound import ADFCOSMORSCompoundJob
from scm.conformers import ConformersJob
from scm.base import KFFile, Units
from typing import List, Optional, Union, Dict, Tuple
from collections import OrderedDict
import os, re
from pathlib import Path
import numpy as np


class ADFCOSMORSConfFilter:
    """
    This class allows the user to specify criteria on which to filter sets of conformers.

    Args:
        max_num_confs : The maximum number of conformers to bring forward to the next step.  These are automatically ordered by energy, so the first *n* lowest energy structures are carried carried forward to the next step.
        max_energy_range : The maximum allowable difference (in kcal/mol) between the lowest energy conformer and the highest energy conformer that will be carried forward.  For example, for a max_energy_range of 2 kcal/mol and a lowest energy structure of 1 kcal/mol, any structure with an energy above 3 (1+2) kcal/mol will be filtered out.
    """

    def __init__(self, max_num_confs: Optional[int] = None, max_energy_range: Optional[float] = None):
        self.max_num_confs = max_num_confs
        self.max_energy_range = max_energy_range


class ADFCOSMORSConfResults(Results):
    pass


class ADFCOSMORSConfJob(MultiJob):
    """
    Multi-step conformer workflow for generating COSMO-RS `.coskf` files:

    .. code-block:: text

        job_0: initial conformer generation
        (default RDKit generator or user-provided conf_gen)
          |-- apply first_filter (if provided)
          v
        job_i: optional refinement stages from additional
        (for each (Settings, Filter) tuple, e.g. DFTB)
          |-- apply its filter after each stage
          v
        adf_gas_job: gas-phase ADF conformer job
          |-- if adf_singlepoint=True: single-point only
          v
        final_filter_job: apply final_filter (if provided)
          v
        Branch on hbc_from_MESP:
          |-- False: replay_job -> COSMO results (.coskf)
          `-- True: per conformer job_cosmo + job_densf
                `-> COSMO + hydrogen bond center (.coskf)

    Args:
        molecule: Input PLAMS molecule.
        conf_gen: User-provided job for initial conformer generation.

        initial_conformers: Number of conformers generated by the default `conf_gen` (RDKit). Defaults to `500`.
        first_filter: Filter applied after initial conformer generation.
        additional: Optional extra conformer-refinement stages and their filters (e.g., with a fast engine such as DFTB).
        final_filter: Filter applied after gas-phase ADF conformer generation.

        adf_singlepoint: If `True`, run gas-phase ADF conformer jobs without optimization. Defaults to `False`.
        hbc_from_MESP: If `True`, determine the hydrogen-bond center (hbc) via DENSF analysis (COSMOSAC-DHB-MESP). Defaults to `False`.

        coskf_dir: Output directory for generated `.coskf` files.
        coskf_name: Base filename for conformers (e.g., `{coskf_name}_{i}.coskf`). Default to `conformer`.
        mol_info: Extra metadata written to "Compound Data" (e.g. `IUPAC`, `CAS`, `SMILES` and `Other Name`).
        name: Optional PLAMS job name. Default to `plams`.
    """

    _result_type = ADFCOSMORSConfResults

    def __init__(
        self,
        molecule: Molecule,
        conf_gen: Optional[ConformersJob] = None,
        first_filter: Optional[ADFCOSMORSConfFilter] = None,
        additional: Optional[List[Tuple[Settings, Optional[ADFCOSMORSConfFilter]]]] = None,
        final_filter: Optional[ADFCOSMORSConfFilter] = None,
        adf_singlepoint: bool = False,
        initial_conformers: int = 500,
        coskf_dir: Optional[str] = None,
        coskf_name: Optional[str] = None,
        mol_info: Optional[Dict[str, Union[float, int, str]]] = None,
        hbc_from_MESP: bool = False,
        **kwargs,
    ):

        MultiJob.__init__(self, children=OrderedDict(), **kwargs)

        self.mol = molecule

        self.mol_info, self.atomic_ion = ADFCOSMORSCompoundJob.get_compound_properties(molecule, mol_info)

        self.initial_conformers = initial_conformers
        self.adf_singlepoint = adf_singlepoint

        self.coskf_dir = coskf_dir
        self.coskf_name = coskf_name if coskf_name is not None else "conformer"

        if self.coskf_dir is not None and not os.path.exists(self.coskf_dir):
            os.makedirs(self.coskf_dir, exist_ok=True)

        self._has_valid_conf_gen = conf_gen is None or isinstance(conf_gen, ConformersJob)

        if conf_gen is None or self._has_valid_conf_gen:
            self.job_0 = conf_gen if conf_gen is not None else self._make_default_confgen_job()
        else:
            self.job_0 = self._make_default_confgen_job()

        self.job_settings = [self.job_0.settings]
        self.filters = [None, first_filter]

        if additional is not None:
            for js, f in additional:
                self.job_settings.append(js)
                self.filters.append(f)

        self.final_filter = final_filter
        self.filters.append(final_filter)

        self.hbc_from_MESP = hbc_from_MESP

        self.children["job_0"] = self.job_0

    @staticmethod
    def _get_compound_properties(
        mol: Molecule, mol_info: Optional[Dict[str, Union[float, int, str]]] = None
    ) -> Tuple[Dict[str, Union[float, int, str]], bool]:

        if mol_info is None:
            mol_info = dict()
        mol_info["Molar Mass"] = mol.get_mass()
        mol_info["Formula"] = mol.get_formula()
        try:
            rings = mol.locate_rings()
            flatten_atoms = [atom for subring in rings for atom in subring]
            nring = len(set(flatten_atoms))
            mol_info["Nring"] = int(nring)
        except Exception as exc:
            log(f"[ADFCOSMORSConfJob] Nring can not be located correctly: {exc}")

        atomic_ion = len(mol.atoms) == 1

        return mol_info, atomic_ion

    def new_children(self):
        # job_1 ... job_N: optional intermediate conformer refinement jobs
        for job_id in range(1, len(self.job_settings)):
            key = f"job_{job_id}"
            if key not in self.children:
                return {key: self._make_intermediate_job(job_id)}

        # ADF gas-phase conformer job
        adf_job_id = len(self.job_settings)
        if "adf_gas_job" not in self.children:
            return {"adf_gas_job": self._make_adf_job(adf_job_id)}

        # Optional final filtering step on ADF conformers
        if self.final_filter is not None and "final_filter_job" not in self.children:
            return {"final_filter_job": self._make_filter_job(adf_job_id + 1)}

        # In the MESP branch, jobs_cosmo/jobs_densf are created in postrun():
        # the final number of conformers is only known after all filtering is complete.

        # In the non-MESP branch, COSMO results for all conformers are generated via one replay job.
        if not self.hbc_from_MESP and "replay_job" not in self.children:
            replay_id = adf_job_id + (2 if self.final_filter is not None else 1)
            return {"replay_job": self._make_replay_job(replay_id)}
        return None

    def prerun(self):
        if not self._has_valid_conf_gen:
            msg = (
                "[ADFCOSMORSConfJob] Wrong type for argument conf_gen. Expected ConformersJob instance. "
                "Using the default conformer generator."
            )
            log(msg)

        if not self._has_valid_filter_settings():
            msg = "[ADFCOSMORSConfJob] Wrong type for filter or settings"
            log(msg)
            raise TypeError(msg)

    def postrun(self):
        self.adf_gas_job = self.children["adf_gas_job"]
        self.final_adf_job = self._get_final_adf_job()
        self._iterate_conformers()

    def _get_final_adf_job(self):
        if self.final_filter is not None and "final_filter_job" in self.children:
            return self.children["final_filter_job"]
        return self.children["adf_gas_job"]

    def _make_default_confgen_job(self):

        sett = Settings()
        sett.input.AMS.Generator.RDKit.InitialNConformers = self.initial_conformers

        return ConformersJob(name="job0_conformers_uff", molecule=self.mol, settings=sett)

    def _make_intermediate_job(self, job_id):

        settings = self.job_settings[job_id].copy()
        settings.input.AMS.InputConformersSet = self.children[f"job_{job_id - 1}"].results
        self._add_filter(settings, job_id)

        return ConformersJob(name=f"job{job_id}_additional", settings=settings)

    def _make_adf_job(self, job_id):

        sett = ADFCOSMORSCompoundJob.adf_settings(False, atomic_ion=self.atomic_ion)
        if not self.adf_singlepoint:
            sett.input.AMS.Task = "Optimize"
            sett.input.AMS.GeometryOptimization.UseAMSWorker = "False"
            # sett.input.ams.GeometryOptimization.ConvergenceQuality = 'Good'
        else:
            sett.input.AMS.Task = "Score"
        sett.input.AMS.InputConformersSet = self.children[f"job_{job_id - 1}"].results
        if self.hbc_from_MESP:
            sett.input.AMS.GeometryOptimization.Keep = "All"
            sett.input.AMS.Output.KeepWorkDir = "Yes"

        self._add_filter(sett, job_id)

        return ConformersJob(name=f"job{job_id}_adf", settings=sett)

    def _make_filter_job(self, job_id):

        sett = Settings()
        sett.input.AMS.Task = "Filter"
        sett.input.AMS.InputConformersSet = self.children["adf_gas_job"].results
        self._add_filter(sett, job_id)

        return ConformersJob(name=f"job{job_id}_adf_filter", settings=sett)

    def _iterate_conformers(self):
        if self.hbc_from_MESP:
            self._iterate_conformers_mesp()
        else:
            self._iterate_conformers_replay()

    def _iterate_conformers_replay(self):

        if self.coskf_dir is None:
            self.coskf_dir = self.final_adf_job.path

        self.replay_job = self.children["replay_job"]
        self.replay_job.results.wait()

        conformer_E = self.final_adf_job.results.get_energies("Ha")

        for i, conf_E in enumerate(conformer_E):
            frame_name = f"Frame{i+1}"

            cosmo_section = self.replay_job.results.read_rkf_section("COSMO", frame_name)
            cosmo_section["Gas Phase Bond Energy"] = conf_E
            coskfname = f"{self.coskf_name}_{i}.coskf"
            fullname = Path(self.coskf_dir) / coskfname
            fullname.unlink(missing_ok=True)

            coskf = KFFile(str(fullname), autosave=False)
            for key, val in cosmo_section.items():
                coskf.write("COSMO", key, val)

            for key, value in self.mol_info.items():
                coskf.write("Compound Data", key, value)
            coskf.save()

    def _iterate_conformers_mesp(self):

        if self.coskf_dir is None:
            self.coskf_dir = self.final_adf_job.path

        # prepare gas phase rkf files for per-conformer COSMO + DENSF reruns
        rkf_path = Path(self.adf_gas_job.results.rkfpath()).parent
        gas_rkf_path = list(rkf_path.glob("go*"))
        gas_rkf_path = sorted(gas_rkf_path, key=lambda p: int(re.search(r"go(\d+)", p.as_posix()).group(1)))

        au2ha = Units.conversion_factor("au", "hartree")
        gas_rkf_energy = []
        for rkf_path in gas_rkf_path:
            rkf = KFFile(str(rkf_path / "adf.rkf"))
            egas = rkf.read_real("AMSResults", "Energy") * au2ha
            gas_rkf_energy.append(egas)

        conformer_E = self.final_adf_job.results.get_energies("Ha")

        num_conformer = len(conformer_E)
        self.jobs_cosmo = [None] * num_conformer
        self.jobs_densf = [None] * num_conformer

        conformer_names = [f"{self.coskf_name}_{i}.coskf" for i in range(num_conformer)]

        used_rkf_idx = set()
        for i, (conf_E, coskfname) in enumerate(zip(conformer_E, conformer_names)):
            Path(self.coskf_dir, coskfname).unlink(missing_ok=True)

            matches = [
                idx
                for idx, ref_E in enumerate(gas_rkf_energy)
                if idx not in used_rkf_idx and np.isclose(conf_E, ref_E, rtol=1e-12, atol=1e-8)
            ]

            if not matches:
                msg = f"[ADFCOSMORSConfJob] No match found for conf_E={conf_E:.12f}"
                log(msg)
                raise FloatingPointError(msg)

            if len(matches) > 1:
                rkf_idx = min(matches, key=lambda j: abs(conf_E - gas_rkf_energy[j]))
                msg = (
                    f"[ADFCOSMORSConfJob] Multiple matches for conf_E={conf_E:.12f}, "
                    f"choose idx={rkf_idx} with min |dE|={abs(conf_E - gas_rkf_energy[rkf_idx]):.3e}"
                )
                log(msg)
            else:
                rkf_idx = matches[0]

            used_rkf_idx.add(rkf_idx)

            log(f"[ADFCOSMORSConfJob] matched gas_energy index {rkf_idx} for conformer {i}")

            self.jobs_cosmo[i] = self._make_cosmo_job(gas_rkf_path[rkf_idx], i)
            self.jobs_cosmo[i].run()
            self.jobs_densf[i] = self._make_densf_job(gas_rkf_path[rkf_idx], i)
            self.jobs_densf[i].run()

            self._make_conformer_coskf(i, coskfname)

    def _make_conformer_coskf(self, i, name):
        self.jobs_cosmo[i].results.wait()

        t41 = None
        if self.hbc_from_MESP:
            self.jobs_densf[i].results.wait()
            t41 = self.jobs_densf[i].results.kfpath()

        ADFCOSMORSCompoundJob.convert_to_coskf(
            rkf_path=self.jobs_cosmo[i].results.rkfpath(file="adf"),
            coskf_name=name,
            plams_dir=self.jobs_cosmo[i].path,
            coskf_dir=self.coskf_dir,
            mol_info=self.mol_info,
            densf_path=t41,
        )

    def _make_replay_job(self, job_id):

        sett = ADFCOSMORSCompoundJob.adf_settings(
            True, elements=list(set(at.symbol for at in self.mol)), atomic_ion=self.atomic_ion
        )
        sett.input.AMS.Task = "Replay"
        self._get_final_adf_job().results.wait()
        sett.input.AMS.Replay.File = self._get_final_adf_job().results["conformers.rkf"]
        sett.input.AMS.Replay.StoreAllResultFiles = "True"

        return AMSJob(name=f"job{job_id}_replay", settings=sett)

    def _make_cosmo_job(self, rkf_path, i):

        sett = ADFCOSMORSCompoundJob.adf_settings(
            True, elements=list(set(at.symbol for at in self.mol)), atomic_ion=self.atomic_ion
        )
        sett.input.AMS.Task = "SinglePoint"
        sett.input.AMS.LoadSystem.File = str(rkf_path / "ams.rkf")
        sett.input.AMS.EngineRestart = str(rkf_path / "adf.rkf")

        return AMSJob(name=f"job_cosmo_c{i}", settings=sett)

    def _make_densf_job(self, rkf_path, i):

        densf_job = DensfJob(settings=ADFCOSMORSCompoundJob.densf_settings())
        densf_job.inputjob = str(rkf_path / "adf.rkf")
        densf_job.name = f"job_densf_c{i}"
        return densf_job

    def _add_filter(self, sett, job_id):

        if job_id < len(self.filters):
            filt = self.filters[job_id]
        else:
            filt = self.final_filter
        if filt is not None:
            if filt.max_num_confs is not None:
                sett.input.AMS.InputMaxConfs = filt.max_num_confs
            if filt.max_energy_range is not None:
                sett.input.AMS.InputMaxEnergy = filt.max_energy_range

    def _has_valid_filter_settings(self):

        for js, f in zip(self.job_settings, self.filters):
            if js is not None and not isinstance(js, Settings):
                log(f"[ADFCOSMORSConfJob] Wrong type for job_settings : {js}")
                return False
            if f is not None and not isinstance(f, ADFCOSMORSConfFilter):
                log(f"[ADFCOSMORSConfJob] Wrong type for job_settings : {f}")
                return False
        return True

MD Jobs

Convenience job classes for MD simulation setup, restart/spawning, and common trajectory analyses.

Overview

These classes wrap common AMS MolecularDynamics patterns (NVE/NVT/NPT), restarts and spawned ensembles, plus trajectory analysis jobs.

API

API reference
class AMSMDJob(velocities=None, timestep=None, samplingfreq=None, nsteps=None, checkpointfrequency=None, engineresultsfreq=None, writevelocities=None, writebonds=None, writemolecules=None, writecharges=None, writeenginegradients=None, calcpressure=None, molecule=None, temperature=None, thermostat=None, tau=None, thermostat_region=None, pressure=None, barostat=None, barostat_tau=None, scale=None, equal=None, constantvolume=None, binlog_time=None, binlog_pressuretensor=None, binlog_dipolemoment=None, _enforce_thermostat=False, _enforce_barostat=False, **kwargs)[source]
molecule: Molecule

The initial structure

name: str

The name of the job

settings: Settings

Settings for the job. You should normally not populate neither settings.input.ams.MolecularDynamics nor settings.input.ams.Task

velocities: float or AMSJob or str (path/to/ams.rkf) or 2-tuple (path/to/ams.rkf, frame-number)

If float, it is taken as the temperature. If AMSJob or str, the velocities are taken from the EndVelocities section of the corresponding ams.rkf file. If 2-tuple, the first item must be a path to an ams.rkf, and the second item an integer specifying the frame number - the velocities are then read from History%Velocities(framenumber).

timestep: float

Timestep

samplingfreq: int

Sampling frequency

nsteps: int

Length of simulation

Trajectory options:

checkpointfrequency: int

How frequently to write MDStep*.rkf checkpoint files

writevelocitiesbool

Whether to save velocities to ams.rkf (needed for example to restart from individual frames or to calculate velocity autocorrelation functions)

writebonds: bool

Whether to save bonds to ams.rkf

writemolecules: bool

Whether to write molecules to ams.rkf

writeenginegradients: bool

Whether to save engine gradients (negative of atomic forces) to ams.rkf

Thermostat (NVT, NPT) options:

thermostat: str

‘Berendsen’ or ‘NHC’

tau: float

Thermostat time constant (fs)

temperature: float or tuple of floats

Temperature (K). If a tuple/list of floats, the Thermostat.Duration option will be set to evenly divided intervals.

thermostat_region: str

Region for which to apply the thermostat

Barostat (NPT) options:

barostat: str

‘Berendsen’ or ‘MTK’

barostat_tau: float

Barostat time constant (fs)

pressure: float

Barostat pressure (pascal)

equal: str

‘XYZ’ etc.

scale: str

‘XYZ’ etc.

constantvolume: bool

Constant volume?

Other options:

calcpressure: bool

Whether to calculate pressure for each frame.

binlog_time: bool

Whether to log the time at every timestep in the BinLog section on ams.rkf

binlog_pressuretensor: bool

Whether to log the pressure tensor at every timestep in the BinLog section on ams.rkf

Parameters:
get_velocities_from(other_job, frame=None, update_molecule=True)[source]

Function to update the InitialVelocities block in self. It is normally not needed, instead use the e.g. AMSNVEJob.restart_from() function.

This function can be called in prerun() methods for MultiJobs

classmethod restart_from(other_job, frame=None, settings=None, use_prerun=False, **kwargs)[source]
other_job: AMSJob

The job to restart from.

frame: int

Which frame to read the structure and velocities from. If None, the final structure and end velocities will be used (section Molecule and MDResults%EndVelocities)

settings: Settings

Settings that override any other settings. All settings from other_job (e.g. the engine settings) are inherited by default but they can be overridden here.

use_prerun: bool

If True, the molecule and velocities will only be read from other_job inside the prerun() method. Set this to True to prevent PLAMS from waiting for other_job to finish as soon as the new job is defined.

kwargs: many options

See the docstring for AMSMDJob.

class AMSNVEJob(**kwargs)[source]

A class for running NVE MD simulations

class AMSNVTJob(**kwargs)[source]

A class for running NVT MD simulations

class AMSNPTJob(**kwargs)[source]

A class for running NPT MD simulations

class AMSNPTResults(*args, **kwargs)[source]
Parameters:
  • args (Any) –

  • kwargs (Any) –

get_equilibrated_molecule(equilibration_fraction=0.667, return_index=False)[source]

Discards the first equilibration_fraction of the trajectory. Calculates the average density of the rest. Returns the molecule with the closest density to the average density among the remaining trajectory.

class AMSNVESpawnerJob(previous_job, n_nve=1, name='nvespawnerjob', **kwargs)[source]

A class for running multiple NVE simulations with initial structures/velocities taken from an NVT trajectory. The NVT trajectory must contain the velocities!

__init__(previous_job, n_nve=1, name='nvespawnerjob', **kwargs)[source]
previous_job: AMSJob

An AMSJob with an MD trajectory. Must contain velocities (WriteVelocities Yes). Note that the trajectory should have been equilibrated before it starts.

n_nveint

The number of NVE simulations to spawn

All other settings can be set as for an AMSNVEJob (e.g. nsteps).

prerun()[source]

Constructs the children jobs

class AMSMDScanDensityJob(molecule, scan_density_upper=1.4, startstep=None, **kwargs)[source]

A class for scanning the density using MD Deformations

Parameters:

molecule (Molecule | Dict[str, Molecule] | ChemicalSystem | Dict[str, ChemicalSystem] | None) –

class AMSMSDJob(previous_job=None, max_correlation_time_fs=10000, start_time_fit_fs=2000, atom_indices=None, **kwargs)[source]

A convenient class wrapping around the trajectory analysis MSD tool

Parameters:
  • max_correlation_time_fs (float) –

  • start_time_fit_fs (float) –

  • atom_indices (List[int]) –

__init__(previous_job=None, max_correlation_time_fs=10000, start_time_fit_fs=2000, atom_indices=None, **kwargs)[source]
previous_job: AMSJob

An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.

max_correlation_time_fs: float

Maximum correlation time in femtoseconds

start_time_fit_fsfloat

Smallest correlation time for the linear fit

atom_indicesList[int]

If None, use all atoms. Otherwise use the specified atom indices (starting with 1)

kwargs: dict

Other options to AMSAnalysisJob

Parameters:
  • max_correlation_time_fs (float) –

  • start_time_fit_fs (float) –

  • atom_indices (List[int] | None) –

get_input()[source]

Generate the input file

prerun()[source]

Constructs the final settings

postrun()[source]

Creates msd.txt, fit_msd.txt, and D.txt

class AMSRDFJob(previous_job=None, atom_indices=None, atom_indices_to=None, rmin=0.5, rmax=6.0, rstep=0.1, **kwargs)[source]
__init__(previous_job=None, atom_indices=None, atom_indices_to=None, rmin=0.5, rmax=6.0, rstep=0.1, **kwargs)[source]
previous_job: AMSJob

AMSJob with finished MD trajectory.

atom_indices: List[int]

Atom indices (starting with 1). If None, calculate RDF from all atoms.

atom_indices_to: List[int]

Atom indices (starting with 1). If None, calculate RDF to all atoms.

rmin: float

Minimum distance (angstrom)

rmax: float

Maximum distance (angstrom)

rstep: float

Bin width for the histogram (angstrom)

get_input()[source]

Generate the input file

prerun()[source]

Creates the final settings. Do not call or override this method.

postrun()[source]

Creates rdf.txt. Do not call or override this method.

class AMSVACFJob(previous_job=None, max_correlation_time_fs=5000, max_freq=5000, atom_indices=None, **kwargs)[source]

A class for calculating the velocity autocorrelation function and its power spectrum

__init__(previous_job=None, max_correlation_time_fs=5000, max_freq=5000, atom_indices=None, **kwargs)[source]
previous_job: AMSJob

An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.

max_correlation_time_fs: float

Maximum correlation time in femtoseconds

max_freq: float

The maximum frequency for the power spectrum in cm^-1

atom_indices: List[int]

Atom indices (starting with 1). If None, use all atoms.

get_input()[source]

Generate the input file

prerun()[source]

Creates final settings

postrun()[source]

Creates vacf.txt and power_spectrum.txt

class AMSViscosityFromBinLogJob(previous_job=None, **kwargs)[source]

A convenient class wrapping around the trajectory analysis ViscosityFromBinLog tool. Only runs with default input options (max correlation time 10% of the total trajectory).

__init__(previous_job=None, **kwargs)[source]
previous_job: AMSJob

An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts. It must have been run with the BinLog%PressureTensor option set.

kwargs: dict

Other options to AMSAnalysisJob

get_input()[source]

Generate the input file

prerun()[source]

Constructs the final settings

postrun()[source]

Creates viscosity_integral, fit_viscosity_integral, and viscosity.txt

Code

Recipe module source: amsmdjob.py
from scm.plams.core.functions import add_to_instance
from scm.plams.core.settings import Settings
from scm.plams.interfaces.adfsuite.ams import AMSJob, AMSResults
import numpy as np
from scm.plams.tools.kftools import KFFile
from scm.plams.tools.units import Units
from typing import Union
import scm.plams as plams

__all__ = ["AMSMDJob", "AMSNVEJob", "AMSNVTJob", "AMSNPTJob"]


class AMSMDJob(AMSJob):
    """

    molecule: Molecule
        The initial structure

    name: str
        The name of the job

    settings: Settings
        Settings for the job. You should normally not populate neither settings.input.ams.MolecularDynamics nor settings.input.ams.Task

    velocities: float or AMSJob or str (path/to/ams.rkf) or 2-tuple (path/to/ams.rkf, frame-number)
        If float, it is taken as the temperature. If AMSJob or str, the velocities are taken from the EndVelocities section of the corresponding ams.rkf file. If 2-tuple, the first item must be a path to an ams.rkf, and the second item an integer specifying the frame number - the velocities are then read from History%Velocities(framenumber).

    timestep: float
        Timestep

    samplingfreq: int
        Sampling frequency

    nsteps: int
        Length of simulation

    **Trajectory options**:

    checkpointfrequency: int
        How frequently to write MDStep*.rkf checkpoint files

    writevelocities : bool
        Whether to save velocities to ams.rkf (needed for example to restart from individual frames or to calculate velocity autocorrelation functions)

    writebonds: bool
        Whether to save bonds to ams.rkf

    writemolecules: bool
        Whether to write molecules to ams.rkf

    writeenginegradients: bool
        Whether to save engine gradients (negative of atomic forces) to ams.rkf

    **Thermostat (NVT, NPT) options**:

    thermostat: str
        'Berendsen' or 'NHC'

    tau: float
        Thermostat time constant (fs)

    temperature: float or tuple of floats
        Temperature (K). If a tuple/list of floats, the Thermostat.Duration option will be set to evenly divided intervals.

    thermostat_region: str
        Region for which to apply the thermostat

    **Barostat (NPT) options**:

    barostat: str
        'Berendsen' or 'MTK'

    barostat_tau: float
        Barostat time constant (fs)

    pressure: float
        Barostat pressure (pascal)

    equal: str
        'XYZ' etc.

    scale: str
        'XYZ' etc.

    constantvolume: bool
        Constant volume?

    **Other options**:

    calcpressure: bool
        Whether to calculate pressure for each frame.

    binlog_time: bool
        Whether to log the time at every timestep in the BinLog section on ams.rkf

    binlog_pressuretensor: bool
        Whether to log the pressure tensor at every timestep in the BinLog section on ams.rkf



    """

    default_nsteps = 1000
    default_timestep = 0.25
    default_samplingfreq = 100

    default_thermostat = "NHC"
    default_temperature = 300
    default_tau_multiplier = 400  # get tau by multiplying the timestep with this number
    default_thermostat_region = None

    default_barostat = "MTK"
    default_pressure = 1e5
    default_barostat_tau_multiplier = 4000  # get barostat_tau by multiplying the timestep with this number
    default_scale = "XYZ"
    default_equal = "None"
    default_constantvolume = "False"

    default_checkpointfrequency = 1000
    default_writevelocities = "True"
    default_writebonds = "True"
    default_writecharges = "True"
    default_writemolecules = "True"
    default_writeenginegradients = "False"

    default_calcpressure = "False"
    default_binlog_time = "False"
    default_binlog_pressuretensor = "False"
    default_binlog_dipolemoment = "False"

    def __init__(
        self,
        velocities=None,
        timestep=None,
        samplingfreq: int = None,
        nsteps: int = None,
        checkpointfrequency=None,
        engineresultsfreq=None,
        writevelocities=None,
        writebonds=None,
        writemolecules=None,
        writecharges=None,
        writeenginegradients=None,
        calcpressure=None,
        molecule: Union[plams.Molecule, AMSJob, AMSResults] = None,
        temperature: float = None,
        thermostat=None,
        tau=None,
        thermostat_region=None,
        pressure=None,
        barostat=None,
        barostat_tau=None,
        scale=None,
        equal=None,
        constantvolume=None,
        binlog_time=None,
        binlog_pressuretensor=None,
        binlog_dipolemoment=None,
        _enforce_thermostat=False,
        _enforce_barostat=False,
        **kwargs,
    ):
        """ """

        if isinstance(molecule, AMSJob):
            molecule = molecule.results.get_main_molecule()
        if isinstance(molecule, AMSResults):
            molecule = molecule.get_main_molecule()
        AMSJob.__init__(self, molecule=molecule, **kwargs)

        self.settings.input.ams.Task = "MolecularDynamics"
        mdsett = self.settings.input.ams.MolecularDynamics

        mdsett.TimeStep = timestep or mdsett.TimeStep or self.default_timestep
        mdsett.Trajectory.SamplingFreq = samplingfreq or mdsett.Trajectory.SamplingFreq or self.default_samplingfreq
        if engineresultsfreq is not None:
            # keyword was introduced in AMS2025, so avoid always setting this input option in order
            # to be able to use the AMSMDJob class with older AMS versions.
            mdsett.Trajectory.EngineResultsFreq = engineresultsfreq
        mdsett.NSteps = nsteps or mdsett.NSteps or self.default_nsteps

        mdsett.Trajectory.WriteVelocities = (
            str(writevelocities)
            if writevelocities is not None
            else mdsett.Trajectory.WriteVelocities or self.default_writevelocities
        )
        mdsett.Trajectory.WriteBonds = (
            str(writebonds) if writebonds is not None else mdsett.Trajectory.WriteBonds or self.default_writebonds
        )
        mdsett.Trajectory.WriteMolecules = (
            str(writemolecules)
            if writemolecules is not None
            else mdsett.Trajectory.WriteMolecules or self.default_writemolecules
        )
        mdsett.Trajectory.WriteCharges = (
            str(writecharges)
            if writecharges is not None
            else mdsett.Trajectory.WriteCharges or self.default_writecharges
        )
        mdsett.Trajectory.WriteEngineGradients = (
            str(writeenginegradients)
            if writeenginegradients is not None
            else mdsett.Trajectory.WriteEngineGradients or self.default_writeenginegradients
        )
        mdsett.CalcPressure = (
            str(calcpressure) if calcpressure is not None else mdsett.CalcPressure or self.default_calcpressure
        )
        mdsett.Checkpoint.Frequency = (
            checkpointfrequency or mdsett.Checkpoint.Frequency or self.default_checkpointfrequency
        )

        mdsett.BinLog.Time = (
            str(binlog_time) if binlog_time is not None else mdsett.BinLog.Time or self.default_binlog_time
        )
        mdsett.BinLog.PressureTensor = (
            str(binlog_pressuretensor)
            if binlog_pressuretensor is not None
            else mdsett.BinLog.PressureTensor or self.default_binlog_pressuretensor
        )
        mdsett.BinLog.DipoleMoment = (
            str(binlog_dipolemoment)
            if binlog_dipolemoment is not None
            else mdsett.BinLog.DipoleMoment or self.default_binlog_dipolemoment
        )

        if velocities is None and temperature is not None:
            velocities = self._get_initial_temperature(temperature)
        self.settings += self._velocities2settings(velocities)

        if temperature or thermostat or _enforce_thermostat:
            self.settings.update(
                self._get_thermostat_settings(
                    thermostat=thermostat,
                    temperature=temperature,
                    tau=tau,
                    thermostat_region=thermostat_region,
                    nsteps=int(mdsett.NSteps),
                )
            )

        if pressure or barostat or _enforce_barostat:
            self.settings.update(
                self._get_barostat_settings(
                    pressure=pressure,
                    barostat=barostat,
                    barostat_tau=barostat_tau,
                    scale=scale,
                    equal=equal,
                    constantvolume=constantvolume,
                )
            )

    def remove_blocks(self, blocks=None):
        if blocks:
            for block in blocks:
                if block in self.settings.input.ams.MolecularDynamics:
                    del self.settings.input.ams.MolecularDynamics[block]

    @staticmethod
    def _get_initial_temperature(temperature):
        try:
            return temperature[0]
        except TypeError:
            return temperature

    @staticmethod
    def _velocities2settings(velocities):
        s = Settings()
        if isinstance(velocities, int) or isinstance(velocities, float) or velocities is None or velocities is False:
            s.input.ams.MolecularDynamics.InitialVelocities.Type = "Random"
            s.input.ams.MolecularDynamics.InitialVelocities.Temperature = velocities or AMSMDJob.default_temperature
        elif isinstance(velocities, tuple):
            # file and frame number
            f = velocities[0]
            frame = velocities[1]
            vels = KFFile(f).read("MDHistory", f"Velocities({frame})")
            vels = np.array(vels).reshape(-1, 3) * Units.convert(1.0, "bohr", "angstrom")  # angstrom/fs
            s.input.ams.MolecularDynamics.InitialVelocities.Type = "Input"
            values = ""
            for x in vels:
                values += 6 * " " + " ".join(str(y) for y in x) + "\n"
            s.input.ams.MolecularDynamics.InitialVelocities.Values._h = "   # From {} frame {}".format(f, frame)
            s.input.ams.MolecularDynamics.InitialVelocities.Values._1 = values
        else:
            s.input.ams.MolecularDynamics.InitialVelocities.Type = "FromFile"
            s.input.ams.MolecularDynamics.InitialVelocities.File = velocities
        return s

    def get_velocities_from(self, other_job, frame=None, update_molecule=True):
        """
        Function to update the InitialVelocities block in self. It is normally not needed, instead use the e.g. AMSNVEJob.restart_from() function.

        This function can be called in prerun() methods for MultiJobs
        """
        _, velocities, molecule, _ = self._get_restart_job_velocities_molecule(other_job, frame=frame)
        del self.settings.input.ams.MolecularDynamics.InitialVelocities
        self.settings += self._velocities2settings(velocities)

        if update_molecule:
            self.molecule = molecule

    @staticmethod
    def _get_restart_job_velocities_molecule(other_job, frame=None, settings=None, get_velocities_molecule=True):
        """
        other_job: str or some AMSMdJob

        get_velocities_molecule : bool
            Whether to get the velocities and molecule right now. Set to False to not access other_job.results (for use with MultiJob prerun() methods)

        Returns: (other_job [AMSJob], velocities, molecule, extra_settings [Settings])
        """
        if isinstance(other_job, str):
            other_job = AMSJob.load_external(other_job)
        if get_velocities_molecule:
            if frame:
                velocities = (other_job.results.rkfpath(), frame)
                molecule = other_job.results.get_history_molecule(frame)
            else:
                velocities = other_job
                molecule = other_job.results.get_main_molecule()
        else:
            velocities = None
            molecule = None

        extra_settings = other_job.settings.copy()
        if settings:
            extra_settings.update(settings)

        if "InitialVelocities" in extra_settings.input.ams.MolecularDynamics:
            del extra_settings.input.ams.MolecularDynamics.InitialVelocities

        if "System" in extra_settings.input.ams:
            del extra_settings.input.ams.System

        return other_job, velocities, molecule, extra_settings

    def _get_thermostat_settings(self, thermostat, temperature, tau, thermostat_region, nsteps: int):
        s = Settings()
        prev_thermostat_settings = self.settings.input.ams.MolecularDynamics.Thermostat
        if isinstance(prev_thermostat_settings, list):
            prev_thermostat_settings = prev_thermostat_settings[0]
        s.input.ams.MolecularDynamics.Thermostat.Type = (
            thermostat or prev_thermostat_settings.Type or self.default_thermostat
        )
        try:
            n_temperatures = len(temperature)
            my_temperature = " ".join(str(x) for x in temperature)
            if n_temperatures > 1:
                nsteps_per_temperature = nsteps // (len(temperature) - 1)
                s.input.ams.MolecularDynamics.Thermostat.Duration = " ".join(
                    [str(nsteps_per_temperature)] * (n_temperatures - 1)
                )
            else:
                s.input.ams.MolecularDynamics.Thermostat.Duration = None
        except TypeError:
            my_temperature = temperature
            s.input.ams.MolecularDynamics.Thermostat.Duration = None

        s.input.ams.MolecularDynamics.Thermostat.Region = (
            thermostat_region or prev_thermostat_settings.get("Region", None) or self.default_thermostat_region
        )
        s.input.ams.MolecularDynamics.Thermostat.Temperature = (
            my_temperature or prev_thermostat_settings.Temperature or self.default_temperature
        )
        s.input.ams.MolecularDynamics.Thermostat.Tau = (
            tau
            or prev_thermostat_settings.Tau
            or float(self.settings.input.ams.MolecularDynamics.TimeStep) * AMSMDJob.default_tau_multiplier
        )
        return s

    def _get_barostat_settings(self, pressure, barostat, barostat_tau, scale, equal, constantvolume):
        s = Settings()
        self.settings.input.ams.MolecularDynamics.Barostat.Type = (
            barostat or self.settings.input.ams.MolecularDynamics.Barostat.Type or self.default_barostat
        )
        self.settings.input.ams.MolecularDynamics.Barostat.Pressure = (
            pressure
            if pressure is not None
            else self.settings.input.ams.MolecularDynamics.Barostat.Pressure or self.default_pressure
        )
        self.settings.input.ams.MolecularDynamics.Barostat.Tau = (
            barostat_tau
            or self.settings.input.ams.MolecularDynamics.Barostat.Tau
            or float(self.settings.input.ams.MolecularDynamics.TimeStep) * AMSMDJob.default_barostat_tau_multiplier
        )
        self.settings.input.ams.MolecularDynamics.Barostat.Scale = (
            scale or self.settings.input.ams.MolecularDynamics.Barostat.Scale or self.default_scale
        )
        self.settings.input.ams.MolecularDynamics.Barostat.Equal = (
            equal or self.settings.input.ams.MolecularDynamics.Barostat.Equal or self.default_equal
        )
        self.settings.input.ams.MolecularDynamics.Barostat.ConstantVolume = (
            str(constantvolume)
            if constantvolume is not None
            else self.settings.input.ams.MolecularDynamics.Barostat.ConstantVolume or self.default_constantvolume
        )
        return s

    @classmethod
    def restart_from(cls, other_job, frame=None, settings=None, use_prerun=False, **kwargs):
        """

        other_job: AMSJob
            The job to restart from.

        frame: int
            Which frame to read the structure and velocities from. If None, the final structure and end velocities will be used (section Molecule and MDResults%EndVelocities)

        settings: Settings
            Settings that override any other settings. All settings from other_job (e.g. the engine settings) are inherited by default but they can be overridden here.

        use_prerun: bool
            If True, the molecule and velocities will only be read from other_job inside the prerun() method. Set this to True to prevent PLAMS from waiting for other_job to finish as soon as the new job is defined.

        kwargs: many options
            See the docstring for AMSMDJob.

        """
        other_job, velocities, molecule, extra_settings = cls._get_restart_job_velocities_molecule(
            other_job, frame, settings, get_velocities_molecule=not use_prerun
        )
        job = cls(settings=extra_settings, velocities=velocities, molecule=molecule, **kwargs)

        if use_prerun:

            @add_to_instance(job)
            def prerun(self):  # noqa F811
                self.get_velocities_from(other_job, frame=frame, update_molecule=True)

        return job


class AMSNVEJob(AMSMDJob):
    """
    A class for running NVE MD simulations
    """

    def __init__(self, **kwargs):
        AMSMDJob.__init__(self, **kwargs)
        self.remove_blocks(["thermostat", "barostat", "deformation", "nanoreactor"])


class AMSNVTJob(AMSMDJob):
    """
    A class for running NVT MD simulations
    """

    def __init__(self, **kwargs):
        AMSMDJob.__init__(self, _enforce_thermostat=True, **kwargs)
        self.remove_blocks(["barostat", "deformation", "nanoreactor"])


class AMSNPTResults(AMSResults):
    def get_equilibrated_molecule(self, equilibration_fraction=0.667, return_index=False):
        """

        Discards the first equilibration_fraction of the trajectory.
        Calculates the average density of the rest. Returns the molecule
        with the closest density to the average density among the remaining
        trajectory.

        """
        densities = self.job.results.get_history_property("Density", "MDHistory")
        analyze_from = int(len(densities) * equilibration_fraction)
        # take structure closest to the target density
        avg_density = np.mean(densities[analyze_from:])  # amu/bohr^3
        delta = np.array(densities[analyze_from:]) - avg_density
        delta = np.abs(delta)
        min_index = np.argmin(delta)
        min_index += analyze_from
        mol = self.job.results.get_history_molecule(min_index + 1)

        if return_index:
            return mol, min_index + 1

        return mol


class AMSNPTJob(AMSMDJob):
    """
    A class for running NPT MD simulations
    """

    _result_type = AMSNPTResults

    def __init__(self, **kwargs):
        AMSMDJob.__init__(self, _enforce_thermostat=True, _enforce_barostat=True, **kwargs)
        self.settings.input.ams.MolecularDynamics.CalcPressure = "True"
        self.remove_blocks(["deformation", "nanoreactor"])
Recipe module source: nvespawner.py
from collections import OrderedDict

from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.core.enums import JobStatus

import numpy as np
from scm.plams.recipes.md.amsmdjob import AMSNVEJob

__all__ = ["AMSNVESpawnerJob", "AMSNVESpawnerResults"]


class AMSNVESpawnerResults(Results):
    """Results class for AMSNVESpawnerJob"""

    def get_all_dipole_derivatives_acf(
        self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, x=True, y=True, z=True, normalize=False
    ):
        ret_x, ret_y = [], []
        for job in self.job.children.values():
            xv, yv = job.results.get_dipole_derivatives_acf(
                start_fs=start_fs,
                end_fs=end_fs,
                every_fs=every_fs,
                max_dt_fs=max_dt_fs,
                x=x,
                y=y,
                z=z,
                normalize=normalize,
            )
            ret_x.append(xv)
            ret_y.append(yv)

        return ret_x, ret_y

    def get_dipole_derivatives_acf(
        self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, x=True, y=True, z=True, normalize=False
    ):
        all_x, all_y = self.get_all_dipole_derivatives_acf(
            start_fs=start_fs, end_fs=end_fs, every_fs=every_fs, max_dt_fs=max_dt_fs, x=x, y=y, z=z, normalize=normalize
        )
        return np.mean(all_x, axis=0), np.mean(all_y, axis=0)

    def get_mean_temperature(self) -> float:
        avg_T = np.mean([job.results.readrkf("MDResults", "MeanTemperature") for job in self.job.children.values()])

        return avg_T


class AMSNVESpawnerJob(MultiJob):
    """A class for running multiple NVE simulations with initial structures/velocities taken from an NVT trajectory. The NVT trajectory must contain the velocities!"""

    _result_type = AMSNVESpawnerResults

    def _default_settings(self):
        s = Settings()
        s.input.ForceField.Type = "GAFF"
        s.input.ForceField.AnteChamberIntegration = "Yes"
        return s

    def __init__(self, previous_job, n_nve=1, name="nvespawnerjob", **kwargs):  # needs to be finished
        """
        previous_job: AMSJob
            An AMSJob with an MD trajectory. Must contain velocities (WriteVelocities Yes). Note that the trajectory should have been equilibrated before it starts.

        n_nve : int
            The number of NVE simulations to spawn

        All other settings can be set as for an AMSNVEJob (e.g. ``nsteps``).

        """
        MultiJob.__init__(self, children=OrderedDict(), name=name)

        self.previous_job = previous_job
        self.n_nve = n_nve
        self.nve_constructor_settings = kwargs
        self.nve_jobs = []

    def prerun(self):  # noqa F811
        """
        Constructs the children jobs
        """

        # use previously run previous_job
        assert self.previous_job.status != JobStatus.CREATED, "You can only pass in a finished AMSJob"
        try:
            self.previous_job.results.readrkf("MDHistory", "Velocities(1)")
        except KeyError:
            raise KeyError("Couldn't read velocities from {}".format(self.previous_job.results.rkfpath()))

        nframes_in_history = self.previous_job.results.readrkf("History", "nEntries")

        if self.n_nve > 0:
            interval = nframes_in_history // self.n_nve
            frames = np.linspace(interval, nframes_in_history, self.n_nve, dtype=int)
            for i, frame in enumerate(frames):
                # the nve jobs only get different inside prerun, so need to do something to make them different to prevent the PLAMS rerun prevention
                name = f"nve{i+1}"
                self.settings.input.ams["#"] = name  # add a comment line

                self.children[name] = AMSNVEJob.restart_from(
                    self.previous_job, frame=frame, name=name, use_prerun=True, **self.nve_constructor_settings
                )
                self.children[name].from_frame = frame

                self.nve_jobs.append(self.children[name])
Recipe module source: scandensity.py
from scm.plams.core.settings import Settings
from scm.plams.interfaces.adfsuite.ams import AMSResults
import numpy as np
from scm.plams.recipes.md.amsmdjob import AMSNVTJob

__all__ = ["AMSMDScanDensityJob", "AMSMDScanDensityResults"]


class AMSMDScanDensityResults(AMSResults):
    """Results class for AMSMDScanDensityJob"""

    def get_lowest_energy_index(self, variable="Energy", history_section="History"):
        """
        Returns the 1-based index of the lowest energy molecule
        """
        energies = self.get_history_property(variable, history_section)
        minindex = np.argmin(energies) + 1
        return minindex

    def get_lowest_energy_molecule(self, variable="TotalEnergy"):
        return self.get_history_molecule(self.get_lowest_energy_index(variable, "MDHistory"))


class AMSMDScanDensityJob(AMSNVTJob):
    """A class for scanning the density using MD Deformations"""

    _result_type = AMSMDScanDensityResults

    def __init__(self, molecule, scan_density_upper=1.4, startstep=None, **kwargs):
        AMSNVTJob.__init__(self, molecule=molecule, **kwargs)

        from_density = self.molecule.get_density() * 1e-3
        orig_length = self.molecule.cell_lengths()
        density_ratio = from_density / scan_density_upper
        new_length = [x * density_ratio**0.333333 for x in orig_length]

        self.settings.input.ams.MolecularDynamics.NSteps

        self.scan_density_upper = scan_density_upper
        self.startstep = startstep or 1

        s = Settings()
        s.input.ams.MolecularDynamics.Deformation.TargetLength = " ".join([str(x) for x in new_length])
        s.input.ams.MolecularDynamics.Deformation.StartStep = self.startstep

        self.settings += s
Recipe module source: trajectoryanalysis.py
import copy
from collections import OrderedDict, defaultdict
from collections.abc import Iterable
from typing import List

import numpy as np
from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.core.functions import requires_optional_package
from scm.plams.interfaces.adfsuite.amsanalysis import AMSAnalysisJob, AMSAnalysisResults
from scm.plams.tools.units import Units
from scm.plams.core.enums import JobStatus

__all__ = ["AMSRDFJob", "AMSMSDJob", "AMSMSDResults", "AMSVACFJob", "AMSVACFResults"]


class MDAnalysisSettingsError(Exception):
    """
    Error in supplied settings object
    """


class AMSConvenientAnalysisJob(AMSAnalysisJob):
    _task = "None"

    def __init__(self, previous_job=None, atom_indices=None, **kwargs):  # needs to be finished
        """
        previous_job: AMSJob
            An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.

        All other settings can be set as for AMS

        """
        AMSAnalysisJob.__init__(self, **kwargs)
        if not self._has_settings_entry(self.settings.input, "Task"):
            self.settings.input.Task = self._task

        self.previous_job = previous_job
        self.atom_indices = atom_indices
        self._settings_updated = False

    def _get_max_dt_frames(self, max_correlation_time_fs):
        if max_correlation_time_fs is None:
            return None

        # Adjust for multiple previous_jobs
        prevjobs = self.previous_job
        if not isinstance(prevjobs, list):
            prevjobs = [prevjobs]

        # Read history and time step
        # FIXME: Does not account for the presence of Range of StepSize (which the user may have provided)
        historylength = 0
        for prevjob in prevjobs:
            historylength += prevjob.results.readrkf("History", "nEntries")
        timestep = prevjobs[0].results.get_time_step()
        max_dt_frames = int(max_correlation_time_fs / timestep)
        max_dt_frames = min(max_dt_frames, historylength // 2)
        return max_dt_frames

    def get_input(self):
        """
        Generate the input file
        """
        self._settings_to_list(self.settings.input, self._task)

        if not self._settings_updated:
            if self.atom_indices and self._parent_write_atoms:
                section = getattr(self.settings.input, self._task)
                for entry in section:
                    if not (self._has_settings_entry(entry, "Atoms") and self._has_settings_entry(entry.Atoms, "Atom")):
                        self._add_nonunique_settings_entries(entry.Atoms, "Atom", self.atom_indices)
                    else:
                        msg = "Atom indices cannot be supplied as argument if already present in settings"
                        raise MDAnalysisSettingsError(msg)
            self._settings_updated = True
        return super().get_input()

    @staticmethod
    def _settings_to_list(settings, entry, nentries=1):
        """
        Convert the main settings object for this Task to a list of settings objects
        """
        if isinstance(settings, Settings):
            # This is not a PISA object
            if AMSConvenientAnalysisJob._has_settings_entry(settings, entry):
                # If this is not a list-type object, make it into one
                if isinstance(settings[entry], dict) or not (isinstance(settings[entry], Iterable)):
                    settings[entry] = [settings[entry]]
            else:
                settings[entry] = [Settings() for i in range(nentries)]
        else:
            # Workaround for a PISA bug, where the object can have updated values but length 0
            block = getattr(settings, entry)
            if len(block) == 0:
                if block.value_changed:
                    s = copy.deepcopy(block)
                    block[0] = s
                else:
                    # I need there to be a single entry
                    for i in range(nentries):
                        s = block[i]

    @staticmethod
    def _has_settings_entry(settings, entry):
        """
        Check if this Settings or PISA object contains this entry already
        """
        if isinstance(settings, Settings):
            return entry in settings
        else:
            return getattr(settings, entry).value_changed

    @staticmethod
    def _add_nonunique_settings_entries(settings, key, entries):
        """
        For non-default entries, multiple entries can be supplied

        * ``settings`` -- Settings or PISA object to which the entries should be added
        * ``entries`` -- Iterator for multiple settings entries
        """
        if not isinstance(settings, Settings):
            for i, entry in enumerate(entries):
                subsettings = getattr(settings, key)
                subsettings[i] = entry
        else:
            settings[key] = entries

    def _parent_prerun(self):
        """
        Possibly wait for previous job to finish before adding path
        """
        prevjobs = self.previous_job
        if not isinstance(prevjobs, list):
            prevjobs = [prevjobs]

        # Turn trajectory settings into a list,
        # If no entries yet exist, it will be a list of empty entries
        self._settings_to_list(self.settings.input.TrajectoryInfo, "Trajectory", nentries=len(prevjobs))

        nblocks = len(self.settings.input.TrajectoryInfo.Trajectory)
        msg = "Number of Trajectory blocks must match number of previous_jobs"
        assert nblocks == len(prevjobs) or nblocks == 0, msg

        # Overwrite KFFilename with that of previous_job
        for i, prevjob in enumerate(prevjobs):
            assert prevjob.status != JobStatus.CREATED, "You can only pass in a finished AMSJob"
            self.settings.input.TrajectoryInfo.Trajectory[i].KFFilename = prevjob.results.rkfpath()


class AMSMSDResults(AMSAnalysisResults):
    """Results class for AMSMSDJob"""

    def get_msd(self):
        """
        returns time [fs], msd [ang^2]
        """
        msd_xy = self.get_xy()
        time = np.array(msd_xy.x[0])  # fs
        y = np.array(msd_xy.y) * Units.convert(1.0, "bohr", "angstrom") ** 2

        return time, y

    @requires_optional_package("scipy")
    def get_linear_fit(self, start_time_fit_fs=None, end_time_fit_fs=None):
        """
        Fits the MSD between start_time_fit_fs and end_time_fit_fs

        Returns a 3-tuple LinRegress.result, fit_x (fs), fit_y (ang^2)

        result.slope is given in ang^2/fs

        """
        from scipy.stats import linregress

        time, y = self.get_msd()
        end_time_fit_fs = end_time_fit_fs or max(time)
        start_time_fit_fs = start_time_fit_fs or self.job.start_time_fit_fs

        if start_time_fit_fs >= end_time_fit_fs:
            start_time_fit_fs = end_time_fit_fs / 2

        y = y[time >= start_time_fit_fs]
        time = time[time >= start_time_fit_fs]
        y = y[time <= end_time_fit_fs]
        time = time[time <= end_time_fit_fs]

        result = linregress(time, y)
        fit_x = time
        fit_y = result.slope * fit_x + result.intercept

        return result, fit_x, fit_y

    def get_diffusion_coefficient(self, start_time_fit_fs=None, end_time_fit_fs=None):
        """
        Returns D in m^2/s
        """
        result, _, _ = self.get_linear_fit(start_time_fit_fs=start_time_fit_fs, end_time_fit_fs=end_time_fit_fs)
        D = result.slope * 1e-20 / (6 * 1e-15)  # convert from ang^2/fs to m^2/s, divide by 6 because 3-dimensional (2d)
        return D


class AMSMSDJob(AMSConvenientAnalysisJob):
    """A convenient class wrapping around the trajectory analysis MSD tool"""

    results: AMSMSDResults
    _result_type = AMSMSDResults
    _parent_write_atoms = True
    _task = "MeanSquareDisplacement"

    def __init__(
        self,
        previous_job=None,  # needs to be finished
        max_correlation_time_fs: float = 10000,
        start_time_fit_fs: float = 2000,
        atom_indices: List[int] = None,
        **kwargs,
    ):
        """
        previous_job: AMSJob
            An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.

        max_correlation_time_fs: float
            Maximum correlation time in femtoseconds

        start_time_fit_fs : float
            Smallest correlation time for the linear fit

        atom_indices : List[int]
            If None, use all atoms. Otherwise use the specified atom indices (starting with 1)

        kwargs: dict
            Other options to AMSAnalysisJob

        """
        AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)

        self.max_correlation_time_fs = max_correlation_time_fs
        self.start_time_fit_fs = start_time_fit_fs

    def get_input(self):
        """
        Generate the input file
        """
        self._settings_to_list(self.settings.input, self._task)

        for settings in self.settings.input.MeanSquareDisplacement:
            if not self._has_settings_entry(settings, "Property"):
                settings.Property = "DiffusionCoefficient"
            if not self._has_settings_entry(settings, "StartTimeSlope"):
                settings.StartTimeSlope = self.start_time_fit_fs
        return super().get_input()

    def prerun(self):  # noqa F811
        """
        Constructs the final settings
        """
        self._parent_prerun()  # trajectory and atom_indices handled
        self._settings_to_list(self.settings.input, self._task)

        for settings in self.settings.input.MeanSquareDisplacement:
            if not self._has_settings_entry(settings, "MaxFrame"):
                max_dt_frames = self._get_max_dt_frames(self.max_correlation_time_fs)
                settings.MaxFrame = max_dt_frames

    def postrun(self):
        """
        Creates msd.txt, fit_msd.txt, and D.txt
        """
        time, msd = self.results.get_msd()
        with open(self.path + "/msd.txt", "w") as f:
            f.write("#Time(fs) MSD(ang^2)")
            for x, y in zip(time, msd):
                f.write(f"{x} {y}\n")

        _, fit_x, fit_y = self.results.get_linear_fit()
        with open(self.path + "/fit_msd.txt", "w") as f:
            f.write("#Time(fs), LinearFitToMSD(ang^2)")
            for x, y in zip(fit_x, fit_y):
                f.write(f"{x} {y}\n")

        D = self.results.get_diffusion_coefficient()
        with open(self.path + "/D.txt", "w") as f:
            f.write(f"{D}\n")


def viscosity_double_exponential(x, A, lam, tau1, tau2):
    return A * (lam * (1 - np.exp(-x / tau1)) + (1 - lam) * (1 - np.exp(-x / tau2)))


class AMSViscosityFromBinLogResults(AMSAnalysisResults):
    """Results class for AMSViscosityFromBinLogJob"""

    def get_viscosity_integral(self):
        """Extract the running viscosity integral.

        Returns a 2-tuple with 1D numpy arrays ``time, viscosity_integral``, with time in fs and viscosity_integral in Pa*s.
        """

        xy = self.get_xy("Integral")
        time = np.array(xy.x[0])  # fs
        y = np.array(xy.y)  # Pa s

        return time, y

    @requires_optional_package("scipy")
    def get_double_exponential_fit(self):
        """Perform a double exponential fit to the viscosity integral.

        The fitted function is of the form

        ``A * (lam * (1 - np.exp(-x / tau1)) + (1 - lam) * (1 - np.exp(-x / tau2)))``

        where ``A`` is the limiting value in the infinite time limit.

        Returns: a 3-tuple ``popt, time, prediction``, where

        - ``popt`` is a 4-tuple containing A, lam, tau1, and tau2,

        - ``time`` is in fs, and

        - ``prediction`` is the value of the above function vs time.
        """

        from scipy.optimize import curve_fit

        x, y = self.get_viscosity_integral()

        # The fit
        f, p0 = viscosity_double_exponential, (y[-1], 0.5, 0.1 * x[-1], 0.9 * x[-1])
        popt, _ = curve_fit(f, x, y, p0=p0)
        prediction = f(x, *popt)

        return popt, x, prediction


class AMSViscosityFromBinLogJob(AMSConvenientAnalysisJob):
    """A convenient class wrapping around the trajectory analysis ViscosityFromBinLog tool. Only runs with default input options (max correlation time 10% of the total trajectory)."""

    results: AMSViscosityFromBinLogResults
    _result_type = AMSViscosityFromBinLogResults
    _parent_write_atoms = False
    _task = "AutoCorrelation"

    def __init__(
        self,
        previous_job=None,  # needs to be finished
        **kwargs,
    ):
        """
        previous_job: AMSJob
            An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts. It must have been run with the BinLog%PressureTensor option set.

        kwargs: dict
            Other options to AMSAnalysisJob

        """
        AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, **kwargs)

    def get_input(self):
        """
        Generate the input file
        """
        self._settings_to_list(self.settings.input, self._task)

        for settings in self.settings.input.AutoCorrelation:
            settings.Property = "ViscosityFromBinLog"
        return super().get_input()

    def prerun(self):  # noqa F811
        """
        Constructs the final settings
        """
        self._parent_prerun()  # trajectory and atom_indices handled

    def postrun(self):
        """
        Creates viscosity_integral, fit_viscosity_integral, and viscosity.txt
        """
        time, integral = self.results.get_viscosity_integral()
        with open(self.path + "/viscosity_integral.txt", "w") as f:
            f.write("#Time(fs) Integral(Pa*s)")
            for x, y in zip(time, integral):
                f.write(f"{x} {y}\n")

        popt, fit_x, fit_y = self.results.get_double_exponential_fit()
        with open(self.path + "/fit_viscosity_integral.txt", "w") as f:
            f.write("#Time(fs), DoubleExponentialFitToViscosityIntegral(Pa*s)")
            for x, y in zip(fit_x, fit_y):
                f.write(f"{x} {y}\n")

        eta = popt[0]
        with open(self.path + "/viscosity.txt", "w") as f:
            f.write(f"{eta}\n")


class AMSVACFResults(AMSAnalysisResults):
    """Results class for AMSVACFJob"""

    def get_vacf(self):
        """Extract the velocity autocorrelation function vs. time.

        Returns a 2-tuple ``time, y`` with ``time`` in fs.
        """
        xy = self.get_xy()
        time = np.array(xy.x[0])  # fs
        y = np.array(xy.y)

        return time, y

    def get_power_spectrum(self, max_freq=None):
        """Calculate the power spectrum as the Fourier transform of the velocity autocorrelation function.

        max_freq: float, optional
            The maximum frequency in cm^-1.

        Returns: A 2-tuple ``freq, y`` with ``freq`` in cm^-1 and ``y`` unitless.
        """

        max_freq = max_freq or self.job.max_freq or 5000
        xy = self.get_xy("Spectrum")
        freq = np.array(xy.x[0])
        y = np.array(xy.y)

        y = y[freq < max_freq]
        freq = freq[freq < max_freq]

        return freq, y

    def get_acf(self):
        return self.get_vacf()

    def get_spectrum(self, max_freq=None):
        return self.get_power_spectrum(max_freq=max_freq)


class AMSVACFJob(AMSConvenientAnalysisJob):
    """A class for calculating the velocity autocorrelation function and its power spectrum"""

    _result_type = AMSVACFResults
    _parent_write_atoms = True
    _task = "AutoCorrelation"

    def __init__(
        self,
        previous_job=None,  # needs to be finished
        max_correlation_time_fs=5000,  # fs
        max_freq=5000,  # cm^-1
        atom_indices=None,
        **kwargs,
    ):
        """
        previous_job: AMSJob
            An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.

        max_correlation_time_fs: float
            Maximum correlation time in femtoseconds

        max_freq: float
            The maximum frequency for the power spectrum in cm^-1

        atom_indices: List[int]
            Atom indices (starting with 1). If None, use all atoms.

        """
        AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)

        self.max_correlation_time_fs = max_correlation_time_fs
        self.max_freq = max_freq

    def get_input(self):
        """
        Generate the input file
        """
        self._settings_to_list(self.settings.input, self._task)

        for settings in self.settings.input.AutoCorrelation:
            settings.Property = "Velocities"
        return super().get_input()

    def prerun(self):  # noqa F811
        """
        Creates final settings
        """
        self._parent_prerun()  # trajectory and atom_indices handled
        self._settings_to_list(self.settings.input, self._task)

        for settings in self.settings.input.AutoCorrelation:
            if not self._has_settings_entry(settings, "MaxFrame"):
                max_dt_frames = self._get_max_dt_frames(self.max_correlation_time_fs)
                settings.MaxFrame = max_dt_frames

    def postrun(self):
        """
        Creates vacf.txt and power_spectrum.txt
        """
        try:
            time, vacf = self.results.get_vacf()
            with open(self.path + "/vacf.txt", "w") as f:
                f.write("#Time(fs) VACF")
                for x, y in zip(time, vacf):
                    f.write(f"{x} {y}\n")

            freq, intens = self.results.get_power_spectrum()
            with open(self.path + "/power_spectrum.txt", "w") as f:
                f.write("#Frequency(cm^-1) Intensity(arb.units)")
                for x, y in zip(freq, intens):
                    f.write(f"{x} {y}\n")
        except:
            pass


class AMSDipoleDerivativeACFResults(AMSAnalysisResults):
    """Results class for AMSVACFJob"""

    def get_acf(self):
        xy = self.get_xy()
        time = np.array(xy.x[0])  # fs
        y = np.array(xy.y)

        return time, y

    def get_ir_spectrum(self, max_freq=None):
        max_freq = max_freq or self.job.max_freq or 5000
        xy = self.get_xy("Spectrum")
        freq = np.array(xy.x[0])
        y = np.array(xy.y)

        y = y[freq < max_freq]
        freq = freq[freq < max_freq]

        return freq, y

    def get_spectrum(self, max_freq=None):
        return self.get_ir_spectrum(max_freq=max_freq)


class AMSDipoleDerivativeACFJob(AMSConvenientAnalysisJob):
    """A class for calculating the velocity autocorrelation function and its power spectrum"""

    _result_type = AMSDipoleDerivativeACFResults
    _parent_write_atoms = True
    _task = "AutoCorrelation"

    def __init__(
        self,
        previous_job=None,  # needs to be finished
        max_correlation_time_fs=5000,  # fs
        max_freq=5000,  # cm^-1
        atom_indices=None,
        **kwargs,
    ):
        """
        previous_job: AMSJob
            An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.

        max_correlation_time_fs: float
            Maximum correlation time in femtoseconds

        max_freq: float
            The maximum frequency for the power spectrum in cm^-1

        atom_indices: List[int]
            Atom indices (starting with 1). If None, use all atoms.

        """
        AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)

        self.max_correlation_time_fs = max_correlation_time_fs
        self.max_freq = max_freq

    def get_input(self):
        """
        Generate the input file
        """
        self.settings_to_list()

        for settings in self.settings.input.AutoCorrelation:
            settings.Property = "DipoleDerivativeFromCharges"
        return super().get_input()

    def prerun(self):  # noqa F811
        """
        Creates final settings
        """
        self._parent_prerun()  # trajectory and atom_indices handled
        self.settings_to_list()

        for settings in self.settings.input.AutoCorrelation:
            if not self._has_settings_entry(settings, "MaxFrame"):
                max_dt_frames = self._get_max_dt_frames(self.max_correlation_time_fs)
                settings.MaxFrame = max_dt_frames

    def postrun(self):
        """
        Creates dipolederivativeacf.txt and ir_spectrum.txt
        """
        try:
            time, acf = self.results.get_acf()
            with open(self.path + "/dipolederivativeacf.txt", "w") as f:
                f.write("#Time(fs) DipoleDerivativeACF")
                for x, y in zip(time, acf):
                    f.write(f"{x} {y}\n")

            freq, intens = self.results.get_ir_spectrum()
            with open(self.path + "/ir_spectrum.txt", "w") as f:
                f.write("#Frequency(cm^-1) Intensity(arb.units)")
                for x, y in zip(freq, intens):
                    f.write(f"{x} {y}\n")
        except:
            pass


class AMSRDFResults(AMSAnalysisResults):
    """Results class for AMSRDFJob"""

    def get_rdf(self):
        """
        Returns a 2-tuple r, rdf.

        r: numpy array of float (angstrom)
        rdf: numpy array of float
        """
        xy = self.get_xy()
        r = np.array(xy.x[0])
        y = np.array(xy.y)

        return r, y


class AMSRDFJob(AMSConvenientAnalysisJob):
    _result_type = AMSRDFResults
    _parent_write_atoms = False
    _task = "RadialDistribution"

    def __init__(
        self, previous_job=None, atom_indices=None, atom_indices_to=None, rmin=0.5, rmax=6.0, rstep=0.1, **kwargs
    ):
        """
        previous_job: AMSJob
            AMSJob with finished MD trajectory.

        atom_indices: List[int]
            Atom indices (starting with 1). If None, calculate RDF *from* all atoms.

        atom_indices_to: List[int]
            Atom indices (starting with 1). If None, calculate RDF *to* all atoms.

        rmin: float
            Minimum distance (angstrom)

        rmax: float
            Maximum distance (angstrom)

        rstep: float
            Bin width for the histogram (angstrom)
        """

        AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)
        self.atom_indices_to = atom_indices_to
        self.rmin = rmin
        self.rmax = rmax
        self.rstep = rstep

    def get_input(self):
        """
        Generate the input file
        """
        self._settings_to_list(self.settings.input, self._task)

        if not self._settings_updated:
            prevjobs = self.previous_job
            if not isinstance(prevjobs, list):
                prevjobs = [prevjobs]
            main_mol = prevjobs[0].results.get_main_molecule()

            for settings in self.settings.input.RadialDistribution:
                # If no atom iindices were provided, and there is nothing in the settings at all, add them
                atom_indices = self.atom_indices
                atom_indices_to = self.atom_indices_to
                if not atom_indices:
                    if not self._has_settings_entry(settings, "AtomsFrom"):
                        atom_indices = list(range(1, len(main_mol) + 1))
                if not atom_indices_to:
                    if not self._has_settings_entry(settings, "AtomsTo"):
                        atom_indices_to = list(range(1, len(main_mol) + 1))

                # Add the atom indices only if there were no atom indices in the settings (if elements are present, add indices)
                if atom_indices:
                    atoms_from_present = self._has_settings_entry(settings, "AtomsFrom")
                    atom_present = atoms_from_present and self._has_settings_entry(settings.AtomsFrom, "Atom")
                    if not atom_present:
                        self._add_nonunique_settings_entries(settings.AtomsFrom, "Atom", atom_indices)
                    else:
                        msg = "Atom indices cannot be supplied as argument if already present in settings"
                        raise MDAnalysisSettingsError(msg)
                if atom_indices_to:
                    atoms_to_present = self._has_settings_entry(settings, "AtomsTo")
                    atom_present = atoms_to_present and self._has_settings_entry(settings.AtomsTo, "Atom")
                    if not atom_present:
                        self._add_nonunique_settings_entries(settings.AtomsTo, "Atom", atom_indices_to)
                    else:
                        msg = "Atom indices cannot be supplied as argument if already present in settings"
                        raise MDAnalysisSettingsError(msg)
                if not self._has_settings_entry(settings, "Range"):
                    if isinstance(settings, Settings):
                        settings.Range = f"{self.rmin} {self.rmax} {self.rstep}"
                    else:
                        settings.Range = [self.rmin, self.rmax, self.rstep]
        return super().get_input()

    def prerun(self):  # noqa F811
        """
        Creates the final settings. Do not call or override this method.
        """
        self._parent_prerun()
        self._settings_to_list(self.settings.input, self._task)

        # Atom_indices will be adjusted when get_input is called (again).

    def postrun(self):
        """
        Creates rdf.txt. Do not call or override this method.
        """
        try:
            r, gr = self.results.get_rdf()
            with open(self.path + "/rdf.txt", "w") as f:
                f.write("#r(angstrom) g(r)")
                for x, y in zip(r, gr):
                    f.write(f"{x} {y}\n")
        except:
            pass


class AMSConvenientAnalysisPerRegionResults(Results):
    def _getter(self, analysis_job_type, method, kwargs):
        assert (
            self.job.analysis_job_type is analysis_job_type
        ), f"{method}() can only be called for {analysis_job_type}, tried for type {self.job.analysis_job_type}"
        ret = {}
        for name, job in self.job.children.items():
            ret[name] = getattr(job.results, method)(**kwargs)
        return ret

    def get_diffusion_coefficient(self, **kwargs):
        return self._getter(AMSMSDJob, "get_diffusion_coefficient", kwargs)

    def get_msd(self, **kwargs):
        return self._getter(AMSMSDJob, "get_msd", kwargs)

    def get_linear_fit(self, **kwargs):
        return self._getter(AMSMSDJob, "get_linear_fit", kwargs)

    def get_vacf(self, **kwargs):
        return self._getter(AMSVACFJob, "get_vacf", kwargs)

    def get_power_spectrum(self, **kwargs):
        return self._getter(AMSVACFJob, "get_power_spectrum", kwargs)


class AMSConvenientAnalysisPerRegionJob(MultiJob):
    _result_type = AMSConvenientAnalysisPerRegionResults

    def __init__(self, previous_job, analysis_job_type, name=None, regions=None, per_element=False, **kwargs):
        MultiJob.__init__(self, children=OrderedDict(), name=name or "analysis_per_region")
        self.previous_job = previous_job
        self.analysis_job_type = analysis_job_type
        self.analysis_job_kwargs = kwargs
        self.regions_dict = regions
        self.per_element = per_element

    @staticmethod
    def get_regions_dict(molecule, per_element: bool = False):
        regions_dict = defaultdict(lambda: [])
        for i, at in enumerate(molecule, 1):
            regions = set([at.properties.region]) if isinstance(at.properties.region, str) else at.properties.region
            if len(regions) == 0:
                region_name = "NoRegion" if not per_element else f"NoRegion_{at.symbol}"
                regions_dict[region_name].append(i)
            for region in regions:
                region_name = region if not per_element else f"{region}_{at.symbol}"
                regions_dict[region_name].append(i)
            regions_dict["All"].append(i)
            if per_element:
                regions_dict[f"All_{at.symbol}"].append(i)

        return regions_dict

    def prerun(self):  # noqa F811
        regions_dict = self.regions_dict or self.get_regions_dict(
            self.previous_job.results.get_main_molecule(), per_element=self.per_element
        )

        for region in regions_dict:
            self.children[region] = self.analysis_job_type(
                previous_job=self.previous_job,
                name=region,
                atom_indices=regions_dict[region],
                **self.analysis_job_kwargs,
            )

    @staticmethod
    def get_mean_std_per_region(list_of_jobs, function_name, **kwargs):
        """
        list_of_jobs: List[AMSConvenientAnalysisPerRegionJob]
            List of jobs over which to average

        function_name: str
            e.g. 'get_msd', 'get_power_spectrum'

        """

        if isinstance(list_of_jobs, dict):
            list_of_jobs = [x for x in list_of_jobs.values()]

        all_x = defaultdict(lambda: [])
        all_y = defaultdict(lambda: [])
        for vacfjob in list_of_jobs:
            # let's calculate mean and std for each region
            results_dict = getattr(vacfjob.results, function_name)(**kwargs)
            for region_name, ret_value in results_dict.items():
                if np.isscalar(ret_value):
                    all_x[region_name].append(np.atleast_1d(ret_value))
                elif len(ret_value) == 2:
                    all_x[region_name].append(np.atleast_1d(ret_value[0]))
                    all_y[region_name].append(np.atleast_1d(ret_value[1]))

        mean_x = {}
        std_x = {}
        mean_y = {}
        std_y = {}
        for region_name, values in all_x.items():
            mean_x[region_name] = np.mean(values, axis=0)
            std_x[region_name] = np.std(values, axis=0)

            if len(all_y) > 0:
                mean_y[region_name] = np.mean(all_y[region_name], axis=0)
                std_y[region_name] = np.std(all_y[region_name], axis=0)

        if len(mean_y) > 0:
            return mean_x, std_x, mean_y, std_y
        else:
            return mean_x, std_x

ADF Fragment

Run fragment-based ADF analysis as a dedicated multi-job workflow.

Overview

This recipe orchestrates two fragment calculations and one full-system ADF calculation and exposes energy-decomposition and related properties through a dedicated results class.

API

API reference
class ADFFragmentJob(fragment1=None, fragment2=None, fragment1_opt=None, fragment2_opt=None, full_settings=None, frag1_settings=None, frag2_settings=None, **kwargs)[source]

Subclass of MultiJob for ADF fragment calculations.

Parameters:
_result_type

alias of ADFFragmentResults

prerun()[source]

Prepare the fragments and the full calculation.

classmethod load_external(path, jobname=None)[source]

Load the results of the ADFFragmentJob job from an external path.

Parameters:
  • path (str) – The path to the job. It should at least have the subfolders ‘frag1’, ‘frag2’ and ‘full’.

  • jobname (str, optional) – The name of the job. Defaults to None.

Returns:

The job with the loaded results.

Return type:

ADFFragmentJob

class ADFFragmentResults(job)[source]
Parameters:

job (Job) –

check()[source]

Check if the calculation finished successfully.

Overwriting the method of MultiJob because it only checks if the job finished, not how it finished.

get_properties()[source]

Redirect to AMSResults of the full calculation.

get_main_molecule()[source]

Redirect to AMSResults of the full calculation.

get_input_molecule()[source]

Redirect to AMSResults of the full calculation.

get_energy(unit='au')[source]

Redirect to AMSResults of the full calculation.

get_dipole_vector(unit='au')[source]

Redirect to AMSResults of the full calculation.

get_energy_decomposition(unit='au')[source]

Get the energy decomposition of the fragment calculation.

Parameters:

unit (str, optional) – The unit of the energy. Defaults to ‘au’.

Returns:

The energy decomposition.

Return type:

Dict[str, float]

get_nocv_eigenvalues()[source]

Get the NOCV eigenvalues of the full calculation.

Return type:

List[float] | Tuple[List[float], List[float]]

check_nocv_eigenvalues()[source]

Basic NOCV check.

The eigenvalue #i should be equal to -eigenvalue #-i.

Returns:

True if the check passes, False otherwise.

Return type:

bool

get_nocv_orbital_interaction(unit='kcal/mol')[source]

Get the NOCV orbital interactions of the full calculation.

Return type:

List[float] | Tuple[List[float]]

Code

Recipe module source: adffragment.py
from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.tools.units import Units
from scm.plams.core.errors import FileError
from scm.plams.mol.molecule import Molecule
from scm.plams.core.enums import JobStatus

import numpy as np

from os.path import join as opj
from os.path import abspath, isdir, basename
from typing import Dict, List, Tuple, Union, Optional

__all__ = ["ADFFragmentJob", "ADFFragmentResults"]


class ADFFragmentResults(Results):

    def check(self):
        """Check if the calculation finished successfully.

        Overwriting the method of |MultiJob| because it only checks if the job
        finished, not how it finished.
        """
        return all(job.check() for job in self.job.children)

    def get_properties(self):
        """Redirect to |AMSResults| of the full calculation."""
        return self.job.full.results.get_properties()

    def get_main_molecule(self):
        """Redirect to |AMSResults| of the full calculation."""
        return self.job.full.results.get_main_molecule()

    def get_input_molecule(self):
        """Redirect to |AMSResults| of the full calculation."""
        return self.job.full.results.get_input_molecule()

    def get_energy(self, unit="au"):
        """Redirect to |AMSResults| of the full calculation."""
        return self.job.full.results.get_energy(unit)

    def get_dipole_vector(self, unit="au"):
        """Redirect to |AMSResults| of the full calculation."""
        return self.job.full.results.get_dipole_vector(unit)

    def get_energy_decomposition(self, unit="au") -> Dict[str, float]:
        """Get the energy decomposition of the fragment calculation.

        Args:
            unit (str, optional): The unit of the energy. Defaults to 'au'.

        Returns:
            Dict[str, float]: The energy decomposition.
        """
        energy_section = self.job.full.results.read_rkf_section("Energy", file="adf")
        ret = {}
        for k in [
            "Electrostatic Energy",
            "Kinetic Energy",
            "Elstat Interaction",
            "XC Energy",
        ]:
            ret[k] = Units.convert(energy_section[k], "au", unit)

        # most information not available from the KF file
        res = self.job.full.results
        res1 = self.job.f1.results
        res2 = self.job.f2.results

        # remaining information on the output file
        def try_grep_result(key, pattern, match_position=-1, split_position=4):
            match = res.grep_output(pattern)
            if match:
                match = Units.convert(float(match[match_position].split()[split_position]), "au", unit)
                ret[key] = match

        try_grep_result("E_int", "Total Bonding Energy:", match_position=-2)
        try_grep_result("E_int_disp", "Dispersion Energy:")
        try_grep_result("E_Pauli", "Pauli Repulsion (Delta")
        try_grep_result("E_elstat", "Electrostatic Interaction:")
        try_grep_result("E_orb", "Total Orbital Interactions:")

        ret["E_1"] = res1.get_energy(unit=unit)
        ret["E_2"] = res2.get_energy(unit=unit)

        if hasattr(self.job, "f1_opt"):
            ret["E_1_opt"] = self.job.f1_opt.results.get_energy(unit=unit)
            ret["E_prep_1"] = ret["E_1"] - ret["E_1_opt"]
        if hasattr(self.job, "f2_opt"):
            ret["E_2_opt"] = self.job.f2_opt.results.get_energy(unit=unit)
            ret["E_prep_2"] = ret["E_2"] - ret["E_2_opt"]

        if ("E_1_opt" in ret) and ("E_2_opt" in ret):
            ret["E_prep"] = ret["E_prep_1"] + ret["E_prep_2"]
            ret["E_bond"] = ret["E_int"] + ret["E_prep"]

        return ret

    def get_nocv_eigenvalues(self) -> Union[List[float], Tuple[List[float], List[float]]]:
        """Get the NOCV eigenvalues of the full calculation."""
        keys = self.job.full.results.get_rkf_skeleton(file="adf")
        if "NOCV" not in keys:
            raise KeyError("NOCV section not found in the rkf file")
        keys = keys["NOCV"]
        # restricted calculation
        if "NOCV_eigenvalues_restricted" in keys:
            return self.job.full.results.readrkf("NOCV", "NOCV_eigenvalues_restricted", file="adf")
        # unrestricted calculation
        elif "NOCV_eigenvalues_alpha" in keys:
            alpha = self.job.full.results.readrkf("NOCV", "NOCV_eigenvalues_alpha", file="adf")
            beta = self.job.full.results.readrkf("NOCV", "NOCV_eigenvalues_beta", file="adf")
            tpl = (
                [float(x) for x in alpha],
                [float(x) for x in beta],  #  make mypy happy by ensuring return is tuple of list of floats
            )
            return tpl
        else:
            raise KeyError("NOCV eigenvalues not found in the rkf file")

    def check_nocv_eigenvalues(self) -> bool:
        """Basic NOCV check.

        The eigenvalue #i should be equal to -eigenvalue #-i.

        Returns:
            bool: True if the check passes, False otherwise.
        """

        def _check_list(eigenvalues):
            for i in range(int(len(eigenvalues) / 2)):
                if not np.isclose(eigenvalues[i], -1 * eigenvalues[-i - 1], atol=1e-4):
                    return False
            return True

        NOCV_eigenvalues = self.job.results.get_nocv_eigenvalues()
        # spin-restricted calculation
        if isinstance(NOCV_eigenvalues, list):
            return _check_list(NOCV_eigenvalues)
        elif isinstance(NOCV_eigenvalues, tuple):
            for eigenvalues in NOCV_eigenvalues:
                if not _check_list(eigenvalues):
                    return False
            return True
        else:
            raise TypeError("Unexpected type for NOCV eigenvalues")

    def get_nocv_orbital_interaction(self, unit="kcal/mol") -> Union[List[float], Tuple[List[float]]]:
        """Get the NOCV orbital interactions of the full calculation."""

        def _calc_energies(oi: List[float]) -> List[float]:
            ret = []
            for i in range(int(len(oi) / 2)):
                a = oi[i]
                b = oi[len(oi) - i - 1]
                ret.append(a + b)
            return ret

        keys = self.job.full.results.get_rkf_skeleton(file="adf")
        if "NOCV" not in keys:
            raise KeyError("NOCV section not found in the rkf file")
        keys = keys["NOCV"]
        # restricted calculation
        if "NOCV_oi_restricted" in keys:
            oi = self.job.full.results.readrkf("NOCV", "NOCV_oi_restricted", file="adf")
        # unrestricted calculation
        elif "NOCV_oi_alpha" in keys:
            tpl = (
                self.job.full.results.readrkf("NOCV", "NOCV_oi_alpha", file="adf"),
                self.job.full.results.readrkf("NOCV", "NOCV_oi_beta", file="adf"),
            )
        else:
            raise KeyError("NOCV orbital interaction not found in rkf file")
        energies: Union[List[float], Tuple[List[float], List[float]]]  #  make mypy happy
        if isinstance(oi, tuple):
            energies = (_calc_energies(tpl[0]), _calc_energies(tpl[1]))
        elif isinstance(oi, list):
            energies = _calc_energies(oi)
        return Units.convert(energies, "kcal/mol", unit)


class ADFFragmentJob(MultiJob):
    """Subclass of |MultiJob| for ADF fragment calculations."""

    _result_type = ADFFragmentResults

    def __init__(
        self,
        fragment1: Optional[Molecule] = None,
        fragment2: Optional[Molecule] = None,
        fragment1_opt: Optional[Molecule] = None,
        fragment2_opt: Optional[Molecule] = None,
        full_settings: Optional[Settings] = None,
        frag1_settings: Optional[Settings] = None,
        frag2_settings: Optional[Settings] = None,
        **kwargs,
    ):
        """Create an ADFFragmentJob with the given fragments.

        The optimized fragment structures can be given as arguments.
        Single point calculations on these structures will then be performed
        to obtain the preparation energy.

        Subclass of |MultiJob|.

        Args:
            fragment1 (|Molecule|): The first fragment.
            fragment2 (|Molecule|): The second fragment.
            fragment1_opt (|Molecule|, optional): The optimized first fragment.
            fragment2_opt (|Molecule|, optional): The optimized second fragment.
            full_settings (Settings): The settings for the full calculation.
            frag1_settings (Settings): Specific settings for the first fragment calculation.
            frag2_settings (Settings): Specific settings for the second fragment calculation.
            **kwargs: Further keyword arguments for |MultiJob|.
        """
        MultiJob.__init__(self, **kwargs)
        self.fragment1 = fragment1.copy() if isinstance(fragment1, Molecule) else fragment1
        self.frag1_settings = frag1_settings or Settings()
        self.fragment2 = fragment2.copy() if isinstance(fragment2, Molecule) else fragment2
        self.frag2_settings = frag2_settings or Settings()
        self.full_settings = full_settings or Settings()

        if isinstance(fragment1_opt, Molecule):
            self.fragment1_opt = fragment1_opt.copy()
        elif fragment1_opt is not None:
            raise TypeError("fragment1_opt should be a Molecule object or None")
        if isinstance(fragment2_opt, Molecule):
            self.fragment2_opt = fragment2_opt.copy()
        elif fragment2_opt is not None:
            raise TypeError("fragment2_opt should be a Molecule object or None")

    def prerun(self):  # noqa F811
        """Prepare the fragments and the full calculation."""
        self.f1 = AMSJob(
            name="frag1",
            molecule=self.fragment1,
            settings=self.settings + self.frag1_settings,
        )
        self.f2 = AMSJob(
            name="frag2",
            molecule=self.fragment2,
            settings=self.settings + self.frag2_settings,
        )

        for at in self.fragment1:
            at.properties.suffix = "adf.f=subsystem1"
        for at in self.fragment2:
            at.properties.suffix = "adf.f=subsystem2"

        self.full = AMSJob(
            name="full",
            molecule=self.fragment1 + self.fragment2,
            settings=self.settings + self.full_settings,
        )

        self.full.settings.input.adf.fragments.subsystem1 = (self.f1, "adf")
        self.full.settings.input.adf.fragments.subsystem2 = (self.f2, "adf")
        self.children = [self.f1, self.f2, self.full]

        if hasattr(self, "fragment1_opt"):
            self.f1_opt = AMSJob(
                name="frag1_opt",
                molecule=self.fragment1_opt,
                settings=self.settings + self.frag1_settings,
            )
            self.children.append(self.f1_opt)
        if hasattr(self, "fragment2_opt"):
            self.f2_opt = AMSJob(
                name="frag2_opt",
                molecule=self.fragment2_opt,
                settings=self.settings + self.frag2_settings,
            )
            self.children.append(self.f2_opt)

    @classmethod
    def load_external(cls, path: str, jobname: Optional[str] = None) -> "ADFFragmentJob":
        """Load the results of the ADFFragmentJob job from an external path.

        Args:
            path (str): The path to the job. It should at least have the
                        subfolders 'frag1', 'frag2' and 'full'.
            jobname (str, optional): The name of the job. Defaults to None.

        Returns:
            ADFFragmentJob: The job with the loaded results.
        """
        if not isdir(path):
            raise FileError("Path {} does not exist, cannot load from it.".format(path))
        path = abspath(path)
        jobname = basename(path) if jobname is None else str(jobname)

        job = cls(name=jobname)
        job.path = path
        job.status = JobStatus.COPIED

        job.f1 = AMSJob.load_external(opj(path, "frag1"))
        job.f2 = AMSJob.load_external(opj(path, "frag2"))
        job.full = AMSJob.load_external(opj(path, "full"))
        job.children = [job.f1, job.f2, job.full]

        if isdir(opj(path, "frag1_opt")):
            job.f1_opt = AMSJob.load_external(opj(path, "frag1_opt"))
            job.children.append(job.f1_opt)
        if isdir(opj(path, "frag2_opt")):
            job.f2_opt = AMSJob.load_external(opj(path, "frag2_opt"))
            job.children.append(job.f2_opt)

        return job

BAND Fragment

Run BAND energy decomposition workflows for periodic systems with optional NOCV analysis.

Overview

This recipe extends the fragment approach to periodic BAND systems and supports NOCV workflows via NOCVBandFragmentJob.

API

API reference
class BANDFragmentJob(fragment1=None, fragment2=None, fragment1_opt=None, fragment2_opt=None, full_settings=None, frag1_settings=None, frag2_settings=None, **kwargs)[source]
Parameters:
_result_type

alias of BANDFragmentResults

prerun()[source]

Creates the fragment jobs.

Return type:

None

new_children()[source]

After the first round, add the full job to the children list.

Return type:

None | List[AMSJob]

classmethod load_external(path, jobname=None)[source]

Load the results of the BANDFragmentJob job from an external path.

Parameters:
  • path (str) – The path to the job. It should at least have the subfolders ‘frag1’, ‘frag2’ and ‘full’.

  • jobname (str, optional) – The name of the job. Defaults to None.

Returns:

The job with the loaded results.

Return type:

BANDFragmentJob

class BANDFragmentResults(job)[source]

Subclass of ADFFragmentResults for BAND calculations.

Parameters:

job (Job) –

get_energy_decomposition(unit='au')[source]

Get the energy decomposition of the fragment calculation.

Parameters:

unit (str, optional) – The unit of the energy. Defaults to ‘au’.

Returns:

The energy decomposition.

Return type:

Dict[str, float]

class NOCVBandFragmentJob(nocv_settings=None, **kwargs)[source]
_result_type

alias of BANDFragmentResults

new_children()[source]

After the first round, add the full job to the children list. After the second round, add the NOCV job to the children list.

Return type:

None | List[AMSJob]

classmethod load_external(path, jobname=None)[source]

Load the results of the BANDFragmentJob job from an external path.

Parameters:
  • path (str) – The path to the job. It should at least have the subfolders ‘frag1’, ‘frag2’, ‘full’ and ‘NOCV’.

  • jobname (str, optional) – The name of the job. Defaults to None.

Returns:

The job with the loaded results.

Return type:

NOCVBandFragmentJob

Code

Recipe module source: bandfragment.py
from scm.plams.core.functions import log
from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.core.settings import Settings
from scm.plams.recipes.adffragment import ADFFragmentJob, ADFFragmentResults
from scm.plams.tools.units import Units
from scm.plams.core.errors import FileError
from scm.plams.core.functions import add_to_instance
from scm.plams.core.enums import JobStatus


from os.path import join as opj
from os.path import abspath, isdir, basename, relpath
from os import symlink
from typing import Dict, List, Union, Optional

__all__ = ["BANDFragmentJob", "BANDFragmentResults"]


class BANDFragmentResults(ADFFragmentResults):
    """Subclass of |ADFFragmentResults| for BAND calculations."""

    def get_energy_decomposition(self, unit="au") -> Dict[str, float]:
        """Get the energy decomposition of the fragment calculation.

        Args:
            unit (str, optional): The unit of the energy. Defaults to 'au'.

        Returns:
            Dict[str, float]: The energy decomposition.
        """
        res = self.job.full.results
        res1 = self.job.f1.results
        res2 = self.job.f2.results
        ret = {}

        def try_grep_result(key, pattern, match_position=-1, split_position=2):
            match = res.grep_output(pattern)
            if match:
                match = Units.convert(float(match[match_position].split()[split_position]), "au", unit)
                ret[key] = match

        try_grep_result("E_int", "E_int", match_position=-2)
        try_grep_result("E_int_disp", "E_disp")
        try_grep_result("E_Pauli", "E_Pauli")
        try_grep_result("E_elstat", "E_elstat")
        try_grep_result("E_orb", "E_orb")

        ret["E_1"] = res1.get_energy(unit=unit)
        ret["E_2"] = res2.get_energy(unit=unit)

        if hasattr(self.job, "f1_opt"):
            ret["E_1_opt"] = self.job.f1_opt.results.get_energy(unit=unit)
            ret["E_prep_1"] = ret["E_1"] - ret["E_1_opt"]
        if hasattr(self.job, "f2_opt"):
            ret["E_2_opt"] = self.job.f2_opt.results.get_energy(unit=unit)
            ret["E_prep_2"] = ret["E_2"] - ret["E_2_opt"]

        if ("E_1_opt" in ret) and ("E_2_opt" in ret):
            ret["E_prep"] = ret["E_prep_1"] + ret["E_prep_2"]
            ret["E_bond"] = ret["E_int"] + ret["E_prep"]

        return ret


class BANDFragmentJob(ADFFragmentJob):
    _result_type = BANDFragmentResults

    def prerun(self) -> None:
        """Creates the fragment jobs."""
        self.f1 = AMSJob(name="frag1", molecule=self.fragment1, settings=self.settings)
        self.f2 = AMSJob(name="frag2", molecule=self.fragment2, settings=self.settings)
        self.children += [self.f1, self.f2]

    def new_children(self) -> Union[None, List[AMSJob]]:
        """After the first round, add the full job to the children list."""
        if hasattr(self, "full"):
            return None
        else:
            # create the correct mapping settings for the full job
            if "fragment" in self.full_settings.input.band:
                log(
                    "Fragment already present in full_settings. Assuming that the user has already set up the mapping. Skipping the mapping setup.",
                    level=1,
                )
                return
            # first fragment 1 then fragment 2
            set1 = Settings()
            set1.atommapping = {str(i + 1): str(i + 1) for i in range(len(self.fragment1))}
            set2 = Settings()
            set2.atommapping = {str(i + 1): str(i + 1 + len(self.fragment1)) for i in range(len(self.fragment2))}
            set1.filename = (self.f1, "band")
            set2.filename = (self.f2, "band")
            self.full_settings.input.band.fragment = [set1, set2]

            # create the full job
            self.full = AMSJob(
                name="full", molecule=self.fragment1 + self.fragment2, settings=self.settings + self.full_settings
            )

            return [self.full]

    @classmethod
    def load_external(cls, path: str, jobname: Optional[str] = None) -> "BANDFragmentJob":
        """Load the results of the BANDFragmentJob job from an external path.

        Args:
            path (str): The path to the job. It should at least have the
                        subfolders 'frag1', 'frag2' and 'full'.
            jobname (str, optional): The name of the job. Defaults to None.

        Returns:
            BANDFragmentJob: The job with the loaded results.
        """
        if not isdir(path):
            raise FileError("Path {} does not exist, cannot load from it.".format(path))
        path = abspath(path)
        jobname = basename(path) if jobname is None else str(jobname)

        job = cls(name=jobname)
        job.path = path
        job.status = JobStatus.COPIED

        job.f1 = AMSJob.load_external(opj(path, "frag1"))
        job.f2 = AMSJob.load_external(opj(path, "frag2"))
        job.full = AMSJob.load_external(opj(path, "full"))
        job.children = [job.f1, job.f2, job.full]

        if isdir(opj(path, "frag1_opt")):
            job.f1_opt = AMSJob.load_external(opj(path, "frag1_opt"))
            job.children.append(job.f1_opt)
        if isdir(opj(path, "frag2_opt")):
            job.f2_opt = AMSJob.load_external(opj(path, "frag2_opt"))
            job.children.append(job.f2_opt)

        return job


class NOCVBandFragmentJob(BANDFragmentJob):
    _result_type = BANDFragmentResults

    def __init__(self, nocv_settings=None, **kwargs) -> None:
        """Create a BANDFragmentJob with final NOCV calculation.

        Args:
            nocv_settings (Settings, optional): Settings for the NOCV calculation. Defaults to None.

        """
        super().__init__(**kwargs)
        self.nocv_settings = nocv_settings or Settings()

    def new_children(self) -> Union[None, List[AMSJob]]:
        """After the first round, add the full job to the children list.
        After the second round, add the NOCV job to the children list."""
        # new_children of BANDFragmentJob creates the full job
        ret = super().new_children()
        if ret is not None:
            return ret
        if hasattr(self, "nocv"):
            return None
        else:
            # add NOCV run
            # settings for the NOCV calculation
            set = self.settings + self.full_settings + self.nocv_settings
            # set the restart file
            set.input.band.restart.file = "full.rkf"
            # copy the fragment settings
            set.input.band.fragment = [fset.copy() for fset in self.full.settings.input.band.fragment]

            self.nocv = AMSJob(name="NOCV", molecule=self.fragment1 + self.fragment2, settings=set)

            self.nocv.frag_paths = []
            for job in [self.f1, self.f2, self.full]:
                self.nocv.frag_paths.append(job.path)

            # edit NOCV prerun to create symlinks
            @add_to_instance(self.nocv)
            def prerun(self):
                """Create symlinks for the restart files."""
                for i, job in enumerate(["frag1", "frag2", "full"]):
                    rel_path = relpath(self.frag_paths[i], self.path)
                    symlink(opj(rel_path, "band.rkf"), opj(self.path, f"{job}.rkf"))

            return [self.nocv]

    @classmethod
    def load_external(cls, path: str, jobname: Optional[str] = None) -> "NOCVBandFragmentJob":
        """Load the results of the BANDFragmentJob job from an external path.

        Args:
            path (str): The path to the job. It should at least have the
                        subfolders 'frag1', 'frag2', 'full' and 'NOCV'.
            jobname (str, optional): The name of the job. Defaults to None.

        Returns:
            NOCVBandFragmentJob: The job with the loaded results.
        """
        job = super().load_external(path, jobname)
        job.nocv = AMSJob.load_external(opj(path, "NOCV"))
        job.children.append(job.nocv)
        return job

Reorganization Energy

Compute Marcus reorganization energies using four coordinated AMS calculations.

Overview

The workflow evaluates energies at optimized geometries of two electronic states and combines them into the reorganization energy used in Marcus theory.

API

API reference
class ReorganizationEnergyJob(molecule, common_settings, settings_state_A, settings_state_B, **kwargs)[source]

A class for calculating the reorganization energy using AMS. Given two states, A and B, the reorganization energy is defined as follows:

reorganization energy = E(state B at optimal geometry for state A) - E(state A at optimal geometry for state A) + E(state A at optimal geometry for state B) - E(state B at optimal geometry for state B)

This job will run two geometry optimizations and two single point calculations.

_result_type

alias of ReorganizationEnergyResults

class ReorganizationEnergyResults(job)[source]

Results class for reorganization energy.

Parameters:

job (Job) –

Code

Recipe module source: reorganization_energy.py
from collections import OrderedDict

from scm.plams.core.basejob import MultiJob
from scm.plams.core.functions import add_to_instance
from scm.plams.core.results import Results
from scm.plams.interfaces.adfsuite.ams import AMSJob

__all__ = ["ReorganizationEnergyJob", "ReorganizationEnergyResults"]


# using this function to pass a molecule inside a MultiJob ensures proper parallel execution
def pass_molecule(source, target):
    @add_to_instance(target)
    def prerun(self):  # noqa F811
        self.molecule = source.results.get_main_molecule()


class ReorganizationEnergyResults(Results):
    """Results class for reorganization energy."""

    def reorganization_energy(self, unit="au"):
        energies = self.get_all_energies(unit)

        reorganization_energy = (
            energies["state B geo A"]
            - energies["state A geo A"]
            + energies["state A geo B"]
            - energies["state B geo B"]
        )
        return reorganization_energy

    def get_all_energies(self, unit="au"):
        energies = {}
        energies["state A geo A"] = self.job.children["go_A"].results.get_energy(unit=unit)
        energies["state B geo B"] = self.job.children["go_B"].results.get_energy(unit=unit)
        energies["state A geo B"] = self.job.children["sp_A_for_geo_B"].results.get_energy(unit=unit)
        energies["state B geo A"] = self.job.children["sp_B_for_geo_A"].results.get_energy(unit=unit)

        return energies


class ReorganizationEnergyJob(MultiJob):
    """A class for calculating the reorganization energy using AMS.
    Given two states, A and B, the reorganization energy is defined as follows:

    reorganization energy =
    E(state B at optimal geometry for state A) -
    E(state A at optimal geometry for state A) +
    E(state A at optimal geometry for state B) -
    E(state B at optimal geometry for state B)

    This job will run two geometry optimizations and two single point calculations.
    """

    _result_type = ReorganizationEnergyResults

    def __init__(self, molecule, common_settings, settings_state_A, settings_state_B, **kwargs):
        """
        molecule: the molecule
        common_settings: a setting object for an AMSJob containing the shared settings for all the calculations
        settings_state_A: Setting object for an AMSJob containing exclusivelt the options defining the state A (e.g. charge and spin)
        settings_state_B: Setting object for an AMSJob containing exclusivelt the options defining the state B (e.g. charge and spin)
        kwargs: other options to be passed to the MultiJob constructor
        """
        MultiJob.__init__(self, children=OrderedDict(), **kwargs)

        go_settings = common_settings.copy()
        go_settings.input.ams.task = "GeometryOptimization"

        sp_settings = common_settings.copy()
        sp_settings.input.ams.task = "SinglePoint"

        # copy the settings so that we wont modify the ones provided as input by the user
        settings_state_A = settings_state_A.copy()
        settings_state_B = settings_state_B.copy()
        # In case the charge key is not specified, excplicitely set the value to 0.
        # This is to prevent the charge in molecule.properties.charge (set by get_main_molecule())
        # to be used in case of neutral systems
        for s in [settings_state_A, settings_state_B]:
            if not "charge" in s.input.ams.system:
                s.input.ams.system.charge = 0

        self.children["go_A"] = AMSJob(molecule=molecule, settings=go_settings + settings_state_A, name="go_A")
        self.children["go_B"] = AMSJob(molecule=molecule, settings=go_settings + settings_state_B, name="go_B")
        self.children["sp_A_for_geo_B"] = AMSJob(settings=sp_settings + settings_state_A, name="sp_A_geo_B")
        self.children["sp_B_for_geo_A"] = AMSJob(settings=sp_settings + settings_state_B, name="sp_B_geo_A")
        pass_molecule(self.children["go_A"], self.children["sp_B_for_geo_A"])
        pass_molecule(self.children["go_B"], self.children["sp_A_for_geo_B"])

ADFNBO

Extend ADF runs with NBO6/gennbo execution through an AMS job wrapper.

Overview

ADFNBOJob extends the regular AMS/ADF runscript to execute adfnbo and gennbo6 while ensuring required ADF input is present.

API

API reference
class ADFNBOJob(molecule=None, *args, **kwargs)[source]
Parameters:
prerun()[source]

Actions to take before the actual job execution.

This method is initially empty, it can be defined in subclasses or directly added to either the whole class or a single instance using Binding decorators.

Code

Recipe module source: adfnbo.py
import os

from scm.plams.core.functions import log
from scm.plams.interfaces.adfsuite.ams import AMSJob

__all__ = ["ADFNBOJob"]


class ADFNBOJob(AMSJob):

    def prerun(self):  # noqa F811
        s = self.settings.input.ADF
        s.fullfock = True
        s.aomat2file = True
        s.symmetry = "NoSym"
        s.basis.core = "None"
        if "save" in s:
            if isinstance(s.save, str):
                s.save += " TAPE15"
            elif isinstance(s.save, list):
                s.save.append("TAPE15")
            else:
                log(
                    "WARNING: 'SAVE TAPE15' could not be added to the input settings of {}. Make sure (thisjob).settings.input.save is a string or a list.".format(
                        self.name
                    ),
                    1,
                )
        else:
            s.save = "TAPE15"

        if isinstance(self.settings.adfnbo, list):
            adfnbo_input = self.settings.adfnbo
        else:
            adfnbo_input = ["write", "spherical", "fock"]
            log(
                "WARNING: (thisjob).settings.adfnbo should be a list. Using default settings: write, fock, spherical", 1
            )

        self.settings.runscript.post = (
            'cp "' + os.path.join(self.path, "adf.rkf") + '" TAPE21\n'
            "$AMSBIN/adfnbo <<eor\n" + "\n".join(adfnbo_input) + "\neor\n\n$AMSBIN/gennbo6 FILE47\n"
        )

pyAHFCDOS

Compute vibronic density of states from two vibronic spectra at AH-FC level.

Overview

The FCFDOS class combines absorption and emission vibronic data to build a DOS profile with configurable broadening.

API

API reference
class FCFDOS(absrkf, emirkf, absele=0.0, emiele=0.0)[source]

A class for calculating the convolution of two FCF spectra

dos(lineshape='GAU', HWHM=100.0)[source]

Calculate density of states by computing the overlap of the two FCF spectra The two spectra are broadened using the chosen lineshape and Half-Width at Half-Maximum in cm-1

convolute(spc, stick, lineshape=None, HWHM=None)[source]

Convolute stick spectrum with the chosen width and lineshape lineshape : Can be Gaussian or Lorentzian HWHM : expressed in cm-1

trapezoid(spc)[source]

Integrate spectrum using the trapezoid rule

Code

Recipe module source: fcf_dos.py
# from .scmjob import SCMJob, SCMResults
import math

import numpy
from scm.plams.tools.kftools import KFFile

__all__ = ["FCFDOS"]


class FCFDOS:
    """
    A class for calculating the convolution of two FCF spectra
    """

    # Conversion factor to cm-1 == ( me^2 * c * alpha^2 ) / ( 100 * 2pi * amu * hbar )
    G2F = 120.399494933
    # J2Ha = 1.0 / (scipy.constants.m_e * scipy.constants.speed_of_light**2 * 0.0072973525693**2)
    J2Ha = 2.293712278400752e17

    def __init__(self, absrkf, emirkf, absele=0.0, emiele=0.0):
        """
        absrkf : KFFile from an FCF absorption calculation for the acceptor, as name, path, or KFFile object
        emirkf : KFFile from an FCF emission calculation for the donor, as name, path, or KFFile object
        absele : Acceptor absorption electronic energy cm-1
        emiele : Donor emission electronic energy in cm-1
        """
        if isinstance(absrkf, str):
            self.absrkf = KFFile(absrkf)
        else:
            self.absrkf = absrkf
        if isinstance(emirkf, str):
            self.emirkf = KFFile(emirkf)
        else:
            self.emirkf = emirkf
        self.absele = absele
        self.emiele = emiele
        self.spc = None

    def _getstick(self, source):
        spclen = source.read("Fcf", "nspectrum")
        rawspc = source.read("Fcf", "spectrum")
        stick = numpy.reshape(numpy.array(rawspc), (2, spclen)).transpose()
        # Reorder spectrum if decreasing
        if stick[0, 0] > stick[-1, 0]:
            for ix in range(numpy.size(stick, 0) // 2):
                XX = numpy.copy(stick[ix, :])
                stick[ix, :] = stick[-ix - 1, :]
                stick[-ix - 1, :] = XX
        # Get frequencies in cm-1
        frq1 = numpy.array(source.read("Fcf", "gamma1")) * self.G2F
        frq2 = numpy.array(source.read("Fcf", "gamma2")) * self.G2F
        # Calculate energy in Joules
        # factor = scipy.constants.pi * scipy.constants.hbar * scipy.constants.speed_of_light * 100.0
        factor = 9.932229285744643e-24
        viben1 = sum(frq1) * factor
        viben2 = sum(frq2) * factor
        # Convert to Hartree
        viben1 = viben1 * self.J2Ha
        viben2 = viben2 * self.J2Ha
        # Add ZPE and electronic energy
        stick[:, 0] = stick[:, 0] + viben2 - viben1 + self.absele + self.emiele
        return stick

    def _spcinit(self):
        # Get absorption and emission spectra
        absspc = self._getstick(self.absrkf)
        emispc = self._getstick(self.emirkf)
        # Find spectrum bounds
        absmin = numpy.amin(absspc[:, 0])
        absmax = numpy.amax(absspc[:, 0])
        emimin = numpy.amin(emispc[:, 0])
        emimax = numpy.amax(emispc[:, 0])
        spcmin = min(absmin, emimin)
        spcmax = max(absmax, emimax)
        # Find spectrum length and grain
        absdlt = abs(absspc[0, 0] - absspc[1, 0])
        emidlt = abs(absspc[0, 0] - absspc[1, 0])
        spcgrn = min(absdlt, emidlt)
        spclen = math.floor((spcmax - spcmin) / spcgrn) + 1
        # Initialize spectrum
        self.spc = self.newspc(spcmin, spcmax, spclen)
        return None

    def dos(self, lineshape="GAU", HWHM=100.0):
        """
        Calculate density of states by computing the overlap of the two FCF spectra
        The two spectra are broadened using the chosen lineshape and Half-Width at Half-Maximum in cm-1
        """
        # Initialize spectrum
        self._spcinit()
        # Get stick spectra
        absstick = self._getstick(self.absrkf)
        emistick = self._getstick(self.emirkf)
        # Convolute with gaussians
        absspc = numpy.copy(self.spc)
        emispc = numpy.copy(self.spc)
        absspc = self.convolute(absspc, absstick, lineshape, HWHM)
        emispc = self.convolute(emispc, emistick, lineshape, HWHM)
        # Integrate
        # Calculate DOS
        self.spc[:, 1] = absspc[:, 1] * emispc[:, 1]
        dos = self.trapezoid(self.spc)
        return dos

    def newspc(self, spcmin, spcmax, spclen=1000):
        # Dimensions of the spectrum
        delta = (spcmax - spcmin) / (spclen - 1)
        spc = numpy.zeros((spclen, 2), dtype=float)
        for ix in range(spclen):
            spc[ix, 0] = spcmin + delta * ix
        return spc

    def convolute(self, spc, stick, lineshape=None, HWHM=None):
        """
        Convolute stick spectrum with the chosen width and lineshape
        lineshape : Can be Gaussian or Lorentzian
        HWHM      : expressed in cm-1
        """
        if HWHM is None:
            raise ValueError("HWHM not defined")
        if lineshape is None:
            raise ValueError("Lineshape not defined")
        # Data for the convolution
        delta = spc[1, 0] - spc[0, 0]
        if lineshape[0:3].upper() == "GAU":
            # Gaussian lineshape
            idline = 1
            # This includes the Gaussian prefactor and the factor to account for the reduced lineshape width
            # factor = 1. / scipy.special.erf(2*math.sqrt(math.log(2)))
            factor = 1.0188815852036244
            factA = math.sqrt(numpy.log(2.0) / math.pi) * factor / HWHM
            factB = -numpy.log(2.0) / HWHM**2
            # We only convolute between -2HWHM and +2HWHM which accounts for 98.1% of the area
            ishft = math.floor(2 * HWHM / delta)
        elif lineshape[0:3].upper() == "LOR":
            # Lorentzian lineshape
            idline = 2
            # This includes the Lorentzian prefactor and the factor to account for the reduced lineshape width
            factA = (math.pi / math.atan(12) / 2) * HWHM / math.pi
            factB = 0.0
            # We only convolute between -12HWHM and +12HWHM which accounts for 94.7% of the area
            ishft = math.floor(12 * HWHM / delta)
        else:
            raise ValueError("invalid lineshape")
        # Loop over peaks in the stick spectrum
        spclen = numpy.size(spc, 0)
        for ix in range(numpy.size(stick[:, 0])):
            # Find peak position in the convoluted spectrum
            peakpos = 1 + math.floor((stick[ix, 0] - spc[0, 0]) / delta)
            # Convolution interval, limited to save computational time
            i1 = max(peakpos - ishft, 1)
            i2 = min(peakpos + ishft, spclen)
            factor = factA * stick[ix, 1]
            if idline == 1:  # Gaussian
                for i in range(i1, i2):
                    spc[i, 1] = spc[i, 1] + factor * math.exp(factB * (spc[i, 0] - stick[ix, 0]) ** 2)
            elif idline == 2:  # Lorentzian
                for i in range(i1, i2):
                    spc[i, 1] = spc[i, 1] + factor / (HWHM + (spc[i, 0] - stick[ix, 0]) ** 2)
        return spc

    def trapezoid(self, spc):
        """
        Integrate spectrum using the trapezoid rule
        """
        value = (spc[0, 1] + spc[-1, 1]) / 2
        value = value + sum(spc[1:-1, 1])
        # The abscissas must be equally spaced for this to work
        value = value * (spc[1, 0] - spc[0, 0])
        return value

    def __str__(self):
        string = f"Absorption from {self.absrkf.path}\nEmission from {self.emirkf.path}\nAcceptor absorption electronic energy = {self.absele} cm-1\nDonor emission electronic energy = {self.emiele} cm-1"
        return string

ADFVibronicDOS

Practical ADF workflow that uses AH-FC vibronic spectra to build a DOS profile.

Overview

This workflow runs the vibronic calculations and then applies FCFDOS to produce and analyze the vibronic density of states.

API

API reference
class FCFDOS(absrkf, emirkf, absele=0.0, emiele=0.0)[source]

A class for calculating the convolution of two FCF spectra

dos(lineshape='GAU', HWHM=100.0)[source]

Calculate density of states by computing the overlap of the two FCF spectra The two spectra are broadened using the chosen lineshape and Half-Width at Half-Maximum in cm-1

convolute(spc, stick, lineshape=None, HWHM=None)[source]

Convolute stick spectrum with the chosen width and lineshape lineshape : Can be Gaussian or Lorentzian HWHM : expressed in cm-1

trapezoid(spc)[source]

Integrate spectrum using the trapezoid rule

Code

Recipe module source: fcf_dos.py
# from .scmjob import SCMJob, SCMResults
import math

import numpy
from scm.plams.tools.kftools import KFFile

__all__ = ["FCFDOS"]


class FCFDOS:
    """
    A class for calculating the convolution of two FCF spectra
    """

    # Conversion factor to cm-1 == ( me^2 * c * alpha^2 ) / ( 100 * 2pi * amu * hbar )
    G2F = 120.399494933
    # J2Ha = 1.0 / (scipy.constants.m_e * scipy.constants.speed_of_light**2 * 0.0072973525693**2)
    J2Ha = 2.293712278400752e17

    def __init__(self, absrkf, emirkf, absele=0.0, emiele=0.0):
        """
        absrkf : KFFile from an FCF absorption calculation for the acceptor, as name, path, or KFFile object
        emirkf : KFFile from an FCF emission calculation for the donor, as name, path, or KFFile object
        absele : Acceptor absorption electronic energy cm-1
        emiele : Donor emission electronic energy in cm-1
        """
        if isinstance(absrkf, str):
            self.absrkf = KFFile(absrkf)
        else:
            self.absrkf = absrkf
        if isinstance(emirkf, str):
            self.emirkf = KFFile(emirkf)
        else:
            self.emirkf = emirkf
        self.absele = absele
        self.emiele = emiele
        self.spc = None

    def _getstick(self, source):
        spclen = source.read("Fcf", "nspectrum")
        rawspc = source.read("Fcf", "spectrum")
        stick = numpy.reshape(numpy.array(rawspc), (2, spclen)).transpose()
        # Reorder spectrum if decreasing
        if stick[0, 0] > stick[-1, 0]:
            for ix in range(numpy.size(stick, 0) // 2):
                XX = numpy.copy(stick[ix, :])
                stick[ix, :] = stick[-ix - 1, :]
                stick[-ix - 1, :] = XX
        # Get frequencies in cm-1
        frq1 = numpy.array(source.read("Fcf", "gamma1")) * self.G2F
        frq2 = numpy.array(source.read("Fcf", "gamma2")) * self.G2F
        # Calculate energy in Joules
        # factor = scipy.constants.pi * scipy.constants.hbar * scipy.constants.speed_of_light * 100.0
        factor = 9.932229285744643e-24
        viben1 = sum(frq1) * factor
        viben2 = sum(frq2) * factor
        # Convert to Hartree
        viben1 = viben1 * self.J2Ha
        viben2 = viben2 * self.J2Ha
        # Add ZPE and electronic energy
        stick[:, 0] = stick[:, 0] + viben2 - viben1 + self.absele + self.emiele
        return stick

    def _spcinit(self):
        # Get absorption and emission spectra
        absspc = self._getstick(self.absrkf)
        emispc = self._getstick(self.emirkf)
        # Find spectrum bounds
        absmin = numpy.amin(absspc[:, 0])
        absmax = numpy.amax(absspc[:, 0])
        emimin = numpy.amin(emispc[:, 0])
        emimax = numpy.amax(emispc[:, 0])
        spcmin = min(absmin, emimin)
        spcmax = max(absmax, emimax)
        # Find spectrum length and grain
        absdlt = abs(absspc[0, 0] - absspc[1, 0])
        emidlt = abs(absspc[0, 0] - absspc[1, 0])
        spcgrn = min(absdlt, emidlt)
        spclen = math.floor((spcmax - spcmin) / spcgrn) + 1
        # Initialize spectrum
        self.spc = self.newspc(spcmin, spcmax, spclen)
        return None

    def dos(self, lineshape="GAU", HWHM=100.0):
        """
        Calculate density of states by computing the overlap of the two FCF spectra
        The two spectra are broadened using the chosen lineshape and Half-Width at Half-Maximum in cm-1
        """
        # Initialize spectrum
        self._spcinit()
        # Get stick spectra
        absstick = self._getstick(self.absrkf)
        emistick = self._getstick(self.emirkf)
        # Convolute with gaussians
        absspc = numpy.copy(self.spc)
        emispc = numpy.copy(self.spc)
        absspc = self.convolute(absspc, absstick, lineshape, HWHM)
        emispc = self.convolute(emispc, emistick, lineshape, HWHM)
        # Integrate
        # Calculate DOS
        self.spc[:, 1] = absspc[:, 1] * emispc[:, 1]
        dos = self.trapezoid(self.spc)
        return dos

    def newspc(self, spcmin, spcmax, spclen=1000):
        # Dimensions of the spectrum
        delta = (spcmax - spcmin) / (spclen - 1)
        spc = numpy.zeros((spclen, 2), dtype=float)
        for ix in range(spclen):
            spc[ix, 0] = spcmin + delta * ix
        return spc

    def convolute(self, spc, stick, lineshape=None, HWHM=None):
        """
        Convolute stick spectrum with the chosen width and lineshape
        lineshape : Can be Gaussian or Lorentzian
        HWHM      : expressed in cm-1
        """
        if HWHM is None:
            raise ValueError("HWHM not defined")
        if lineshape is None:
            raise ValueError("Lineshape not defined")
        # Data for the convolution
        delta = spc[1, 0] - spc[0, 0]
        if lineshape[0:3].upper() == "GAU":
            # Gaussian lineshape
            idline = 1
            # This includes the Gaussian prefactor and the factor to account for the reduced lineshape width
            # factor = 1. / scipy.special.erf(2*math.sqrt(math.log(2)))
            factor = 1.0188815852036244
            factA = math.sqrt(numpy.log(2.0) / math.pi) * factor / HWHM
            factB = -numpy.log(2.0) / HWHM**2
            # We only convolute between -2HWHM and +2HWHM which accounts for 98.1% of the area
            ishft = math.floor(2 * HWHM / delta)
        elif lineshape[0:3].upper() == "LOR":
            # Lorentzian lineshape
            idline = 2
            # This includes the Lorentzian prefactor and the factor to account for the reduced lineshape width
            factA = (math.pi / math.atan(12) / 2) * HWHM / math.pi
            factB = 0.0
            # We only convolute between -12HWHM and +12HWHM which accounts for 94.7% of the area
            ishft = math.floor(12 * HWHM / delta)
        else:
            raise ValueError("invalid lineshape")
        # Loop over peaks in the stick spectrum
        spclen = numpy.size(spc, 0)
        for ix in range(numpy.size(stick[:, 0])):
            # Find peak position in the convoluted spectrum
            peakpos = 1 + math.floor((stick[ix, 0] - spc[0, 0]) / delta)
            # Convolution interval, limited to save computational time
            i1 = max(peakpos - ishft, 1)
            i2 = min(peakpos + ishft, spclen)
            factor = factA * stick[ix, 1]
            if idline == 1:  # Gaussian
                for i in range(i1, i2):
                    spc[i, 1] = spc[i, 1] + factor * math.exp(factB * (spc[i, 0] - stick[ix, 0]) ** 2)
            elif idline == 2:  # Lorentzian
                for i in range(i1, i2):
                    spc[i, 1] = spc[i, 1] + factor / (HWHM + (spc[i, 0] - stick[ix, 0]) ** 2)
        return spc

    def trapezoid(self, spc):
        """
        Integrate spectrum using the trapezoid rule
        """
        value = (spc[0, 1] + spc[-1, 1]) / 2
        value = value + sum(spc[1:-1, 1])
        # The abscissas must be equally spaced for this to work
        value = value * (spc[1, 0] - spc[0, 0])
        return value

    def __str__(self):
        string = f"Absorption from {self.absrkf.path}\nEmission from {self.emirkf.path}\nAcceptor absorption electronic energy = {self.absele} cm-1\nDonor emission electronic energy = {self.emiele} cm-1"
        return string