Recipes¶
These recipe workflows are shipped with PLAMS and can be imported directly. Each recipe includes details, API documentation, and source code.
COSMO-RS Compound¶
Generate .coskf files equivalent to AMS GUI task “COSMO-RS Compound”.
Overview¶
Use this recipe when you want an easy Python interface for generating COSMO-RS compound files
(.coskf) from one or many input structures.
API¶
API reference
- class ADFCOSMORSCompoundJob(molecule, coskf_name=None, coskf_dir=None, preoptimization=None, singlepoint=False, settings=None, mol_info=None, hbc_from_MESP=False, **kwargs)[source]
A class for performing the equivalent of Task: COSMO-RS Compound in the AMS GUI
- Parameters:
- Keyword Arguments:
coskf_name – A name for the generated .coskf file. If nothing is specified, the name of the job will be used.
coskf_dir – The directory in which to place the generated .coskf file. If nothing is specified, the file will be put in the plams directory corresponding to the job.
preoptimization – If None, do not preoptimize with a fast engine priori to the optimization with ADF. Otherwise, it can be one of ‘UFF’, ‘GAFF’, ‘GFNFF’, ‘GFN1-xTB’, ‘ANI-2x’, ‘M3GNet-UP-2022’. Note that you need valid licenses for ForceField or DFTB or MLPotential to use these preoptimizers.
singlepoint (bool) – Run a singlepoint in gasphase and with solvation to generate the .coskf file on the given Molecule. (no geometry optimization). Cannot be combined with
preoptimization.settings (Settings) – A
Settingsobject. settings.runscript.nproc, settings.input.adf.custom_options. If ‘adf’ is in settings.input it should be provided without the solvation block.mol_info (dict) – an optional dictionary containing information will be written to the Compound Data section within the COSKF file.
hbc_from_MESP (bool) – Defaults to False. Performs DENSF analysis to determine the hydrogen bond center (HBC) used in COSMOSAC-DHB-MESP.
name – an optional name for the calculation directory
Example
mol = from_smiles('O') job = ADFCOSMORSCompoundJob( molecule = mol, preoptimization = 'UFF', coskf_dir = 'coskfs', coskf_name = 'Water', name = 'H2O', mol_info = {'CAS':'7732-18-5'} ) job.run() print(job.results.coskfpath())
- _result_type
alias of
ADFCOSMORSCompoundResults
- static _get_radii()[source]
Method to get the atomic radii from solvent.txt (for some elements the radii are instead the Klamt radii)
- static adf_settings(solvation, settings=None, elements=None, atomic_ion=False)[source]
Returns ADF settings with or without solvation
If solvation == True, then also include the solvation block.
- static convert_to_coskf(rkf_path, coskf_name, plams_dir, coskf_dir=None, mol_info=None, densf_path=None)[source]
Convert an adf.rkf file into a .coskf file
- Parameters:
rkf_path (str) – absolute path to adf.rkf
coskf_name (str) – the name of the .coskf file
plams_dir (str) – plamsjob path to write out the .coskf file
coskf_dir (Optional[str]) – additional path to store the .coskf file
mol_info (Optional[Dict[str, Union[float, int, str]]]) – Optional information to write out in the “Compound Data” section of the .coskf file
densf_path (Optional[str]) – path to the densf output .t41 file
- Return type:
None
Code¶
Recipe module source: adfcosmorscompound.py
import os, shutil
from collections import OrderedDict
from typing import List, Optional, Union, Dict, Literal, Tuple
from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.interfaces.adfsuite.crs import CRSJob
from scm.plams.interfaces.adfsuite.densf import DensfJob
from scm.plams.tools.periodic_table import PeriodicTable
from scm.plams.mol.molecule import Molecule
from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.core.functions import add_to_instance, requires_optional_package
from scm.plams.interfaces.adfsuite.quickjobs import model_to_settings
from scm.plams.tools.hbc_utilities import parse_mesp, write_HBC_to_COSKF, view_HBC
import numpy as np
__all__ = ["ADFCOSMORSCompoundJob", "ADFCOSMORSCompoundResults"]
class ADFCOSMORSCompoundResults(Results):
"""Results class for ADFCOSMORSCompoundJob"""
def coskfpath(self):
"""
Returns the path to the resulting .coskf
"""
return os.path.join(self.job.path, self.job.coskf_name)
def get_main_molecule(self):
"""
Returns the optimized molecule
"""
return self.job.children["solv"].results.get_main_molecule()
def get_input_molecule(self):
"""
Returns the input molecule
"""
for job in self.job.children.values():
return job.results.get_input_molecule()
def get_sigma_profile(self, subsection: str = "profil"):
"""
Returns the sigma profile of the molecule. For more details see `CRSResults.get_sigma_profile`.
"""
return self.job.children["crs"].results.get_sigma_profile(subsection=subsection)
class ADFCOSMORSCompoundJob(MultiJob):
"""
A class for performing the equivalent of Task: COSMO-RS Compound in the AMS GUI
Args:
molecule : a PLAMS |Molecule|
Keyword Args:
coskf_name : A name for the generated .coskf file. If nothing is specified, the name of the job will be used.
coskf_dir : The directory in which to place the generated .coskf file. If nothing is specified, the file will be put in the plams directory corresponding to the job.
preoptimization : If None, do not preoptimize with a fast engine priori to the optimization with ADF. Otherwise, it can be one of 'UFF', 'GAFF', 'GFNFF', 'GFN1-xTB', 'ANI-2x', 'M3GNet-UP-2022'. Note that you need valid licenses for ForceField or DFTB or MLPotential to use these preoptimizers.
singlepoint (bool) : Run a singlepoint in gasphase and with solvation to generate the .coskf file on the given Molecule. (no geometry optimization). Cannot be combined with ``preoptimization``.
settings (Settings) : A |Settings| object. settings.runscript.nproc, settings.input.adf.custom_options. If 'adf' is in settings.input it should be provided without the solvation block.
mol_info (dict) : an optional dictionary containing information will be written to the Compound Data section within the COSKF file.
hbc_from_MESP (bool) : Defaults to False. Performs DENSF analysis to determine the hydrogen bond center (HBC) used in COSMOSAC-DHB-MESP.
name : an optional name for the calculation directory
Example:
.. code-block:: python
mol = from_smiles('O')
job = ADFCOSMORSCompoundJob(
molecule = mol,
preoptimization = 'UFF',
coskf_dir = 'coskfs',
coskf_name = 'Water',
name = 'H2O',
mol_info = {'CAS':'7732-18-5'}
)
job.run()
print(job.results.coskfpath())
"""
_result_type = ADFCOSMORSCompoundResults
def __init__(
self,
molecule: Union[Molecule, None],
coskf_name: Optional[str] = None,
coskf_dir: Optional[str] = None,
preoptimization: Optional[str] = None,
singlepoint: bool = False,
settings: Optional[Settings] = None,
mol_info: Optional[Dict[str, Union[float, int, str]]] = None,
hbc_from_MESP: bool = False,
**kwargs,
):
"""
Class for running the equivalent of "COSMO-RS Compound" in the AMS
GUI. Note that these are ADF calculations, not COSMO-RS
calculations!
Initialize two or three jobs:
1. (Optional): Preoptimization with force field or semi-empirical method ('UFF', 'GAFF', 'GFNFF', 'GFN1-xTB', 'ANI-2x' or 'M3GNet-UP-2022')
Note: A valid license for ForceField or DFTB or MLPotential is required.
2. Gasphase optimization or single-point calculation (BP86, TZP, BeckeGrid Quality Good)
3. Take optimized structure and run singlepoint with implicit solvation
Access the result .coskf file with ``job.results.coskfpath()``.
Note: this file will be called jobname.coskf, where jobname is the
name of the ADFCOSMORSCompoundJob.
"""
if preoptimization and singlepoint:
raise ValueError("Cannot combine preoptimization with singlepoint")
MultiJob.__init__(self, children=OrderedDict(), **kwargs)
self.input_molecule = molecule
self.mol_info = dict()
if mol_info is not None:
self.mol_info.update(mol_info)
self.settings = settings or Settings()
self.coskf_name = coskf_name
self.coskf_dir = coskf_dir
self.hbc_from_MESP = hbc_from_MESP
if self.coskf_dir is not None and not os.path.exists(self.coskf_dir):
os.mkdir(self.coskf_dir)
if self.coskf_name is None:
self.coskf_name = f"{self.name}.coskf"
elif isinstance(self.coskf_name, str) and not self.coskf_name.endswith(".coskf"):
self.coskf_name += ".coskf"
gas_s = Settings()
gas_s += ADFCOSMORSCompoundJob.adf_settings(solvation=False, settings=self.settings)
gas_job = AMSJob(settings=gas_s, name="gas")
if not singlepoint:
gas_job.settings.input.ams.Task = "GeometryOptimization"
if preoptimization:
preoptimization_s = Settings()
preoptimization_s.runscript.nproc = 1
preoptimization_s.input.ams.Task = "GeometryOptimization"
preoptimization_s += model_to_settings(preoptimization)
preoptimization_job = AMSJob(
settings=preoptimization_s, name="preoptimization", molecule=self.input_molecule
)
self.children["preoptimization"] = preoptimization_job
elif singlepoint:
gas_job.settings.input.ams.Task = "SinglePoint"
@add_to_instance(gas_job)
def prerun(self): # noqa: F811
if not singlepoint and preoptimization:
self.molecule = self.parent.children["preoptimization"].results.get_main_molecule()
else:
self.molecule = self.parent.input_molecule
self.parent.mol_info, self.parent.atomic_ion = ADFCOSMORSCompoundJob.get_compound_properties(
self.molecule, self.parent.mol_info
)
self.children["gas"] = gas_job
if self.hbc_from_MESP:
densf_job = DensfJob(settings=ADFCOSMORSCompoundJob.densf_settings(), name="densf")
self.children["densf"] = densf_job
@add_to_instance(densf_job)
def prerun(self): # noqa: F811
gas_job.results.wait()
self.inputjob = f"../gas/adf.rkf #{self.parent.name}"
solv_s = Settings()
solv_s.input.ams.Task = "SinglePoint"
solv_job = AMSJob(settings=solv_s, name="solv")
@add_to_instance(solv_job)
def prerun(self): # noqa: F811
gas_job.results.wait()
self.settings.input.ams.EngineRestart = "../gas/adf.rkf"
self.settings.input.ams.LoadSystem.File = "../gas/ams.rkf"
molecule_charge = gas_job.results.get_main_molecule().properties.get("charge", 0)
self.settings.input.ams.LoadSystem._1 = f"# {self.parent.name}"
self.settings.input.ams.LoadSystem._2 = f"# charge {molecule_charge}"
self.settings += ADFCOSMORSCompoundJob.adf_settings(
solvation=True,
settings=self.parent.settings,
elements=list(set(at.symbol for at in self.parent.input_molecule)),
atomic_ion=self.parent.atomic_ion,
)
@add_to_instance(solv_job)
def postrun(self):
if self.parent.hbc_from_MESP:
densf_job.results.wait()
densf_path = densf_job.results.kfpath()
else:
densf_path = None
ADFCOSMORSCompoundJob.convert_to_coskf(
rkf_path=self.results.rkfpath(file="adf"),
coskf_name=self.parent.coskf_name,
plams_dir=self.parent.path,
coskf_dir=self.parent.coskf_dir,
mol_info=self.parent.mol_info,
densf_path=densf_path,
)
self.children["solv"] = solv_job
sigma_s = Settings()
sigma_s.input.property._h = "PURESIGMAPROFILE"
compounds = [Settings()]
sigma_s.input.compound = compounds
crsjob = CRSJob(settings=sigma_s, name="sigma")
@add_to_instance(crsjob)
def prerun(self): # noqa F811
self.parent.children["solv"].results.wait()
self.settings.input.compound[0]._h = os.path.join(self.parent.path, self.parent.coskf_name)
self.children["crs"] = crsjob
@staticmethod
def get_compound_properties(
mol: Molecule, mol_info: Optional[Dict[str, Union[float, int, str]]] = None
) -> Tuple[Dict[str, Union[float, int, str]], bool]:
if mol_info is None:
mol_info = dict()
mol_info["Molar Mass"] = mol.get_mass()
mol_info["Formula"] = mol.get_formula()
try:
rings = mol.locate_rings()
flatten_atoms = [atom for subring in rings for atom in subring]
nring = len(set(flatten_atoms))
mol_info["Nring"] = int(nring)
except:
pass
atomic_ion = len(mol.atoms) == 1
return mol_info, atomic_ion
@staticmethod
def _get_radii() -> Dict[str, float]:
"""Method to get the atomic radii from solvent.txt (for some elements the radii are instead the Klamt radii)"""
with open(os.path.expandvars("$AMSHOME/data/gui/solvent.txt"), "r") as f:
mod_allinger_radii = [float(x) for i, x in enumerate(f) if i > 0]
radii = {PeriodicTable.get_symbol(i): r for i, r in enumerate(mod_allinger_radii, 1) if i <= 118}
klamt_radii = {
"H": 1.30,
"C": 2.00,
"N": 1.83,
"O": 1.72,
"F": 1.72,
"Si": 2.48,
"P": 2.13,
"S": 2.16,
"Cl": 2.05,
"Br": 2.16,
"I": 2.32,
}
radii.update(klamt_radii)
return radii
@staticmethod
def solvation_settings(elements: Optional[List[str]] = None, atomic_ion: bool = False) -> Settings:
sett = Settings()
radii = {
"H": 1.3,
"He": 1.275,
"Li": 2.125,
"Be": 1.858,
"B": 1.792,
"C": 2.0,
"N": 1.83,
"O": 1.72,
"F": 1.72,
"Ne": 1.333,
"Na": 2.25,
"Mg": 2.025,
"Al": 1.967,
"Si": 2.48,
"P": 2.13,
"S": 2.16,
"Cl": 2.05,
"Ar": 1.658,
"K": 2.575,
"Ca": 2.342,
"Sc": 2.175,
"Ti": 1.992,
"V": 1.908,
"Cr": 1.875,
"Mn": 1.867,
"Fe": 1.858,
"Co": 1.858,
"Ni": 1.85,
"Cu": 1.883,
"Zn": 1.908,
"Ga": 2.05,
"Ge": 2.033,
"As": 1.967,
"Se": 1.908,
"Br": 2.16,
"Kr": 1.792,
"Rb": 2.708,
"Sr": 2.5,
"Y": 2.258,
"Zr": 2.117,
"Nb": 2.025,
"Mo": 1.992,
"Tc": 1.967,
"Ru": 1.95,
"Rh": 1.95,
"Pd": 1.975,
"Ag": 2.025,
"Cd": 2.083,
"In": 2.2,
"Sn": 2.158,
"Sb": 2.1,
"Te": 2.033,
"I": 2.32,
"Xe": 1.9,
"Cs": 2.867,
"Ba": 2.558,
"La": 2.317,
"Ce": 2.283,
"Pr": 2.275,
"Nd": 2.275,
"Pm": 2.267,
"Sm": 2.258,
"Eu": 2.45,
"Gd": 2.258,
"Tb": 2.25,
"Dy": 2.242,
"Ho": 2.225,
"Er": 2.225,
"Tm": 2.225,
"Yb": 2.325,
"Lu": 2.208,
"Hf": 2.108,
"Ta": 2.025,
"W": 1.992,
"Re": 1.975,
"Os": 1.958,
"Ir": 1.967,
"Pt": 1.992,
"Au": 2.025,
"Hg": 2.108,
"Tl": 2.158,
"Pb": 2.283,
"Bi": 2.217,
"Po": 2.158,
"At": 2.092,
"Rn": 2.025,
"Fr": 3.033,
"Ra": 2.725,
"Ac": 2.567,
"Th": 2.283,
"Pa": 2.2,
"U": 2.1,
"Np": 2.1,
"Pu": 2.1,
"Am": 2.1,
"Cm": 2.1,
"Bk": 2.1,
"Cf": 2.1,
"Es": 2.1,
"Fm": 2.1,
"Md": 2.1,
"No": 2.1,
"Lr": 2.1,
"Rf": 2.1,
"Db": 2.1,
"Sg": 2.1,
"Bh": 2.1,
"Hs": 2.1,
"Mt": 2.1,
"Ds": 2.1,
"Rg": 2.1,
"Cn": 2.1,
"Nh": 2.1,
"Fl": 2.1,
"Mc": 2.1,
"Lv": 2.1,
"Ts": 2.1,
"Og": 2.1,
} # from _get_radii()
if elements:
radii = {k: radii[k] for k in sorted(elements)}
if atomic_ion:
charge_method = "method=atom corr"
else:
charge_method = "method=Conj corr"
sett.input.adf.solvation = {
"surf": "Delley",
"solv": "name=CRS cav0=0.0 cav1=0.0",
"charged": charge_method,
"c-mat": "Exact",
"scf": "Var All",
"radii": radii,
}
return sett
@staticmethod
def adf_settings(
solvation: bool, settings=None, elements: Optional[List[str]] = None, atomic_ion: bool = False
) -> Settings:
"""
Returns ADF settings with or without solvation
If solvation == True, then also include the solvation block.
"""
s = Settings()
if settings:
s = settings.copy()
if "basis" not in s.input.adf and "xc" not in s.input.adf:
s.input.adf.Basis.Type = "TZP"
s.input.adf.Basis.Core = "Small"
s.input.adf.XC.GGA = "BP86"
s.input.adf.Symmetry = "NOSYM"
s.input.adf.BeckeGrid.Quality = "Good"
if solvation:
s += ADFCOSMORSCompoundJob.solvation_settings(elements=elements, atomic_ion=atomic_ion)
return s
@staticmethod
def densf_settings(grid: Literal["Medium", "Fine"] = "Medium") -> Settings:
s = Settings()
s.input.GRID = f"{grid}\nEnd"
s.input.Density = "SCF"
s.input.Potential = "COUL SCF"
return s
@staticmethod
@requires_optional_package("scm.base")
def convert_to_coskf(
rkf_path: str,
coskf_name: str,
plams_dir: str,
coskf_dir: Optional[str] = None,
mol_info: Optional[Dict[str, Union[float, int, str]]] = None,
densf_path: Optional[str] = None,
) -> None:
"""
Convert an adf.rkf file into a .coskf file
Args:
rkf_path (str) : absolute path to adf.rkf
coskf_name (str) : the name of the .coskf file
plams_dir (str) : plamsjob path to write out the .coskf file
coskf_dir (Optional[str]) :additional path to store the .coskf file
mol_info (Optional[Dict[str, Union[float, int, str]]]) : Optional information to write out in the "Compound Data" section of the .coskf file
densf_path (Optional[str]) : path to the densf output .t41 file
"""
from scm.base import KFFile
with KFFile(rkf_path) as rkf:
cosmo = rkf.read_section("COSMO")
coskf_path = os.path.join(plams_dir, coskf_name)
with KFFile(coskf_path, autosave=False) as rkf:
for key, value in cosmo.items():
rkf.write("COSMO", key, float(value) if isinstance(value, np.float64) else value)
for key, value in mol_info.items():
rkf.write("Compound Data", key, float(value) if isinstance(value, np.float64) else value)
if densf_path is not None:
HBC_xyz, HBC_atom, HBC_angle, HBC_info = parse_mesp(densf_path, coskf_path)
write_HBC_to_COSKF(coskf_path, HBC_xyz, HBC_atom, HBC_angle, HBC_info)
if coskf_dir is not None:
shutil.copy2(coskf_path, os.path.join(coskf_dir, coskf_name))
@staticmethod
def update_hbc_to_coskf(coskf: str, visulization: bool = False) -> None:
"""
Determine the hydrogen bond center for existing COSKF file
Args:
coskf (str) : Existing COSKF file
visulization (bool) : Visulization of hydrogen bond center
"""
molecule = Molecule(coskf)
coskf_name = os.path.basename(coskf).replace(".coskf", "")
atomic_ion = len(molecule.atoms) == 1
gas_settings = ADFCOSMORSCompoundJob.adf_settings(solvation=False, atomic_ion=atomic_ion)
gas_settings.input.ams.Task = "SinglePoint"
gas_job = AMSJob(molecule=molecule, settings=gas_settings, name=f"gas_{coskf_name}")
gas_job.run()
gas_rkf = gas_job.results.rkfpath(file="adf")
densf_settings = ADFCOSMORSCompoundJob.densf_settings()
densf_job = DensfJob(gas_rkf, settings=densf_settings, name=f"densf_{coskf_name}")
densf_job.run()
t41 = densf_job.results.kfpath()
HBC_xyz, HBC_atom, HBC_angle, HBC_info = parse_mesp(t41, coskf)
write_HBC_to_COSKF(coskf, HBC_xyz, HBC_atom, HBC_angle, HBC_info)
if visulization:
view_HBC(coskf)
COSMO-RS Conformers¶
Build customizable conformer workflows that end in ADF/COSMO calculations for COSMO-RS.
Overview¶
This recipe builds a configurable conformer-generation pipeline and produces .coskf files for
the selected conformers.
API¶
API reference
- class ADFCOSMORSConfJob(molecule, conf_gen=None, first_filter=None, additional=None, final_filter=None, adf_singlepoint=False, initial_conformers=500, coskf_dir=None, coskf_name=None, mol_info=None, hbc_from_MESP=False, **kwargs)[source]
Multi-step conformer workflow for generating COSMO-RS .coskf files:
job_0: initial conformer generation (default RDKit generator or user-provided conf_gen) |-- apply first_filter (if provided) v job_i: optional refinement stages from additional (for each (Settings, Filter) tuple, e.g. DFTB) |-- apply its filter after each stage v adf_gas_job: gas-phase ADF conformer job |-- if adf_singlepoint=True: single-point only v final_filter_job: apply final_filter (if provided) v Branch on hbc_from_MESP: |-- False: replay_job -> COSMO results (.coskf) `-- True: per conformer job_cosmo + job_densf `-> COSMO + hydrogen bond center (.coskf)
- Parameters:
molecule (Molecule) – Input PLAMS molecule.
conf_gen (ConformersJob | None) – User-provided job for initial conformer generation.
initial_conformers (int) – Number of conformers generated by the default conf_gen (RDKit). Defaults to 500.
first_filter (ADFCOSMORSConfFilter | None) – Filter applied after initial conformer generation.
additional (List[Tuple[Settings, ADFCOSMORSConfFilter | None]] | None) – Optional extra conformer-refinement stages and their filters (e.g., with a fast engine such as DFTB).
final_filter (ADFCOSMORSConfFilter | None) – Filter applied after gas-phase ADF conformer generation.
adf_singlepoint (bool) – If True, run gas-phase ADF conformer jobs without optimization. Defaults to False.
hbc_from_MESP (bool) – If True, determine the hydrogen-bond center (hbc) via DENSF analysis (COSMOSAC-DHB-MESP). Defaults to False.
coskf_dir (str | None) – Output directory for generated .coskf files.
coskf_name (str | None) – Base filename for conformers (e.g., {coskf_name}_{i}.coskf). Default to conformer.
mol_info (Dict[str, float | int | str] | None) – Extra metadata written to “Compound Data” (e.g. IUPAC, CAS, SMILES and Other Name).
name – Optional PLAMS job name. Default to plams.
- _result_type
alias of
ADFCOSMORSConfResults
- new_children()[source]
Generate new children jobs.
This method is useful when some of children jobs are not known beforehand and need to be generated based on other children jobs, like for example in any kind of self-consistent procedure.
The goal of this method is to produce a new portion of children jobs. Newly created jobs should be returned in a container compatible with
self.children(e.g. list for list, dict for dict). No adjustment of newly created jobs’parentattribute is needed. This method cannot modify_active_childrenattribute.The method defined here is a default template, returning
None, which means no new children jobs are generated and the entire execution of the parent job consists only of running jobs initially found inself.children. To modify this behavior you can override this method in aMultiJobsubclass or you can use one of Binding decorators, just like with Prerun and postrun methods.
- prerun()[source]
Actions to take before the actual job execution.
This method is initially empty, it can be defined in subclasses or directly added to either the whole class or a single instance using Binding decorators.
- postrun()[source]
Actions to take just after the actual job execution.
This method is initially empty, it can be defined in subclasses or directly added to either the whole class or a single instance using Binding decorators.
- class ADFCOSMORSConfFilter(max_num_confs=None, max_energy_range=None)[source]
This class allows the user to specify criteria on which to filter sets of conformers.
- Parameters:
max_num_confs (int | None) – The maximum number of conformers to bring forward to the next step. These are automatically ordered by energy, so the first n lowest energy structures are carried carried forward to the next step.
max_energy_range (float | None) – The maximum allowable difference (in kcal/mol) between the lowest energy conformer and the highest energy conformer that will be carried forward. For example, for a max_energy_range of 2 kcal/mol and a lowest energy structure of 1 kcal/mol, any structure with an energy above 3 (1+2) kcal/mol will be filtered out.
Code¶
Recipe module source: adfcosmorsconformers.py
from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.interfaces.adfsuite.densf import DensfJob
from scm.plams.mol.molecule import Molecule
from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.core.functions import log
from scm.plams.recipes.adfcosmorscompound import ADFCOSMORSCompoundJob
from scm.conformers import ConformersJob
from scm.base import KFFile, Units
from typing import List, Optional, Union, Dict, Tuple
from collections import OrderedDict
import os, re
from pathlib import Path
import numpy as np
class ADFCOSMORSConfFilter:
"""
This class allows the user to specify criteria on which to filter sets of conformers.
Args:
max_num_confs : The maximum number of conformers to bring forward to the next step. These are automatically ordered by energy, so the first *n* lowest energy structures are carried carried forward to the next step.
max_energy_range : The maximum allowable difference (in kcal/mol) between the lowest energy conformer and the highest energy conformer that will be carried forward. For example, for a max_energy_range of 2 kcal/mol and a lowest energy structure of 1 kcal/mol, any structure with an energy above 3 (1+2) kcal/mol will be filtered out.
"""
def __init__(self, max_num_confs: Optional[int] = None, max_energy_range: Optional[float] = None):
self.max_num_confs = max_num_confs
self.max_energy_range = max_energy_range
class ADFCOSMORSConfResults(Results):
pass
class ADFCOSMORSConfJob(MultiJob):
"""
Multi-step conformer workflow for generating COSMO-RS `.coskf` files:
.. code-block:: text
job_0: initial conformer generation
(default RDKit generator or user-provided conf_gen)
|-- apply first_filter (if provided)
v
job_i: optional refinement stages from additional
(for each (Settings, Filter) tuple, e.g. DFTB)
|-- apply its filter after each stage
v
adf_gas_job: gas-phase ADF conformer job
|-- if adf_singlepoint=True: single-point only
v
final_filter_job: apply final_filter (if provided)
v
Branch on hbc_from_MESP:
|-- False: replay_job -> COSMO results (.coskf)
`-- True: per conformer job_cosmo + job_densf
`-> COSMO + hydrogen bond center (.coskf)
Args:
molecule: Input PLAMS molecule.
conf_gen: User-provided job for initial conformer generation.
initial_conformers: Number of conformers generated by the default `conf_gen` (RDKit). Defaults to `500`.
first_filter: Filter applied after initial conformer generation.
additional: Optional extra conformer-refinement stages and their filters (e.g., with a fast engine such as DFTB).
final_filter: Filter applied after gas-phase ADF conformer generation.
adf_singlepoint: If `True`, run gas-phase ADF conformer jobs without optimization. Defaults to `False`.
hbc_from_MESP: If `True`, determine the hydrogen-bond center (hbc) via DENSF analysis (COSMOSAC-DHB-MESP). Defaults to `False`.
coskf_dir: Output directory for generated `.coskf` files.
coskf_name: Base filename for conformers (e.g., `{coskf_name}_{i}.coskf`). Default to `conformer`.
mol_info: Extra metadata written to "Compound Data" (e.g. `IUPAC`, `CAS`, `SMILES` and `Other Name`).
name: Optional PLAMS job name. Default to `plams`.
"""
_result_type = ADFCOSMORSConfResults
def __init__(
self,
molecule: Molecule,
conf_gen: Optional[ConformersJob] = None,
first_filter: Optional[ADFCOSMORSConfFilter] = None,
additional: Optional[List[Tuple[Settings, Optional[ADFCOSMORSConfFilter]]]] = None,
final_filter: Optional[ADFCOSMORSConfFilter] = None,
adf_singlepoint: bool = False,
initial_conformers: int = 500,
coskf_dir: Optional[str] = None,
coskf_name: Optional[str] = None,
mol_info: Optional[Dict[str, Union[float, int, str]]] = None,
hbc_from_MESP: bool = False,
**kwargs,
):
MultiJob.__init__(self, children=OrderedDict(), **kwargs)
self.mol = molecule
self.mol_info, self.atomic_ion = ADFCOSMORSCompoundJob.get_compound_properties(molecule, mol_info)
self.initial_conformers = initial_conformers
self.adf_singlepoint = adf_singlepoint
self.coskf_dir = coskf_dir
self.coskf_name = coskf_name if coskf_name is not None else "conformer"
if self.coskf_dir is not None and not os.path.exists(self.coskf_dir):
os.makedirs(self.coskf_dir, exist_ok=True)
self._has_valid_conf_gen = conf_gen is None or isinstance(conf_gen, ConformersJob)
if conf_gen is None or self._has_valid_conf_gen:
self.job_0 = conf_gen if conf_gen is not None else self._make_default_confgen_job()
else:
self.job_0 = self._make_default_confgen_job()
self.job_settings = [self.job_0.settings]
self.filters = [None, first_filter]
if additional is not None:
for js, f in additional:
self.job_settings.append(js)
self.filters.append(f)
self.final_filter = final_filter
self.filters.append(final_filter)
self.hbc_from_MESP = hbc_from_MESP
self.children["job_0"] = self.job_0
@staticmethod
def _get_compound_properties(
mol: Molecule, mol_info: Optional[Dict[str, Union[float, int, str]]] = None
) -> Tuple[Dict[str, Union[float, int, str]], bool]:
if mol_info is None:
mol_info = dict()
mol_info["Molar Mass"] = mol.get_mass()
mol_info["Formula"] = mol.get_formula()
try:
rings = mol.locate_rings()
flatten_atoms = [atom for subring in rings for atom in subring]
nring = len(set(flatten_atoms))
mol_info["Nring"] = int(nring)
except Exception as exc:
log(f"[ADFCOSMORSConfJob] Nring can not be located correctly: {exc}")
atomic_ion = len(mol.atoms) == 1
return mol_info, atomic_ion
def new_children(self):
# job_1 ... job_N: optional intermediate conformer refinement jobs
for job_id in range(1, len(self.job_settings)):
key = f"job_{job_id}"
if key not in self.children:
return {key: self._make_intermediate_job(job_id)}
# ADF gas-phase conformer job
adf_job_id = len(self.job_settings)
if "adf_gas_job" not in self.children:
return {"adf_gas_job": self._make_adf_job(adf_job_id)}
# Optional final filtering step on ADF conformers
if self.final_filter is not None and "final_filter_job" not in self.children:
return {"final_filter_job": self._make_filter_job(adf_job_id + 1)}
# In the MESP branch, jobs_cosmo/jobs_densf are created in postrun():
# the final number of conformers is only known after all filtering is complete.
# In the non-MESP branch, COSMO results for all conformers are generated via one replay job.
if not self.hbc_from_MESP and "replay_job" not in self.children:
replay_id = adf_job_id + (2 if self.final_filter is not None else 1)
return {"replay_job": self._make_replay_job(replay_id)}
return None
def prerun(self):
if not self._has_valid_conf_gen:
msg = (
"[ADFCOSMORSConfJob] Wrong type for argument conf_gen. Expected ConformersJob instance. "
"Using the default conformer generator."
)
log(msg)
if not self._has_valid_filter_settings():
msg = "[ADFCOSMORSConfJob] Wrong type for filter or settings"
log(msg)
raise TypeError(msg)
def postrun(self):
self.adf_gas_job = self.children["adf_gas_job"]
self.final_adf_job = self._get_final_adf_job()
self._iterate_conformers()
def _get_final_adf_job(self):
if self.final_filter is not None and "final_filter_job" in self.children:
return self.children["final_filter_job"]
return self.children["adf_gas_job"]
def _make_default_confgen_job(self):
sett = Settings()
sett.input.AMS.Generator.RDKit.InitialNConformers = self.initial_conformers
return ConformersJob(name="job0_conformers_uff", molecule=self.mol, settings=sett)
def _make_intermediate_job(self, job_id):
settings = self.job_settings[job_id].copy()
settings.input.AMS.InputConformersSet = self.children[f"job_{job_id - 1}"].results
self._add_filter(settings, job_id)
return ConformersJob(name=f"job{job_id}_additional", settings=settings)
def _make_adf_job(self, job_id):
sett = ADFCOSMORSCompoundJob.adf_settings(False, atomic_ion=self.atomic_ion)
if not self.adf_singlepoint:
sett.input.AMS.Task = "Optimize"
sett.input.AMS.GeometryOptimization.UseAMSWorker = "False"
# sett.input.ams.GeometryOptimization.ConvergenceQuality = 'Good'
else:
sett.input.AMS.Task = "Score"
sett.input.AMS.InputConformersSet = self.children[f"job_{job_id - 1}"].results
if self.hbc_from_MESP:
sett.input.AMS.GeometryOptimization.Keep = "All"
sett.input.AMS.Output.KeepWorkDir = "Yes"
self._add_filter(sett, job_id)
return ConformersJob(name=f"job{job_id}_adf", settings=sett)
def _make_filter_job(self, job_id):
sett = Settings()
sett.input.AMS.Task = "Filter"
sett.input.AMS.InputConformersSet = self.children["adf_gas_job"].results
self._add_filter(sett, job_id)
return ConformersJob(name=f"job{job_id}_adf_filter", settings=sett)
def _iterate_conformers(self):
if self.hbc_from_MESP:
self._iterate_conformers_mesp()
else:
self._iterate_conformers_replay()
def _iterate_conformers_replay(self):
if self.coskf_dir is None:
self.coskf_dir = self.final_adf_job.path
self.replay_job = self.children["replay_job"]
self.replay_job.results.wait()
conformer_E = self.final_adf_job.results.get_energies("Ha")
for i, conf_E in enumerate(conformer_E):
frame_name = f"Frame{i+1}"
cosmo_section = self.replay_job.results.read_rkf_section("COSMO", frame_name)
cosmo_section["Gas Phase Bond Energy"] = conf_E
coskfname = f"{self.coskf_name}_{i}.coskf"
fullname = Path(self.coskf_dir) / coskfname
fullname.unlink(missing_ok=True)
coskf = KFFile(str(fullname), autosave=False)
for key, val in cosmo_section.items():
coskf.write("COSMO", key, val)
for key, value in self.mol_info.items():
coskf.write("Compound Data", key, value)
coskf.save()
def _iterate_conformers_mesp(self):
if self.coskf_dir is None:
self.coskf_dir = self.final_adf_job.path
# prepare gas phase rkf files for per-conformer COSMO + DENSF reruns
rkf_path = Path(self.adf_gas_job.results.rkfpath()).parent
gas_rkf_path = list(rkf_path.glob("go*"))
gas_rkf_path = sorted(gas_rkf_path, key=lambda p: int(re.search(r"go(\d+)", p.as_posix()).group(1)))
au2ha = Units.conversion_factor("au", "hartree")
gas_rkf_energy = []
for rkf_path in gas_rkf_path:
rkf = KFFile(str(rkf_path / "adf.rkf"))
egas = rkf.read_real("AMSResults", "Energy") * au2ha
gas_rkf_energy.append(egas)
conformer_E = self.final_adf_job.results.get_energies("Ha")
num_conformer = len(conformer_E)
self.jobs_cosmo = [None] * num_conformer
self.jobs_densf = [None] * num_conformer
conformer_names = [f"{self.coskf_name}_{i}.coskf" for i in range(num_conformer)]
used_rkf_idx = set()
for i, (conf_E, coskfname) in enumerate(zip(conformer_E, conformer_names)):
Path(self.coskf_dir, coskfname).unlink(missing_ok=True)
matches = [
idx
for idx, ref_E in enumerate(gas_rkf_energy)
if idx not in used_rkf_idx and np.isclose(conf_E, ref_E, rtol=1e-12, atol=1e-8)
]
if not matches:
msg = f"[ADFCOSMORSConfJob] No match found for conf_E={conf_E:.12f}"
log(msg)
raise FloatingPointError(msg)
if len(matches) > 1:
rkf_idx = min(matches, key=lambda j: abs(conf_E - gas_rkf_energy[j]))
msg = (
f"[ADFCOSMORSConfJob] Multiple matches for conf_E={conf_E:.12f}, "
f"choose idx={rkf_idx} with min |dE|={abs(conf_E - gas_rkf_energy[rkf_idx]):.3e}"
)
log(msg)
else:
rkf_idx = matches[0]
used_rkf_idx.add(rkf_idx)
log(f"[ADFCOSMORSConfJob] matched gas_energy index {rkf_idx} for conformer {i}")
self.jobs_cosmo[i] = self._make_cosmo_job(gas_rkf_path[rkf_idx], i)
self.jobs_cosmo[i].run()
self.jobs_densf[i] = self._make_densf_job(gas_rkf_path[rkf_idx], i)
self.jobs_densf[i].run()
self._make_conformer_coskf(i, coskfname)
def _make_conformer_coskf(self, i, name):
self.jobs_cosmo[i].results.wait()
t41 = None
if self.hbc_from_MESP:
self.jobs_densf[i].results.wait()
t41 = self.jobs_densf[i].results.kfpath()
ADFCOSMORSCompoundJob.convert_to_coskf(
rkf_path=self.jobs_cosmo[i].results.rkfpath(file="adf"),
coskf_name=name,
plams_dir=self.jobs_cosmo[i].path,
coskf_dir=self.coskf_dir,
mol_info=self.mol_info,
densf_path=t41,
)
def _make_replay_job(self, job_id):
sett = ADFCOSMORSCompoundJob.adf_settings(
True, elements=list(set(at.symbol for at in self.mol)), atomic_ion=self.atomic_ion
)
sett.input.AMS.Task = "Replay"
self._get_final_adf_job().results.wait()
sett.input.AMS.Replay.File = self._get_final_adf_job().results["conformers.rkf"]
sett.input.AMS.Replay.StoreAllResultFiles = "True"
return AMSJob(name=f"job{job_id}_replay", settings=sett)
def _make_cosmo_job(self, rkf_path, i):
sett = ADFCOSMORSCompoundJob.adf_settings(
True, elements=list(set(at.symbol for at in self.mol)), atomic_ion=self.atomic_ion
)
sett.input.AMS.Task = "SinglePoint"
sett.input.AMS.LoadSystem.File = str(rkf_path / "ams.rkf")
sett.input.AMS.EngineRestart = str(rkf_path / "adf.rkf")
return AMSJob(name=f"job_cosmo_c{i}", settings=sett)
def _make_densf_job(self, rkf_path, i):
densf_job = DensfJob(settings=ADFCOSMORSCompoundJob.densf_settings())
densf_job.inputjob = str(rkf_path / "adf.rkf")
densf_job.name = f"job_densf_c{i}"
return densf_job
def _add_filter(self, sett, job_id):
if job_id < len(self.filters):
filt = self.filters[job_id]
else:
filt = self.final_filter
if filt is not None:
if filt.max_num_confs is not None:
sett.input.AMS.InputMaxConfs = filt.max_num_confs
if filt.max_energy_range is not None:
sett.input.AMS.InputMaxEnergy = filt.max_energy_range
def _has_valid_filter_settings(self):
for js, f in zip(self.job_settings, self.filters):
if js is not None and not isinstance(js, Settings):
log(f"[ADFCOSMORSConfJob] Wrong type for job_settings : {js}")
return False
if f is not None and not isinstance(f, ADFCOSMORSConfFilter):
log(f"[ADFCOSMORSConfJob] Wrong type for job_settings : {f}")
return False
return True
MD Jobs¶
Convenience job classes for MD simulation setup, restart/spawning, and common trajectory analyses.
Overview¶
These classes wrap common AMS MolecularDynamics patterns (NVE/NVT/NPT), restarts and spawned ensembles, plus trajectory analysis jobs.
API¶
API reference
- class AMSMDJob(velocities=None, timestep=None, samplingfreq=None, nsteps=None, checkpointfrequency=None, engineresultsfreq=None, writevelocities=None, writebonds=None, writemolecules=None, writecharges=None, writeenginegradients=None, calcpressure=None, molecule=None, temperature=None, thermostat=None, tau=None, thermostat_region=None, pressure=None, barostat=None, barostat_tau=None, scale=None, equal=None, constantvolume=None, binlog_time=None, binlog_pressuretensor=None, binlog_dipolemoment=None, _enforce_thermostat=False, _enforce_barostat=False, **kwargs)[source]
- molecule: Molecule
The initial structure
- name: str
The name of the job
- settings: Settings
Settings for the job. You should normally not populate neither settings.input.ams.MolecularDynamics nor settings.input.ams.Task
- velocities: float or AMSJob or str (path/to/ams.rkf) or 2-tuple (path/to/ams.rkf, frame-number)
If float, it is taken as the temperature. If AMSJob or str, the velocities are taken from the EndVelocities section of the corresponding ams.rkf file. If 2-tuple, the first item must be a path to an ams.rkf, and the second item an integer specifying the frame number - the velocities are then read from History%Velocities(framenumber).
- timestep: float
Timestep
- samplingfreq: int
Sampling frequency
- nsteps: int
Length of simulation
Trajectory options:
- checkpointfrequency: int
How frequently to write MDStep*.rkf checkpoint files
- writevelocitiesbool
Whether to save velocities to ams.rkf (needed for example to restart from individual frames or to calculate velocity autocorrelation functions)
- writebonds: bool
Whether to save bonds to ams.rkf
- writemolecules: bool
Whether to write molecules to ams.rkf
- writeenginegradients: bool
Whether to save engine gradients (negative of atomic forces) to ams.rkf
Thermostat (NVT, NPT) options:
- thermostat: str
‘Berendsen’ or ‘NHC’
- tau: float
Thermostat time constant (fs)
- temperature: float or tuple of floats
Temperature (K). If a tuple/list of floats, the Thermostat.Duration option will be set to evenly divided intervals.
- thermostat_region: str
Region for which to apply the thermostat
Barostat (NPT) options:
- barostat: str
‘Berendsen’ or ‘MTK’
- barostat_tau: float
Barostat time constant (fs)
- pressure: float
Barostat pressure (pascal)
- equal: str
‘XYZ’ etc.
- scale: str
‘XYZ’ etc.
- constantvolume: bool
Constant volume?
Other options:
- calcpressure: bool
Whether to calculate pressure for each frame.
- binlog_time: bool
Whether to log the time at every timestep in the BinLog section on ams.rkf
- binlog_pressuretensor: bool
Whether to log the pressure tensor at every timestep in the BinLog section on ams.rkf
- Parameters:
- get_velocities_from(other_job, frame=None, update_molecule=True)[source]
Function to update the InitialVelocities block in self. It is normally not needed, instead use the e.g. AMSNVEJob.restart_from() function.
This function can be called in prerun() methods for MultiJobs
- classmethod restart_from(other_job, frame=None, settings=None, use_prerun=False, **kwargs)[source]
- other_job: AMSJob
The job to restart from.
- frame: int
Which frame to read the structure and velocities from. If None, the final structure and end velocities will be used (section Molecule and MDResults%EndVelocities)
- settings: Settings
Settings that override any other settings. All settings from other_job (e.g. the engine settings) are inherited by default but they can be overridden here.
- use_prerun: bool
If True, the molecule and velocities will only be read from other_job inside the prerun() method. Set this to True to prevent PLAMS from waiting for other_job to finish as soon as the new job is defined.
- kwargs: many options
See the docstring for AMSMDJob.
- class AMSNVEJob(**kwargs)[source]
A class for running NVE MD simulations
- class AMSNVTJob(**kwargs)[source]
A class for running NVT MD simulations
- class AMSNPTJob(**kwargs)[source]
A class for running NPT MD simulations
- class AMSNPTResults(*args, **kwargs)[source]
-
- get_equilibrated_molecule(equilibration_fraction=0.667, return_index=False)[source]
Discards the first equilibration_fraction of the trajectory. Calculates the average density of the rest. Returns the molecule with the closest density to the average density among the remaining trajectory.
- class AMSNVESpawnerJob(previous_job, n_nve=1, name='nvespawnerjob', **kwargs)[source]
A class for running multiple NVE simulations with initial structures/velocities taken from an NVT trajectory. The NVT trajectory must contain the velocities!
- __init__(previous_job, n_nve=1, name='nvespawnerjob', **kwargs)[source]
- previous_job: AMSJob
An AMSJob with an MD trajectory. Must contain velocities (WriteVelocities Yes). Note that the trajectory should have been equilibrated before it starts.
- n_nveint
The number of NVE simulations to spawn
All other settings can be set as for an AMSNVEJob (e.g.
nsteps).
- prerun()[source]
Constructs the children jobs
- class AMSMDScanDensityJob(molecule, scan_density_upper=1.4, startstep=None, **kwargs)[source]
A class for scanning the density using MD Deformations
- class AMSMSDJob(previous_job=None, max_correlation_time_fs=10000, start_time_fit_fs=2000, atom_indices=None, **kwargs)[source]
A convenient class wrapping around the trajectory analysis MSD tool
- Parameters:
- __init__(previous_job=None, max_correlation_time_fs=10000, start_time_fit_fs=2000, atom_indices=None, **kwargs)[source]
- previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.
- max_correlation_time_fs: float
Maximum correlation time in femtoseconds
- start_time_fit_fsfloat
Smallest correlation time for the linear fit
- atom_indicesList[int]
If None, use all atoms. Otherwise use the specified atom indices (starting with 1)
- kwargs: dict
Other options to AMSAnalysisJob
- get_input()[source]
Generate the input file
- prerun()[source]
Constructs the final settings
- postrun()[source]
Creates msd.txt, fit_msd.txt, and D.txt
- class AMSRDFJob(previous_job=None, atom_indices=None, atom_indices_to=None, rmin=0.5, rmax=6.0, rstep=0.1, **kwargs)[source]
- __init__(previous_job=None, atom_indices=None, atom_indices_to=None, rmin=0.5, rmax=6.0, rstep=0.1, **kwargs)[source]
- previous_job: AMSJob
AMSJob with finished MD trajectory.
- atom_indices: List[int]
Atom indices (starting with 1). If None, calculate RDF from all atoms.
- atom_indices_to: List[int]
Atom indices (starting with 1). If None, calculate RDF to all atoms.
- rmin: float
Minimum distance (angstrom)
- rmax: float
Maximum distance (angstrom)
- rstep: float
Bin width for the histogram (angstrom)
- get_input()[source]
Generate the input file
- prerun()[source]
Creates the final settings. Do not call or override this method.
- postrun()[source]
Creates rdf.txt. Do not call or override this method.
- class AMSVACFJob(previous_job=None, max_correlation_time_fs=5000, max_freq=5000, atom_indices=None, **kwargs)[source]
A class for calculating the velocity autocorrelation function and its power spectrum
- __init__(previous_job=None, max_correlation_time_fs=5000, max_freq=5000, atom_indices=None, **kwargs)[source]
- previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.
- max_correlation_time_fs: float
Maximum correlation time in femtoseconds
- max_freq: float
The maximum frequency for the power spectrum in cm^-1
- atom_indices: List[int]
Atom indices (starting with 1). If None, use all atoms.
- get_input()[source]
Generate the input file
- prerun()[source]
Creates final settings
- postrun()[source]
Creates vacf.txt and power_spectrum.txt
- class AMSViscosityFromBinLogJob(previous_job=None, **kwargs)[source]
A convenient class wrapping around the trajectory analysis ViscosityFromBinLog tool. Only runs with default input options (max correlation time 10% of the total trajectory).
- __init__(previous_job=None, **kwargs)[source]
- previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts. It must have been run with the BinLog%PressureTensor option set.
- kwargs: dict
Other options to AMSAnalysisJob
- get_input()[source]
Generate the input file
- prerun()[source]
Constructs the final settings
- postrun()[source]
Creates viscosity_integral, fit_viscosity_integral, and viscosity.txt
Code¶
Recipe module source: amsmdjob.py
from scm.plams.core.functions import add_to_instance
from scm.plams.core.settings import Settings
from scm.plams.interfaces.adfsuite.ams import AMSJob, AMSResults
import numpy as np
from scm.plams.tools.kftools import KFFile
from scm.plams.tools.units import Units
from typing import Union
import scm.plams as plams
__all__ = ["AMSMDJob", "AMSNVEJob", "AMSNVTJob", "AMSNPTJob"]
class AMSMDJob(AMSJob):
"""
molecule: Molecule
The initial structure
name: str
The name of the job
settings: Settings
Settings for the job. You should normally not populate neither settings.input.ams.MolecularDynamics nor settings.input.ams.Task
velocities: float or AMSJob or str (path/to/ams.rkf) or 2-tuple (path/to/ams.rkf, frame-number)
If float, it is taken as the temperature. If AMSJob or str, the velocities are taken from the EndVelocities section of the corresponding ams.rkf file. If 2-tuple, the first item must be a path to an ams.rkf, and the second item an integer specifying the frame number - the velocities are then read from History%Velocities(framenumber).
timestep: float
Timestep
samplingfreq: int
Sampling frequency
nsteps: int
Length of simulation
**Trajectory options**:
checkpointfrequency: int
How frequently to write MDStep*.rkf checkpoint files
writevelocities : bool
Whether to save velocities to ams.rkf (needed for example to restart from individual frames or to calculate velocity autocorrelation functions)
writebonds: bool
Whether to save bonds to ams.rkf
writemolecules: bool
Whether to write molecules to ams.rkf
writeenginegradients: bool
Whether to save engine gradients (negative of atomic forces) to ams.rkf
**Thermostat (NVT, NPT) options**:
thermostat: str
'Berendsen' or 'NHC'
tau: float
Thermostat time constant (fs)
temperature: float or tuple of floats
Temperature (K). If a tuple/list of floats, the Thermostat.Duration option will be set to evenly divided intervals.
thermostat_region: str
Region for which to apply the thermostat
**Barostat (NPT) options**:
barostat: str
'Berendsen' or 'MTK'
barostat_tau: float
Barostat time constant (fs)
pressure: float
Barostat pressure (pascal)
equal: str
'XYZ' etc.
scale: str
'XYZ' etc.
constantvolume: bool
Constant volume?
**Other options**:
calcpressure: bool
Whether to calculate pressure for each frame.
binlog_time: bool
Whether to log the time at every timestep in the BinLog section on ams.rkf
binlog_pressuretensor: bool
Whether to log the pressure tensor at every timestep in the BinLog section on ams.rkf
"""
default_nsteps = 1000
default_timestep = 0.25
default_samplingfreq = 100
default_thermostat = "NHC"
default_temperature = 300
default_tau_multiplier = 400 # get tau by multiplying the timestep with this number
default_thermostat_region = None
default_barostat = "MTK"
default_pressure = 1e5
default_barostat_tau_multiplier = 4000 # get barostat_tau by multiplying the timestep with this number
default_scale = "XYZ"
default_equal = "None"
default_constantvolume = "False"
default_checkpointfrequency = 1000
default_writevelocities = "True"
default_writebonds = "True"
default_writecharges = "True"
default_writemolecules = "True"
default_writeenginegradients = "False"
default_calcpressure = "False"
default_binlog_time = "False"
default_binlog_pressuretensor = "False"
default_binlog_dipolemoment = "False"
def __init__(
self,
velocities=None,
timestep=None,
samplingfreq: int = None,
nsteps: int = None,
checkpointfrequency=None,
engineresultsfreq=None,
writevelocities=None,
writebonds=None,
writemolecules=None,
writecharges=None,
writeenginegradients=None,
calcpressure=None,
molecule: Union[plams.Molecule, AMSJob, AMSResults] = None,
temperature: float = None,
thermostat=None,
tau=None,
thermostat_region=None,
pressure=None,
barostat=None,
barostat_tau=None,
scale=None,
equal=None,
constantvolume=None,
binlog_time=None,
binlog_pressuretensor=None,
binlog_dipolemoment=None,
_enforce_thermostat=False,
_enforce_barostat=False,
**kwargs,
):
""" """
if isinstance(molecule, AMSJob):
molecule = molecule.results.get_main_molecule()
if isinstance(molecule, AMSResults):
molecule = molecule.get_main_molecule()
AMSJob.__init__(self, molecule=molecule, **kwargs)
self.settings.input.ams.Task = "MolecularDynamics"
mdsett = self.settings.input.ams.MolecularDynamics
mdsett.TimeStep = timestep or mdsett.TimeStep or self.default_timestep
mdsett.Trajectory.SamplingFreq = samplingfreq or mdsett.Trajectory.SamplingFreq or self.default_samplingfreq
if engineresultsfreq is not None:
# keyword was introduced in AMS2025, so avoid always setting this input option in order
# to be able to use the AMSMDJob class with older AMS versions.
mdsett.Trajectory.EngineResultsFreq = engineresultsfreq
mdsett.NSteps = nsteps or mdsett.NSteps or self.default_nsteps
mdsett.Trajectory.WriteVelocities = (
str(writevelocities)
if writevelocities is not None
else mdsett.Trajectory.WriteVelocities or self.default_writevelocities
)
mdsett.Trajectory.WriteBonds = (
str(writebonds) if writebonds is not None else mdsett.Trajectory.WriteBonds or self.default_writebonds
)
mdsett.Trajectory.WriteMolecules = (
str(writemolecules)
if writemolecules is not None
else mdsett.Trajectory.WriteMolecules or self.default_writemolecules
)
mdsett.Trajectory.WriteCharges = (
str(writecharges)
if writecharges is not None
else mdsett.Trajectory.WriteCharges or self.default_writecharges
)
mdsett.Trajectory.WriteEngineGradients = (
str(writeenginegradients)
if writeenginegradients is not None
else mdsett.Trajectory.WriteEngineGradients or self.default_writeenginegradients
)
mdsett.CalcPressure = (
str(calcpressure) if calcpressure is not None else mdsett.CalcPressure or self.default_calcpressure
)
mdsett.Checkpoint.Frequency = (
checkpointfrequency or mdsett.Checkpoint.Frequency or self.default_checkpointfrequency
)
mdsett.BinLog.Time = (
str(binlog_time) if binlog_time is not None else mdsett.BinLog.Time or self.default_binlog_time
)
mdsett.BinLog.PressureTensor = (
str(binlog_pressuretensor)
if binlog_pressuretensor is not None
else mdsett.BinLog.PressureTensor or self.default_binlog_pressuretensor
)
mdsett.BinLog.DipoleMoment = (
str(binlog_dipolemoment)
if binlog_dipolemoment is not None
else mdsett.BinLog.DipoleMoment or self.default_binlog_dipolemoment
)
if velocities is None and temperature is not None:
velocities = self._get_initial_temperature(temperature)
self.settings += self._velocities2settings(velocities)
if temperature or thermostat or _enforce_thermostat:
self.settings.update(
self._get_thermostat_settings(
thermostat=thermostat,
temperature=temperature,
tau=tau,
thermostat_region=thermostat_region,
nsteps=int(mdsett.NSteps),
)
)
if pressure or barostat or _enforce_barostat:
self.settings.update(
self._get_barostat_settings(
pressure=pressure,
barostat=barostat,
barostat_tau=barostat_tau,
scale=scale,
equal=equal,
constantvolume=constantvolume,
)
)
def remove_blocks(self, blocks=None):
if blocks:
for block in blocks:
if block in self.settings.input.ams.MolecularDynamics:
del self.settings.input.ams.MolecularDynamics[block]
@staticmethod
def _get_initial_temperature(temperature):
try:
return temperature[0]
except TypeError:
return temperature
@staticmethod
def _velocities2settings(velocities):
s = Settings()
if isinstance(velocities, int) or isinstance(velocities, float) or velocities is None or velocities is False:
s.input.ams.MolecularDynamics.InitialVelocities.Type = "Random"
s.input.ams.MolecularDynamics.InitialVelocities.Temperature = velocities or AMSMDJob.default_temperature
elif isinstance(velocities, tuple):
# file and frame number
f = velocities[0]
frame = velocities[1]
vels = KFFile(f).read("MDHistory", f"Velocities({frame})")
vels = np.array(vels).reshape(-1, 3) * Units.convert(1.0, "bohr", "angstrom") # angstrom/fs
s.input.ams.MolecularDynamics.InitialVelocities.Type = "Input"
values = ""
for x in vels:
values += 6 * " " + " ".join(str(y) for y in x) + "\n"
s.input.ams.MolecularDynamics.InitialVelocities.Values._h = " # From {} frame {}".format(f, frame)
s.input.ams.MolecularDynamics.InitialVelocities.Values._1 = values
else:
s.input.ams.MolecularDynamics.InitialVelocities.Type = "FromFile"
s.input.ams.MolecularDynamics.InitialVelocities.File = velocities
return s
def get_velocities_from(self, other_job, frame=None, update_molecule=True):
"""
Function to update the InitialVelocities block in self. It is normally not needed, instead use the e.g. AMSNVEJob.restart_from() function.
This function can be called in prerun() methods for MultiJobs
"""
_, velocities, molecule, _ = self._get_restart_job_velocities_molecule(other_job, frame=frame)
del self.settings.input.ams.MolecularDynamics.InitialVelocities
self.settings += self._velocities2settings(velocities)
if update_molecule:
self.molecule = molecule
@staticmethod
def _get_restart_job_velocities_molecule(other_job, frame=None, settings=None, get_velocities_molecule=True):
"""
other_job: str or some AMSMdJob
get_velocities_molecule : bool
Whether to get the velocities and molecule right now. Set to False to not access other_job.results (for use with MultiJob prerun() methods)
Returns: (other_job [AMSJob], velocities, molecule, extra_settings [Settings])
"""
if isinstance(other_job, str):
other_job = AMSJob.load_external(other_job)
if get_velocities_molecule:
if frame:
velocities = (other_job.results.rkfpath(), frame)
molecule = other_job.results.get_history_molecule(frame)
else:
velocities = other_job
molecule = other_job.results.get_main_molecule()
else:
velocities = None
molecule = None
extra_settings = other_job.settings.copy()
if settings:
extra_settings.update(settings)
if "InitialVelocities" in extra_settings.input.ams.MolecularDynamics:
del extra_settings.input.ams.MolecularDynamics.InitialVelocities
if "System" in extra_settings.input.ams:
del extra_settings.input.ams.System
return other_job, velocities, molecule, extra_settings
def _get_thermostat_settings(self, thermostat, temperature, tau, thermostat_region, nsteps: int):
s = Settings()
prev_thermostat_settings = self.settings.input.ams.MolecularDynamics.Thermostat
if isinstance(prev_thermostat_settings, list):
prev_thermostat_settings = prev_thermostat_settings[0]
s.input.ams.MolecularDynamics.Thermostat.Type = (
thermostat or prev_thermostat_settings.Type or self.default_thermostat
)
try:
n_temperatures = len(temperature)
my_temperature = " ".join(str(x) for x in temperature)
if n_temperatures > 1:
nsteps_per_temperature = nsteps // (len(temperature) - 1)
s.input.ams.MolecularDynamics.Thermostat.Duration = " ".join(
[str(nsteps_per_temperature)] * (n_temperatures - 1)
)
else:
s.input.ams.MolecularDynamics.Thermostat.Duration = None
except TypeError:
my_temperature = temperature
s.input.ams.MolecularDynamics.Thermostat.Duration = None
s.input.ams.MolecularDynamics.Thermostat.Region = (
thermostat_region or prev_thermostat_settings.get("Region", None) or self.default_thermostat_region
)
s.input.ams.MolecularDynamics.Thermostat.Temperature = (
my_temperature or prev_thermostat_settings.Temperature or self.default_temperature
)
s.input.ams.MolecularDynamics.Thermostat.Tau = (
tau
or prev_thermostat_settings.Tau
or float(self.settings.input.ams.MolecularDynamics.TimeStep) * AMSMDJob.default_tau_multiplier
)
return s
def _get_barostat_settings(self, pressure, barostat, barostat_tau, scale, equal, constantvolume):
s = Settings()
self.settings.input.ams.MolecularDynamics.Barostat.Type = (
barostat or self.settings.input.ams.MolecularDynamics.Barostat.Type or self.default_barostat
)
self.settings.input.ams.MolecularDynamics.Barostat.Pressure = (
pressure
if pressure is not None
else self.settings.input.ams.MolecularDynamics.Barostat.Pressure or self.default_pressure
)
self.settings.input.ams.MolecularDynamics.Barostat.Tau = (
barostat_tau
or self.settings.input.ams.MolecularDynamics.Barostat.Tau
or float(self.settings.input.ams.MolecularDynamics.TimeStep) * AMSMDJob.default_barostat_tau_multiplier
)
self.settings.input.ams.MolecularDynamics.Barostat.Scale = (
scale or self.settings.input.ams.MolecularDynamics.Barostat.Scale or self.default_scale
)
self.settings.input.ams.MolecularDynamics.Barostat.Equal = (
equal or self.settings.input.ams.MolecularDynamics.Barostat.Equal or self.default_equal
)
self.settings.input.ams.MolecularDynamics.Barostat.ConstantVolume = (
str(constantvolume)
if constantvolume is not None
else self.settings.input.ams.MolecularDynamics.Barostat.ConstantVolume or self.default_constantvolume
)
return s
@classmethod
def restart_from(cls, other_job, frame=None, settings=None, use_prerun=False, **kwargs):
"""
other_job: AMSJob
The job to restart from.
frame: int
Which frame to read the structure and velocities from. If None, the final structure and end velocities will be used (section Molecule and MDResults%EndVelocities)
settings: Settings
Settings that override any other settings. All settings from other_job (e.g. the engine settings) are inherited by default but they can be overridden here.
use_prerun: bool
If True, the molecule and velocities will only be read from other_job inside the prerun() method. Set this to True to prevent PLAMS from waiting for other_job to finish as soon as the new job is defined.
kwargs: many options
See the docstring for AMSMDJob.
"""
other_job, velocities, molecule, extra_settings = cls._get_restart_job_velocities_molecule(
other_job, frame, settings, get_velocities_molecule=not use_prerun
)
job = cls(settings=extra_settings, velocities=velocities, molecule=molecule, **kwargs)
if use_prerun:
@add_to_instance(job)
def prerun(self): # noqa F811
self.get_velocities_from(other_job, frame=frame, update_molecule=True)
return job
class AMSNVEJob(AMSMDJob):
"""
A class for running NVE MD simulations
"""
def __init__(self, **kwargs):
AMSMDJob.__init__(self, **kwargs)
self.remove_blocks(["thermostat", "barostat", "deformation", "nanoreactor"])
class AMSNVTJob(AMSMDJob):
"""
A class for running NVT MD simulations
"""
def __init__(self, **kwargs):
AMSMDJob.__init__(self, _enforce_thermostat=True, **kwargs)
self.remove_blocks(["barostat", "deformation", "nanoreactor"])
class AMSNPTResults(AMSResults):
def get_equilibrated_molecule(self, equilibration_fraction=0.667, return_index=False):
"""
Discards the first equilibration_fraction of the trajectory.
Calculates the average density of the rest. Returns the molecule
with the closest density to the average density among the remaining
trajectory.
"""
densities = self.job.results.get_history_property("Density", "MDHistory")
analyze_from = int(len(densities) * equilibration_fraction)
# take structure closest to the target density
avg_density = np.mean(densities[analyze_from:]) # amu/bohr^3
delta = np.array(densities[analyze_from:]) - avg_density
delta = np.abs(delta)
min_index = np.argmin(delta)
min_index += analyze_from
mol = self.job.results.get_history_molecule(min_index + 1)
if return_index:
return mol, min_index + 1
return mol
class AMSNPTJob(AMSMDJob):
"""
A class for running NPT MD simulations
"""
_result_type = AMSNPTResults
def __init__(self, **kwargs):
AMSMDJob.__init__(self, _enforce_thermostat=True, _enforce_barostat=True, **kwargs)
self.settings.input.ams.MolecularDynamics.CalcPressure = "True"
self.remove_blocks(["deformation", "nanoreactor"])
Recipe module source: nvespawner.py
from collections import OrderedDict
from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.core.enums import JobStatus
import numpy as np
from scm.plams.recipes.md.amsmdjob import AMSNVEJob
__all__ = ["AMSNVESpawnerJob", "AMSNVESpawnerResults"]
class AMSNVESpawnerResults(Results):
"""Results class for AMSNVESpawnerJob"""
def get_all_dipole_derivatives_acf(
self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, x=True, y=True, z=True, normalize=False
):
ret_x, ret_y = [], []
for job in self.job.children.values():
xv, yv = job.results.get_dipole_derivatives_acf(
start_fs=start_fs,
end_fs=end_fs,
every_fs=every_fs,
max_dt_fs=max_dt_fs,
x=x,
y=y,
z=z,
normalize=normalize,
)
ret_x.append(xv)
ret_y.append(yv)
return ret_x, ret_y
def get_dipole_derivatives_acf(
self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, x=True, y=True, z=True, normalize=False
):
all_x, all_y = self.get_all_dipole_derivatives_acf(
start_fs=start_fs, end_fs=end_fs, every_fs=every_fs, max_dt_fs=max_dt_fs, x=x, y=y, z=z, normalize=normalize
)
return np.mean(all_x, axis=0), np.mean(all_y, axis=0)
def get_mean_temperature(self) -> float:
avg_T = np.mean([job.results.readrkf("MDResults", "MeanTemperature") for job in self.job.children.values()])
return avg_T
class AMSNVESpawnerJob(MultiJob):
"""A class for running multiple NVE simulations with initial structures/velocities taken from an NVT trajectory. The NVT trajectory must contain the velocities!"""
_result_type = AMSNVESpawnerResults
def _default_settings(self):
s = Settings()
s.input.ForceField.Type = "GAFF"
s.input.ForceField.AnteChamberIntegration = "Yes"
return s
def __init__(self, previous_job, n_nve=1, name="nvespawnerjob", **kwargs): # needs to be finished
"""
previous_job: AMSJob
An AMSJob with an MD trajectory. Must contain velocities (WriteVelocities Yes). Note that the trajectory should have been equilibrated before it starts.
n_nve : int
The number of NVE simulations to spawn
All other settings can be set as for an AMSNVEJob (e.g. ``nsteps``).
"""
MultiJob.__init__(self, children=OrderedDict(), name=name)
self.previous_job = previous_job
self.n_nve = n_nve
self.nve_constructor_settings = kwargs
self.nve_jobs = []
def prerun(self): # noqa F811
"""
Constructs the children jobs
"""
# use previously run previous_job
assert self.previous_job.status != JobStatus.CREATED, "You can only pass in a finished AMSJob"
try:
self.previous_job.results.readrkf("MDHistory", "Velocities(1)")
except KeyError:
raise KeyError("Couldn't read velocities from {}".format(self.previous_job.results.rkfpath()))
nframes_in_history = self.previous_job.results.readrkf("History", "nEntries")
if self.n_nve > 0:
interval = nframes_in_history // self.n_nve
frames = np.linspace(interval, nframes_in_history, self.n_nve, dtype=int)
for i, frame in enumerate(frames):
# the nve jobs only get different inside prerun, so need to do something to make them different to prevent the PLAMS rerun prevention
name = f"nve{i+1}"
self.settings.input.ams["#"] = name # add a comment line
self.children[name] = AMSNVEJob.restart_from(
self.previous_job, frame=frame, name=name, use_prerun=True, **self.nve_constructor_settings
)
self.children[name].from_frame = frame
self.nve_jobs.append(self.children[name])
Recipe module source: scandensity.py
from scm.plams.core.settings import Settings
from scm.plams.interfaces.adfsuite.ams import AMSResults
import numpy as np
from scm.plams.recipes.md.amsmdjob import AMSNVTJob
__all__ = ["AMSMDScanDensityJob", "AMSMDScanDensityResults"]
class AMSMDScanDensityResults(AMSResults):
"""Results class for AMSMDScanDensityJob"""
def get_lowest_energy_index(self, variable="Energy", history_section="History"):
"""
Returns the 1-based index of the lowest energy molecule
"""
energies = self.get_history_property(variable, history_section)
minindex = np.argmin(energies) + 1
return minindex
def get_lowest_energy_molecule(self, variable="TotalEnergy"):
return self.get_history_molecule(self.get_lowest_energy_index(variable, "MDHistory"))
class AMSMDScanDensityJob(AMSNVTJob):
"""A class for scanning the density using MD Deformations"""
_result_type = AMSMDScanDensityResults
def __init__(self, molecule, scan_density_upper=1.4, startstep=None, **kwargs):
AMSNVTJob.__init__(self, molecule=molecule, **kwargs)
from_density = self.molecule.get_density() * 1e-3
orig_length = self.molecule.cell_lengths()
density_ratio = from_density / scan_density_upper
new_length = [x * density_ratio**0.333333 for x in orig_length]
self.settings.input.ams.MolecularDynamics.NSteps
self.scan_density_upper = scan_density_upper
self.startstep = startstep or 1
s = Settings()
s.input.ams.MolecularDynamics.Deformation.TargetLength = " ".join([str(x) for x in new_length])
s.input.ams.MolecularDynamics.Deformation.StartStep = self.startstep
self.settings += s
Recipe module source: trajectoryanalysis.py
import copy
from collections import OrderedDict, defaultdict
from collections.abc import Iterable
from typing import List
import numpy as np
from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.core.functions import requires_optional_package
from scm.plams.interfaces.adfsuite.amsanalysis import AMSAnalysisJob, AMSAnalysisResults
from scm.plams.tools.units import Units
from scm.plams.core.enums import JobStatus
__all__ = ["AMSRDFJob", "AMSMSDJob", "AMSMSDResults", "AMSVACFJob", "AMSVACFResults"]
class MDAnalysisSettingsError(Exception):
"""
Error in supplied settings object
"""
class AMSConvenientAnalysisJob(AMSAnalysisJob):
_task = "None"
def __init__(self, previous_job=None, atom_indices=None, **kwargs): # needs to be finished
"""
previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.
All other settings can be set as for AMS
"""
AMSAnalysisJob.__init__(self, **kwargs)
if not self._has_settings_entry(self.settings.input, "Task"):
self.settings.input.Task = self._task
self.previous_job = previous_job
self.atom_indices = atom_indices
self._settings_updated = False
def _get_max_dt_frames(self, max_correlation_time_fs):
if max_correlation_time_fs is None:
return None
# Adjust for multiple previous_jobs
prevjobs = self.previous_job
if not isinstance(prevjobs, list):
prevjobs = [prevjobs]
# Read history and time step
# FIXME: Does not account for the presence of Range of StepSize (which the user may have provided)
historylength = 0
for prevjob in prevjobs:
historylength += prevjob.results.readrkf("History", "nEntries")
timestep = prevjobs[0].results.get_time_step()
max_dt_frames = int(max_correlation_time_fs / timestep)
max_dt_frames = min(max_dt_frames, historylength // 2)
return max_dt_frames
def get_input(self):
"""
Generate the input file
"""
self._settings_to_list(self.settings.input, self._task)
if not self._settings_updated:
if self.atom_indices and self._parent_write_atoms:
section = getattr(self.settings.input, self._task)
for entry in section:
if not (self._has_settings_entry(entry, "Atoms") and self._has_settings_entry(entry.Atoms, "Atom")):
self._add_nonunique_settings_entries(entry.Atoms, "Atom", self.atom_indices)
else:
msg = "Atom indices cannot be supplied as argument if already present in settings"
raise MDAnalysisSettingsError(msg)
self._settings_updated = True
return super().get_input()
@staticmethod
def _settings_to_list(settings, entry, nentries=1):
"""
Convert the main settings object for this Task to a list of settings objects
"""
if isinstance(settings, Settings):
# This is not a PISA object
if AMSConvenientAnalysisJob._has_settings_entry(settings, entry):
# If this is not a list-type object, make it into one
if isinstance(settings[entry], dict) or not (isinstance(settings[entry], Iterable)):
settings[entry] = [settings[entry]]
else:
settings[entry] = [Settings() for i in range(nentries)]
else:
# Workaround for a PISA bug, where the object can have updated values but length 0
block = getattr(settings, entry)
if len(block) == 0:
if block.value_changed:
s = copy.deepcopy(block)
block[0] = s
else:
# I need there to be a single entry
for i in range(nentries):
s = block[i]
@staticmethod
def _has_settings_entry(settings, entry):
"""
Check if this Settings or PISA object contains this entry already
"""
if isinstance(settings, Settings):
return entry in settings
else:
return getattr(settings, entry).value_changed
@staticmethod
def _add_nonunique_settings_entries(settings, key, entries):
"""
For non-default entries, multiple entries can be supplied
* ``settings`` -- Settings or PISA object to which the entries should be added
* ``entries`` -- Iterator for multiple settings entries
"""
if not isinstance(settings, Settings):
for i, entry in enumerate(entries):
subsettings = getattr(settings, key)
subsettings[i] = entry
else:
settings[key] = entries
def _parent_prerun(self):
"""
Possibly wait for previous job to finish before adding path
"""
prevjobs = self.previous_job
if not isinstance(prevjobs, list):
prevjobs = [prevjobs]
# Turn trajectory settings into a list,
# If no entries yet exist, it will be a list of empty entries
self._settings_to_list(self.settings.input.TrajectoryInfo, "Trajectory", nentries=len(prevjobs))
nblocks = len(self.settings.input.TrajectoryInfo.Trajectory)
msg = "Number of Trajectory blocks must match number of previous_jobs"
assert nblocks == len(prevjobs) or nblocks == 0, msg
# Overwrite KFFilename with that of previous_job
for i, prevjob in enumerate(prevjobs):
assert prevjob.status != JobStatus.CREATED, "You can only pass in a finished AMSJob"
self.settings.input.TrajectoryInfo.Trajectory[i].KFFilename = prevjob.results.rkfpath()
class AMSMSDResults(AMSAnalysisResults):
"""Results class for AMSMSDJob"""
def get_msd(self):
"""
returns time [fs], msd [ang^2]
"""
msd_xy = self.get_xy()
time = np.array(msd_xy.x[0]) # fs
y = np.array(msd_xy.y) * Units.convert(1.0, "bohr", "angstrom") ** 2
return time, y
@requires_optional_package("scipy")
def get_linear_fit(self, start_time_fit_fs=None, end_time_fit_fs=None):
"""
Fits the MSD between start_time_fit_fs and end_time_fit_fs
Returns a 3-tuple LinRegress.result, fit_x (fs), fit_y (ang^2)
result.slope is given in ang^2/fs
"""
from scipy.stats import linregress
time, y = self.get_msd()
end_time_fit_fs = end_time_fit_fs or max(time)
start_time_fit_fs = start_time_fit_fs or self.job.start_time_fit_fs
if start_time_fit_fs >= end_time_fit_fs:
start_time_fit_fs = end_time_fit_fs / 2
y = y[time >= start_time_fit_fs]
time = time[time >= start_time_fit_fs]
y = y[time <= end_time_fit_fs]
time = time[time <= end_time_fit_fs]
result = linregress(time, y)
fit_x = time
fit_y = result.slope * fit_x + result.intercept
return result, fit_x, fit_y
def get_diffusion_coefficient(self, start_time_fit_fs=None, end_time_fit_fs=None):
"""
Returns D in m^2/s
"""
result, _, _ = self.get_linear_fit(start_time_fit_fs=start_time_fit_fs, end_time_fit_fs=end_time_fit_fs)
D = result.slope * 1e-20 / (6 * 1e-15) # convert from ang^2/fs to m^2/s, divide by 6 because 3-dimensional (2d)
return D
class AMSMSDJob(AMSConvenientAnalysisJob):
"""A convenient class wrapping around the trajectory analysis MSD tool"""
results: AMSMSDResults
_result_type = AMSMSDResults
_parent_write_atoms = True
_task = "MeanSquareDisplacement"
def __init__(
self,
previous_job=None, # needs to be finished
max_correlation_time_fs: float = 10000,
start_time_fit_fs: float = 2000,
atom_indices: List[int] = None,
**kwargs,
):
"""
previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.
max_correlation_time_fs: float
Maximum correlation time in femtoseconds
start_time_fit_fs : float
Smallest correlation time for the linear fit
atom_indices : List[int]
If None, use all atoms. Otherwise use the specified atom indices (starting with 1)
kwargs: dict
Other options to AMSAnalysisJob
"""
AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)
self.max_correlation_time_fs = max_correlation_time_fs
self.start_time_fit_fs = start_time_fit_fs
def get_input(self):
"""
Generate the input file
"""
self._settings_to_list(self.settings.input, self._task)
for settings in self.settings.input.MeanSquareDisplacement:
if not self._has_settings_entry(settings, "Property"):
settings.Property = "DiffusionCoefficient"
if not self._has_settings_entry(settings, "StartTimeSlope"):
settings.StartTimeSlope = self.start_time_fit_fs
return super().get_input()
def prerun(self): # noqa F811
"""
Constructs the final settings
"""
self._parent_prerun() # trajectory and atom_indices handled
self._settings_to_list(self.settings.input, self._task)
for settings in self.settings.input.MeanSquareDisplacement:
if not self._has_settings_entry(settings, "MaxFrame"):
max_dt_frames = self._get_max_dt_frames(self.max_correlation_time_fs)
settings.MaxFrame = max_dt_frames
def postrun(self):
"""
Creates msd.txt, fit_msd.txt, and D.txt
"""
time, msd = self.results.get_msd()
with open(self.path + "/msd.txt", "w") as f:
f.write("#Time(fs) MSD(ang^2)")
for x, y in zip(time, msd):
f.write(f"{x} {y}\n")
_, fit_x, fit_y = self.results.get_linear_fit()
with open(self.path + "/fit_msd.txt", "w") as f:
f.write("#Time(fs), LinearFitToMSD(ang^2)")
for x, y in zip(fit_x, fit_y):
f.write(f"{x} {y}\n")
D = self.results.get_diffusion_coefficient()
with open(self.path + "/D.txt", "w") as f:
f.write(f"{D}\n")
def viscosity_double_exponential(x, A, lam, tau1, tau2):
return A * (lam * (1 - np.exp(-x / tau1)) + (1 - lam) * (1 - np.exp(-x / tau2)))
class AMSViscosityFromBinLogResults(AMSAnalysisResults):
"""Results class for AMSViscosityFromBinLogJob"""
def get_viscosity_integral(self):
"""Extract the running viscosity integral.
Returns a 2-tuple with 1D numpy arrays ``time, viscosity_integral``, with time in fs and viscosity_integral in Pa*s.
"""
xy = self.get_xy("Integral")
time = np.array(xy.x[0]) # fs
y = np.array(xy.y) # Pa s
return time, y
@requires_optional_package("scipy")
def get_double_exponential_fit(self):
"""Perform a double exponential fit to the viscosity integral.
The fitted function is of the form
``A * (lam * (1 - np.exp(-x / tau1)) + (1 - lam) * (1 - np.exp(-x / tau2)))``
where ``A`` is the limiting value in the infinite time limit.
Returns: a 3-tuple ``popt, time, prediction``, where
- ``popt`` is a 4-tuple containing A, lam, tau1, and tau2,
- ``time`` is in fs, and
- ``prediction`` is the value of the above function vs time.
"""
from scipy.optimize import curve_fit
x, y = self.get_viscosity_integral()
# The fit
f, p0 = viscosity_double_exponential, (y[-1], 0.5, 0.1 * x[-1], 0.9 * x[-1])
popt, _ = curve_fit(f, x, y, p0=p0)
prediction = f(x, *popt)
return popt, x, prediction
class AMSViscosityFromBinLogJob(AMSConvenientAnalysisJob):
"""A convenient class wrapping around the trajectory analysis ViscosityFromBinLog tool. Only runs with default input options (max correlation time 10% of the total trajectory)."""
results: AMSViscosityFromBinLogResults
_result_type = AMSViscosityFromBinLogResults
_parent_write_atoms = False
_task = "AutoCorrelation"
def __init__(
self,
previous_job=None, # needs to be finished
**kwargs,
):
"""
previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts. It must have been run with the BinLog%PressureTensor option set.
kwargs: dict
Other options to AMSAnalysisJob
"""
AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, **kwargs)
def get_input(self):
"""
Generate the input file
"""
self._settings_to_list(self.settings.input, self._task)
for settings in self.settings.input.AutoCorrelation:
settings.Property = "ViscosityFromBinLog"
return super().get_input()
def prerun(self): # noqa F811
"""
Constructs the final settings
"""
self._parent_prerun() # trajectory and atom_indices handled
def postrun(self):
"""
Creates viscosity_integral, fit_viscosity_integral, and viscosity.txt
"""
time, integral = self.results.get_viscosity_integral()
with open(self.path + "/viscosity_integral.txt", "w") as f:
f.write("#Time(fs) Integral(Pa*s)")
for x, y in zip(time, integral):
f.write(f"{x} {y}\n")
popt, fit_x, fit_y = self.results.get_double_exponential_fit()
with open(self.path + "/fit_viscosity_integral.txt", "w") as f:
f.write("#Time(fs), DoubleExponentialFitToViscosityIntegral(Pa*s)")
for x, y in zip(fit_x, fit_y):
f.write(f"{x} {y}\n")
eta = popt[0]
with open(self.path + "/viscosity.txt", "w") as f:
f.write(f"{eta}\n")
class AMSVACFResults(AMSAnalysisResults):
"""Results class for AMSVACFJob"""
def get_vacf(self):
"""Extract the velocity autocorrelation function vs. time.
Returns a 2-tuple ``time, y`` with ``time`` in fs.
"""
xy = self.get_xy()
time = np.array(xy.x[0]) # fs
y = np.array(xy.y)
return time, y
def get_power_spectrum(self, max_freq=None):
"""Calculate the power spectrum as the Fourier transform of the velocity autocorrelation function.
max_freq: float, optional
The maximum frequency in cm^-1.
Returns: A 2-tuple ``freq, y`` with ``freq`` in cm^-1 and ``y`` unitless.
"""
max_freq = max_freq or self.job.max_freq or 5000
xy = self.get_xy("Spectrum")
freq = np.array(xy.x[0])
y = np.array(xy.y)
y = y[freq < max_freq]
freq = freq[freq < max_freq]
return freq, y
def get_acf(self):
return self.get_vacf()
def get_spectrum(self, max_freq=None):
return self.get_power_spectrum(max_freq=max_freq)
class AMSVACFJob(AMSConvenientAnalysisJob):
"""A class for calculating the velocity autocorrelation function and its power spectrum"""
_result_type = AMSVACFResults
_parent_write_atoms = True
_task = "AutoCorrelation"
def __init__(
self,
previous_job=None, # needs to be finished
max_correlation_time_fs=5000, # fs
max_freq=5000, # cm^-1
atom_indices=None,
**kwargs,
):
"""
previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.
max_correlation_time_fs: float
Maximum correlation time in femtoseconds
max_freq: float
The maximum frequency for the power spectrum in cm^-1
atom_indices: List[int]
Atom indices (starting with 1). If None, use all atoms.
"""
AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)
self.max_correlation_time_fs = max_correlation_time_fs
self.max_freq = max_freq
def get_input(self):
"""
Generate the input file
"""
self._settings_to_list(self.settings.input, self._task)
for settings in self.settings.input.AutoCorrelation:
settings.Property = "Velocities"
return super().get_input()
def prerun(self): # noqa F811
"""
Creates final settings
"""
self._parent_prerun() # trajectory and atom_indices handled
self._settings_to_list(self.settings.input, self._task)
for settings in self.settings.input.AutoCorrelation:
if not self._has_settings_entry(settings, "MaxFrame"):
max_dt_frames = self._get_max_dt_frames(self.max_correlation_time_fs)
settings.MaxFrame = max_dt_frames
def postrun(self):
"""
Creates vacf.txt and power_spectrum.txt
"""
try:
time, vacf = self.results.get_vacf()
with open(self.path + "/vacf.txt", "w") as f:
f.write("#Time(fs) VACF")
for x, y in zip(time, vacf):
f.write(f"{x} {y}\n")
freq, intens = self.results.get_power_spectrum()
with open(self.path + "/power_spectrum.txt", "w") as f:
f.write("#Frequency(cm^-1) Intensity(arb.units)")
for x, y in zip(freq, intens):
f.write(f"{x} {y}\n")
except:
pass
class AMSDipoleDerivativeACFResults(AMSAnalysisResults):
"""Results class for AMSVACFJob"""
def get_acf(self):
xy = self.get_xy()
time = np.array(xy.x[0]) # fs
y = np.array(xy.y)
return time, y
def get_ir_spectrum(self, max_freq=None):
max_freq = max_freq or self.job.max_freq or 5000
xy = self.get_xy("Spectrum")
freq = np.array(xy.x[0])
y = np.array(xy.y)
y = y[freq < max_freq]
freq = freq[freq < max_freq]
return freq, y
def get_spectrum(self, max_freq=None):
return self.get_ir_spectrum(max_freq=max_freq)
class AMSDipoleDerivativeACFJob(AMSConvenientAnalysisJob):
"""A class for calculating the velocity autocorrelation function and its power spectrum"""
_result_type = AMSDipoleDerivativeACFResults
_parent_write_atoms = True
_task = "AutoCorrelation"
def __init__(
self,
previous_job=None, # needs to be finished
max_correlation_time_fs=5000, # fs
max_freq=5000, # cm^-1
atom_indices=None,
**kwargs,
):
"""
previous_job: AMSJob
An AMSJob with an MD trajectory. Note that the trajectory should have been equilibrated before it starts.
max_correlation_time_fs: float
Maximum correlation time in femtoseconds
max_freq: float
The maximum frequency for the power spectrum in cm^-1
atom_indices: List[int]
Atom indices (starting with 1). If None, use all atoms.
"""
AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)
self.max_correlation_time_fs = max_correlation_time_fs
self.max_freq = max_freq
def get_input(self):
"""
Generate the input file
"""
self.settings_to_list()
for settings in self.settings.input.AutoCorrelation:
settings.Property = "DipoleDerivativeFromCharges"
return super().get_input()
def prerun(self): # noqa F811
"""
Creates final settings
"""
self._parent_prerun() # trajectory and atom_indices handled
self.settings_to_list()
for settings in self.settings.input.AutoCorrelation:
if not self._has_settings_entry(settings, "MaxFrame"):
max_dt_frames = self._get_max_dt_frames(self.max_correlation_time_fs)
settings.MaxFrame = max_dt_frames
def postrun(self):
"""
Creates dipolederivativeacf.txt and ir_spectrum.txt
"""
try:
time, acf = self.results.get_acf()
with open(self.path + "/dipolederivativeacf.txt", "w") as f:
f.write("#Time(fs) DipoleDerivativeACF")
for x, y in zip(time, acf):
f.write(f"{x} {y}\n")
freq, intens = self.results.get_ir_spectrum()
with open(self.path + "/ir_spectrum.txt", "w") as f:
f.write("#Frequency(cm^-1) Intensity(arb.units)")
for x, y in zip(freq, intens):
f.write(f"{x} {y}\n")
except:
pass
class AMSRDFResults(AMSAnalysisResults):
"""Results class for AMSRDFJob"""
def get_rdf(self):
"""
Returns a 2-tuple r, rdf.
r: numpy array of float (angstrom)
rdf: numpy array of float
"""
xy = self.get_xy()
r = np.array(xy.x[0])
y = np.array(xy.y)
return r, y
class AMSRDFJob(AMSConvenientAnalysisJob):
_result_type = AMSRDFResults
_parent_write_atoms = False
_task = "RadialDistribution"
def __init__(
self, previous_job=None, atom_indices=None, atom_indices_to=None, rmin=0.5, rmax=6.0, rstep=0.1, **kwargs
):
"""
previous_job: AMSJob
AMSJob with finished MD trajectory.
atom_indices: List[int]
Atom indices (starting with 1). If None, calculate RDF *from* all atoms.
atom_indices_to: List[int]
Atom indices (starting with 1). If None, calculate RDF *to* all atoms.
rmin: float
Minimum distance (angstrom)
rmax: float
Maximum distance (angstrom)
rstep: float
Bin width for the histogram (angstrom)
"""
AMSConvenientAnalysisJob.__init__(self, previous_job=previous_job, atom_indices=atom_indices, **kwargs)
self.atom_indices_to = atom_indices_to
self.rmin = rmin
self.rmax = rmax
self.rstep = rstep
def get_input(self):
"""
Generate the input file
"""
self._settings_to_list(self.settings.input, self._task)
if not self._settings_updated:
prevjobs = self.previous_job
if not isinstance(prevjobs, list):
prevjobs = [prevjobs]
main_mol = prevjobs[0].results.get_main_molecule()
for settings in self.settings.input.RadialDistribution:
# If no atom iindices were provided, and there is nothing in the settings at all, add them
atom_indices = self.atom_indices
atom_indices_to = self.atom_indices_to
if not atom_indices:
if not self._has_settings_entry(settings, "AtomsFrom"):
atom_indices = list(range(1, len(main_mol) + 1))
if not atom_indices_to:
if not self._has_settings_entry(settings, "AtomsTo"):
atom_indices_to = list(range(1, len(main_mol) + 1))
# Add the atom indices only if there were no atom indices in the settings (if elements are present, add indices)
if atom_indices:
atoms_from_present = self._has_settings_entry(settings, "AtomsFrom")
atom_present = atoms_from_present and self._has_settings_entry(settings.AtomsFrom, "Atom")
if not atom_present:
self._add_nonunique_settings_entries(settings.AtomsFrom, "Atom", atom_indices)
else:
msg = "Atom indices cannot be supplied as argument if already present in settings"
raise MDAnalysisSettingsError(msg)
if atom_indices_to:
atoms_to_present = self._has_settings_entry(settings, "AtomsTo")
atom_present = atoms_to_present and self._has_settings_entry(settings.AtomsTo, "Atom")
if not atom_present:
self._add_nonunique_settings_entries(settings.AtomsTo, "Atom", atom_indices_to)
else:
msg = "Atom indices cannot be supplied as argument if already present in settings"
raise MDAnalysisSettingsError(msg)
if not self._has_settings_entry(settings, "Range"):
if isinstance(settings, Settings):
settings.Range = f"{self.rmin} {self.rmax} {self.rstep}"
else:
settings.Range = [self.rmin, self.rmax, self.rstep]
return super().get_input()
def prerun(self): # noqa F811
"""
Creates the final settings. Do not call or override this method.
"""
self._parent_prerun()
self._settings_to_list(self.settings.input, self._task)
# Atom_indices will be adjusted when get_input is called (again).
def postrun(self):
"""
Creates rdf.txt. Do not call or override this method.
"""
try:
r, gr = self.results.get_rdf()
with open(self.path + "/rdf.txt", "w") as f:
f.write("#r(angstrom) g(r)")
for x, y in zip(r, gr):
f.write(f"{x} {y}\n")
except:
pass
class AMSConvenientAnalysisPerRegionResults(Results):
def _getter(self, analysis_job_type, method, kwargs):
assert (
self.job.analysis_job_type is analysis_job_type
), f"{method}() can only be called for {analysis_job_type}, tried for type {self.job.analysis_job_type}"
ret = {}
for name, job in self.job.children.items():
ret[name] = getattr(job.results, method)(**kwargs)
return ret
def get_diffusion_coefficient(self, **kwargs):
return self._getter(AMSMSDJob, "get_diffusion_coefficient", kwargs)
def get_msd(self, **kwargs):
return self._getter(AMSMSDJob, "get_msd", kwargs)
def get_linear_fit(self, **kwargs):
return self._getter(AMSMSDJob, "get_linear_fit", kwargs)
def get_vacf(self, **kwargs):
return self._getter(AMSVACFJob, "get_vacf", kwargs)
def get_power_spectrum(self, **kwargs):
return self._getter(AMSVACFJob, "get_power_spectrum", kwargs)
class AMSConvenientAnalysisPerRegionJob(MultiJob):
_result_type = AMSConvenientAnalysisPerRegionResults
def __init__(self, previous_job, analysis_job_type, name=None, regions=None, per_element=False, **kwargs):
MultiJob.__init__(self, children=OrderedDict(), name=name or "analysis_per_region")
self.previous_job = previous_job
self.analysis_job_type = analysis_job_type
self.analysis_job_kwargs = kwargs
self.regions_dict = regions
self.per_element = per_element
@staticmethod
def get_regions_dict(molecule, per_element: bool = False):
regions_dict = defaultdict(lambda: [])
for i, at in enumerate(molecule, 1):
regions = set([at.properties.region]) if isinstance(at.properties.region, str) else at.properties.region
if len(regions) == 0:
region_name = "NoRegion" if not per_element else f"NoRegion_{at.symbol}"
regions_dict[region_name].append(i)
for region in regions:
region_name = region if not per_element else f"{region}_{at.symbol}"
regions_dict[region_name].append(i)
regions_dict["All"].append(i)
if per_element:
regions_dict[f"All_{at.symbol}"].append(i)
return regions_dict
def prerun(self): # noqa F811
regions_dict = self.regions_dict or self.get_regions_dict(
self.previous_job.results.get_main_molecule(), per_element=self.per_element
)
for region in regions_dict:
self.children[region] = self.analysis_job_type(
previous_job=self.previous_job,
name=region,
atom_indices=regions_dict[region],
**self.analysis_job_kwargs,
)
@staticmethod
def get_mean_std_per_region(list_of_jobs, function_name, **kwargs):
"""
list_of_jobs: List[AMSConvenientAnalysisPerRegionJob]
List of jobs over which to average
function_name: str
e.g. 'get_msd', 'get_power_spectrum'
"""
if isinstance(list_of_jobs, dict):
list_of_jobs = [x for x in list_of_jobs.values()]
all_x = defaultdict(lambda: [])
all_y = defaultdict(lambda: [])
for vacfjob in list_of_jobs:
# let's calculate mean and std for each region
results_dict = getattr(vacfjob.results, function_name)(**kwargs)
for region_name, ret_value in results_dict.items():
if np.isscalar(ret_value):
all_x[region_name].append(np.atleast_1d(ret_value))
elif len(ret_value) == 2:
all_x[region_name].append(np.atleast_1d(ret_value[0]))
all_y[region_name].append(np.atleast_1d(ret_value[1]))
mean_x = {}
std_x = {}
mean_y = {}
std_y = {}
for region_name, values in all_x.items():
mean_x[region_name] = np.mean(values, axis=0)
std_x[region_name] = np.std(values, axis=0)
if len(all_y) > 0:
mean_y[region_name] = np.mean(all_y[region_name], axis=0)
std_y[region_name] = np.std(all_y[region_name], axis=0)
if len(mean_y) > 0:
return mean_x, std_x, mean_y, std_y
else:
return mean_x, std_x
ADF Fragment¶
Run fragment-based ADF analysis as a dedicated multi-job workflow.
Overview¶
This recipe orchestrates two fragment calculations and one full-system ADF calculation and exposes energy-decomposition and related properties through a dedicated results class.
API¶
API reference
- class ADFFragmentJob(fragment1=None, fragment2=None, fragment1_opt=None, fragment2_opt=None, full_settings=None, frag1_settings=None, frag2_settings=None, **kwargs)[source]
Subclass of
MultiJobfor ADF fragment calculations.- Parameters:
- _result_type
alias of
ADFFragmentResults
- prerun()[source]
Prepare the fragments and the full calculation.
- classmethod load_external(path, jobname=None)[source]
Load the results of the ADFFragmentJob job from an external path.
- class ADFFragmentResults(job)[source]
- Parameters:
job (Job) –
- check()[source]
Check if the calculation finished successfully.
Overwriting the method of
MultiJobbecause it only checks if the job finished, not how it finished.
- get_properties()[source]
Redirect to
AMSResultsof the full calculation.
- get_main_molecule()[source]
Redirect to
AMSResultsof the full calculation.
- get_input_molecule()[source]
Redirect to
AMSResultsof the full calculation.
- get_energy(unit='au')[source]
Redirect to
AMSResultsof the full calculation.
- get_dipole_vector(unit='au')[source]
Redirect to
AMSResultsof the full calculation.
- get_energy_decomposition(unit='au')[source]
Get the energy decomposition of the fragment calculation.
- get_nocv_eigenvalues()[source]
Get the NOCV eigenvalues of the full calculation.
- check_nocv_eigenvalues()[source]
Basic NOCV check.
The eigenvalue #i should be equal to -eigenvalue #-i.
- Returns:
True if the check passes, False otherwise.
- Return type:
Code¶
Recipe module source: adffragment.py
from scm.plams.core.basejob import MultiJob
from scm.plams.core.results import Results
from scm.plams.core.settings import Settings
from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.tools.units import Units
from scm.plams.core.errors import FileError
from scm.plams.mol.molecule import Molecule
from scm.plams.core.enums import JobStatus
import numpy as np
from os.path import join as opj
from os.path import abspath, isdir, basename
from typing import Dict, List, Tuple, Union, Optional
__all__ = ["ADFFragmentJob", "ADFFragmentResults"]
class ADFFragmentResults(Results):
def check(self):
"""Check if the calculation finished successfully.
Overwriting the method of |MultiJob| because it only checks if the job
finished, not how it finished.
"""
return all(job.check() for job in self.job.children)
def get_properties(self):
"""Redirect to |AMSResults| of the full calculation."""
return self.job.full.results.get_properties()
def get_main_molecule(self):
"""Redirect to |AMSResults| of the full calculation."""
return self.job.full.results.get_main_molecule()
def get_input_molecule(self):
"""Redirect to |AMSResults| of the full calculation."""
return self.job.full.results.get_input_molecule()
def get_energy(self, unit="au"):
"""Redirect to |AMSResults| of the full calculation."""
return self.job.full.results.get_energy(unit)
def get_dipole_vector(self, unit="au"):
"""Redirect to |AMSResults| of the full calculation."""
return self.job.full.results.get_dipole_vector(unit)
def get_energy_decomposition(self, unit="au") -> Dict[str, float]:
"""Get the energy decomposition of the fragment calculation.
Args:
unit (str, optional): The unit of the energy. Defaults to 'au'.
Returns:
Dict[str, float]: The energy decomposition.
"""
energy_section = self.job.full.results.read_rkf_section("Energy", file="adf")
ret = {}
for k in [
"Electrostatic Energy",
"Kinetic Energy",
"Elstat Interaction",
"XC Energy",
]:
ret[k] = Units.convert(energy_section[k], "au", unit)
# most information not available from the KF file
res = self.job.full.results
res1 = self.job.f1.results
res2 = self.job.f2.results
# remaining information on the output file
def try_grep_result(key, pattern, match_position=-1, split_position=4):
match = res.grep_output(pattern)
if match:
match = Units.convert(float(match[match_position].split()[split_position]), "au", unit)
ret[key] = match
try_grep_result("E_int", "Total Bonding Energy:", match_position=-2)
try_grep_result("E_int_disp", "Dispersion Energy:")
try_grep_result("E_Pauli", "Pauli Repulsion (Delta")
try_grep_result("E_elstat", "Electrostatic Interaction:")
try_grep_result("E_orb", "Total Orbital Interactions:")
ret["E_1"] = res1.get_energy(unit=unit)
ret["E_2"] = res2.get_energy(unit=unit)
if hasattr(self.job, "f1_opt"):
ret["E_1_opt"] = self.job.f1_opt.results.get_energy(unit=unit)
ret["E_prep_1"] = ret["E_1"] - ret["E_1_opt"]
if hasattr(self.job, "f2_opt"):
ret["E_2_opt"] = self.job.f2_opt.results.get_energy(unit=unit)
ret["E_prep_2"] = ret["E_2"] - ret["E_2_opt"]
if ("E_1_opt" in ret) and ("E_2_opt" in ret):
ret["E_prep"] = ret["E_prep_1"] + ret["E_prep_2"]
ret["E_bond"] = ret["E_int"] + ret["E_prep"]
return ret
def get_nocv_eigenvalues(self) -> Union[List[float], Tuple[List[float], List[float]]]:
"""Get the NOCV eigenvalues of the full calculation."""
keys = self.job.full.results.get_rkf_skeleton(file="adf")
if "NOCV" not in keys:
raise KeyError("NOCV section not found in the rkf file")
keys = keys["NOCV"]
# restricted calculation
if "NOCV_eigenvalues_restricted" in keys:
return self.job.full.results.readrkf("NOCV", "NOCV_eigenvalues_restricted", file="adf")
# unrestricted calculation
elif "NOCV_eigenvalues_alpha" in keys:
alpha = self.job.full.results.readrkf("NOCV", "NOCV_eigenvalues_alpha", file="adf")
beta = self.job.full.results.readrkf("NOCV", "NOCV_eigenvalues_beta", file="adf")
tpl = (
[float(x) for x in alpha],
[float(x) for x in beta], # make mypy happy by ensuring return is tuple of list of floats
)
return tpl
else:
raise KeyError("NOCV eigenvalues not found in the rkf file")
def check_nocv_eigenvalues(self) -> bool:
"""Basic NOCV check.
The eigenvalue #i should be equal to -eigenvalue #-i.
Returns:
bool: True if the check passes, False otherwise.
"""
def _check_list(eigenvalues):
for i in range(int(len(eigenvalues) / 2)):
if not np.isclose(eigenvalues[i], -1 * eigenvalues[-i - 1], atol=1e-4):
return False
return True
NOCV_eigenvalues = self.job.results.get_nocv_eigenvalues()
# spin-restricted calculation
if isinstance(NOCV_eigenvalues, list):
return _check_list(NOCV_eigenvalues)
elif isinstance(NOCV_eigenvalues, tuple):
for eigenvalues in NOCV_eigenvalues:
if not _check_list(eigenvalues):
return False
return True
else:
raise TypeError("Unexpected type for NOCV eigenvalues")
def get_nocv_orbital_interaction(self, unit="kcal/mol") -> Union[List[float], Tuple[List[float]]]:
"""Get the NOCV orbital interactions of the full calculation."""
def _calc_energies(oi: List[float]) -> List[float]:
ret = []
for i in range(int(len(oi) / 2)):
a = oi[i]
b = oi[len(oi) - i - 1]
ret.append(a + b)
return ret
keys = self.job.full.results.get_rkf_skeleton(file="adf")
if "NOCV" not in keys:
raise KeyError("NOCV section not found in the rkf file")
keys = keys["NOCV"]
# restricted calculation
if "NOCV_oi_restricted" in keys:
oi = self.job.full.results.readrkf("NOCV", "NOCV_oi_restricted", file="adf")
# unrestricted calculation
elif "NOCV_oi_alpha" in keys:
tpl = (
self.job.full.results.readrkf("NOCV", "NOCV_oi_alpha", file="adf"),
self.job.full.results.readrkf("NOCV", "NOCV_oi_beta", file="adf"),
)
else:
raise KeyError("NOCV orbital interaction not found in rkf file")
energies: Union[List[float], Tuple[List[float], List[float]]] # make mypy happy
if isinstance(oi, tuple):
energies = (_calc_energies(tpl[0]), _calc_energies(tpl[1]))
elif isinstance(oi, list):
energies = _calc_energies(oi)
return Units.convert(energies, "kcal/mol", unit)
class ADFFragmentJob(MultiJob):
"""Subclass of |MultiJob| for ADF fragment calculations."""
_result_type = ADFFragmentResults
def __init__(
self,
fragment1: Optional[Molecule] = None,
fragment2: Optional[Molecule] = None,
fragment1_opt: Optional[Molecule] = None,
fragment2_opt: Optional[Molecule] = None,
full_settings: Optional[Settings] = None,
frag1_settings: Optional[Settings] = None,
frag2_settings: Optional[Settings] = None,
**kwargs,
):
"""Create an ADFFragmentJob with the given fragments.
The optimized fragment structures can be given as arguments.
Single point calculations on these structures will then be performed
to obtain the preparation energy.
Subclass of |MultiJob|.
Args:
fragment1 (|Molecule|): The first fragment.
fragment2 (|Molecule|): The second fragment.
fragment1_opt (|Molecule|, optional): The optimized first fragment.
fragment2_opt (|Molecule|, optional): The optimized second fragment.
full_settings (Settings): The settings for the full calculation.
frag1_settings (Settings): Specific settings for the first fragment calculation.
frag2_settings (Settings): Specific settings for the second fragment calculation.
**kwargs: Further keyword arguments for |MultiJob|.
"""
MultiJob.__init__(self, **kwargs)
self.fragment1 = fragment1.copy() if isinstance(fragment1, Molecule) else fragment1
self.frag1_settings = frag1_settings or Settings()
self.fragment2 = fragment2.copy() if isinstance(fragment2, Molecule) else fragment2
self.frag2_settings = frag2_settings or Settings()
self.full_settings = full_settings or Settings()
if isinstance(fragment1_opt, Molecule):
self.fragment1_opt = fragment1_opt.copy()
elif fragment1_opt is not None:
raise TypeError("fragment1_opt should be a Molecule object or None")
if isinstance(fragment2_opt, Molecule):
self.fragment2_opt = fragment2_opt.copy()
elif fragment2_opt is not None:
raise TypeError("fragment2_opt should be a Molecule object or None")
def prerun(self): # noqa F811
"""Prepare the fragments and the full calculation."""
self.f1 = AMSJob(
name="frag1",
molecule=self.fragment1,
settings=self.settings + self.frag1_settings,
)
self.f2 = AMSJob(
name="frag2",
molecule=self.fragment2,
settings=self.settings + self.frag2_settings,
)
for at in self.fragment1:
at.properties.suffix = "adf.f=subsystem1"
for at in self.fragment2:
at.properties.suffix = "adf.f=subsystem2"
self.full = AMSJob(
name="full",
molecule=self.fragment1 + self.fragment2,
settings=self.settings + self.full_settings,
)
self.full.settings.input.adf.fragments.subsystem1 = (self.f1, "adf")
self.full.settings.input.adf.fragments.subsystem2 = (self.f2, "adf")
self.children = [self.f1, self.f2, self.full]
if hasattr(self, "fragment1_opt"):
self.f1_opt = AMSJob(
name="frag1_opt",
molecule=self.fragment1_opt,
settings=self.settings + self.frag1_settings,
)
self.children.append(self.f1_opt)
if hasattr(self, "fragment2_opt"):
self.f2_opt = AMSJob(
name="frag2_opt",
molecule=self.fragment2_opt,
settings=self.settings + self.frag2_settings,
)
self.children.append(self.f2_opt)
@classmethod
def load_external(cls, path: str, jobname: Optional[str] = None) -> "ADFFragmentJob":
"""Load the results of the ADFFragmentJob job from an external path.
Args:
path (str): The path to the job. It should at least have the
subfolders 'frag1', 'frag2' and 'full'.
jobname (str, optional): The name of the job. Defaults to None.
Returns:
ADFFragmentJob: The job with the loaded results.
"""
if not isdir(path):
raise FileError("Path {} does not exist, cannot load from it.".format(path))
path = abspath(path)
jobname = basename(path) if jobname is None else str(jobname)
job = cls(name=jobname)
job.path = path
job.status = JobStatus.COPIED
job.f1 = AMSJob.load_external(opj(path, "frag1"))
job.f2 = AMSJob.load_external(opj(path, "frag2"))
job.full = AMSJob.load_external(opj(path, "full"))
job.children = [job.f1, job.f2, job.full]
if isdir(opj(path, "frag1_opt")):
job.f1_opt = AMSJob.load_external(opj(path, "frag1_opt"))
job.children.append(job.f1_opt)
if isdir(opj(path, "frag2_opt")):
job.f2_opt = AMSJob.load_external(opj(path, "frag2_opt"))
job.children.append(job.f2_opt)
return job
BAND Fragment¶
Run BAND energy decomposition workflows for periodic systems with optional NOCV analysis.
Overview¶
This recipe extends the fragment approach to periodic BAND systems and supports NOCV workflows via
NOCVBandFragmentJob.
API¶
API reference
- class BANDFragmentJob(fragment1=None, fragment2=None, fragment1_opt=None, fragment2_opt=None, full_settings=None, frag1_settings=None, frag2_settings=None, **kwargs)[source]
- Parameters:
- _result_type
alias of
BANDFragmentResults
- prerun()[source]
Creates the fragment jobs.
- Return type:
None
- new_children()[source]
After the first round, add the full job to the children list.
- classmethod load_external(path, jobname=None)[source]
Load the results of the BANDFragmentJob job from an external path.
- class BANDFragmentResults(job)[source]
Subclass of
ADFFragmentResultsfor BAND calculations.- Parameters:
job (Job) –
- class NOCVBandFragmentJob(nocv_settings=None, **kwargs)[source]
- _result_type
alias of
BANDFragmentResults
- new_children()[source]
After the first round, add the full job to the children list. After the second round, add the NOCV job to the children list.
- classmethod load_external(path, jobname=None)[source]
Load the results of the BANDFragmentJob job from an external path.
Code¶
Recipe module source: bandfragment.py
from scm.plams.core.functions import log
from scm.plams.interfaces.adfsuite.ams import AMSJob
from scm.plams.core.settings import Settings
from scm.plams.recipes.adffragment import ADFFragmentJob, ADFFragmentResults
from scm.plams.tools.units import Units
from scm.plams.core.errors import FileError
from scm.plams.core.functions import add_to_instance
from scm.plams.core.enums import JobStatus
from os.path import join as opj
from os.path import abspath, isdir, basename, relpath
from os import symlink
from typing import Dict, List, Union, Optional
__all__ = ["BANDFragmentJob", "BANDFragmentResults"]
class BANDFragmentResults(ADFFragmentResults):
"""Subclass of |ADFFragmentResults| for BAND calculations."""
def get_energy_decomposition(self, unit="au") -> Dict[str, float]:
"""Get the energy decomposition of the fragment calculation.
Args:
unit (str, optional): The unit of the energy. Defaults to 'au'.
Returns:
Dict[str, float]: The energy decomposition.
"""
res = self.job.full.results
res1 = self.job.f1.results
res2 = self.job.f2.results
ret = {}
def try_grep_result(key, pattern, match_position=-1, split_position=2):
match = res.grep_output(pattern)
if match:
match = Units.convert(float(match[match_position].split()[split_position]), "au", unit)
ret[key] = match
try_grep_result("E_int", "E_int", match_position=-2)
try_grep_result("E_int_disp", "E_disp")
try_grep_result("E_Pauli", "E_Pauli")
try_grep_result("E_elstat", "E_elstat")
try_grep_result("E_orb", "E_orb")
ret["E_1"] = res1.get_energy(unit=unit)
ret["E_2"] = res2.get_energy(unit=unit)
if hasattr(self.job, "f1_opt"):
ret["E_1_opt"] = self.job.f1_opt.results.get_energy(unit=unit)
ret["E_prep_1"] = ret["E_1"] - ret["E_1_opt"]
if hasattr(self.job, "f2_opt"):
ret["E_2_opt"] = self.job.f2_opt.results.get_energy(unit=unit)
ret["E_prep_2"] = ret["E_2"] - ret["E_2_opt"]
if ("E_1_opt" in ret) and ("E_2_opt" in ret):
ret["E_prep"] = ret["E_prep_1"] + ret["E_prep_2"]
ret["E_bond"] = ret["E_int"] + ret["E_prep"]
return ret
class BANDFragmentJob(ADFFragmentJob):
_result_type = BANDFragmentResults
def prerun(self) -> None:
"""Creates the fragment jobs."""
self.f1 = AMSJob(name="frag1", molecule=self.fragment1, settings=self.settings)
self.f2 = AMSJob(name="frag2", molecule=self.fragment2, settings=self.settings)
self.children += [self.f1, self.f2]
def new_children(self) -> Union[None, List[AMSJob]]:
"""After the first round, add the full job to the children list."""
if hasattr(self, "full"):
return None
else:
# create the correct mapping settings for the full job
if "fragment" in self.full_settings.input.band:
log(
"Fragment already present in full_settings. Assuming that the user has already set up the mapping. Skipping the mapping setup.",
level=1,
)
return
# first fragment 1 then fragment 2
set1 = Settings()
set1.atommapping = {str(i + 1): str(i + 1) for i in range(len(self.fragment1))}
set2 = Settings()
set2.atommapping = {str(i + 1): str(i + 1 + len(self.fragment1)) for i in range(len(self.fragment2))}
set1.filename = (self.f1, "band")
set2.filename = (self.f2, "band")
self.full_settings.input.band.fragment = [set1, set2]
# create the full job
self.full = AMSJob(
name="full", molecule=self.fragment1 + self.fragment2, settings=self.settings + self.full_settings
)
return [self.full]
@classmethod
def load_external(cls, path: str, jobname: Optional[str] = None) -> "BANDFragmentJob":
"""Load the results of the BANDFragmentJob job from an external path.
Args:
path (str): The path to the job. It should at least have the
subfolders 'frag1', 'frag2' and 'full'.
jobname (str, optional): The name of the job. Defaults to None.
Returns:
BANDFragmentJob: The job with the loaded results.
"""
if not isdir(path):
raise FileError("Path {} does not exist, cannot load from it.".format(path))
path = abspath(path)
jobname = basename(path) if jobname is None else str(jobname)
job = cls(name=jobname)
job.path = path
job.status = JobStatus.COPIED
job.f1 = AMSJob.load_external(opj(path, "frag1"))
job.f2 = AMSJob.load_external(opj(path, "frag2"))
job.full = AMSJob.load_external(opj(path, "full"))
job.children = [job.f1, job.f2, job.full]
if isdir(opj(path, "frag1_opt")):
job.f1_opt = AMSJob.load_external(opj(path, "frag1_opt"))
job.children.append(job.f1_opt)
if isdir(opj(path, "frag2_opt")):
job.f2_opt = AMSJob.load_external(opj(path, "frag2_opt"))
job.children.append(job.f2_opt)
return job
class NOCVBandFragmentJob(BANDFragmentJob):
_result_type = BANDFragmentResults
def __init__(self, nocv_settings=None, **kwargs) -> None:
"""Create a BANDFragmentJob with final NOCV calculation.
Args:
nocv_settings (Settings, optional): Settings for the NOCV calculation. Defaults to None.
"""
super().__init__(**kwargs)
self.nocv_settings = nocv_settings or Settings()
def new_children(self) -> Union[None, List[AMSJob]]:
"""After the first round, add the full job to the children list.
After the second round, add the NOCV job to the children list."""
# new_children of BANDFragmentJob creates the full job
ret = super().new_children()
if ret is not None:
return ret
if hasattr(self, "nocv"):
return None
else:
# add NOCV run
# settings for the NOCV calculation
set = self.settings + self.full_settings + self.nocv_settings
# set the restart file
set.input.band.restart.file = "full.rkf"
# copy the fragment settings
set.input.band.fragment = [fset.copy() for fset in self.full.settings.input.band.fragment]
self.nocv = AMSJob(name="NOCV", molecule=self.fragment1 + self.fragment2, settings=set)
self.nocv.frag_paths = []
for job in [self.f1, self.f2, self.full]:
self.nocv.frag_paths.append(job.path)
# edit NOCV prerun to create symlinks
@add_to_instance(self.nocv)
def prerun(self):
"""Create symlinks for the restart files."""
for i, job in enumerate(["frag1", "frag2", "full"]):
rel_path = relpath(self.frag_paths[i], self.path)
symlink(opj(rel_path, "band.rkf"), opj(self.path, f"{job}.rkf"))
return [self.nocv]
@classmethod
def load_external(cls, path: str, jobname: Optional[str] = None) -> "NOCVBandFragmentJob":
"""Load the results of the BANDFragmentJob job from an external path.
Args:
path (str): The path to the job. It should at least have the
subfolders 'frag1', 'frag2', 'full' and 'NOCV'.
jobname (str, optional): The name of the job. Defaults to None.
Returns:
NOCVBandFragmentJob: The job with the loaded results.
"""
job = super().load_external(path, jobname)
job.nocv = AMSJob.load_external(opj(path, "NOCV"))
job.children.append(job.nocv)
return job
Reorganization Energy¶
Compute Marcus reorganization energies using four coordinated AMS calculations.
Overview¶
The workflow evaluates energies at optimized geometries of two electronic states and combines them into the reorganization energy used in Marcus theory.
API¶
API reference
- class ReorganizationEnergyJob(molecule, common_settings, settings_state_A, settings_state_B, **kwargs)[source]
A class for calculating the reorganization energy using AMS. Given two states, A and B, the reorganization energy is defined as follows:
reorganization energy = E(state B at optimal geometry for state A) - E(state A at optimal geometry for state A) + E(state A at optimal geometry for state B) - E(state B at optimal geometry for state B)
This job will run two geometry optimizations and two single point calculations.
- _result_type
alias of
ReorganizationEnergyResults
Code¶
Recipe module source: reorganization_energy.py
from collections import OrderedDict
from scm.plams.core.basejob import MultiJob
from scm.plams.core.functions import add_to_instance
from scm.plams.core.results import Results
from scm.plams.interfaces.adfsuite.ams import AMSJob
__all__ = ["ReorganizationEnergyJob", "ReorganizationEnergyResults"]
# using this function to pass a molecule inside a MultiJob ensures proper parallel execution
def pass_molecule(source, target):
@add_to_instance(target)
def prerun(self): # noqa F811
self.molecule = source.results.get_main_molecule()
class ReorganizationEnergyResults(Results):
"""Results class for reorganization energy."""
def reorganization_energy(self, unit="au"):
energies = self.get_all_energies(unit)
reorganization_energy = (
energies["state B geo A"]
- energies["state A geo A"]
+ energies["state A geo B"]
- energies["state B geo B"]
)
return reorganization_energy
def get_all_energies(self, unit="au"):
energies = {}
energies["state A geo A"] = self.job.children["go_A"].results.get_energy(unit=unit)
energies["state B geo B"] = self.job.children["go_B"].results.get_energy(unit=unit)
energies["state A geo B"] = self.job.children["sp_A_for_geo_B"].results.get_energy(unit=unit)
energies["state B geo A"] = self.job.children["sp_B_for_geo_A"].results.get_energy(unit=unit)
return energies
class ReorganizationEnergyJob(MultiJob):
"""A class for calculating the reorganization energy using AMS.
Given two states, A and B, the reorganization energy is defined as follows:
reorganization energy =
E(state B at optimal geometry for state A) -
E(state A at optimal geometry for state A) +
E(state A at optimal geometry for state B) -
E(state B at optimal geometry for state B)
This job will run two geometry optimizations and two single point calculations.
"""
_result_type = ReorganizationEnergyResults
def __init__(self, molecule, common_settings, settings_state_A, settings_state_B, **kwargs):
"""
molecule: the molecule
common_settings: a setting object for an AMSJob containing the shared settings for all the calculations
settings_state_A: Setting object for an AMSJob containing exclusivelt the options defining the state A (e.g. charge and spin)
settings_state_B: Setting object for an AMSJob containing exclusivelt the options defining the state B (e.g. charge and spin)
kwargs: other options to be passed to the MultiJob constructor
"""
MultiJob.__init__(self, children=OrderedDict(), **kwargs)
go_settings = common_settings.copy()
go_settings.input.ams.task = "GeometryOptimization"
sp_settings = common_settings.copy()
sp_settings.input.ams.task = "SinglePoint"
# copy the settings so that we wont modify the ones provided as input by the user
settings_state_A = settings_state_A.copy()
settings_state_B = settings_state_B.copy()
# In case the charge key is not specified, excplicitely set the value to 0.
# This is to prevent the charge in molecule.properties.charge (set by get_main_molecule())
# to be used in case of neutral systems
for s in [settings_state_A, settings_state_B]:
if not "charge" in s.input.ams.system:
s.input.ams.system.charge = 0
self.children["go_A"] = AMSJob(molecule=molecule, settings=go_settings + settings_state_A, name="go_A")
self.children["go_B"] = AMSJob(molecule=molecule, settings=go_settings + settings_state_B, name="go_B")
self.children["sp_A_for_geo_B"] = AMSJob(settings=sp_settings + settings_state_A, name="sp_A_geo_B")
self.children["sp_B_for_geo_A"] = AMSJob(settings=sp_settings + settings_state_B, name="sp_B_geo_A")
pass_molecule(self.children["go_A"], self.children["sp_B_for_geo_A"])
pass_molecule(self.children["go_B"], self.children["sp_A_for_geo_B"])
ADFNBO¶
Extend ADF runs with NBO6/gennbo execution through an AMS job wrapper.
Overview¶
ADFNBOJob extends the regular AMS/ADF runscript to execute adfnbo and gennbo6 while
ensuring required ADF input is present.
API¶
API reference
- class ADFNBOJob(molecule=None, *args, **kwargs)[source]
- Parameters:
- prerun()[source]
Actions to take before the actual job execution.
This method is initially empty, it can be defined in subclasses or directly added to either the whole class or a single instance using Binding decorators.
Code¶
Recipe module source: adfnbo.py
import os
from scm.plams.core.functions import log
from scm.plams.interfaces.adfsuite.ams import AMSJob
__all__ = ["ADFNBOJob"]
class ADFNBOJob(AMSJob):
def prerun(self): # noqa F811
s = self.settings.input.ADF
s.fullfock = True
s.aomat2file = True
s.symmetry = "NoSym"
s.basis.core = "None"
if "save" in s:
if isinstance(s.save, str):
s.save += " TAPE15"
elif isinstance(s.save, list):
s.save.append("TAPE15")
else:
log(
"WARNING: 'SAVE TAPE15' could not be added to the input settings of {}. Make sure (thisjob).settings.input.save is a string or a list.".format(
self.name
),
1,
)
else:
s.save = "TAPE15"
if isinstance(self.settings.adfnbo, list):
adfnbo_input = self.settings.adfnbo
else:
adfnbo_input = ["write", "spherical", "fock"]
log(
"WARNING: (thisjob).settings.adfnbo should be a list. Using default settings: write, fock, spherical", 1
)
self.settings.runscript.post = (
'cp "' + os.path.join(self.path, "adf.rkf") + '" TAPE21\n'
"$AMSBIN/adfnbo <<eor\n" + "\n".join(adfnbo_input) + "\neor\n\n$AMSBIN/gennbo6 FILE47\n"
)
pyAHFCDOS¶
Compute vibronic density of states from two vibronic spectra at AH-FC level.
Overview¶
The FCFDOS class combines absorption and emission vibronic data to build a DOS profile with
configurable broadening.
API¶
API reference
- class FCFDOS(absrkf, emirkf, absele=0.0, emiele=0.0)[source]
A class for calculating the convolution of two FCF spectra
- dos(lineshape='GAU', HWHM=100.0)[source]
Calculate density of states by computing the overlap of the two FCF spectra The two spectra are broadened using the chosen lineshape and Half-Width at Half-Maximum in cm-1
- convolute(spc, stick, lineshape=None, HWHM=None)[source]
Convolute stick spectrum with the chosen width and lineshape lineshape : Can be Gaussian or Lorentzian HWHM : expressed in cm-1
- trapezoid(spc)[source]
Integrate spectrum using the trapezoid rule
Code¶
Recipe module source: fcf_dos.py
# from .scmjob import SCMJob, SCMResults
import math
import numpy
from scm.plams.tools.kftools import KFFile
__all__ = ["FCFDOS"]
class FCFDOS:
"""
A class for calculating the convolution of two FCF spectra
"""
# Conversion factor to cm-1 == ( me^2 * c * alpha^2 ) / ( 100 * 2pi * amu * hbar )
G2F = 120.399494933
# J2Ha = 1.0 / (scipy.constants.m_e * scipy.constants.speed_of_light**2 * 0.0072973525693**2)
J2Ha = 2.293712278400752e17
def __init__(self, absrkf, emirkf, absele=0.0, emiele=0.0):
"""
absrkf : KFFile from an FCF absorption calculation for the acceptor, as name, path, or KFFile object
emirkf : KFFile from an FCF emission calculation for the donor, as name, path, or KFFile object
absele : Acceptor absorption electronic energy cm-1
emiele : Donor emission electronic energy in cm-1
"""
if isinstance(absrkf, str):
self.absrkf = KFFile(absrkf)
else:
self.absrkf = absrkf
if isinstance(emirkf, str):
self.emirkf = KFFile(emirkf)
else:
self.emirkf = emirkf
self.absele = absele
self.emiele = emiele
self.spc = None
def _getstick(self, source):
spclen = source.read("Fcf", "nspectrum")
rawspc = source.read("Fcf", "spectrum")
stick = numpy.reshape(numpy.array(rawspc), (2, spclen)).transpose()
# Reorder spectrum if decreasing
if stick[0, 0] > stick[-1, 0]:
for ix in range(numpy.size(stick, 0) // 2):
XX = numpy.copy(stick[ix, :])
stick[ix, :] = stick[-ix - 1, :]
stick[-ix - 1, :] = XX
# Get frequencies in cm-1
frq1 = numpy.array(source.read("Fcf", "gamma1")) * self.G2F
frq2 = numpy.array(source.read("Fcf", "gamma2")) * self.G2F
# Calculate energy in Joules
# factor = scipy.constants.pi * scipy.constants.hbar * scipy.constants.speed_of_light * 100.0
factor = 9.932229285744643e-24
viben1 = sum(frq1) * factor
viben2 = sum(frq2) * factor
# Convert to Hartree
viben1 = viben1 * self.J2Ha
viben2 = viben2 * self.J2Ha
# Add ZPE and electronic energy
stick[:, 0] = stick[:, 0] + viben2 - viben1 + self.absele + self.emiele
return stick
def _spcinit(self):
# Get absorption and emission spectra
absspc = self._getstick(self.absrkf)
emispc = self._getstick(self.emirkf)
# Find spectrum bounds
absmin = numpy.amin(absspc[:, 0])
absmax = numpy.amax(absspc[:, 0])
emimin = numpy.amin(emispc[:, 0])
emimax = numpy.amax(emispc[:, 0])
spcmin = min(absmin, emimin)
spcmax = max(absmax, emimax)
# Find spectrum length and grain
absdlt = abs(absspc[0, 0] - absspc[1, 0])
emidlt = abs(absspc[0, 0] - absspc[1, 0])
spcgrn = min(absdlt, emidlt)
spclen = math.floor((spcmax - spcmin) / spcgrn) + 1
# Initialize spectrum
self.spc = self.newspc(spcmin, spcmax, spclen)
return None
def dos(self, lineshape="GAU", HWHM=100.0):
"""
Calculate density of states by computing the overlap of the two FCF spectra
The two spectra are broadened using the chosen lineshape and Half-Width at Half-Maximum in cm-1
"""
# Initialize spectrum
self._spcinit()
# Get stick spectra
absstick = self._getstick(self.absrkf)
emistick = self._getstick(self.emirkf)
# Convolute with gaussians
absspc = numpy.copy(self.spc)
emispc = numpy.copy(self.spc)
absspc = self.convolute(absspc, absstick, lineshape, HWHM)
emispc = self.convolute(emispc, emistick, lineshape, HWHM)
# Integrate
# Calculate DOS
self.spc[:, 1] = absspc[:, 1] * emispc[:, 1]
dos = self.trapezoid(self.spc)
return dos
def newspc(self, spcmin, spcmax, spclen=1000):
# Dimensions of the spectrum
delta = (spcmax - spcmin) / (spclen - 1)
spc = numpy.zeros((spclen, 2), dtype=float)
for ix in range(spclen):
spc[ix, 0] = spcmin + delta * ix
return spc
def convolute(self, spc, stick, lineshape=None, HWHM=None):
"""
Convolute stick spectrum with the chosen width and lineshape
lineshape : Can be Gaussian or Lorentzian
HWHM : expressed in cm-1
"""
if HWHM is None:
raise ValueError("HWHM not defined")
if lineshape is None:
raise ValueError("Lineshape not defined")
# Data for the convolution
delta = spc[1, 0] - spc[0, 0]
if lineshape[0:3].upper() == "GAU":
# Gaussian lineshape
idline = 1
# This includes the Gaussian prefactor and the factor to account for the reduced lineshape width
# factor = 1. / scipy.special.erf(2*math.sqrt(math.log(2)))
factor = 1.0188815852036244
factA = math.sqrt(numpy.log(2.0) / math.pi) * factor / HWHM
factB = -numpy.log(2.0) / HWHM**2
# We only convolute between -2HWHM and +2HWHM which accounts for 98.1% of the area
ishft = math.floor(2 * HWHM / delta)
elif lineshape[0:3].upper() == "LOR":
# Lorentzian lineshape
idline = 2
# This includes the Lorentzian prefactor and the factor to account for the reduced lineshape width
factA = (math.pi / math.atan(12) / 2) * HWHM / math.pi
factB = 0.0
# We only convolute between -12HWHM and +12HWHM which accounts for 94.7% of the area
ishft = math.floor(12 * HWHM / delta)
else:
raise ValueError("invalid lineshape")
# Loop over peaks in the stick spectrum
spclen = numpy.size(spc, 0)
for ix in range(numpy.size(stick[:, 0])):
# Find peak position in the convoluted spectrum
peakpos = 1 + math.floor((stick[ix, 0] - spc[0, 0]) / delta)
# Convolution interval, limited to save computational time
i1 = max(peakpos - ishft, 1)
i2 = min(peakpos + ishft, spclen)
factor = factA * stick[ix, 1]
if idline == 1: # Gaussian
for i in range(i1, i2):
spc[i, 1] = spc[i, 1] + factor * math.exp(factB * (spc[i, 0] - stick[ix, 0]) ** 2)
elif idline == 2: # Lorentzian
for i in range(i1, i2):
spc[i, 1] = spc[i, 1] + factor / (HWHM + (spc[i, 0] - stick[ix, 0]) ** 2)
return spc
def trapezoid(self, spc):
"""
Integrate spectrum using the trapezoid rule
"""
value = (spc[0, 1] + spc[-1, 1]) / 2
value = value + sum(spc[1:-1, 1])
# The abscissas must be equally spaced for this to work
value = value * (spc[1, 0] - spc[0, 0])
return value
def __str__(self):
string = f"Absorption from {self.absrkf.path}\nEmission from {self.emirkf.path}\nAcceptor absorption electronic energy = {self.absele} cm-1\nDonor emission electronic energy = {self.emiele} cm-1"
return string
ADFVibronicDOS¶
Practical ADF workflow that uses AH-FC vibronic spectra to build a DOS profile.
Overview¶
This workflow runs the vibronic calculations and then applies FCFDOS to produce and analyze the
vibronic density of states.
API¶
API reference
- class FCFDOS(absrkf, emirkf, absele=0.0, emiele=0.0)[source]
A class for calculating the convolution of two FCF spectra
- dos(lineshape='GAU', HWHM=100.0)[source]
Calculate density of states by computing the overlap of the two FCF spectra The two spectra are broadened using the chosen lineshape and Half-Width at Half-Maximum in cm-1
- convolute(spc, stick, lineshape=None, HWHM=None)[source]
Convolute stick spectrum with the chosen width and lineshape lineshape : Can be Gaussian or Lorentzian HWHM : expressed in cm-1
- trapezoid(spc)[source]
Integrate spectrum using the trapezoid rule
Code¶
Recipe module source: fcf_dos.py
# from .scmjob import SCMJob, SCMResults
import math
import numpy
from scm.plams.tools.kftools import KFFile
__all__ = ["FCFDOS"]
class FCFDOS:
"""
A class for calculating the convolution of two FCF spectra
"""
# Conversion factor to cm-1 == ( me^2 * c * alpha^2 ) / ( 100 * 2pi * amu * hbar )
G2F = 120.399494933
# J2Ha = 1.0 / (scipy.constants.m_e * scipy.constants.speed_of_light**2 * 0.0072973525693**2)
J2Ha = 2.293712278400752e17
def __init__(self, absrkf, emirkf, absele=0.0, emiele=0.0):
"""
absrkf : KFFile from an FCF absorption calculation for the acceptor, as name, path, or KFFile object
emirkf : KFFile from an FCF emission calculation for the donor, as name, path, or KFFile object
absele : Acceptor absorption electronic energy cm-1
emiele : Donor emission electronic energy in cm-1
"""
if isinstance(absrkf, str):
self.absrkf = KFFile(absrkf)
else:
self.absrkf = absrkf
if isinstance(emirkf, str):
self.emirkf = KFFile(emirkf)
else:
self.emirkf = emirkf
self.absele = absele
self.emiele = emiele
self.spc = None
def _getstick(self, source):
spclen = source.read("Fcf", "nspectrum")
rawspc = source.read("Fcf", "spectrum")
stick = numpy.reshape(numpy.array(rawspc), (2, spclen)).transpose()
# Reorder spectrum if decreasing
if stick[0, 0] > stick[-1, 0]:
for ix in range(numpy.size(stick, 0) // 2):
XX = numpy.copy(stick[ix, :])
stick[ix, :] = stick[-ix - 1, :]
stick[-ix - 1, :] = XX
# Get frequencies in cm-1
frq1 = numpy.array(source.read("Fcf", "gamma1")) * self.G2F
frq2 = numpy.array(source.read("Fcf", "gamma2")) * self.G2F
# Calculate energy in Joules
# factor = scipy.constants.pi * scipy.constants.hbar * scipy.constants.speed_of_light * 100.0
factor = 9.932229285744643e-24
viben1 = sum(frq1) * factor
viben2 = sum(frq2) * factor
# Convert to Hartree
viben1 = viben1 * self.J2Ha
viben2 = viben2 * self.J2Ha
# Add ZPE and electronic energy
stick[:, 0] = stick[:, 0] + viben2 - viben1 + self.absele + self.emiele
return stick
def _spcinit(self):
# Get absorption and emission spectra
absspc = self._getstick(self.absrkf)
emispc = self._getstick(self.emirkf)
# Find spectrum bounds
absmin = numpy.amin(absspc[:, 0])
absmax = numpy.amax(absspc[:, 0])
emimin = numpy.amin(emispc[:, 0])
emimax = numpy.amax(emispc[:, 0])
spcmin = min(absmin, emimin)
spcmax = max(absmax, emimax)
# Find spectrum length and grain
absdlt = abs(absspc[0, 0] - absspc[1, 0])
emidlt = abs(absspc[0, 0] - absspc[1, 0])
spcgrn = min(absdlt, emidlt)
spclen = math.floor((spcmax - spcmin) / spcgrn) + 1
# Initialize spectrum
self.spc = self.newspc(spcmin, spcmax, spclen)
return None
def dos(self, lineshape="GAU", HWHM=100.0):
"""
Calculate density of states by computing the overlap of the two FCF spectra
The two spectra are broadened using the chosen lineshape and Half-Width at Half-Maximum in cm-1
"""
# Initialize spectrum
self._spcinit()
# Get stick spectra
absstick = self._getstick(self.absrkf)
emistick = self._getstick(self.emirkf)
# Convolute with gaussians
absspc = numpy.copy(self.spc)
emispc = numpy.copy(self.spc)
absspc = self.convolute(absspc, absstick, lineshape, HWHM)
emispc = self.convolute(emispc, emistick, lineshape, HWHM)
# Integrate
# Calculate DOS
self.spc[:, 1] = absspc[:, 1] * emispc[:, 1]
dos = self.trapezoid(self.spc)
return dos
def newspc(self, spcmin, spcmax, spclen=1000):
# Dimensions of the spectrum
delta = (spcmax - spcmin) / (spclen - 1)
spc = numpy.zeros((spclen, 2), dtype=float)
for ix in range(spclen):
spc[ix, 0] = spcmin + delta * ix
return spc
def convolute(self, spc, stick, lineshape=None, HWHM=None):
"""
Convolute stick spectrum with the chosen width and lineshape
lineshape : Can be Gaussian or Lorentzian
HWHM : expressed in cm-1
"""
if HWHM is None:
raise ValueError("HWHM not defined")
if lineshape is None:
raise ValueError("Lineshape not defined")
# Data for the convolution
delta = spc[1, 0] - spc[0, 0]
if lineshape[0:3].upper() == "GAU":
# Gaussian lineshape
idline = 1
# This includes the Gaussian prefactor and the factor to account for the reduced lineshape width
# factor = 1. / scipy.special.erf(2*math.sqrt(math.log(2)))
factor = 1.0188815852036244
factA = math.sqrt(numpy.log(2.0) / math.pi) * factor / HWHM
factB = -numpy.log(2.0) / HWHM**2
# We only convolute between -2HWHM and +2HWHM which accounts for 98.1% of the area
ishft = math.floor(2 * HWHM / delta)
elif lineshape[0:3].upper() == "LOR":
# Lorentzian lineshape
idline = 2
# This includes the Lorentzian prefactor and the factor to account for the reduced lineshape width
factA = (math.pi / math.atan(12) / 2) * HWHM / math.pi
factB = 0.0
# We only convolute between -12HWHM and +12HWHM which accounts for 94.7% of the area
ishft = math.floor(12 * HWHM / delta)
else:
raise ValueError("invalid lineshape")
# Loop over peaks in the stick spectrum
spclen = numpy.size(spc, 0)
for ix in range(numpy.size(stick[:, 0])):
# Find peak position in the convoluted spectrum
peakpos = 1 + math.floor((stick[ix, 0] - spc[0, 0]) / delta)
# Convolution interval, limited to save computational time
i1 = max(peakpos - ishft, 1)
i2 = min(peakpos + ishft, spclen)
factor = factA * stick[ix, 1]
if idline == 1: # Gaussian
for i in range(i1, i2):
spc[i, 1] = spc[i, 1] + factor * math.exp(factB * (spc[i, 0] - stick[ix, 0]) ** 2)
elif idline == 2: # Lorentzian
for i in range(i1, i2):
spc[i, 1] = spc[i, 1] + factor / (HWHM + (spc[i, 0] - stick[ix, 0]) ** 2)
return spc
def trapezoid(self, spc):
"""
Integrate spectrum using the trapezoid rule
"""
value = (spc[0, 1] + spc[-1, 1]) / 2
value = value + sum(spc[1:-1, 1])
# The abscissas must be equally spaced for this to work
value = value * (spc[1, 0] - spc[0, 0])
return value
def __str__(self):
string = f"Absorption from {self.absrkf.path}\nEmission from {self.emirkf.path}\nAcceptor absorption electronic energy = {self.absele} cm-1\nDonor emission electronic energy = {self.emiele} cm-1"
return string