Source code for scm.plams.interfaces.adfsuite.ams

import os
import subprocess
import numpy as np
import re

from os.path import join as opj

from ...core.basejob import SingleJob
from ...core.errors import FileError, JobError, ResultsError, PTError, PlamsError, JobError
from ...core.functions import config, log, parse_heredoc
from ...core.private import sha256, UpdateSysPath
from ...core.results import Results
from ...core.settings import Settings
from ...mol.molecule import Molecule
from ...mol.atom import Atom
from ...mol.bond import Bond
from ...tools.kftools import KFFile
from ...tools.units import Units
from ...trajectories.rkffile import RKFTrajectoryFile
from ...tools.converters import vasp_output_to_ams, qe_output_to_ams, gaussian_output_to_ams

try:
    from watchdog.observers import Observer
    from watchdog.events import PatternMatchingEventHandler, FileModifiedEvent
    _has_watchdog = True

    class AMSJobLogTailHandler(PatternMatchingEventHandler):

        def __init__(self, job, jobmanager):
            super().__init__(patterns=[os.path.join(jobmanager.workdir, f"{job.name}*", "ams.log")],
                             ignore_patterns=["*.rkf", "*.out"],
                             ignore_directories=True,
                             case_sensitive=True)
            self._job = job
            self._seekto = 0

        def on_any_event(self, event):
            if self._job.path is not None \
            and event.src_path == os.path.join(self._job.path, "ams.log") \
            and isinstance(event, FileModifiedEvent):
                try:
                    with open(event.src_path, 'r') as f:
                        f.seek(self._seekto)
                        while True:
                            line = f.readline()
                            if not line: break
                            log(f"{self._job.name}: "+line[25:-1])
                        self._seekto = f.tell()
                except FileNotFoundError:
                    self._seekto = 0

except ImportError:
    _has_watchdog = False



__all__ = ['AMSJob', 'AMSResults']


[docs]class AMSResults(Results): """A specialized |Results| subclass for accessing the results of |AMSJob|.""" def __init__(self, *args, **kwargs): Results.__init__(self, *args, **kwargs) self.rkfs = {}
[docs] def collect(self): """Collect files present in the job folder. Use parent method from |Results|, then create an instance of |KFFile| for each ``.rkf`` file present in the job folder. Collect these files in ``rkfs`` dictionary, with keys being filenames without ``.rkf`` extension. The information about ``.rkf`` files generated by engines is taken from the main ``ams.rkf`` file. This method is called automatically during the final part of the job execution and there is no need to call it manually. """ Results.collect(self) self.collect_rkfs()
def collect_rkfs(self): rkfname = 'ams.rkf' if rkfname in self.files: main = KFFile(opj(self.job.path, rkfname)) n = main[('EngineResults','nEntries')] for i in range(1, n+1): title = main[('EngineResults','Title({})'.format(i))] files = main[('EngineResults','Files({})'.format(i))].split('\x00') if files[0].endswith('.rkf'): key = files[0][:-4] self.rkfs[key] = KFFile(opj(self.job.path, files[0])) self.rkfs['ams'] = main else: log('WARNING: Main KF file {} not present in {}'.format(rkfname, self.job.path), 1) def _copy_to(self, newresults): super()._copy_to(newresults) newresults.rkfs = {} newresults.collect_rkfs()
[docs] def refresh(self): """Refresh the contents of ``files`` list. Use the parent method from |Results|, then look at |KFFile| instances present in ``rkfs`` dictionary and check if they point to existing files. If not, try to reinstantiate them with current job path (that can happen while loading a pickled job after the entire job folder was moved). """ Results.refresh(self) to_remove = [] for key,val in self.rkfs.items(): if not os.path.isfile(val.path): if os.path.dirname(val.path) != self.job.path: guessnewpath = opj(self.job.path, os.path.basename(val.path)) if os.path.isfile(guessnewpath): self.rkfs[key] = KFFile(guessnewpath) else: to_remove.append(key) else: to_remove.append(key) for i in to_remove: del self.rkfs[i]
[docs] def engine_names(self): """Return a list of all names of engine specific ``.rkf`` files. The identifier of the main result file (``'ams'``) is not present in the returned list, only engine specific names are listed. """ self.refresh() ret = list(self.rkfs.keys()) ret.remove('ams') return ret
[docs] def rkfpath(self, file='ams'): """Return the absolute path of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. """ return self._access_rkf(lambda x: x.path, file)
[docs] def readrkf(self, section, variable, file='ams'): """Read data from *section*/*variable* of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. The type of the returned value depends on the type of *variable* defined inside KF file. It can be: single int, list of ints, single float, list of floats, single boolean, list of booleans or string. .. note:: If arguments *section* or *variable* are incorrect (not present in the chosen file), the returned value is ``None``. Please mind the fact that KF files are case sensitive. """ return self._access_rkf(lambda x: x.read(section, variable), file)
[docs] def read_rkf_section(self, section, file='ams'): """Return a dictionary with all variables from a given *section* of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. .. note:: If *section* is not present in the chosen file, the returned value is an empty dictionary. Please mind the fact that KF files are case sensitive. """ return self._access_rkf(lambda x: x.read_section(section), file)
[docs] def get_rkf_skeleton(self, file='ams'): """Return a dictionary with the structure of a chosen ``.rkf`` file. Each key corresponds to a section name with the value being a set of variable names present in that section. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. """ return self._access_rkf(lambda x: x.get_skeleton(), file)
[docs] def get_molecule(self, section, file='ams'): """Return a |Molecule| instance stored in a given *section* of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. All data used by this method is taken from the chosen ``.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ sectiondict = self.read_rkf_section(section, file) if sectiondict: return Molecule._mol_from_rkf_section(sectiondict)
def get_ase_atoms(self, section, file='ams'): from ase import Atoms sectiondict = self.read_rkf_section(section, file) bohr2angstrom = 0.529177210903 nLatticeVectors = sectiondict.get('nLatticeVectors', 0) pbc = [True] * nLatticeVectors + [False] * (3 - nLatticeVectors) if nLatticeVectors > 0: cell = np.zeros((3, 3)) lattice = np.array(sectiondict.get('LatticeVectors')).reshape(-1, 3) cell[:lattice.shape[0], :lattice.shape[1]] = lattice * bohr2angstrom else: cell = None atomsymbols = sectiondict['AtomSymbols'].split() positions = np.array(sectiondict['Coords']).reshape(-1,3) * bohr2angstrom return Atoms(symbols=atomsymbols, positions=positions, pbc=pbc, cell=cell)
[docs] def get_input_molecule(self): """Return a |Molecule| instance with the initial coordinates. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ return self.get_molecule('InputMolecule', 'ams')
[docs] def get_main_molecule(self): """Return a |Molecule| instance with the final coordinates. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ return self.get_molecule('Molecule', 'ams')
[docs] def get_main_ase_atoms(self, get_results=False): """Return an ase.Atoms instance with the final coordinates. An alternative is to call toASE(results.get_main_molecule()) to convert a Molecule to ASE Atoms. get_results : bool If True, the returned Atoms will have a SinglePointCalculator as their calculator, allowing you to call .get_potential_energy(), .get_forces(), and .get_stress() on the returned Atoms object. """ from ase.calculators.singlepoint import SinglePointCalculator atoms = self.get_ase_atoms('Molecule', 'ams') if get_results: atoms.set_calculator(SinglePointCalculator(atoms)) try: energy = self.get_energy(unit='eV') atoms.calc.results['energy'] = energy except KeyError: pass try: forces = (-1) * np.array(self.get_gradients(dist_unit='angstrom', energy_unit='eV')).reshape(-1,3) atoms.calc.results['forces'] = forces except KeyError: pass try: stress = np.array(self.get_stresstensor()).ravel() * Units.convert(1.0, 'hartree/bohr^3', 'eV/ang^3') if len(stress) == 9: stress = stress.reshape(3,3) atoms.calc.results['stress'] = np.array([stress[0][0], stress[1][1], stress[2][2], stress[1][2], stress[0][2], stress[0][1]]) except KeyError: pass return atoms
[docs] def get_history_molecule(self, step): """Return a |Molecule| instance with coordinates taken from a particular *step* in the ``History`` section of ``ams.rkf`` file. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ if 'ams' in self.rkfs: main = self.rkfs['ams'] if not self.is_valid_stepnumber(main,step) : return coords = main.read('History', f'Coords({step})') coords = [coords[i:i+3] for i in range(0,len(coords),3)] if ('History', f'SystemVersion({step})') in main: system = self.get_system_version(main, step) mol = self.get_molecule(f'ChemicalSystem({system})') molsrc = f'ChemicalSystem({system})' else: mol = self.get_main_molecule() molsrc = 'Molecule' if len(mol) != len(coords): raise ResultsError(f'Coordinates taken from "History%Coords({step})" have incompatible length with molecule from {molsrc} section') for at, c in zip(mol, coords): at.move_to(c, unit='bohr') if ('History', f'LatticeVectors('+str(step)+')') in main: lattice = Units.convert(main.read('History', f'LatticeVectors('+str(step)+')'), 'bohr', 'angstrom') mol.lattice = [tuple(lattice[j:j+3]) for j in range(0,len(lattice),3)] # Bonds from the reference molecule are probably outdated. Let us never use them ... mol.delete_all_bonds() # ... but instead use bonds from the history section if they are available: if all(('History', i) in main for i in [f'Bonds.Index({step})', f'Bonds.Atoms({step})', f'Bonds.Orders({step})']): index = main.read('History', f'Bonds.Index({step})') if not isinstance(index, list): index = [index] atoms = main.read('History', f'Bonds.Atoms({step})') if not isinstance(atoms, list): atoms = [atoms] orders = main.read('History', f'Bonds.Orders({step})') if not isinstance(orders, list): orders = [orders] for i in range(len(index)-1): for j in range(index[i], index[i+1]): mol.add_bond(mol[i+1], mol[atoms[j-1]], orders[j-1]) if ('History', f'Bonds.CellShifts({step})') in main: cellShifts = main.read('History', f'Bonds.CellShifts({step})') for i, b in enumerate(mol.bonds): b.properties.suffix = f"{cellShifts[3*i]} {cellShifts[3*i+1]} {cellShifts[3*i+2]}" return mol
[docs] def is_valid_stepnumber (self, main, step) : """ Check if the requested step number is in the results file """ if 'History' not in main: raise KeyError("'History' section not present in {}".format(main.path)) n = main.read('History', 'nEntries') if step > n: raise KeyError("Step {} not present in 'History' section of {}".format(step, main.path)) return True
[docs] def get_system_version (self, main, step) : """ Determine which Molecule version is requested """ if ('History', f'SystemVersion({step})') in main: version = main.read('History', f'SystemVersion({step})') if 'SystemVersionHistory' in main: if ('SystemVersionHistory', 'blockSize') in main: blockSize = main.read('SystemVersionHistory', 'blockSize') else: blockSize = 1 block = (version - 1) // blockSize + 1 offset = (version - 1) % blockSize system = main.read('SystemVersionHistory', f'SectionNum({block})', return_as_list=True)[offset] else: system = version return system
[docs] def get_history_variables(self, history_section='History') : """ Return a set of keynames stored in the specified history section of the ``ams.rkf`` file. The *history_section argument should be a string representing the name of the history section (``History`` or ``MDHistory``)*""" if not 'ams' in self.rkfs: return main = self.rkfs['ams'] keylist = [var for sec,var in main if sec==history_section] # Now throw out all the last parts return set([key.split('(')[0] for key in keylist if len(key.split('('))>1])
[docs] def get_history_property(self, varname, history_section='History') : """ Return the values of *varname* in the history section *history_section*.""" if not 'ams' in self.rkfs:return main = self.rkfs['ams'] if history_section not in main: raise KeyError(f"The requested section '{history_section}' does not exist in {main.path}") if (history_section,'nScanCoord') in main: # PESScan nentries = main.read(history_section,'nScanCoord') as_block = False elif (history_section,'nEntries') in main: nentries = main.read(history_section,'nEntries') as_block = self._values_stored_as_blocks(main, varname, history_section) if as_block : nblocks = main.read(history_section,'nBlocks') values = [main.read(history_section,f"{varname}({iblock})",return_as_list=True) for iblock in range(1,nblocks+1)] values = [val for blockvals in values for val in blockvals if isinstance(blockvals,list)] else : values = [main.read(history_section,f"{varname}({step})") for step in range(1,nentries+1)] return values
[docs] def get_property_at_step(self, step, varname, history_section='History') : """ Return the value of *varname* in the history section *history_section at step *step*.""" if not 'ams' in self.rkfs: return main = self.rkfs['ams'] as_block = self._values_stored_as_blocks(main, varname, history_section) if as_block : blocksize = main.read(history_section,'blockSize') iblock = int(np.ceil(step/blocksize)) value = main.read(history_section,f"{varname}({iblock})")[(step%blocksize)-1] else : value = main.read(history_section,f"{varname}({step})") return value
def _values_stored_as_blocks(self, main, varname, history_section) : """Determines wether the values of varname in a trajectory rkf file are stored in blocks""" nentries = main.read(history_section,'nEntries') as_block = False # This is extremely slow, because looping over main is very slow. # This is because the (sec,var) tuples are first stored in a set, then sorted, and only then yielded keylist = [var for sec,var in main if sec==history_section] if 'nBlocks' in keylist : if not f"{varname}({nentries})" in keylist : as_block = True return as_block
[docs] def get_atomic_temperatures_at_step(self, step, history_section='MDHistory') : """ Get all the atomic temperatures for step `step` Note: Numbering of steps starts at 1 """ if not 'ams' in self.rkfs: return main = self.rkfs['ams'] if not self.is_valid_stepnumber(main, step) : return # Read the masses molname = 'Molecule' if ('History', f'SystemVersion({step})') in main: system = self.get_system_version(main, step) molname = 'ChemicalSystem(%i)'%(system) masses = np.array(main.read(molname,'AtomMasses')) nats = len(masses) # Read the velocities velocities = np.array(self.get_property_at_step(step,'Velocities',history_section)).reshape((nats,3)) # Convert to SI units and compute temperatures m = masses * 1.e-3 / Units.constants['NA'] vels = velocities * Units.conversion_ratio('Bohr','Angstrom') * 1.e5 temperatures = (m.reshape((nats,1)) * vels**2).sum(axis=1) temperatures /= 3 * Units.constants['k_B'] return temperatures
[docs] def get_band_structure(self, bands=None, unit='hartree', only_high_symmetry_points=False): """ Extracts the electronic band structure from DFTB or BAND calculations. The returned data can be plotted with ``plot_band_structure``. Note: for unrestricted calculations bands 0, 2, 4, ... are spin-up and bands 1, 3, 5, ... are spin-down. Returns: ``x``, ``y_spin_up``, ``y_spin_down``, ``labels``, ``fermi_energy`` ``x``: 1D array of float ``y_spin_up``: 2D array of shape (len(x), len(bands)). Every column is a separate band. In units of ``unit`` ``y_spin_down``: 2D array of shape (len(x), len(bands)). Every column is a separate band. In units of ``unit``. If the calculation was restricted this is identical to y_spin_up. ``labels``: 1D list of str of length len(x). If a point is not a high-symmetry point then the label is an empty string. ``fermi_energy``: float. The Fermi energy (in units of ``unit``). Arguments below. bands: list of int or None If None, all bands are returned. Note: the band indices start with 0. unit: str Unit of the returned band energies only_high_symmetry_points: bool Return only the first point of each edge. """ read_labels = True nBands= self.readrkf('band_curves', 'nBands', file='engine') nEdges = self.readrkf('band_curves', 'nEdges', file='engine') try: nSpin = self.readrkf('band_curves', 'nSpin', file='engine') except KeyError: nSpin = 1 if bands is None: bands = np.arange(nBands) spindown_bands = bands if nSpin == 2: spindown_bands = np.array(bands) + nBands prevmaxx = 0 complete_spinup_data = [] complete_spindown_data = [] labels = [] x = [] for i in range(nEdges): my_x = self.readrkf('band_curves', f'Edge_{i+1}_xFor1DPlotting', file='engine') my_x = np.array(my_x) + prevmaxx prevmaxx = np.max(my_x) if read_labels: my_labels = self.readrkf('band_curves', f'Edge_{i+1}_labels', file='engine').split() if len(my_labels) == 2: if only_high_symmetry_points: labels += [my_labels[0]] # only the first point of the curve else: labels += [my_labels[0]] + ['']*(len(my_x)-2) + [my_labels[1]] if only_high_symmetry_points: x.append(my_x[0:1]) else: x.append(my_x) A = self.readrkf('band_curves', f'Edge_{i+1}_bands', file='engine') A = np.array(A).reshape(-1, nBands*nSpin) spinup_data = A[:, bands] spindown_data = A[:, spindown_bands] if only_high_symmetry_points: spinup_data = np.reshape(spinup_data[0, :], (-1, len(bands))) spindown_data = np.reshape(spindown_data[0, :], (-1, len(spindown_bands))) complete_spinup_data.append(spinup_data) complete_spindown_data.append(spindown_data) complete_spinup_data = np.concatenate(complete_spinup_data) complete_spinup_data = Units.convert(complete_spinup_data, 'hartree', unit) complete_spindown_data = np.concatenate(complete_spindown_data) complete_spindown_data = Units.convert(complete_spindown_data, 'hartree', unit) x = np.concatenate(x).ravel() fermi_energy = self.readrkf('BandStructure', 'FermiEnergy', file='engine') fermi_energy = Units.convert(fermi_energy, 'hartree', unit) return x, complete_spinup_data, complete_spindown_data, labels, fermi_energy
[docs] def get_engine_results(self, engine=None): """Return a dictionary with contents of ``AMSResults`` section from an engine results ``.rkf`` file. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read_section('AMSResults'), engine)
[docs] def get_engine_properties(self, engine=None): """Return a dictionary with all the entries from ``Properties`` section from an engine results ``.rkf`` file. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ def properties(kf): if not 'Properties' in kf: return {} # There are two *kinds* of "Properties" sections: # - ADF simply has a set of variables in the properties section # - DFTB and some other engines have a *scheme* for storing "Properties". See the fortran 'RKFileModule' (RKFile.f90) # for more details on this. ret = {} if ('Properties', 'nEntries') in kf: # This is a 'RKFileModule' properties section: n = kf.read('Properties', 'nEntries') for i in range(1, n+1): tp = kf.read('Properties', 'Type({})'.format(i)).strip() stp = kf.read('Properties', 'Subtype({})'.format(i)).strip() val = kf.read('Properties', 'Value({})'.format(i)) key = stp if stp.endswith(tp) else ('{} {}'.format(stp, tp) if stp else tp) ret[key] = val else: skeleton = kf.get_skeleton() for variable in skeleton['Properties']: ret[variable] = kf.read('Properties', variable) return ret return self._process_engine_results(properties, engine)
[docs] def get_energy(self, unit='au', engine=None): """Return final energy, expressed in *unit*. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read('AMSResults', 'Energy'), engine) * Units.conversion_ratio('au', unit)
[docs] def get_gradients(self, energy_unit='au', dist_unit='au', engine=None): """Return the gradients of the final energy, expressed in *energy_unit* / *dist_unit*. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'Gradients'), engine)).reshape(-1,3) * Units.conversion_ratio('au', energy_unit) / Units.conversion_ratio('au', dist_unit)
[docs] def get_stresstensor(self, engine=None): """Return the final stress tensor, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'StressTensor'), engine)).reshape(len(self.get_input_molecule().lattice),-1)
[docs] def get_hessian(self, engine=None): """Return the Hessian matrix, i.e. the second derivative of the total energy with respect to the nuclear coordinates, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'Hessian'), engine)).reshape(3*len(self.get_input_molecule()),-1)
[docs] def get_elastictensor(self, engine=None): """Return the elastic tensor, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ et_flat = np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'ElasticTensor'), engine)) num_latvec = len(self.get_input_molecule().lattice) if num_latvec == 1: return et_flat.reshape(1,1) elif num_latvec == 2: return et_flat.reshape(3,3) else: return et_flat.reshape(6,6)
[docs] def get_frequencies(self, unit='cm^-1', engine=None): """Return a numpy array of vibrational frequencies, expressed in *unit*. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ freqs = self._process_engine_results(lambda x: x.read('Vibrations', 'Frequencies[cm-1]'), engine) freqs = np.array(freqs) if isinstance(freqs,list) else np.array([freqs]) return freqs * Units.conversion_ratio('cm^-1', unit)
[docs] def get_force_constants(self, engine=None): """Return a numpy array of force constants, expressed in atomic units (Hartree/bohr^2). The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ forceConstants = self._process_engine_results(lambda x: x.read('Vibrations', 'ForceConstants'), engine) forceConstants = np.array(forceConstants) if isinstance(forceConstants,list) else np.array([forceConstants]) return forceConstants
[docs] def get_charges(self, engine=None): """Return the atomic charges, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'Charges'), engine))
[docs] def get_dipolemoment(self, engine=None): """Return the electric dipole moment, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'DipoleMoment'), engine))
[docs] def get_dipolegradients(self, engine=None): """Return the nuclear gradients of the electric dipole moment, expressed in atomic units. This is a (3*numAtoms x 3) matrix. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'DipoleGradients'), engine)).reshape(-1,3)
[docs] def get_n_spin(self, engine=None): """n_spin is 1 in case of spin-restricted or spin-orbit coupling calculations, and 2 in case of spin-unrestricted calculations The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read('AMSResults', 'nSpin'))
[docs] def get_orbital_energies(self, unit='Hartree', engine=None): """Return the orbital energies in a numpy array of shape [nSpin,nOrbitals] (nSpin is 1 in case of spin-restricted or spin-orbit coupling and 2 in case of spin unrestricted) The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return Units.convert(np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'orbitalEnergies'), engine)).reshape(self.get_n_spin(),-1), 'Hartree', unit)
[docs] def get_orbital_occupations(self, engine=None): """Return the orbital occupations in a numpy array of shape [nSpin,nOrbitals]. For spin restricted calculations, the occupations will be between 0 and 2. For spin unrestricted or spin-orbit coupling the values will be between 0 and 1. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'orbitalOccupations'), engine)).reshape(self.get_n_spin(),-1)
[docs] def get_homo_energies(self, unit='Hartree', engine=None): """ Return the homo energies per spin as a numpy array of size [nSpin]. nSpin is 1 in case of spin-restricted or spin-orbit coupling and 2 in case of spin unrestricted. See also :func:`~are_orbitals_fractionally_occupied`. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return Units.convert(np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'HOMOEnergy'), engine)).reshape(-1), "Hartree", unit)
[docs] def get_lumo_energies(self, unit='Hartree', engine=None): """ Return the lumo energies per spin as a numpy array of size [nSpin]. nSpin is 1 in case of spin-restricted or spin-orbit coupling and 2 in case of spin unrestricted. See also :func:`~are_orbitals_fractionally_occupied`. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return Units.convert(np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'LUMOEnergy'), engine)).reshape(-1), "Hartree", unit)
[docs] def get_smallest_homo_lumo_gap(self, unit='Hartree', engine=None): """ Returns a float containing the smallest HOMO-LUMO gap irrespective of spin (i.e. min(LUMO) - max(HOMO)). See also :func:`~are_orbitals_fractionally_occupied`. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return Units.convert(self._process_engine_results(lambda x: x.read('AMSResults', 'SmallestHOMOLUMOGap'), engine), 'Hartree', unit)
[docs] def are_orbitals_fractionally_occupied(self, engine=None): """ Returns a boolean indicating whether fractional occupations were detected. If that is the case, then the 'HOMO' and 'LUMO' labels are not well defined, since the demarcation between 'occupied' and 'empty' is somewhat arbitrary. See the AMSDriver documentation for more info. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read('AMSResults', 'fractionalOccupation'), engine)
[docs] def get_timings(self): """Return a dictionary with timing statistics of the job execution. Returned dictionary contains keys cpu, system and elapsed. The values are corresponding timings, expressed in seconds. """ ret = {} try: # new AMS versions store timings on ams.rkf ret['elapsed'] = self.readrkf('General', 'ElapsedTime') ret['system'] = self.readrkf('General', 'SysTime') ret['cpu'] = self.readrkf('General', 'CPUTime') except: # fall back to reading output, was needed for old AMS versions cpu = self.grep_output('Total cpu time:') system = self.grep_output('Total system time:') elapsed = self.grep_output('Total elapsed time:') ret['elapsed'] = float(elapsed[0].split()[-1]) ret['system'] = float(system[0].split()[-1]) ret['cpu'] = float(cpu[0].split()[-1]) return ret
[docs] def get_forcefield_params (self, engine=None): """ Read all force field data from a forcefield.rkf file into self * ``filename`` -- Name of the RKF file that contains ForceField data """ from .forcefieldparams import forcefield_params_from_kf return self._process_engine_results(forcefield_params_from_kf, engine)
def get_poissonratio(self, engine=None): return self._process_engine_results(lambda x: x.read('AMSResults', 'PoissonRatio'), engine) def get_youngmodulus(self, unit='au', engine=None): return self._process_engine_results(lambda x: x.read('AMSResults', 'YoungModulus'), engine) * Units.conversion_ratio('au', unit) def get_shearmodulus(self, unit='au', engine=None): return self._process_engine_results(lambda x: x.read('AMSResults', 'ShearModulus'), engine) * Units.conversion_ratio('au', unit) def get_bulkmodulus(self, unit='au', engine=None): return self._process_engine_results(lambda x: x.read('AMSResults', 'BulkModulus'), engine) * Units.conversion_ratio('au', unit)
[docs] def get_pesscan_results(self, molecules=True): """ For PESScan jobs, this functions extracts information about the scan coordinates and energies. molecules : bool Whether to return a Molecule at each PES point. Returns a dictionary, with the following keys: 'RaveledScanCoords': a list of str with the names of the scan coordinates: ['sc_1_1','sc_1_2','sc_2_1','sc_2_2',...] 'nRaveledScanCoords': the length of the previous list 'RaveledUnits': a list of str with the units: ['bohr', 'bohr', 'radian', 'bohr', ...] 'RaveledPESCoords': a nested list with the values of the scan coordinates: [[val_1_1_1,val_1_1_2,...],[val_1_2_1,val_1_2_2,...],[val_2_1_1,val_2_1_2,...],[val_2_2_1,val_2_2_2]] 'ScanCoords': a nested list: [['sc_1_1','sc_1_2'],['sc_2_1','sc_2_2'],...] 'nScanCoords': length of the previous list 'Units': a nested list with the units: [['bohr', 'bohr'],['radian', 'bohr'], ...] 'OrigScanCoords': a list of str in the newline-separated format stored on the .rkf file: ['sc_1_1\\nsc_1_2','sc_2_1\\nsc_2_2',...] 'nPESPoints': int, number of PES points 'PES': list of float, the energies at each PES point 'Converged': list of bool, whether the geometry optimization at each PES point converged 'Molecules': list of Molecule, the structure at each PES point. Only if the property molecules == True 'HistoryIndices': list of int, the indices (1-based) in the History section which correspond to the Molecules and PES. 'ConstrainedAtoms': set of int, all atom indices (1-based) that were part of any PESScan scan coordinates (other constrained atoms are not included) 'Properties': list of dict. The dictionary keys are what can be found on the AMSResults section of the engine .rkf file. These will only be populated if "CalcPropertiesAtPESPoints" is set to Yes when running the PES scan. """ from natsort import natsorted import re def tolist(x): if isinstance(x, list): return x else: return [x] nScanCoord = self.readrkf('PESScan', 'nScanCoord') nPoints = tolist(self.get_history_property('nPoints', 'PESScan')) pes = tolist(self.readrkf('PESScan', 'PES')) origscancoords = tolist(self.get_history_property('ScanCoord', history_section='PESScan')) # one scan coordinate may have several variables scancoords = [x.split('\n') for x in origscancoords] units = [] pescoords = tolist(self.readrkf('PESScan', 'PESCoords')) pescoords = np.array(pescoords).reshape(-1,sum(len(x) for x in scancoords)) pescoords = np.transpose(pescoords) w = [] units = [] count = 0 for i in range(nScanCoord): units.append([]) for j in range(len(scancoords[i])): if scancoords[i][j] in ['a', 'b', 'c'] or 'Dist' in scancoords[i][j] or 'Coordinate' in scancoords[i][j]: units[-1].append('bohr') elif 'Volume' in scancoords[i][j]: units[-1].append('bohr^3') elif 'Area' in scancoords[i][j]: units[-1].append('bohr^2') else: units[-1].append('radian') converged = tolist(self.readrkf('PESScan', 'GOConverged')) converged = [bool(x) for x in converged] historyindices = tolist(self.readrkf('PESScan', 'HistoryIndices')) ret = {} raveled_scancoords = [x[i] for x in scancoords for i in range(len(x))] # 1d list raveled_units = [x[i] for x in units for i in range(len(x))] # 1d list constrained_atoms = [] for sc in raveled_scancoords: constrained_atoms.extend(re.findall(r"\((\d+)\)", sc)) try: constrained_atoms = set(int(x) for x in constrained_atoms) except ValueError: constrained_atoms = set() ret['RaveledScanCoords'] = raveled_scancoords ret['nRaveledScanCoords'] = len(raveled_scancoords) ret['ConstrainedAtoms'] = constrained_atoms ret['ScanCoords'] = scancoords ret['nScanCoords'] = len(scancoords) ret['OrigScanCoords'] = origscancoords #newline delimiter for joint scan coordinates ret['RaveledPESCoords'] = pescoords.tolist() ret['Units'] = units ret['RaveledUnits'] = raveled_units ret['Converged'] = converged ret['PES'] = pes ret['nPESPoints'] = len(pes) ret['HistoryIndices'] = historyindices if molecules: mols = [] for ind in historyindices: mols.append(self.get_history_molecule(ind)) ret['Molecules'] = mols ret['Properties'] = [] for key in natsorted(title for title in self.rkfs.keys() if title.startswith('PESPoint')): amsresults = self.rkfs[key].read_section("AMSResults") ret['Properties'].append(amsresults) return ret
[docs] def get_neb_results(self, molecules=True, unit='au'): """ Returns a dictionary with results from a NEB calculation. molecules : bool Whether to include the 'Molecules' key in the return result unit : str Energy unit for the Energies, LeftBarrier, RightBarrier, and ReactionEnergy Returns: dict 'nImages': number of images (excluding end points) 'nIterations': number of iterations 'Energies': list of energies (including end points) 'Climbing': bool, whether climbing image NEB was used 'LeftBarrier': float, left reaction barrier 'RightBarrier': float, right reaction barrier 'ReactionEnergy': float, reaction energy 'HistoryIndices': list of int, same length as 'Energies', contains indices in the History section 'Molecules': list of Molecule (including end points) """ def tolist(x): if isinstance(x, list): return x else: return [x] ret = {} conversion_ratio = Units.conversion_ratio('au', unit) ret['nImages'] = self.readrkf('NEB', 'nebImages') ret['nIterations'] = self.readrkf('NEB', 'nebIterations') ret['Climbing'] = bool(self.readrkf('NEB', 'climbing')) ret['HighestIndex'] = self.readrkf('NEB', 'highestIndex') ret['LeftBarrier'] = self.readrkf('NEB', 'LeftBarrier') * conversion_ratio ret['RightBarrier'] = self.readrkf('NEB', 'RightBarrier') * conversion_ratio ret['ReactionEnergy'] = self.readrkf('NEB', 'ReactionEnergy') * conversion_ratio history_dim = tolist(self.readrkf('NEB', 'historyIndex@dim')) # nimages, randombign history_dim.reverse() # randombign, nimages history_indices_matrix = tolist(self.readrkf('NEB', 'historyIndex')) # this matrix is padded with -1 values history_indices_matrix = np.array(history_indices_matrix).reshape(history_dim) history_indices = np.max(history_indices_matrix, axis=0, keepdims=False).tolist() if any(x == -1 for x in history_indices): raise ValueError("Found -1 in the 'converged' part of historyIndex. This should not happen!") ret['HistoryIndices'] = history_indices ret['Energies'] = [self.get_property_at_step(ind, 'Energy')*conversion_ratio for ind in ret['HistoryIndices']] if molecules: ret['Molecules'] = [self.get_history_molecule(ind) for ind in ret['HistoryIndices']] return ret
[docs] def get_time_step(self): """ Returns the time step between adjacent frames (NOT the TimeStep in the settings, but Timestep*SamplingFreq) in femtoseconds for MD simulation jobs """ time1 = self.get_property_at_step(1, 'Time', history_section='MDHistory') time2 = self.get_property_at_step(2, 'Time', history_section='MDHistory') time_step = time2 - time1 return time_step
def _get_integer_start_end_every_max(self, start_fs, end_fs, every_fs, max_dt_fs): time_step = self.get_time_step() start_step = round( start_fs / time_step ) end_step = round( end_fs / time_step ) if end_fs is not None else None every = round ( every_fs / time_step) if every_fs is not None else 1 max_dt = round( (max_dt_fs / time_step) / every) if max_dt_fs is not None else None return start_step, end_step, every, max_dt
[docs] def get_velocity_acf(self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, atom_indices=None, x=True, y=True, z=True, normalize=False): """ Calculate the velocity autocorrelation function. Works only for trajectories with a constant number of atoms. start_fs : float Start time in femtoseconds. Defaults to 0 (first frame) end_fs : float End time in femtoseconds. Defaults to the end of the trajectory. max_dt_fs : float Maximum correlation time in femtoseconds. atom_indices: list of int Atom indices for which to calculate the velocity autocorrelation function. Defaults to all atoms. The atom indices start with 1. x : bool Whether to use the velocities along x y : bool Whether to use the velocities along y z : bool Whether to use the velocities along z normalize : bool Whether to normalize the velocity autocorrelation function so that it is 1 at time 0. Returns: 2-tuple (times, C) ``times`` is a 1D np array with times in femtoseconds. ``C`` is a 1D numpy array with shape (max_dt,) containing the autocorrelation function. If not ``normalize`` then the unit is angstrom^2/fs^2. """ from ...trajectories.analysis import autocorrelation nEntries = self.readrkf('MDHistory', 'nEntries') time_step = self.get_time_step() start_step, end_step, every, max_dt = self._get_integer_start_end_every_max(start_fs, end_fs, every_fs, max_dt_fs) data = self.get_history_property('Velocities', history_section='MDHistory') data = np.array(data).reshape(nEntries, -1, 3)[start_step:end_step:every] if atom_indices is not None: zero_based_atom_indices = [x-1 for x in atom_indices] data = data[:, zero_based_atom_indices, :] n_dimensions = 3 if not x or not y or not z: components = [] if x: components += [0] if y: components += [1] if z: components += [2] data = data[:, :, components] n_dimensions = len(components) data *= Units.convert(1.0, 'bohr', 'angstrom') # convert from bohr/fs to ang/fs vacf = autocorrelation(data, max_dt=max_dt, normalize=normalize) * n_dimensions # multiply by n_dimensions to undo the averaging per component and instead average per atom times = np.arange(len(vacf)) * time_step * every return times, vacf
[docs] def get_diffusion_coefficient_from_velocity_acf(self, times=None, acf=None, n_dimensions=3): """ Diffusion coefficient by integration of the velocity autocorrelation function If ``times`` or ``acf`` is None, then a default velocity autocorrelation function will be calculated. times: 1D numpy array 1D numpy array with the times acf: 1D numpy array 1D numpy array with the velocity autocorrelation function in ang^2/fs^2 n_dimensions: int Number of dimensions that were used to calculate the autocorrelation function. Returns: 2-tuple (times, diffusion_coefficient) ``times`` are the times in femtoseconds. ``diffusion_coefficient`` is a 1D numpy array with the diffusion coefficients in m^2/s. """ from scipy.integrate import cumtrapz if times is None or acf is None: times, acf = self.get_velocity_acf(normalize=False) diffusion_coefficient = (1.0/n_dimensions) * cumtrapz(acf, times) #ang^2/fs diffusion_coefficient *= 1e-20/1e-15 new_times = times[:-1] return np.array(new_times), diffusion_coefficient
[docs] def get_power_spectrum(self, times=None, acf=None, max_freq=None, number_of_points=None): """ Calculates a power spectrum from the velocity autocorrelation function. If ``times`` or ``acf`` is None, then a default velocity autocorrelation function will be calculated. times: 1D numpy array of float The ``times`` returned by ``AMSResults.get_velocity_acf()``. acf: 1D numpy arra of float The ``vacf`` returned by ``AMSResults.get_velocity_acf()`` max_freq: float Maximum frequency (in cm^-1) of the returned power spectrum. Defaults to 5000 cm^-1. number_of_points: int Number of points in the returned power spectrum. Defaults to a number so that the spacing between points is about 1 cm^-1. Returns: 2-tuple (frequencies, intensities) ``frequencies`` : Frequencies in cm^-1 (1D numpy array). ``intensities``: Intensities (1D numpy array). """ from ...trajectories.analysis import power_spectrum if times is None or acf is None: times, acf = self.get_velocity_acf(normalize=True) return power_spectrum(times, acf, max_freq=max_freq, number_of_points=number_of_points)
@staticmethod def _get_green_kubo_viscosity(pressuretensor, time_step, max_dt, volume, temperature, xy=True, yz=True, xz=True): from ...trajectories.analysis import autocorrelation from ...tools.units import Units from scipy.integrate import cumtrapz data = np.array(pressuretensor) components = [] if yz: components += [3] if xz: components += [4] if xy: components += [5] data = data[:, components] acf = autocorrelation(data, max_dt=max_dt) times = np.arange(len(acf)) * time_step integrated_acf = cumtrapz(acf, times, initial=0) integrated_times = np.arange(len(integrated_acf)) * time_step k_B = Units.constants['k_B'] au2Pa = Units.convert(1.0, 'hartree/bohr^3', 'Pa') viscosity = (volume*1e-30)/(k_B*temperature) * integrated_acf * au2Pa**2 * 1e-15 * 1e3 return integrated_times, viscosity
[docs] def get_green_kubo_viscosity(self, start_fs=0, end_fs=None, every_fs=None, max_dt_fs=None, xy=True, yz=True, xz=True, pressuretensor=None): """ Calculates the viscosity using the Green-Kubo relation (integrating the off-diagonal pressure tensor autocorrelation function). start_fs : float Start time in femtoseconds. Defaults to 0 (first frame) end_fs : float End time in femtoseconds. Defaults to the end of the trajectory. max_dt_fs : float Maximum correlation time in femtoseconds. xy : bool Whether to use the xy off-diagonal elements yz : bool Whether to use the yz off-diagonal elements xz : bool Whether to use the xz off-diagonal elements pressuretensor : np.array shape (N,6) Pressure tensor in atomic units. If not specified, it will be read from the ams.rkf file. Returns: 2-tuple (times, C) ``times`` is a 1D np array with times in femtoseconds. ``C`` is a 1D numpy array with shape (max_dt,) containing the viscosity (in mPa*s) integral. It should converge to the viscosity values as the time increases. """ from ...trajectories.analysis import autocorrelation from ...tools.units import Units from scipy.integrate import cumtrapz nEntries = self.readrkf('MDHistory', 'nEntries') time_step = self.get_time_step() start_step, end_step, every, max_dt = self._get_integer_start_end_every_max(start_fs, end_fs, every_fs, max_dt_fs) data = pressuretensor if data is None: data = self.get_history_property('PressureTensor', 'MDHistory') data = [x for x in data if x is not None] # None might appear in currently running trajectories if the job was loaded with load_external data = np.array(data)[start_step:end_step:every] components = [] if yz: components += [3] if xz: components += [4] if xy: components += [5] data = data[:, components] acf = autocorrelation(data, max_dt=max_dt) times = np.arange(len(acf)) * time_step * every integrated_acf = cumtrapz(acf, times) integrated_times = np.arange(len(integrated_acf)) * time_step * every V = self.get_main_molecule().unit_cell_volume() try: T = self.get_history_property('Temperature', 'MDHistory') T = np.array(T)[start_step:end_step:every] mean_T = np.mean(T) except KeyError: # might be triggered for currently running trajectories, then just use the first temperature mean_T = self.get_property_at_step(1, 'Temperature', 'MDHistory') k_B = Units.constants['k_B'] au2Pa = Units.convert(1.0, 'hartree/bohr^3', 'Pa') viscosity = (V*1e-30)/(k_B*mean_T) * integrated_acf * au2Pa**2 * 1e-15 * 1e3 return integrated_times, viscosity
[docs] def get_density_along_axis(self, axis:str='z', density_type:str='mass', start_fs=0, end_fs=None, every_fs=None, bin_width=0.1, atom_indices=None): """ Calculates the density of atoms along a Cartesian coordinate axis. This only works if the axis is perpendicular to the other two axes. The system must be 3D-periodic and the number of atoms cannot change during the trajectory. axis : str 'x', 'y', or 'z' density_type : str 'mass' gives the density in g/cm^3. 'number' gives the number density in ang^-3, 'histogram' gives the number of atoms per frame (so the number depends on the bin size) start_fs : float Start time in fs end_fs : float End time in fs every_fs : float Use data every every_fs timesteps bin_width : float Bin width of the returned coordinates (in angstrom). atom_indices: list of int Indices (starting with 1) for the atoms to use for calculating the density. Returns: 2-tuple (coordinates, density) ``coordinates`` is a 1D array with the coordinates (in angstrom). ``density`` is a 1D array with the densities (in g/cm^3 or ang^-3 depending on ``density_type``). """ assert axis in ['x', 'y', 'z'], f"Unknown axis: {axis}. Must be 'x', 'y', or 'z'" assert density_type in ['mass', 'number', 'histogram'], f"Unknown density_type: {density_type}. Must be 'mass' or 'number'" main_mol = self.get_main_molecule() histogram = density_type == 'histogram' bohr2ang = Units.convert(1.0, 'bohr', 'angstrom') start_step, end_step, every, _ = self._get_integer_start_end_every_max(start_fs, end_fs, every_fs, None) nEntries = self.readrkf('History', 'nEntries') coords = np.array(self.get_history_property('Coords')).reshape(nEntries, -1, 3) coords = coords[start_step:end_step:every] axis2index = {'x': 0, 'y': 1, 'z': 2} index = axis2index[axis] if not histogram: other_indices = [0,1,2] other_indices.remove(index) assert len(main_mol.lattice) == 3, f"get_density_along_axis with density_type='mass' or 'number' can only be called for 3d-periodic systems. Current periodicity: {len(main_mol.lattice)}. Use density_type='histogram' instead." assert np.abs(np.dot(main_mol.lattice[other_indices[0]], main_mol.lattice[index])) < 1e-6, f"Axis {axis} must be perpendicular to the other two axes" assert np.abs(np.dot(main_mol.lattice[other_indices[1]], main_mol.lattice[index])) < 1e-6, f"Axis {axis} must be perpendicular to the other two axes" assert np.abs(main_mol.lattice[index][other_indices[0]]) < 1e-6, f"Density along {axis} requires that lattice vector {index+1} has only 1 non-zero component (along {axis}). The vector is {main_mol.lattice[index]}" assert np.abs(main_mol.lattice[index][other_indices[1]]) < 1e-6, f"Density along {axis} requires that lattice vector {index+1} has only 1 non-zero component (along {axis}). The vector is {main_mol.lattice[index]}" lattice_vectors = np.array(self.get_history_property('LatticeVectors')).reshape(-1,3,3) lattice_vectors = lattice_vectors[start_step:end_step:every] * bohr2ang vec1s = lattice_vectors[:, other_indices[0], :] vec2s = lattice_vectors[:, other_indices[1], :] cross_products = np.cross(vec1s, vec2s) areas = np.linalg.norm(cross_products, axis=1) mean_area = np.mean(areas) coords = coords[:, :, index] if atom_indices is not None: zero_based_atom_indices = [x-1 for x in atom_indices] coords = coords[:, zero_based_atom_indices] coords *= bohr2ang avogadro = Units.constants['NA'] min_c = np.min(coords) max_c = np.max(coords) num_bins = round((max_c-min_c) / bin_width + 1) bins = np.linspace( min_c, max_c, num_bins, endpoint=True ) if density_type == 'mass': masses = np.array(self.readrkf('Molecule', 'AtomMasses')) if atom_indices is not None: masses = masses[zero_based_atom_indices] masses_broadcasted = np.broadcast_to(masses, coords.shape) hist, bin_edges = np.histogram(coords, weights=masses_broadcasted, bins=bins) volume_slice_ang3 = mean_area * (bin_edges[1]-bin_edges[0]) volume_slice_cm3 = volume_slice_ang3*1e-24 density = (1.0 / nEntries) * (hist / avogadro) / volume_slice_cm3 elif density_type == 'number': hist, bin_edges = np.histogram(coords, bins=bins) volume_slice_ang3 = mean_area * (bin_edges[1]-bin_edges[0]) density = (1.0 / nEntries) * hist / volume_slice_ang3 elif density_type == 'histogram': hist, bin_edges = np.histogram(coords, bins=bins) density = (1.0 / nEntries) * hist z = (bin_edges[:-1] + bin_edges[1:]) / 2.0 return z, density
[docs] def recreate_molecule(self): """Recreate the input molecule for the corresponding job based on files present in the job folder. This method is used by |load_external|. If ``ams.rkf`` is present in the job folder, extract data from the ``InputMolecule`` section. """ if 'ams' in self.rkfs: return self.get_input_molecule() return None
[docs] def recreate_settings(self): """Recreate the input |Settings| instance for the corresponding job based on files present in the job folder. This method is used by |load_external|. If ``ams.rkf`` is present in the job folder, extract user input and parse it back to a |Settings| instance using ``scm.libbase`` module. Remove the ``system`` branch from that instance. """ if 'ams' in self.rkfs: user_input = self.readrkf('General', 'user input') try: from scm.libbase import InputParser inp = InputParser().to_settings('ams', user_input) except: log('Failed to recreate input settings from {}'.format(self.rkfs['ams'].path, 5)) return None s = Settings() s.input = inp if 'system' in s.input.ams: del s.input.ams.system s.soft_update(config.job) return s return None
[docs] def ok(self): """Check if the execution of the associated :attr:`job` was successful or not. See :meth:`Job.ok<scm.plams.core.basejob.Job.ok>` for more information.""" return self.job.ok()
[docs] def get_errormsg(self): """Tries to get an an error message for a associated :attr:`job`. This method returns ``None`` if the associated job was successful. See :meth:`Job.get_errormsg<scm.plams.core.basejob.Job.errormsg>` for more information.""" return self.job.get_errormsg()
@property def name(self): """Retrun the :attr:`job.name` of the job associated with this results instance.""" return self.job.name class EnergyLandscape: class State: def __init__(self, landscape, engfile, energy, mol, count, isTS, reactantsID=None, productsID=None, prefactorsFromReactant=None, prefactorsFromProduct=None): self._landscape = landscape self.engfile = engfile self.energy = energy self.molecule = mol self.count = count self.isTS = isTS self.reactantsID = reactantsID self.productsID = productsID self.prefactorsFromReactant = prefactorsFromReactant self.prefactorsFromProduct = prefactorsFromProduct @property def id(self): return self._landscape._states.index(self)+1 @property def reactants(self): return self._landscape._states[self.reactantsID-1] @property def products(self): return self._landscape._states[self.productsID-1] def __str__(self): if self.isTS: lines = [f"State {self.id}: {self.molecule.get_formula(False)} transition state @ {self.energy:.8f} Hartree (found {self.count} times"+(f", results on {self.engfile})" if self.engfile is not None else ")")] if self.reactantsID is not None: lines += [f" +- Reactants: {self.reactants}"] if self.productsID is not None: lines += [f" Products: {self.products}"] if self.reactantsID is not None and self.productsID is not None: eV = Units.convert(1.0, 'eV', 'hartree') lines += [f" Prefactors: {self.prefactorsFromReactant:.3E}:{self.prefactorsFromProduct:.3E} s^-1"] dEforward = ( self.energy-self._landscape._states[self.reactantsID-1].energy )/eV dEbackward = ( self.energy-self._landscape._states[self.productsID-1].energy )/eV lines += [f" Barriers: {dEforward:.3f}:{dEbackward:.3f} eV"] elif self.productsID is not None: lines += [f" Prefactors: {self.prefactorsFromReactant:.3E}:?"] else: lines = [f"State {self.id}: {self.molecule.get_formula(False)} local minimum @ {self.energy:.8f} Hartree (found {self.count} times"+(f", results on {self.engfile})" if self.engfile is not None else ")")] return "\n".join(lines) class Fragment: def __init__(self, landscape, engfile, energy, mol): self._landscape = landscape self.engfile = engfile self.energy = energy self.molecule = mol @property def id(self): return self._landscape._fragments.index(self)+1 def __str__(self): lines = [f"Fragment {self.id}: {self.molecule.get_formula(False)} local minimum @ {self.energy:.8f} Hartree (results on {self.engfile})"] return "\n".join(lines) class FragmentedState: def __init__(self, landscape, energy, composition, connections=None, adsorptionPrefactors=None, desorptionPrefactors=None): self._landscape = landscape self.energy = energy self.composition = composition self.connections = connections self.adsorptionPrefactors = adsorptionPrefactors self.desorptionPrefactors = desorptionPrefactors @property def id(self): return self._landscape._fstates.index(self)+1 @property def fragments(self): return [ self._landscape._fragments[i] for i in self.composition ] def __str__(self): formula = "" for i,id in enumerate(self.composition): formula += self._landscape._fragments[id].molecule.get_formula(False) if( i != len(self.composition)-1 ): formula += "+" lines = [f"FragmentedState {self.id}: {formula} local minimum @ {self.energy:.8f} Hartree (fragments {[i+1 for i in self.composition]})"] for i,iState in enumerate(self.connections): lines += [f" +- {self._landscape._states[iState]}"] if( i == len(self.connections)-1 ): lines += [f" Prefactors: {self.adsorptionPrefactors[i]:.3E}:{self.desorptionPrefactors[i]:.3E}"] else: lines += [f" | Prefactors: {self.adsorptionPrefactors[i]:.3E}:{self.desorptionPrefactors[i]:.3E}"] return "\n".join(lines) def __init__(self, results): self._states = [] self._fragments = [] self._fstates = [] if results is None: return sec = results.read_rkf_section("EnergyLandscape") # If there is only 1 state in the 'EnergyLandscape' section, some variables that are normally lists are insted be build-in types (e.g. a 'float' instead of a 'list of floats'). # For convenience here we make sure that the following variables are always 'lists': for var in ['energies', 'counts', 'isTS', 'reactants', 'products']: if not isinstance(sec[var], list): sec[var] = [sec[var]] nStates = sec['nStates'] for iState in range(nStates): energy = sec['energies'][iState] resfile = os.path.splitext(sec['fileNames'].split('\0')[iState])[0] mol = results.get_molecule('Molecule',file=resfile) count = sec['counts'][iState] if not sec['isTS'][iState]: self._states.append(AMSResults.EnergyLandscape.State(self, resfile, energy, mol, count, False)) else: reactantsID = sec['reactants'][iState] if sec['reactants'][iState] > 0 else None productsID = sec['products'][iState] if sec['products'][iState] > 0 else None prefactorsFromReactant = sec['prefactorsFromReactant'][iState] if sec['products'][iState] > 0 else None prefactorsFromProduct = sec['prefactorsFromProduct'][iState] if sec['products'][iState] > 0 else None self._states.append(AMSResults.EnergyLandscape.State(self, resfile, energy, mol, count, True, reactantsID, productsID, prefactorsFromReactant, prefactorsFromProduct)) if( 'nFragments' not in sec ): return nFragments = sec['nFragments'] for iFragment in range(nFragments): energy = sec['fragmentsEnergies'][iFragment] resfile = os.path.splitext(sec['fragmentsFileNames'].split('\0')[iFragment])[0] mol = results.get_molecule('Molecule',file=resfile) self._fragments.append(AMSResults.EnergyLandscape.Fragment(self, resfile, energy, mol)) if( 'nFStates' not in sec ): return nFragmentedStates = sec['nFStates'] for iFState in range(nFragmentedStates): iEnergy = sec['fStatesEnergy('+str(iFState+1)+')'] iNFragments = sec['fStatesNFragments('+str(iFState+1)+')'] value = sec['fStatesComposition('+str(iFState+1)+')'] iComposition = [i-1 for i in value] if isinstance(value,list) else [value-1] iNConnections = sec['fStatesNConnections('+str(iFState+1)+')'] value = sec['fStatesConnections('+str(iFState+1)+')'] iConnections = [i-1 for i in value] if isinstance(value,list) else [value-1] value = sec['fStatesAdsorptionPrefactors('+str(iFState+1)+')'] iAdsorptionPrefactors = value if isinstance(value,list) else [value] value = sec['fStatesDesorptionPrefactors('+str(iFState+1)+')'] iDesorptionPrefactors = value if isinstance(value,list) else [value] self._fstates.append(AMSResults.EnergyLandscape.FragmentedState(self, iEnergy, iComposition, iConnections, iAdsorptionPrefactors, iDesorptionPrefactors)) @property def minima(self): return [s for s in self._states if not s.isTS] @property def transition_states(self): return [s for s in self._states if s.isTS] @property def fragments(self): return [f for f in self._fragments] @property def fragmented_states(self): return [fs for fs in self._fstates] def __str__(self): lines = ['All stationary points:'] lines += ['======================'] for s in self._states: lines += [ str(s) ] for f in self._fragments: lines += [ str(f) ] for fs in self._fstates: lines += [ str(fs) ] return "\n".join(lines) def __getitem__(self, i): return self._states[i-1] def __iter__(self): return iter(self._states) def __len__(self): return len(self._states)
[docs] def get_energy_landscape(self): """Returns the energy landscape obtained from a PESExploration job run by the AMS driver. The energy landscape is a set of stationary PES points (local minima and transition states). The returned object is of the type ``AMSResults.EnergyLandscape`` and offers convenient access to the states' energies, geometries as well as the information on which transition states connect which minima. .. code-block:: python el = results.get_energy_landscape() print(el) for state in el: print(f"Energy = {state.energy}") print(f"Is transition state = {state.isTS}") print("Geometry:", state.molecule) if state.isTS: print(f"Forward barrier: {state.energy - state.reactants.energy}") print(f"Backward barrier: {state.energy - state.products.energy}") """ return AMSResults.EnergyLandscape(self)
#========================================================================= def _access_rkf(self, func, file='ams'): """A skeleton method for accessing any of the ``.rkf`` files produced by AMS. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. The *func* argument has to be a function to call on a chosen ``.rkf`` file. It should take one argument, an instance of |KFFile|. """ #Try unique engine: if file == 'engine': names = self.engine_names() if len(names) == 1: return func(self.rkfs[names[0]]) else: raise ValueError("You cannot use 'engine' as 'file' argument if the engine results file is not unique. Please use the real name of the file you wish to read") #Try: if file in self.rkfs: return func(self.rkfs[file]) #Try harder: filename = file + '.rkf' self.refresh() if filename in self.files: self.rkfs[file] = KFFile(opj(self.job.path, filename)) return func(self.rkfs[file]) #Surrender: raise FileError('File {} not present in {}'.format(filename, self.job.path)) def _process_engine_results(self, func, engine=None): """A generic method skeleton for processing any engine results ``.rkf`` file. *func* should be a function that takes one argument (an instance of |KFFile|) and returns arbitrary data. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ names = self.engine_names() if engine is not None: if engine in names: return func(self.rkfs[engine]) else: raise FileError('File {}.rkf not present in {}'.format(engine, self.job.path)) else: if len(names) == 1: return func(self.rkfs[names[0]]) elif len(names) == 0: raise FileError('There is no engine .rkf present in {}'.format(self.job.path)) else: raise ValueError("You need to specify the 'engine' argument when there are multiple engine result files present in the job folder")
#=========================================================================== #=========================================================================== #===========================================================================
[docs]class AMSJob(SingleJob): """A class representing a single computation with AMS driver. The corresponding results type is |AMSResults|. """ _result_type = AMSResults _command = 'ams'
[docs] def run(self, jobrunner=None, jobmanager=None, watch=False, **kwargs): """Run the job using *jobmanager* and *jobrunner* (or defaults, if ``None``). If *watch* is set to ``True``, the contents of the AMS driver logfile will be forwarded line by line to the PLAMS logfile (and stdout), allowing for an easier monitoring of the running job. Not that the forwarding of the AMS driver logfile will cause make the call to this method block until the job's execution has finished, even when using a parallel |JobRunner|. Other keyword arguments (*\*\*kwargs*) are stored in ``run`` branch of job's settings. Returned value is the |AMSResults| instance associated with this job. """ if _has_watchdog and watch: if 'default_jobmanager' in config: jobmanager = config.default_jobmanager else: raise PlamsError('No default jobmanager found. This probably means that PLAMS init() was not called.') observer = Observer() event_handler = AMSJobLogTailHandler(self, jobmanager) observer.schedule(event_handler, jobmanager.workdir, recursive=True) observer.start() try: results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs) results.wait() finally: observer.stop() observer.join() else: results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs) return results
[docs] def get_input(self): """Generate the input file. This method is just a wrapper around :meth:`_serialize_input`. Each instance of |AMSJob| or |AMSResults| present as a value in ``settings.input`` branch is replaced with an absolute path to ``ams.rkf`` file of that job. If you need to use a path to some engine specific ``.rkf`` file rather than the main ``ams.rkf`` file, you can to it by supplying a tuple ``(x, name)`` where ``x`` is an instance of |AMSJob| or |AMSResults| and ``name`` is a string with the name of the ``.rkf`` file you want. For example, ``(myjob, 'dftb')`` will transform to the absolute path to ``dftb.rkf`` file in ``myjob``'s folder, if such a file is present. Instances of |KFFile| are replaced with absolute paths to corresponding files. """ special = { AMSJob: lambda x: x.results.rkfpath(), AMSResults: lambda x: x.rkfpath(), KFFile: lambda x: x.path, tuple: lambda x: AMSJob._tuple2rkf(x) } return self._serialize_input(special)
[docs] def get_runscript(self): """Generate the runscript. Returned string is of the form:: unset AMS_SWITCH_LOGFILE_AND_STDOUT AMS_JOBNAME=jobname AMS_RESULTSDIR=. $AMSBIN/ams [-n nproc] --input=jobname.in [>jobname.out] ``-n`` flag is added if ``settings.runscript.nproc`` exists. ``[>jobname.out]`` is used based on ``settings.runscript.stdout_redirect``. If ``settings.runscript.preamble_lines`` exists, those lines will be added to the runscript verbatim before the execution of AMS. If ``settings.runscript.postamble_lines`` exists, those lines will be added to the runscript verbatim after the execution of AMS. """ ret = 'unset AMS_SWITCH_LOGFILE_AND_STDOUT\n' ret += 'unset SCM_LOGFILE\n' if 'nnode' in self.settings.runscript and config.slurm: # Running as a SLURM job step and user specified the number of nodes explicitly. ret += f'export SCM_SRUN_OPTIONS="$SCM_SRUN_OPTIONS -N {self.settings.runscript.nnode}"\n' elif 'nproc' in self.settings.runscript and config.slurm: # Running as a SLURM job step and user asked for a specific number of tasks. # Make sure to use as few nodes as possible to avoid distributing jobs needlessly across nodes. # See: https://stackoverflow.com/questions/71382578 for nnode in range(1,len(config.slurm.tasks_per_node)+1): if sum(config.slurm.tasks_per_node[0:nnode]) >= self.settings.runscript.nproc: break if nnode > 1: nnode = f"1-{nnode}" ret += f'export SCM_SRUN_OPTIONS="$SCM_SRUN_OPTIONS -N {nnode}"\n' if 'preamble_lines' in self.settings.runscript: for line in self.settings.runscript.preamble_lines: ret += f'{line}\n' ret += 'AMS_JOBNAME="{}" AMS_RESULTSDIR=. $AMSBIN/ams'.format(self.name) if 'nproc' in self.settings.runscript: ret += ' -n {}'.format(self.settings.runscript.nproc) ret += ' --input="{}" < /dev/null'.format(self._filename('inp')) if self.settings.runscript.stdout_redirect: ret += ' >"{}"'.format(self._filename('out')) ret += '\n\n' if 'postamble_lines' in self.settings.runscript: for line in self.settings.runscript.postamble_lines: ret += f'{line}\n' ret += '\n' return ret
[docs] def check(self): """Check if ``termination status`` variable from ``General`` section of main KF file equals ``NORMAL TERMINATION``.""" try: status = self.results.readrkf('General', 'termination status') except: return False if 'NORMAL TERMINATION' in status: if 'errors' in status: log('Job {} reported errors. Please check the output'.format(self._full_name()), 1) return False if 'warnings' in status: log('Job {} reported warnings. Please check the output'.format(self._full_name()), 1) return True return False
[docs] def get_errormsg(self): """Tries to get an error message for a failed job. This method returns ``None`` for successful jobs.""" if self.ok(): return None else: # Something went wrong. The first place to check is the termination status on the ams.rkf. # If the AMS driver stopped with a known error (called StopIt in the Fortran code), the error will be in there. try: msg = self.results.readrkf('General','termination status') if msg == 'NORMAL TERMINATION with errors' or msg is None: # Apparently this wasn't a hard stop in the middle of the job. # Let's look for the last error in the logfile ... msg = self.results.grep_file('ams.log', 'ERROR: ')[-1].partition('ERROR: ')[2] elif msg == 'IN PROGRESS' and '$JN.err' in self.results: # If the status is still "IN PROGRESS", that probably means AMS was shut down hard from the outside. # E.g. it got SIGKILL from the scheduler for exceeding some resource limit. # In this case useful information may be found on stderr. with open(self.results['$JN.err'], 'r') as err: errlines = err.read().splitlines() for el in reversed(errlines): if el != '' and not el.isspace(): msg = "Killed while IN PROGRESS: " + el break except: msg = 'Could not determine error message. Please check the output manually.' return msg
[docs] def hash_input(self): """Calculate the hash of the input file. All instances of |AMSJob| or |AMSResults| present as values in ``settings.input`` branch are replaced with hashes of corresponding job's inputs. Instances of |KFFile| are replaced with absolute paths to corresponding files. """ special = { AMSJob: lambda x: x.hash_input(), AMSResults: lambda x: x.job.hash_input(), KFFile: lambda x: x.path, tuple: lambda x: AMSJob._tuple2rkf(x) } return sha256(self._serialize_input(special))
#========================================================================= def _serialize_input(self, special): """Transform the contents of ``settings.input`` branch into string with blocks, keys and values. First, the contents of ``settings.input`` are extended with entries returned by :meth:`_serialize_molecule`. Then the contents of ``settings.input.ams`` are used to generate AMS text input. Finally, every other (than ``ams``) entry in ``settings.input`` is used to generate engine specific input. Special values can be indicated with *special* argument, which should be a dictionary having types of objects as keys and functions translating these types to strings as values. """ def unspec(value): """Check if *value* is one of a special types and convert it to string if it is.""" for spec_type in special: if isinstance(value, spec_type): return special[spec_type](value) return value def serialize(key, value, indent, end='End'): """Given a *key* and its corresponding *value* from the |Settings| instance produce a snippet of the input file representing this pair. If the value is a nested |Settings| instance, use recursive calls to build the snippet for the entire block. Indent the result with *indent* spaces. """ ret = '' if isinstance(value, Settings): # Open block ... ret += ' '*indent + key # ... with potential header if '_h' in value: ret += ' ' + unspec(value['_h']) ret += '\n' # Free block, or explicitly placed entries with _1, _2, ... i = 1 while f'_{i}' in value: if isinstance(value[f'_{i}'], Settings): for ckey in value[f'_{i}']: ret += serialize(ckey, value[f'_{i}'][ckey], indent+2) else: ret += serialize('', value['_'+str(i)], indent+2) i += 1 # Figure out the order in which we should serialize the entries in the block if "_o" in value: # Ordered block: we need to serialize the contents in the order given by "_o" children = [] for v in value["_o"]: ckey, _, idx = v.partition('[') if idx: # Child is a recurring entry, e.g. SubBlock[5] idx = int(idx.rstrip("]")) children.append((ckey,value[ckey][idx-1])) else: # Child is a unique entry children.append((ckey,value[ckey])) else: # Unordered block: normal Python order when iterating over a dict/Settings children = [(ckey,value[ckey]) for ckey in value if not ckey.startswith('_')] # Serialize all children for ckey, cvalue in children: split_key = key.lower().split() split_ckey = ckey.lower().split() if len(split_key) > 0 and split_key[0] == 'engine' and ckey.lower() == 'input': ret += serialize(ckey, cvalue, indent+2, 'EndInput') # REB: For the hybrid engine. How to deal with the space in ckey (Engine DFTB)? Replace by underscore? elif len(split_ckey) > 0 and split_ckey[0] == 'engine': engine = ' '.join(ckey.split('_')) ret += serialize(engine, cvalue, indent+2, end='EndEngine') + '\n' else: ret += serialize(ckey, cvalue, indent+2) # Close block if key.lower() == 'input': end='endinput' ret += ' '*indent + end+'\n' elif isinstance(value, list): for el in value: ret += serialize(key, el, indent, end) elif value == '' or value is True: ret += ' '*indent + key + '\n' elif value is False or value is None: pass else: ret += ' '*indent + key + ' ' + str(unspec(value)) + '\n' return ret fullinput = self.settings.input.copy() #prepare contents of 'system' block(s) more_systems = self._serialize_molecule() if more_systems: if 'system' in fullinput.ams: #nonempty system block was already present in input.ams system = fullinput.ams.system system_list = system if isinstance(system, list) else [system] system_list_set = Settings({(s._h if '_h' in s else ''):s for s in system_list}) more_systems_set = Settings({(s._h if '_h' in s else ''):s for s in more_systems}) system_list_set += more_systems_set system_list = list(system_list_set.values()) system = system_list[0] if len(system_list) == 1 else system_list fullinput.ams.system = system else: fullinput.ams.system = more_systems[0] if len(more_systems) == 1 else more_systems txtinp = '' ams = fullinput.find_case('ams') #contents of the 'ams' block (AMS input) go first for item in fullinput[ams]: txtinp += serialize(item, fullinput[ams][item], 0) + '\n' #and then engines for engine in fullinput: if engine != ams: txtinp += serialize('Engine '+engine, fullinput[engine], 0, end='EndEngine') + '\n' return txtinp def _serialize_molecule(self): """Return a list of |Settings| instances containing the information about one or more |Molecule| instances stored in the ``molecule`` attribute. Molecular charge is taken from ``molecule.properties.charge``, if present. Additional, atom-specific information to be put in ``atoms`` block after XYZ coordinates can be supplied with ``atom.properties.suffix``. If the ``molecule`` attribute is a dictionary, the returned list is of the same length as the size of the dictionary. Keys from the dictionary are used as headers of returned ``system`` blocks. """ if self.molecule is None: return Settings() moldict = {} if isinstance(self.molecule, Molecule): moldict = {'':self.molecule} elif isinstance(self.molecule, dict): moldict = self.molecule else: raise JobError("Incorrect 'molecule' attribute of job {}. 'molecule' should be a Molecule, a dictionary or None, and not {}".format(self._full_name(), type(self.molecule))) ret = [] for name, molecule in moldict.items(): newsystem = Settings() if name: newsystem._h = name if len(molecule.lattice) in [1,2] and molecule.align_lattice(): log("The lattice of {} Molecule supplied for job {} did not follow the convention required by AMS. I rotated the whole system for you. You're welcome".format(name if name else 'main', self._full_name()), 3) newsystem.Atoms._1 = [atom.str(symbol=self._atom_symbol(atom), space=18, decimal=10, suffix=self._atom_suffix(atom)) for atom in molecule] if molecule.lattice: newsystem.Lattice._1 = ['{:16.10f} {:16.10f} {:16.10f}'.format(*vec) for vec in molecule.lattice] if len(molecule.bonds)>0: lines = ['{} {} {}'.format(molecule.index(b.atom1), molecule.index(b.atom2), b.order) for b in molecule.bonds] # Add bond properties if they are defined newsystem.BondOrders._1 = ['{} {}'.format(text,molecule.bonds[i].properties.suffix) if 'suffix' in molecule.bonds[i].properties else text for i,text in enumerate(lines)] if 'charge' in molecule.properties: newsystem.Charge = molecule.properties.charge if 'regions' in molecule.properties : #newsystem.Region = [Settings({'_h':name, '_1':['%s=%s'%(k,str(v)) for k,v in data.items()]}) for name, data in molecule.properties.regions.items()] newsystem.Region = [Settings({'_h':name, 'Properties':Settings({'_1':[line for line in data]})}) for name, data in molecule.properties.regions.items() if isinstance(data,list)] ret.append(newsystem) return ret #=========================================================================
[docs] def get_task(self): """ Returns the AMS Task from the job's settings. If it does not exist, returns None. """ if isinstance(self.settings, Settings) and 'input' in self.settings and 'ams' in self.settings.input and 'task' in self.settings.input.ams: return self.settings.input.ams.task return None
@staticmethod def _atom_suffix(atom): """Return the suffix of an atom. Combines both ``properties.suffix`` as well as the other properties in a format that is suitable for inclusion in the AMSuite input.""" # Build key-value dictionary from properties keyval_dict = {} def serialize(sett, prefix=''): for key, val in sett.items(): if prefix == '' and key.lower() in ['suffix', 'ghost', 'name']: # Special atomic properties that are handled by _atom_symbol() already. # Suffix handled explicitly below ... continue if isinstance(val, Settings): # Recursively serialize nested Settings object serialize(val, prefix+key+'.') continue elif isinstance(val, list): # Lists are converted to a comma separated string of elements val = ",".join( str(v) for v in val ) elif isinstance(val, set): # Sets are converted to a *sorted* comma separated string of elements val = ",".join( str(v) for v in sorted(val) ) key = prefix+key if not isinstance(val, str): val = str(val) if '\n' in val: raise ValueError(f"String representation of atomic property {key} may not include line breaks") if '{' in val or '}' in val: raise ValueError(f"String representation of atomic property {key} may not include curly brackets: {val}") if ' ' in val: # Ensure that space containing values are quoted if '"' not in val: val = '"'+val+'"' elif "'" not in val: val = "'"+val+"'" else: raise ValueError(f"Atomic property {key} can not include both single and double quotes: {val}") keyval_dict[key] = str(val) serialize(atom.properties) # Assemble and return complete suffix string ret = " ".join( f"{key}={val}" for key,val in keyval_dict.items() ) if 'suffix' in atom.properties: ret += " "+str(atom.properties.suffix) return ret.strip() @staticmethod def _atom_symbol(atom): """Return the atomic symbol of *atom*. Ensure proper formatting for AMSuite input taking into account ``ghost`` and ``name`` entries in ``properties`` of *atom*.""" smb = atom.symbol if 'ghost' in atom.properties and atom.properties.ghost: smb = ('Gh.'+smb).rstrip('.') if 'name' in atom.properties: smb = (smb+'.'+str(atom.properties.name)).lstrip('.') return smb @staticmethod def _tuple2rkf(arg): """Transform a pair ``(x, name)`` where ``x`` is an instance of |AMSJob| or |AMSResults| and ``name`` is a name of ``.rkf`` file (``ams`` or engine) to an absolute path to that ``.rkf`` file.""" if len(arg) == 2 and isinstance(arg[1], str): if isinstance(arg[0], AMSJob): return arg[0].results.rkfpath(arg[1]) if isinstance(arg[0], AMSResults): return arg[0].rkfpath(arg[1]) return str(arg)
[docs] @classmethod def load_external(cls, path, settings=None, molecule=None, finalize=False, fmt='ams'): """Load an external job from *path*. In this context an "external job" is an execution of some external binary that was not managed by PLAMS, and hence does not have a ``.dill`` file. It can also be used in situations where the execution was started with PLAMS, but the Python process was terminated before the execution finished, resulting in steps 9-12 of :ref:`job-life-cycle` not happening. All the files produced by your computation should be placed in one folder and *path* should be the path to this folder or a file in this folder. The name of the folder is used as a job name. Input, output, error and runscript files, if present, should have names defined in ``_filenames`` class attribute (usually ``[jobname].in``, ``[jobname].out``, ``[jobname].err`` and ``[jobname].run``). It is not required to supply all these files, but in most cases one would like to use at least the output file, in order to use methods like :meth:`~scm.plams.core.results.Results.grep_output` or :meth:`~scm.plams.core.results.Results.get_output_chunk`. If *path* is an instance of an AMSJob, that instance is returned. This method is a class method, so it is called via class object and it returns an instance of that class:: >>> a = AMSJob.load_external(path='some/path/jobname') >>> type(a) scm.plams.interfaces.adfsuite.ams.AMSJob You can supply |Settings| and |Molecule| instances as *settings* and *molecule* parameters, they will end up attached to the returned job instance. If you don't do this, PLAMS will try to recreate them automatically using methods :meth:`~scm.plams.core.results.Results.recreate_settings` and :meth:`~scm.plams.core.results.Results.recreate_molecule` of the corresponding |Results| subclass. If no |Settings| instance is obtained in either way, the defaults from ``config.job`` are copied. You can set the *finalize* parameter to ``True`` if you wish to run the whole :meth:`~Job._finalize` on the newly created job. In that case PLAMS will perform the usual :meth:`~Job.check` to determine the job status (*successful* or *failed*), followed by cleaning of the job folder (|cleaning|), |postrun| and pickling (|pickling|). If *finalize* is ``False``, the status of the returned job is *copied*. fmt : str One of 'ams', 'qe', 'vasp', or 'any'. 'ams': load a finished AMS job 'qe': convert a Quantum ESPRESSO .out file to ams.rkf and qe.rkf and load from those files 'vasp': convert a VASP OUTCAR file to ams.rkf and vasp.rkf and load from those files (see more below) 'any': auto-detect the format. This method can also be used to convert a finished VASP job to an AMSJob. If you supply the path to a folder containing OUTCAR, then a subdirectory will be created in this folder called AMSJob. In the AMSJob subdirectory, two files will be created: ams.rkf and vasp.rkf, that contain some of the results from the VASP calculation. If the AMSJob subdirectory already exists, the existing ams.rkf and vasp.rkf files will be reused. NOTE: the purpose of loading VASP data this way is to let you call for example job.results.get_energy() etc., not to run new VASP calculations! """ if isinstance(path, cls): return path preferred_name = None fmt = fmt.lower() # first check if path is a VASP output # in which case call cls._vasp_to_ams, which will return a NEW path # containing ams.rkf and vasp.rkf (the .rkf files will be created if they do not exist) if os.path.isdir(path): preferred_name = os.path.basename(os.path.abspath(path)) if not os.path.exists(os.path.join(path,'ams.rkf')) \ and os.path.exists(os.path.join(path,'OUTCAR')) \ and (fmt == 'vasp' or fmt == 'any'): path = vasp_output_to_ams(path, overwrite=False) elif os.path.exists(path) and os.path.basename(path) == 'OUTCAR' and (fmt == 'vasp' or fmt == 'any'): preferred_name = os.path.basename(os.path.dirname(os.path.abspath(path))) path = vasp_output_to_ams(os.path.dirname(path), overwrite=False) elif os.path.exists(path): from ase.io.formats import filetype try: ft = filetype(path) # check if path is a Quantum ESPRESSO .out file # qe_output_to_ams returns a NEW path containng the ams.rkf and qe.rkf files (will be created if # they do not exist) if fmt == 'qe': assert ft == 'espresso-out', f"The file {path} does not seem to be a Quantum ESPRESSO output file" if fmt == 'gaussian': assert ft == 'gaussian-out', f"The file {path} does not seem to be a Gaussian output file" if ft == 'espresso-out' and (fmt =='qe' or (fmt == 'any' and not path.endswith('rkf'))): path = qe_output_to_ams(path, overwrite=False) elif ft == 'gaussian-out' and (fmt == 'gaussian' or (fmt == 'any' and not path.endswith('rkf'))): path = gaussian_output_to_ams(path, overwrite=False) except Exception as e: # several types of exceptions can be raised, e.g. StopIteration and UnicodeDecodeError # assume that any exception means that the file was not a QE output file pass if not os.path.isdir(path): if os.path.exists(path): path = os.path.dirname(os.path.abspath(path)) elif os.path.isdir(path+'.results'): path=path+'.results' elif os.path.isdir(path+'results'): path=path+'results' else: raise FileError('Path {} does not exist, cannot load from it.'.format(path)) job = super(AMSJob, cls).load_external( path, settings, molecule, finalize) if preferred_name is not None: job.name = preferred_name if job.name.endswith('.results') and len(job.name) > 8: job.name = job.name[:-8] return job
[docs] @classmethod def from_input(cls, text_input:str, **kwargs) -> 'AMSJob': """ Creates an AMSJob from AMS-style text input. This function requires that the SCM Python package is installed (if not, it will raise an ImportError). text_input : a multi-line string Returns: An AMSJob instance Example:: text = ''' Task GeometryOptimization Engine DFTB Model GFN1-xTB EndEngine ''' job = AMSJob.from_input(text) .. note:: If *molecule* is included in the keyword arguments to this method, the *text_input* may not contain any System blocks. In other words, the molecules to be used either need to come from the *text_input*, or the keyword argument, but not both. If *settings* is included in the keyword arguments to this method, the |Settings| created from the *text_input* will be soft updated with the settings from the keyword argument. In other word, the *text_input* takes precedence over the *settings* keyword argument. """ from scm.libbase import InputParser sett = Settings() sett.input = InputParser().to_settings(cls._command, text_input) mol = cls.settings_to_mol(sett) if mol: if 'molecule' in kwargs: raise JobError('AMSJob.from_input(): molecule passed in both text_input and as keyword argument') else: mol = kwargs.pop('molecule', None) if 'settings' in kwargs: sett.soft_update(kwargs.pop('settings')) return cls(molecule=mol, settings=sett, **kwargs)
[docs] @classmethod def from_inputfile(cls, filename:str, heredoc_delimit:str='eor', **kwargs) -> 'AMSJob': """Construct an :class:`AMSJob` instance from an AMS inputfile or runfile. If a runscript is provide than this method will attempt to extract the input file based on the heredoc delimiter (see *heredoc_delimit*). """ with open(filename, 'r') as f: inp_file = parse_heredoc(f.read(), heredoc_delimit) return cls.from_input(inp_file, **kwargs)
[docs] @staticmethod def settings_to_mol(s: Settings) -> dict: """Pop the `s.input.ams.system` block from a settings instance and convert it into a dictionary of molecules. The provided settings should be in the same style as the ones produced by the SCM input parser. Dictionary keys are taken from the header of each system block. The existing `s.input.ams.system` block is removed in the process, assuming it was present in the first place. """ from ...tools.units import Units def get_list(s): if '_1' in s and not '_2' in s and isinstance(s._1, list): return s._1 else: i = 1 l = [] while '_' + str(i) in s: l.append(s['_' + str(i)]) i += 1 return l def read_mol(settings_block: Settings) -> Molecule: """Retrieve single molecule from a single `s.input.ams.system` block.""" if 'geometryfile' in settings_block: mol = Molecule(settings_block.geometryfile) else: mol = Molecule() if "_h" in settings_block.atoms: conv = Units.conversion_ratio(settings_block.atoms._h.strip("[]"), "Angstrom") else: conv = 1.0 for atom in get_list(settings_block.atoms): # Extract arguments for Atom() symbol, x, y, z, *suffix = atom.split(maxsplit=4) coords = conv*float(x), conv*float(y), conv*float(z) try: at = Atom(symbol=symbol, coords=coords) except PTError: # It's either a ghost atom and/or an atom with a custom name kwargs = {} if symbol.startswith('Gh.'): # Ghost atom kwargs['ghost'] = True _, symbol = symbol.split('.', maxsplit=1) if '.' in symbol: # Atom with a custom name symbol, kwargs['name'] = symbol.split('.', maxsplit=1) at = Atom(symbol=symbol, coords=coords, **kwargs) if suffix: at.properties.soft_update(AMSJob._atom_suffix_to_settings(suffix[0])) mol.add_atom(at) # Set the lattice vector if applicable if get_list(settings_block.lattice): if "_h" in settings_block.lattice: conv = Units.conversion_ratio(settings_block.lattice._h.strip("[]"), "Angstrom") else: conv = 1.0 mol.lattice = [tuple(conv*float(j) for j in i.split()) for i in get_list(settings_block.lattice)] # Add bonds for bond in get_list(settings_block.bondorders): _at1, _at2, _order, *suffix = bond.split(maxsplit=3) at1, at2, order = mol[int(_at1)], mol[int(_at2)], float(_order) plams_bond = Bond(at1,at2,order=order) if suffix: plams_bond.properties.suffix = suffix[0] mol.add_bond(plams_bond) # Set the molecular charge if settings_block.charge: mol.properties.charge = settings_block.charge # Set the region info (used in ACErxn) if settings_block.region : for s_reg in settings_block.region : if not '_h' in s_reg.keys() : raise JobError ('Region block requires a header!') if s_reg.properties : mol.properties.regions[s_reg._h] = s_reg.properties._1 mol.properties.name = str(settings_block._h) return mol # Raises a KeyError if the `system` key is absent with s.suppress_missing(): try: settings_list = s.input.ams.pop('system') if not isinstance(settings_list, list): settings_list = [settings_list] except KeyError: # The block s.input.ams.system is absent return None # Create a new dictionary with system headers as keys and molecules as values moldict = {} for settings_block in settings_list: key = str(settings_block._h) if ('_h' in settings_block) else '' # Empty string used as default system name. if key in moldict: raise KeyError(f"Duplicate system headers found in s.input.ams.system: {repr(key)}") moldict[key] = read_mol(settings_block) return moldict
@staticmethod def _atom_suffix_to_settings(suffix) -> Settings: def is_int(s): if '.' in s: return False try: return int(s) == float(s) except ValueError: return False def is_float(s): try: float(s) return True except ValueError: return False import shlex tokens = shlex.split(suffix) # Remove possible spaces around = signs i = 0 while i < len(tokens): if tokens[i].endswith('='): tokens[i] += tokens.pop(i+1) elif tokens[i].startswith('='): tokens[i-1] += tokens.pop(i) else: i += 1 properties = Settings() for i,t in enumerate(tokens): key, _, val = t.partition('=') if not val: # Token that is not a key=value pair? Let's accumulate those in the plain text suffix. if 'suffix' in properties: properties.suffix += ' '+key else: properties.suffix = key continue else: # We have a value. Let's make an educated guess for its type. if val.lower() == 'true': val = True elif val.lower() == 'false': val = False elif is_int(val): val = int(val) elif is_float(val): val = float(val) elif ',' in val: elem = val.split(',') if key.lower() == 'region': # For regions we will output a set instead of a list val = set(elem) elif all(is_int(i) for i in elem): val = [ int(i) for i in elem ] elif all(is_float(f) for f in elem): val = [ float(f) for f in elem ] else: # Anything else will come out as a list of strings val = elem properties.set_nested(key.split('.'), val) return properties @staticmethod def _add_region(atom:Atom, name:str): """ Converts atom.properties.region to a set if it isn't already a set. Adds the name to the set. If name is None, then only the conversion of atom.properties.region is performed. """ if 'region' not in atom.properties: atom.properties.region = set() if isinstance(atom.properties.region, str): atom.properties.region = set([atom.properties.region]) if not isinstance(atom.properties.region, set): atom.properties.region = set(atom.properties.region) if name is None: return atom.properties.region.add(name)