Source code for scm.plams.interfaces.adfsuite.ams

import os
import subprocess
import numpy as np
import re
from natsort import natsorted

from os.path import join as opj

from ...core.basejob import SingleJob
from ...core.errors import FileError, JobError, ResultsError, PTError, PlamsError, JobError
from ...core.functions import config, log, parse_heredoc
from ...core.private import sha256, UpdateSysPath
from ...core.results import Results
from ...core.settings import Settings
from ...mol.molecule import Molecule
from ...mol.atom import Atom
from ...mol.bond import Bond
from ...tools.kftools import KFFile
from ...tools.units import Units
from ...trajectories.rkffile import RKFTrajectoryFile
from ...tools.converters import vasp_output_to_ams, qe_output_to_ams

try:
    from watchdog.observers import Observer
    from watchdog.events import PatternMatchingEventHandler, FileModifiedEvent
    _has_watchdog = True

    class AMSJobLogTailHandler(PatternMatchingEventHandler):

        def __init__(self, job, jobmanager):
            super().__init__(patterns=[os.path.join(jobmanager.workdir, f"{job.name}*", "ams.log")],
                             ignore_patterns=["*.rkf", "*.out"],
                             ignore_directories=True,
                             case_sensitive=True)
            self._job = job
            self._seekto = 0

        def on_any_event(self, event):
            if self._job.path is not None \
            and event.src_path == os.path.join(self._job.path, "ams.log") \
            and isinstance(event, FileModifiedEvent):
                try:
                    with open(event.src_path, 'r') as f:
                        f.seek(self._seekto)
                        while True:
                            line = f.readline()
                            if not line: break
                            log(f"{self._job.name}: "+line[25:-1])
                        self._seekto = f.tell()
                except FileNotFoundError:
                    self._seekto = 0

except ImportError:
    _has_watchdog = False



__all__ = ['AMSJob', 'AMSResults']


[docs]class AMSResults(Results): """A specialized |Results| subclass for accessing the results of |AMSJob|.""" def __init__(self, *args, **kwargs): Results.__init__(self, *args, **kwargs) self.rkfs = {}
[docs] def collect(self): """Collect files present in the job folder. Use parent method from |Results|, then create an instance of |KFFile| for each ``.rkf`` file present in the job folder. Collect these files in ``rkfs`` dictionary, with keys being filenames without ``.rkf`` extension. The information about ``.rkf`` files generated by engines is taken from the main ``ams.rkf`` file. This method is called automatically during the final part of the job execution and there is no need to call it manually. """ Results.collect(self) rkfname = 'ams.rkf' if rkfname in self.files: main = KFFile(opj(self.job.path, rkfname)) n = main[('EngineResults','nEntries')] for i in range(1, n+1): title = main[('EngineResults','Title({})'.format(i))] files = main[('EngineResults','Files({})'.format(i))].split('\x00') if files[0].endswith('.rkf'): key = files[0][:-4] self.rkfs[key] = KFFile(opj(self.job.path, files[0])) self.rkfs['ams'] = main else: log('WARNING: Main KF file {} not present in {}'.format(rkfname, self.job.path), 1)
[docs] def refresh(self): """Refresh the contents of ``files`` list. Use the parent method from |Results|, then look at |KFFile| instances present in ``rkfs`` dictionary and check if they point to existing files. If not, try to reinstantiate them with current job path (that can happen while loading a pickled job after the entire job folder was moved). """ Results.refresh(self) to_remove = [] for key,val in self.rkfs.items(): if not os.path.isfile(val.path): if os.path.dirname(val.path) != self.job.path: guessnewpath = opj(self.job.path, os.path.basename(val.path)) if os.path.isfile(guessnewpath): self.rkfs[key] = KFFile(guessnewpath) else: to_remove.append(key) else: to_remove.append(key) for i in to_remove: del self.rkfs[i]
[docs] def engine_names(self): """Return a list of all names of engine specific ``.rkf`` files. The identifier of the main result file (``'ams'``) is not present in the returned list, only engine specific names are listed. """ self.refresh() ret = list(self.rkfs.keys()) ret.remove('ams') return ret
[docs] def rkfpath(self, file='ams'): """Return the absolute path of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. """ return self._access_rkf(lambda x: x.path, file)
[docs] def readrkf(self, section, variable, file='ams'): """Read data from *section*/*variable* of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. The type of the returned value depends on the type of *variable* defined inside KF file. It can be: single int, list of ints, single float, list of floats, single boolean, list of booleans or string. .. note:: If arguments *section* or *variable* are incorrect (not present in the chosen file), the returned value is ``None``. Please mind the fact that KF files are case sensitive. """ return self._access_rkf(lambda x: x.read(section, variable), file)
[docs] def read_rkf_section(self, section, file='ams'): """Return a dictionary with all variables from a given *section* of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. .. note:: If *section* is not present in the chosen file, the returned value is an empty dictionary. Please mind the fact that KF files are case sensitive. """ return self._access_rkf(lambda x: x.read_section(section), file)
[docs] def get_rkf_skeleton(self, file='ams'): """Return a dictionary with the structure of a chosen ``.rkf`` file. Each key corresponds to a section name with the value being a set of variable names present in that section. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. """ return self._access_rkf(lambda x: x.get_skeleton(), file)
[docs] def get_molecule(self, section, file='ams'): """Return a |Molecule| instance stored in a given *section* of a chosen ``.rkf`` file. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. All data used by this method is taken from the chosen ``.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ sectiondict = self.read_rkf_section(section, file) if sectiondict: return Molecule._mol_from_rkf_section(sectiondict)
def get_ase_atoms(self, section, file='ams'): from ase import Atoms sectiondict = self.read_rkf_section(section, file) bohr2angstrom = 0.529177210903 nLatticeVectors = sectiondict.get('nLatticeVectors', 0) pbc = [True] * nLatticeVectors + [False] * (3 - nLatticeVectors) if nLatticeVectors > 0: cell = np.zeros((3, 3)) lattice = np.array(sectiondict.get('LatticeVectors')).reshape(-1, 3) cell[:lattice.shape[0], :lattice.shape[1]] = lattice * bohr2angstrom else: cell = None atomsymbols = sectiondict['AtomSymbols'].split() positions = np.array(sectiondict['Coords']).reshape(-1,3) * bohr2angstrom return Atoms(symbols=atomsymbols, positions=positions, pbc=pbc, cell=cell)
[docs] def get_input_molecule(self): """Return a |Molecule| instance with the initial coordinates. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ return self.get_molecule('InputMolecule', 'ams')
[docs] def get_main_molecule(self): """Return a |Molecule| instance with the final coordinates. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ return self.get_molecule('Molecule', 'ams')
[docs] def get_main_ase_atoms(self): """Return an ase.Atoms instance with the final coordinates. An alternative is to call toASE(results.get_main_molecule()) to convert a Molecule to ASE Atoms. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ return self.get_ase_atoms('Molecule', 'ams')
[docs] def get_history_molecule(self, step): """Return a |Molecule| instance with coordinates taken from a particular *step* in the ``History`` section of ``ams.rkf`` file. All data used by this method is taken from ``ams.rkf`` file. The ``molecule`` attribute of the corresponding job is ignored. """ if 'ams' in self.rkfs: main = self.rkfs['ams'] if not self.is_valid_stepnumber(main,step) : return coords = main.read('History', f'Coords({step})') coords = [coords[i:i+3] for i in range(0,len(coords),3)] if ('History', f'SystemVersion({step})') in main: system = self.get_system_version(main, step) mol = self.get_molecule(f'ChemicalSystem({system})') molsrc = f'ChemicalSystem({system})' else: mol = self.get_main_molecule() molsrc = 'Molecule' if len(mol) != len(coords): raise ResultsError(f'Coordinates taken from "History%Coords({step})" have incompatible length with molecule from {molsrc} section') for at, c in zip(mol, coords): at.move_to(c, unit='bohr') if ('History', f'LatticeVectors('+str(step)+')') in main: lattice = Units.convert(main.read('History', f'LatticeVectors('+str(step)+')'), 'bohr', 'angstrom') mol.lattice = [tuple(lattice[j:j+3]) for j in range(0,len(lattice),3)] if all(('History', i) in main for i in [f'Bonds.Index({step})', f'Bonds.Atoms({step})', f'Bonds.Orders({step})']): mol.bonds = [] index = main.read('History', f'Bonds.Index({step})') if not isinstance(index, list): index = [index] atoms = main.read('History', f'Bonds.Atoms({step})') if not isinstance(atoms, list): atoms = [atoms] orders = main.read('History', f'Bonds.Orders({step})') if not isinstance(orders, list): orders = [orders] for i in range(len(index)-1): for j in range(index[i], index[i+1]): mol.add_bond(mol[i+1], mol[atoms[j-1]], orders[j-1]) return mol
[docs] def is_valid_stepnumber (self, main, step) : """ Check if the requested step number is in the results file """ if 'History' not in main: raise KeyError("'History' section not present in {}".format(main.path)) n = main.read('History', 'nEntries') if step > n: raise KeyError("Step {} not present in 'History' section of {}".format(step, main.path)) return True
[docs] def get_system_version (self, main, step) : """ Determine which Molecule version is requested """ if ('History', f'SystemVersion({step})') in main: version = main.read('History', f'SystemVersion({step})') if 'SystemVersionHistory' in main: if ('SystemVersionHistory', 'blockSize') in main: blockSize = main.read('SystemVersionHistory', 'blockSize') else: blockSize = 1 block = (version - 1) // blockSize + 1 offset = (version - 1) % blockSize system = main.read('SystemVersionHistory', f'SectionNum({block})', return_as_list=True)[offset] else: system = version return system
[docs] def get_history_variables(self, history_section='History') : """ Return a set of keynames stored in the specified history section of the ``ams.rkf`` file. The *history_section argument should be a string representing the name of the history section (``History`` or ``MDHistory``)*""" if not 'ams' in self.rkfs: return main = self.rkfs['ams'] keylist = [var for sec,var in main if sec==history_section] # Now throw out all the last parts return set([key.split('(')[0] for key in keylist if len(key.split('('))>1])
[docs] def get_history_property(self, varname, history_section='History') : """ Return the values of *varname* in the history section *history_section*.""" if not 'ams' in self.rkfs:return main = self.rkfs['ams'] if history_section not in main: raise KeyError(f"The requested section '{history_section}' does not exist in {main.path}") if (history_section,'nScanCoord') in main: # PESScan nentries = main.read(history_section,'nScanCoord') as_block = False elif (history_section,'nEntries') in main: nentries = main.read(history_section,'nEntries') as_block = self._values_stored_as_blocks(main, varname, history_section) if as_block : nblocks = main.read(history_section,'nBlocks') values = [main.read(history_section,f"{varname}({iblock})",return_as_list=True) for iblock in range(1,nblocks+1)] values = [val for blockvals in values for val in blockvals if isinstance(blockvals,list)] else : values = [main.read(history_section,f"{varname}({step})") for step in range(1,nentries+1)] return values
[docs] def get_property_at_step(self, step, varname, history_section='History') : """ Return the value of *varname* in the history section *history_section at step *step*.""" if not 'ams' in self.rkfs: return main = self.rkfs['ams'] as_block = self._values_stored_as_blocks(main, varname, history_section) if as_block : import numpy blocksize = main.read(history_section,'blockSize') iblock = int(numpy.ceil(step/blocksize)) value = main.read(history_section,f"{varname}({iblock})")[(step%blocksize)-1] else : value = main.read(history_section,f"{varname}({step})") return value
def _values_stored_as_blocks(self, main, varname, history_section) : """Determines wether the values of varname in a trajectory rkf file are stored in blocks""" nentries = main.read(history_section,'nEntries') as_block = False keylist = [var for sec,var in main if sec==history_section] if 'nBlocks' in keylist : if not f"{varname}({nentries})" in keylist : as_block = True return as_block
[docs] def get_atomic_temperatures_at_step(self, step, history_section='MDHistory') : """ Get all the atomic temperatures for step `step` Note: Numbering of steps starts at 1 """ if not 'ams' in self.rkfs: return main = self.rkfs['ams'] if not self.is_valid_stepnumber(main, step) : return # Read the masses molname = 'Molecule' if ('History', f'SystemVersion({step})') in main: system = self.get_system_version(main, step) molname = 'ChemicalSystem(%i)'%(system) masses = np.array(main.read(molname,'AtomMasses')) nats = len(masses) # Read the velocities velocities = np.array(self.get_property_at_step(step,'Velocities',history_section)).reshape((nats,3)) # Convert to SI units and compute temperatures m = masses * 1.e-3 / Units.constants['NA'] vels = velocities * Units.conversion_ratio('Bohr','Angstrom') * 1.e5 temperatures = (m.reshape((nats,1)) * vels**2).sum(axis=1) temperatures /= 3 * Units.constants['k_B'] return temperatures
[docs] def get_engine_results(self, engine=None): """Return a dictionary with contents of ``AMSResults`` section from an engine results ``.rkf`` file. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read_section('AMSResults'), engine)
[docs] def get_engine_properties(self, engine=None): """Return a dictionary with all the entries from ``Properties`` section from an engine results ``.rkf`` file. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ def properties(kf): if not 'Properties' in kf: return {} # There are two *kinds* of "Properties" sections: # - ADF simply has a set of variables in the properties section # - DFTB and some other engines have a *scheme* for storing "Properties". See the fortran 'RKFileModule' (RKFile.f90) # for more details on this. ret = {} if ('Properties', 'nEntries') in kf: # This is a 'RKFileModule' properties section: n = kf.read('Properties', 'nEntries') for i in range(1, n+1): tp = kf.read('Properties', 'Type({})'.format(i)).strip() stp = kf.read('Properties', 'Subtype({})'.format(i)).strip() val = kf.read('Properties', 'Value({})'.format(i)) key = stp if stp.endswith(tp) else ('{} {}'.format(stp, tp) if stp else tp) ret[key] = val else: skeleton = kf.get_skeleton() for variable in skeleton['Properties']: ret[variable] = kf.read('Properties', variable) return ret return self._process_engine_results(properties, engine)
[docs] def get_energy(self, unit='au', engine=None): """Return final energy, expressed in *unit*. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return self._process_engine_results(lambda x: x.read('AMSResults', 'Energy'), engine) * Units.conversion_ratio('au', unit)
# try: # return self._process_engine_results(lambda x: x.read('AMSResults', 'Energy'), engine) * Units.conversion_ratio('au', unit) # except FileError as original_error: # try: # return min(self.readrkf('PESScan', 'PES')) * Units.conversion_ratio('au', unit) # except: # raise original_error
[docs] def get_gradients(self, energy_unit='au', dist_unit='au', engine=None): """Return the gradients of the final energy, expressed in *energy_unit* / *dist_unit*. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'Gradients'), engine)).reshape(-1,3) * Units.conversion_ratio('au', energy_unit) / Units.conversion_ratio('au', dist_unit)
[docs] def get_stresstensor(self, engine=None): """Return the final stress tensor, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'StressTensor'), engine)).reshape(len(self.get_input_molecule().lattice),-1)
[docs] def get_hessian(self, engine=None): """Return the Hessian matrix, i.e. the second derivative of the total energy with respect to the nuclear coordinates, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'Hessian'), engine)).reshape(3*len(self.get_input_molecule()),-1)
[docs] def get_elastictensor(self, engine=None): """Return the elastic tensor, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ et_flat = np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'ElasticTensor'), engine)) num_latvec = len(self.get_input_molecule().lattice) if num_latvec == 1: return et_flat.reshape(1,1) elif num_latvec == 2: return et_flat.reshape(3,3) else: return et_flat.reshape(6,6)
[docs] def get_frequencies(self, unit='cm^-1', engine=None): """Return a numpy array of vibrational frequencies, expressed in *unit*. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ freqs = self._process_engine_results(lambda x: x.read('Vibrations', 'Frequencies[cm-1]'), engine) freqs = np.array(freqs) if isinstance(freqs,list) else np.array([freqs]) return freqs * Units.conversion_ratio('cm^-1', unit)
[docs] def get_charges(self, engine=None): """Return the atomic charges, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'Charges'), engine))
[docs] def get_dipolemoment(self, engine=None): """Return the electric dipole moment, expressed in atomic units. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'DipoleMoment'), engine))
[docs] def get_dipolegradients(self, engine=None): """Return the nuclear gradients of the electric dipole moment, expressed in atomic units. This is a (3*numAtoms x 3) matrix. """ return np.asarray(self._process_engine_results(lambda x: x.read('AMSResults', 'DipoleGradients'), engine)).reshape(-1,3)
[docs] def get_timings(self): """Return a dictionary with timing statistics of the job execution. Returned dictionary contains keys cpu, system and elapsed. The values are corresponding timings, expressed in seconds. """ ret = {} try: # new AMS versions store timings on ams.rkf ret['elapsed'] = self.readrkf('General', 'ElapsedTime') ret['system'] = self.readrkf('General', 'SysTime') ret['cpu'] = self.readrkf('General', 'CPUTime') except: # fall back to reading output, was needed for old AMS versions cpu = self.grep_output('Total cpu time:') system = self.grep_output('Total system time:') elapsed = self.grep_output('Total elapsed time:') ret['elapsed'] = float(elapsed[0].split()[-1]) ret['system'] = float(system[0].split()[-1]) ret['cpu'] = float(cpu[0].split()[-1]) return ret
def get_poissonratio(self, engine=None): return self._process_engine_results(lambda x: x.read('AMSResults', 'PoissonRatio'), engine) def get_youngmodulus(self, unit='au', engine=None): return self._process_engine_results(lambda x: x.read('AMSResults', 'YoungModulus'), engine) * Units.conversion_ratio('au', unit) def get_shearmodulus(self, unit='au', engine=None): return self._process_engine_results(lambda x: x.read('AMSResults', 'ShearModulus'), engine) * Units.conversion_ratio('au', unit) def get_bulkmodulus(self, unit='au', engine=None): return self._process_engine_results(lambda x: x.read('AMSResults', 'BulkModulus'), engine) * Units.conversion_ratio('au', unit)
[docs] def get_pesscan_results(self, molecules=True): """ For PESScan jobs, this functions extracts information about the scan coordinates and energies. molecules : bool Whether to return a Molecule at each PES point. Returns a dictionary, with the following keys: 'RaveledScanCoords': a list of str with the names of the scan coordinates: ['sc_1_1','sc_1_2','sc_2_1','sc_2_2',...] 'nRaveledScanCoords': the length of the previous list 'RaveledUnits': a list of str with the units: ['bohr', 'bohr', 'radian', 'bohr', ...] 'RaveledPESCoords': a nested list with the values of the scan coordinates: [[val_1_1_1,val_1_1_2,...],[val_1_2_1,val_1_2_2,...],[val_2_1_1,val_2_1_2,...],[val_2_2_1,val_2_2_2]] 'ScanCoords': a nested list: [['sc_1_1','sc_1_2'],['sc_2_1','sc_2_2'],...] 'nScanCoords': length of the previous list 'Units': a nested list with the units: [['bohr', 'bohr'],['radian', 'bohr'], ...] 'OrigScanCoords': a list of str in the newline-separated format stored on the .rkf file: ['sc_1_1\\nsc_1_2','sc_2_1\\nsc_2_2',...] 'nPESPoints': int, number of PES points 'PES': list of float, the energies at each PES point 'Converged': list of bool, whether the geometry optimization at each PES point converged 'Molecules': list of Molecule, the structure at each PES point. Only if the property molecules == True 'HistoryIndices': list of int, the indices (1-based) in the History section which correspond to the Molecules and PES. 'Properties': list of dict. The dictionary keys are what can be found on the AMSResults section of the engine .rkf file. These will only be populated if "CalcPropertiesAtPESPoints" is set to Yes when running the PES scan. """ def tolist(x): if isinstance(x, list): return x else: return [x] nScanCoord = self.readrkf('PESScan', 'nScanCoord') nPoints = tolist(self.get_history_property('nPoints', 'PESScan')) pes = tolist(self.readrkf('PESScan', 'PES')) origscancoords = tolist(self.get_history_property('ScanCoord', history_section='PESScan')) # one scan coordinate may have several variables scancoords = [x.split('\n') for x in origscancoords] units = [] pescoords = tolist(self.readrkf('PESScan', 'PESCoords')) pescoords = np.array(pescoords).reshape(-1,sum(len(x) for x in scancoords)) pescoords = np.transpose(pescoords) w = [] units = [] count = 0 for i in range(nScanCoord): units.append([]) for j in range(len(scancoords[i])): if scancoords[i][j] in ['a', 'b', 'c'] or 'Dist' in scancoords[i][j]: units[-1].append('bohr') elif 'Volume' in scancoords[i][j]: units[-1].append('bohr^3') elif 'Area' in scancoords[i][j]: units[-1].append('bohr^2') else: units[-1].append('radian') converged = tolist(self.readrkf('PESScan', 'GOConverged')) converged = [bool(x) for x in converged] historyindices = tolist(self.readrkf('PESScan', 'HistoryIndices')) ret = {} raveled_scancoords = [x[i] for x in scancoords for i in range(len(x))] # 1d list raveled_units = [x[i] for x in units for i in range(len(x))] # 1d list ret['RaveledScanCoords'] = raveled_scancoords ret['nRaveledScanCoords'] = len(raveled_scancoords) ret['ScanCoords'] = scancoords ret['nScanCoords'] = len(scancoords) ret['OrigScanCoords'] = origscancoords #newline delimiter for joint scan coordinates ret['RaveledPESCoords'] = pescoords.tolist() ret['Units'] = units ret['RaveledUnits'] = raveled_units ret['Converged'] = converged ret['PES'] = pes ret['nPESPoints'] = len(pes) ret['HistoryIndices'] = historyindices if molecules: mols = [] for ind in historyindices: mols.append(self.get_history_molecule(ind)) ret['Molecules'] = mols ret['Properties'] = [] for key in natsorted(title for title in self.rkfs.keys() if title.startswith('PESPoint')): amsresults = self.rkfs[key].read_section("AMSResults") ret['Properties'].append(amsresults) return ret
[docs] def get_neb_results(self, molecules=True, unit='au'): """ Returns a dictionary with results from a NEB calculation. molecules : bool Whether to include the 'Molecules' key in the return result unit : str Energy unit for the Energies, LeftBarrier, RightBarrier, and ReactionEnergy Returns: dict 'nImages': number of images (excluding end points) 'nIterations': number of iterations 'Energies': list of energies (including end points) 'Climbing': bool, whether climbing image NEB was used 'LeftBarrier': float, left reaction barrier 'RightBarrier': float, right reaction barrier 'ReactionEnergy': float, reaction energy 'HistoryIndices': list of int, same length as 'Energies', contains indices in the History section 'Molecules': list of Molecule (including end points) """ def tolist(x): if isinstance(x, list): return x else: return [x] ret = {} conversion_ratio = Units.conversion_ratio('au', unit) ret['nImages'] = self.readrkf('NEB', 'nebImages') ret['nIterations'] = self.readrkf('NEB', 'nebIterations') ret['Climbing'] = bool(self.readrkf('NEB', 'climbing')) ret['HighestIndex'] = self.readrkf('NEB', 'highestIndex') ret['LeftBarrier'] = self.readrkf('NEB', 'LeftBarrier') * conversion_ratio ret['RightBarrier'] = self.readrkf('NEB', 'RightBarrier') * conversion_ratio ret['ReactionEnergy'] = self.readrkf('NEB', 'ReactionEnergy') * conversion_ratio history_dim = tolist(self.readrkf('NEB', 'historyIndex@dim')) # nimages, randombign history_dim.reverse() # randombign, nimages history_indices_matrix = tolist(self.readrkf('NEB', 'historyIndex')) # this matrix is padded with -1 values history_indices_matrix = np.array(history_indices_matrix).reshape(history_dim) history_indices = np.max(history_indices_matrix, axis=0, keepdims=False).tolist() if any(x == -1 for x in history_indices): raise ValueError("Found -1 in the 'converged' part of historyIndex. This should not happen!") ret['HistoryIndices'] = history_indices ret['Energies'] = [self.get_property_at_step(ind, 'Energy')*conversion_ratio for ind in ret['HistoryIndices']] if molecules: ret['Molecules'] = [self.get_history_molecule(ind) for ind in ret['HistoryIndices']] return ret
[docs] def recreate_molecule(self): """Recreate the input molecule for the corresponding job based on files present in the job folder. This method is used by |load_external|. If ``ams.rkf`` is present in the job folder, extract data from the ``InputMolecule`` section. """ if 'ams' in self.rkfs: return self.get_input_molecule() return None
[docs] def recreate_settings(self): """Recreate the input |Settings| instance for the corresponding job based on files present in the job folder. This method is used by |load_external|. If ``ams.rkf`` is present in the job folder, extract user input and parse it back to a |Settings| instance using ``scm.input_parser`` module. Remove the ``system`` branch from that instance. """ if 'ams' in self.rkfs: user_input = self.readrkf('General', 'user input') try: from scm.input_parser import InputParser with InputParser() as parser: inp = parser.to_settings('ams', user_input) except: log('Failed to recreate input settings from {}'.format(self.rkfs['ams'].path, 5)) return None s = Settings() s.input = inp if 'system' in s.input.ams: del s.input.ams.system s.soft_update(config.job) return s return None
[docs] def ok(self): """Check if the execution of the associated :attr:`job` was successful or not. See :meth:`Job.ok<scm.plams.core.basejob.Job.ok>` for more information.""" return self.job.ok()
[docs] def get_errormsg(self): """Tries to get an an error message for a associated :attr:`job`. This method returns ``None`` if the associated job was successful. See :meth:`Job.get_errormsg<scm.plams.core.basejob.Job.errormsg>` for more information.""" return self.job.get_errormsg()
@property def name(self): """Retrun the :attr:`job.name` of the job associated with this results instance.""" return self.job.name class EnergyLandscape: class State: def __init__(self, landscape, engfile, energy, mol, count, isTS, reactantsID=None, productsID=None, prefactorsFromReactant=None, prefactorsFromProduct=None): self._landscape = landscape self.engfile = engfile self.energy = energy self.molecule = mol self.count = count self.isTS = isTS self.reactantsID = reactantsID self.productsID = productsID self.prefactorsFromReactant = prefactorsFromReactant self.prefactorsFromProduct = prefactorsFromProduct @property def id(self): return self._landscape._states.index(self)+1 @property def reactants(self): return self._landscape._states[self.reactantsID-1] @property def products(self): return self._landscape._states[self.productsID-1] def __str__(self): if self.isTS: lines = [f"State {self.id}: {self.molecule.get_formula(False)} transition state @ {self.energy:.8f} Hartree (found {self.count} times, results on {self.engfile})"] if self.reactantsID is not None: lines += [f" +- Reactants: {self.reactants}"] if self.productsID is not None: lines += [f" Products: {self.products}"] if self.reactantsID is not None: lines += [f" Prefactors: {self.prefactorsFromReactant:.3E}:{self.prefactorsFromProduct:.3E}"] else: lines = [f"State {self.id}: {self.molecule.get_formula(False)} local minimum @ {self.energy:.8f} Hartree (found {self.count} times, results on {self.engfile})"] return "\n".join(lines) class Fragment: def __init__(self, landscape, engfile, energy, mol): self._landscape = landscape self.engfile = engfile self.energy = energy self.molecule = mol @property def id(self): return self._landscape._fragments.index(self)+1 def __str__(self): lines = [f"Fragment {self.id}: {self.molecule.get_formula(False)} local minimum @ {self.energy:.8f} Hartree (results on {self.engfile})"] return "\n".join(lines) class FragmentedState: def __init__(self, landscape, energy, composition, connections=None, adsorptionPrefactors=None, desorptionPrefactors=None): self._landscape = landscape self.energy = energy self.composition = composition self.connections = connections self.adsorptionPrefactors = adsorptionPrefactors self.desorptionPrefactors = desorptionPrefactors @property def id(self): return self._landscape._fstates.index(self)+1 @property def fragments(self): return [ self._landscape._fragments[i] for i in self.composition ] def __str__(self): formula = "" for i,id in enumerate(self.composition): formula += self._landscape._fragments[id].molecule.get_formula(False) if( i != len(self.composition)-1 ): formula += "+" lines = [f"FragmentedState {self.id}: {formula} local minimum @ {self.energy:.8f} Hartree (fragments {[i+1 for i in self.composition]})"] for i,iState in enumerate(self.connections): lines += [f" +- {self._landscape._states[iState]}"] if( i == len(self.connections)-1 ): lines += [f" Prefactors: {self.adsorptionPrefactors[iState]:.3E}:{self.desorptionPrefactors[iState]:.3E}"] else: lines += [f" | Prefactors: {self.adsorptionPrefactors[iState]:.3E}:{self.desorptionPrefactors[iState]:.3E}"] return "\n".join(lines) def __init__(self, results): self._states = [] self._fragments = [] self._fstates = [] sec = results.read_rkf_section("EnergyLandscape") # If there is only 1 state in the 'EnergyLandscape' section, some variables that are normally lists are insted be build-in types (e.g. a 'float' instead of a 'list of floats'). # For convenience here we make sure that the following variables are always 'lists': for var in ['energies', 'counts', 'isTS', 'reactants', 'products']: if not isinstance(sec[var], list): sec[var] = [sec[var]] nStates = sec['nStates'] for iState in range(nStates): energy = sec['energies'][iState] resfile = os.path.splitext(sec['fileNames'].split('\0')[iState])[0] mol = results.get_molecule('Molecule',file=resfile) count = sec['counts'][iState] if not sec['isTS'][iState]: self._states.append(AMSResults.EnergyLandscape.State(self, resfile, energy, mol, count, False)) else: reactantsID = sec['reactants'][iState] if sec['reactants'][iState] > 0 else None productsID = sec['products'][iState] if sec['products'][iState] > 0 else None prefactorsFromReactant = sec['prefactorsFromReactant'][iState] if sec['products'][iState] > 0 else None prefactorsFromProduct = sec['prefactorsFromProduct'][iState] if sec['products'][iState] > 0 else None self._states.append(AMSResults.EnergyLandscape.State(self, resfile, energy, mol, count, True, reactantsID, productsID, prefactorsFromReactant, prefactorsFromProduct)) if( 'nFragments' not in sec ): return nFragments = sec['nFragments'] for iFragment in range(nFragments): energy = sec['fragmentsEnergies'][iFragment] resfile = os.path.splitext(sec['fragmentsFileNames'].split('\0')[iFragment])[0] mol = results.get_molecule('Molecule',file=resfile) self._fragments.append(AMSResults.EnergyLandscape.Fragment(self, resfile, energy, mol)) if( 'nFStates' not in sec ): return nFragmentedStates = sec['nFStates'] for iFState in range(nFragmentedStates): iEnergy = sec['fStatesEnergy('+str(iFState+1)+')'] iNFragments = sec['fStatesNFragments('+str(iFState+1)+')'] iComposition = [ i-1 for i in sec['fStatesComposition('+str(iFState+1)+')'] ] iNConnections = sec['fStatesNConnections('+str(iFState+1)+')'] iConnections = [ i-1 for i in sec['fStatesConnections('+str(iFState+1)+')'] ] iAdsorptionPrefactors = sec['fStatesAdsorptionPrefactors('+str(iFState+1)+')'] iDesorptionPrefactors = sec['fStatesDesorptionPrefactors('+str(iFState+1)+')'] self._fstates.append(AMSResults.EnergyLandscape.FragmentedState(self, iEnergy, iComposition, iConnections, iAdsorptionPrefactors, iDesorptionPrefactors)) @property def minima(self): return [s for s in self._states if not s.isTS ] @property def transition_states(self): return [s for s in self._states if s.isTS] @property def fragments(self): return [f for f in self._fragments] @property def fragmented_states(self): return [fs for fs in self._fstates] def __str__(self): lines = [ 'All stationary points:' ] lines += ['======================' ] for s in self._states: lines += [ str(s) ] for f in self._fragments: lines += [ str(f) ] for fs in self._fstates: lines += [ str(fs) ] return "\n".join(lines) def __getitem__(self, i): return self._states[i-1] def __iter__(self): return iter(self._states) def __len__(self): return len(self._states)
[docs] def get_energy_landscape(self): """Returns the energy landscape obtained from a PESExploration job run by the AMS driver. The energy landscape is a set of stationary PES points (local minima and transition states). The returned object is of the type ``AMSResults.EnergyLandscape`` and offers convenient access to the states' energies, geometries as well as the information on which transition states connect which minima. .. code-block:: python el = results.get_energy_landscape() print(el) for state in el: print(f"Energy = {state.energy}") print(f"Is transition state = {state.isTS}") print("Geometry:", state.molecule) if state.isTS: print(f"Forward barrier: {state.energy - state.reactants.energy}") print(f"Backward barrier: {state.energy - state.products.energy}") """ return AMSResults.EnergyLandscape(self)
#========================================================================= def _access_rkf(self, func, file='ams'): """A skeleton method for accessing any of the ``.rkf`` files produced by AMS. The *file* argument should be the identifier of the file to read. It defaults to ``'ams'``. To access a file called ``something.rkf`` you need to call this function with ``file='something'``. If there exists only one engine results ``.rkf`` file, you can call this function with ``file='engine'`` to access this file. The *func* argument has to be a function to call on a chosen ``.rkf`` file. It should take one argument, an instance of |KFFile|. """ #Try unique engine: if file == 'engine': names = self.engine_names() if len(names) == 1: return func(self.rkfs[names[0]]) else: raise ValueError("You cannot use 'engine' as 'file' argument if the engine results file is not unique. Please use the real name of the file you wish to read") #Try: if file in self.rkfs: return func(self.rkfs[file]) #Try harder: filename = file + '.rkf' self.refresh() if filename in self.files: self.rkfs[file] = KFFile(opj(self.job.path, filename)) return func(self.rkfs[file]) #Surrender: raise FileError('File {} not present in {}'.format(filename, self.job.path)) def _process_engine_results(self, func, engine=None): """A generic method skeleton for processing any engine results ``.rkf`` file. *func* should be a function that takes one argument (an instance of |KFFile|) and returns arbitrary data. The *engine* argument should be the identifier of the file you wish to read. To access a file called ``something.rkf`` you need to call this function with ``engine='something'``. The *engine* argument can be omitted if there's only one engine results file in the job folder. """ names = self.engine_names() if engine is not None: if engine in names: return func(self.rkfs[engine]) else: raise FileError('File {}.rkf not present in {}'.format(engine, self.job.path)) else: if len(names) == 1: return func(self.rkfs[names[0]]) elif len(names) == 0: raise FileError('There is no engine .rkf present in {}'.format(self.job.path)) else: raise ValueError("You need to specify the 'engine' argument when there are multiple engine result files present in the job folder")
#=========================================================================== #=========================================================================== #===========================================================================
[docs]class AMSJob(SingleJob): """A class representing a single computation with AMS driver. The corresponding results type is |AMSResults|. """ _result_type = AMSResults _command = 'ams'
[docs] def run(self, jobrunner=None, jobmanager=None, watch=False, **kwargs): """Run the job using *jobmanager* and *jobrunner* (or defaults, if ``None``). If *watch* is set to ``True``, the contents of the AMS driver logfile will be forwarded line by line to the PLAMS logfile (and stdout), allowing for an easier monitoring of the running job. Not that the forwarding of the AMS driver logfile will cause make the call to this method block until the job's execution has finished, even when using a parallel |JobRunner|. Other keyword arguments (*\*\*kwargs*) are stored in ``run`` branch of job's settings. Returned value is the |AMSResults| instance associated with this job. """ if _has_watchdog and watch: if 'default_jobmanager' in config: jobmanager = config.default_jobmanager else: raise PlamsError('No default jobmanager found. This probably means that PLAMS init() was not called.') observer = Observer() event_handler = AMSJobLogTailHandler(self, jobmanager) observer.schedule(event_handler, jobmanager.workdir, recursive=True) observer.start() try: results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs) results.wait() finally: observer.stop() observer.join() else: results = super().run(jobrunner=jobrunner, jobmanager=jobmanager, **kwargs) return results
[docs] def get_input(self): """Generate the input file. This method is just a wrapper around :meth:`_serialize_input`. Each instance of |AMSJob| or |AMSResults| present as a value in ``settings.input`` branch is replaced with an absolute path to ``ams.rkf`` file of that job. If you need to use a path to some engine specific ``.rkf`` file rather than the main ``ams.rkf`` file, you can to it by supplying a tuple ``(x, name)`` where ``x`` is an instance of |AMSJob| or |AMSResults| and ``name`` is a string with the name of the ``.rkf`` file you want. For example, ``(myjob, 'dftb')`` will transform to the absolute path to ``dftb.rkf`` file in ``myjob``'s folder, if such a file is present. Instances of |KFFile| are replaced with absolute paths to corresponding files. """ special = { AMSJob: lambda x: x.results.rkfpath(), AMSResults: lambda x: x.rkfpath(), KFFile: lambda x: x.path, tuple: lambda x: AMSJob._tuple2rkf(x) } return self._serialize_input(special)
[docs] def get_runscript(self): """Generate the runscript. Returned string is of the form:: unset AMS_SWITCH_LOGFILE_AND_STDOUT AMS_JOBNAME=jobname AMS_RESULTSDIR=. $AMSBIN/ams [-n nproc] <jobname.in [>jobname.out] ``-n`` flag is added if ``settings.runscript.nproc`` exists. ``[>jobname.out]`` is used based on ``settings.runscript.stdout_redirect``. If ``settings.runscript.preamble_lines`` exists, those lines will be added to the runscript verbatim before the execution of AMS. """ ret = 'unset AMS_SWITCH_LOGFILE_AND_STDOUT\n' ret += 'unset SCM_LOGFILE\n' if 'preamble_lines' in self.settings.runscript: for line in self.settings.runscript.preamble_lines: ret += f'{line}\n' ret += 'AMS_JOBNAME="{}" AMS_RESULTSDIR=. $AMSBIN/ams'.format(self.name) if 'nproc' in self.settings.runscript: ret += ' -n {}'.format(self.settings.runscript.nproc) ret += ' <"{}"'.format(self._filename('inp')) if self.settings.runscript.stdout_redirect: ret += ' >"{}"'.format(self._filename('out')) ret += '\n\n' return AMSJob._slurm_env(self.settings) + ret
[docs] def check(self): """Check if ``termination status`` variable from ``General`` section of main KF file equals ``NORMAL TERMINATION``.""" try: status = self.results.readrkf('General', 'termination status') except: return False if 'NORMAL TERMINATION' in status: if 'errors' in status: log('Job {} reported errors. Please check the the output'.format(self._full_name()), 1) return False if 'warnings' in status: log('Job {} reported warnings. Please check the the output'.format(self._full_name()), 1) return True return False
[docs] def get_errormsg(self): """Tries to get an error message for a failed job. This method returns ``None`` for successful jobs.""" if self.ok(): return None else: # Something went wrong. The first place to check is the termination status on the ams.rkf. # If the AMS driver stopped with a known error (called StopIt in the Fortran code), the error will be in there. try: msg = self.results.readrkf('General','termination status') if msg == 'NORMAL TERMINATION with errors' or msg is None: # Apparently this wasn't a hard stop in the middle of the job. # Let's look for the last error in the logfile ... msg = self.results.grep_file('ams.log', 'ERROR: ')[-1].partition('ERROR: ')[2] elif msg == 'IN PROGRESS' and '$JN.err' in self.results: # If the status is still "IN PROGRESS", that probably means AMS was shut down hard from the outside. # E.g. it got SIGKILL from the scheduler for exceeding some resource limit. # In this case useful information may be found on stderr. with open(self.results['$JN.err'], 'r') as err: errlines = err.read().splitlines() for el in reversed(errlines): if el != '' and not el.isspace(): msg = "Killed while IN PROGRESS: " + el break except: msg = 'Could not determine error message. Please check the output manually.' return msg
[docs] def hash_input(self): """Calculate the hash of the input file. All instances of |AMSJob| or |AMSResults| present as values in ``settings.input`` branch are replaced with hashes of corresponding job's inputs. Instances of |KFFile| are replaced with absolute paths to corresponding files. """ special = { AMSJob: lambda x: x.hash_input(), AMSResults: lambda x: x.job.hash_input(), KFFile: lambda x: x.path, tuple: lambda x: AMSJob._tuple2rkf(x) } return sha256(self._serialize_input(special))
#========================================================================= def _serialize_input(self, special): """Transform the contents of ``settings.input`` branch into string with blocks, keys and values. First, the contents of ``settings.input`` are extended with entries returned by :meth:`_serialize_molecule`. Then the contents of ``settings.input.ams`` are used to generate AMS text input. Finally, every other (than ``ams``) entry in ``settings.input`` is used to generate engine specific input. Special values can be indicated with *special* argument, which should be a dictionary having types of objects as keys and functions translating these types to strings as values. """ def unspec(value): """Check if *value* is one of a special types and convert it to string if it is.""" for spec_type in special: if isinstance(value, spec_type): return special[spec_type](value) return value def serialize(key, value, indent, end='End'): """Given a *key* and its corresponding *value* from the |Settings| instance produce a snippet of the input file representing this pair. If the value is a nested |Settings| instance, use recursive calls to build the snippet for the entire block. Indent the result with *indent* spaces. """ ret = '' if isinstance(value, Settings): ret += ' '*indent + key if '_h' in value: ret += ' ' + unspec(value['_h']) ret += '\n' i = 1 while ('_'+str(i)) in value: ret += serialize('', value['_'+str(i)], indent+2) i += 1 for el in value: if not el.startswith('_'): if key.lower().startswith('engine') and el.lower() == 'input': ret += serialize(el, value[el], indent+2, 'EndInput') # REB: For the hybrid engine. How todeal with the space in el (Engine DFTB)? Replace by underscore? elif key.lower().startswith('engine') and el.lower().startswith('engine') : engine = ' '.join(el.split('_')) ret += serialize(engine, value[el], indent+2, end='EndEngine') + '\n' else: ret += serialize(el, value[el], indent+2) if key.lower() == 'input': end='endinput' ret += ' '*indent + end+'\n' elif isinstance(value, list): for el in value: ret += serialize(key, el, indent) elif value == '' or value is True: ret += ' '*indent + key + '\n' elif value is False or value is None: pass else: ret += ' '*indent + key + ' ' + str(unspec(value)) + '\n' return ret fullinput = self.settings.input.copy() #prepare contents of 'system' block(s) more_systems = self._serialize_molecule() if more_systems: if 'system' in fullinput.ams: #nonempty system block was already present in input.ams system = fullinput.ams.system system_list = system if isinstance(system, list) else [system] system_list_set = Settings({(s._h if '_h' in s else ''):s for s in system_list}) more_systems_set = Settings({(s._h if '_h' in s else ''):s for s in more_systems}) system_list_set += more_systems_set system_list = list(system_list_set.values()) system = system_list[0] if len(system_list) == 1 else system_list fullinput.ams.system = system else: fullinput.ams.system = more_systems[0] if len(more_systems) == 1 else more_systems txtinp = '' ams = fullinput.find_case('ams') #contents of the 'ams' block (AMS input) go first for item in fullinput[ams]: txtinp += serialize(item, fullinput[ams][item], 0) + '\n' #and then engines for engine in fullinput: if engine != ams: txtinp += serialize('Engine '+engine, fullinput[engine], 0, end='EndEngine') + '\n' return txtinp def _serialize_molecule(self): """Return a list of |Settings| instances containing the information about one or more |Molecule| instances stored in the ``molecule`` attribute. Molecular charge is taken from ``molecule.properties.charge``, if present. Additional, atom-specific information to be put in ``atoms`` block after XYZ coordinates can be supplied with ``atom.properties.suffix``. If the ``molecule`` attribute is a dictionary, the returned list is of the same length as the size of the dictionary. Keys from the dictionary are used as headers of returned ``system`` blocks. """ if self.molecule is None: return Settings() moldict = {} if isinstance(self.molecule, Molecule): moldict = {'':self.molecule} elif isinstance(self.molecule, dict): moldict = self.molecule else: raise JobError("Incorrect 'molecule' attribute of job {}. 'molecule' should be a Molecule, a dictionary or None, and not {}".format(self._full_name(), type(self.molecule))) ret = [] for name, molecule in moldict.items(): newsystem = Settings() if name: newsystem._h = name if len(molecule.lattice) in [1,2] and molecule.align_lattice(): log("The lattice of {} Molecule supplied for job {} did not follow the convention required by AMS. I rotated the whole system for you. You're welcome".format(name if name else 'main', self._full_name()), 3) newsystem.Atoms._1 = [atom.str(symbol=self._atom_symbol(atom), space=18, decimal=10, suffix=(atom.properties.suffix if 'suffix' in atom.properties else '')) for atom in molecule] if molecule.lattice: newsystem.Lattice._1 = ['{:16.10f} {:16.10f} {:16.10f}'.format(*vec) for vec in molecule.lattice] if len(molecule.bonds)>0: lines = ['{} {} {}'.format(molecule.index(b.atom1), molecule.index(b.atom2), b.order) for b in molecule.bonds] # Add bond properties if they are defined newsystem.BondOrders._1 = ['{} {}'.format(text,molecule.bonds[i].properties.suffix) if 'suffix' in molecule.bonds[i].properties else text for i,text in enumerate(lines)] if 'charge' in molecule.properties: newsystem.Charge = molecule.properties.charge if 'regions' in molecule.properties : #newsystem.Region = [Settings({'_h':name, '_1':['%s=%s'%(k,str(v)) for k,v in data.items()]}) for name, data in molecule.properties.regions.items()] newsystem.Region = [Settings({'_h':name, 'Properties':Settings({'_1':[line for line in data]})}) for name, data in molecule.properties.regions.items() if isinstance(data,list)] ret.append(newsystem) return ret #=========================================================================
[docs] def get_task(self): """ Returns the AMS Task from the job's settings. If it does not exist, returns None. """ if isinstance(self.settings, Settings) and 'input' in self.settings and 'ams' in self.settings.input and 'task' in self.settings.input.ams: return self.settings.input.ams.task return None
@staticmethod def _atom_symbol(atom): """Return the atomic symbol of *atom*. Ensure proper formatting for AMSuite input taking into account ``ghost`` and ``name`` entries in ``properties`` of *atom*.""" smb = atom.symbol if atom.atnum > 0 else '' #Dummy atom should have '' instead of 'Xx' if 'ghost' in atom.properties and atom.properties.ghost: smb = ('Gh.'+smb).rstrip('.') if 'name' in atom.properties: smb = (smb+'.'+str(atom.properties.name)).lstrip('.') return smb @staticmethod def _tuple2rkf(arg): """Transform a pair ``(x, name)`` where ``x`` is an instance of |AMSJob| or |AMSResults| and ``name`` is a name of ``.rkf`` file (``ams`` or engine) to an absolute path to that ``.rkf`` file.""" if len(arg) == 2 and isinstance(arg[1], str): if isinstance(arg[0], AMSJob): return arg[0].results.rkfpath(arg[1]) if isinstance(arg[0], AMSResults): return arg[0].rkfpath(arg[1]) return str(arg)
[docs] @classmethod def load_external(cls, path, settings=None, molecule=None, finalize=False, fmt='ams'): """Load an external job from *path*. In this context an "external job" is an execution of some external binary that was not managed by PLAMS, and hence does not have a ``.dill`` file. It can also be used in situations where the execution was started with PLAMS, but the Python process was terminated before the execution finished, resulting in steps 9-12 of :ref:`job-life-cycle` not happening. All the files produced by your computation should be placed in one folder and *path* should be the path to this folder or a file in this folder. The name of the folder is used as a job name. Input, output, error and runscript files, if present, should have names defined in ``_filenames`` class attribute (usually ``[jobname].in``, ``[jobname].out``, ``[jobname].err`` and ``[jobname].run``). It is not required to supply all these files, but in most cases one would like to use at least the output file, in order to use methods like :meth:`~scm.plams.core.results.Results.grep_output` or :meth:`~scm.plams.core.results.Results.get_output_chunk`. If *path* is an instance of an AMSJob, that instance is returned. This method is a class method, so it is called via class object and it returns an instance of that class:: >>> a = AMSJob.load_external(path='some/path/jobname') >>> type(a) scm.plams.interfaces.adfsuite.ams.AMSJob You can supply |Settings| and |Molecule| instances as *settings* and *molecule* parameters, they will end up attached to the returned job instance. If you don't do this, PLAMS will try to recreate them automatically using methods :meth:`~scm.plams.core.results.Results.recreate_settings` and :meth:`~scm.plams.core.results.Results.recreate_molecule` of the corresponding |Results| subclass. If no |Settings| instance is obtained in either way, the defaults from ``config.job`` are copied. You can set the *finalize* parameter to ``True`` if you wish to run the whole :meth:`~Job._finalize` on the newly created job. In that case PLAMS will perform the usual :meth:`~Job.check` to determine the job status (*successful* or *failed*), followed by cleaning of the job folder (|cleaning|), |postrun| and pickling (|pickling|). If *finalize* is ``False``, the status of the returned job is *copied*. fmt : str One of 'ams', 'qe', 'vasp', or 'any'. 'ams': load a finished AMS job 'qe': convert a Quantum ESPRESSO .out file to ams.rkf and qe.rkf and load from those files 'vasp': convert a VASP OUTCAR file to ams.rkf and vasp.rkf and load from those files (see more below) 'any': auto-detect the format. This method can also be used to convert a finished VASP job to an AMSJob. If you supply the path to a folder containing OUTCAR, then a subdirectory will be created in this folder called AMSJob. In the AMSJob subdirectory, two files will be created: ams.rkf and vasp.rkf, that contain some of the results from the VASP calculation. If the AMSJob subdirectory already exists, the existing ams.rkf and vasp.rkf files will be reused. NOTE: the purpose of loading VASP data this way is to let you call for example job.results.get_energy() etc., not to run new VASP calculations! """ if isinstance(path, cls): return path preferred_name = None fmt = fmt.lower() # first check if path is a VASP output # in which case call cls._vasp_to_ams, which will return a NEW path # containing ams.rkf and vasp.rkf (the .rkf files will be created if they do not exist) if os.path.isdir(path): preferred_name = os.path.basename(os.path.abspath(path)) if not os.path.exists(os.path.join(path,'ams.rkf')) \ and os.path.exists(os.path.join(path,'OUTCAR')) \ and (fmt == 'vasp' or fmt == 'any'): path = vasp_output_to_ams(path, overwrite=False) elif os.path.exists(path) and os.path.basename(path) == 'OUTCAR' and (fmt == 'vasp' or fmt == 'any'): preferred_name = os.path.basename(os.path.dirname(os.path.abspath(path))) path = vasp_output_to_ams(os.path.dirname(path), overwrite=False) elif os.path.exists(path): # check if path is a Quantum ESPRESSO .out file # qe_output_to_ams returns a NEW path containng the ams.rkf and qe.rkf files (will be created if # they do not exist) if fmt =='qe' or (fmt == 'any' and not path.endswith('rkf')): try: path = qe_output_to_ams(path, overwrite=False) except: # several types of exceptions can be raised, e.g. StopIteration and UnicodeDecodeError # assume that any exception means that the file was not a QE output file pass if not os.path.isdir(path): if os.path.exists(path): path = os.path.dirname(os.path.abspath(path)) elif os.path.isdir(path+'.results'): path=path+'.results' elif os.path.isdir(path+'results'): path=path+'results' else: raise FileError('Path {} does not exist, cannot load from it.'.format(path)) job = super(AMSJob, cls).load_external( path, settings, molecule, finalize) if preferred_name is not None: job.name = preferred_name if job.name.endswith('.results') and len(job.name) > 8: job.name = job.name[:-8] return job
[docs] @classmethod def from_input(cls, text_input:str, **kwargs) -> 'AMSJob': """ Creates an AMSJob from AMS-style text input. This function requires that the SCM Python package is installed (if not, it will raise an ImportError). text_input : a multi-line string Returns: An AMSJob instance Example:: text = ''' Task GeometryOptimization Engine DFTB Model GFN1-xTB EndEngine ''' job = AMSJob.from_input(text) .. note:: If *molecule* is included in the keyword arguments to this method, the *text_input* may not contain any System blocks. In other words, the molecules to be used either need to come from the *text_input*, or the keyword argument, but not both. If *settings* is included in the keyword arguments to this method, the |Settings| created from the *text_input* will be soft updated with the settings from the keyword argument. In other word, the *text_input* takes precedence over the *settings* keyword argument. """ try: from scm.input_parser import InputParser except ImportError: # Try to load the parser from $AMSHOME/scripting with UpdateSysPath(): from scm.input_parser import InputParser sett = Settings() with InputParser() as parser: sett.input = parser.to_settings(cls._command, text_input) mol = cls.settings_to_mol(sett) if mol: if 'molecule' in kwargs: raise JobError('AMSJob.from_input(): molecule passed in both text_input and as keyword argument') else: mol = kwargs.pop('molecule', None) if 'settings' in kwargs: sett.soft_update(kwargs.pop('settings')) return cls(molecule=mol, settings=sett, **kwargs)
[docs] @classmethod def from_inputfile(cls, filename:str, heredoc_delimit:str='eor', **kwargs) -> 'AMSJob': """Construct an :class:`AMSJob` instance from an AMS inputfile or runfile. If a runscript is provide than this method will attempt to extract the input file based on the heredoc delimiter (see *heredoc_delimit*). """ with open(filename, 'r') as f: inp_file = parse_heredoc(f.read(), heredoc_delimit) return cls.from_input(inp_file, **kwargs)
[docs] @staticmethod def settings_to_mol(s: Settings) -> dict: """Pop the `s.input.ams.system` block from a settings instance and convert it into a dictionary of molecules. The provided settings should be in the same style as the ones produced by the SCM input parser. Dictionary keys are taken from the header of each system block. The existing `s.input.ams.system` block is removed in the process, assuming it was present in the first place. """ def read_mol(settings_block: Settings) -> Molecule: """Retrieve single molecule from a single `s.input.ams.system` block.""" if 'geometryfile' in settings_block: mol = Molecule(settings_block.geometryfile) else: mol = Molecule() for atom in settings_block.atoms._1: # Extract arguments for Atom() symbol, x, y, z, *comment = atom.split(maxsplit=4) kwargs = {} if not comment else {'suffix': comment[0]} coords = float(x), float(y), float(z) try: at = Atom(symbol=symbol, coords=coords, **kwargs) except PTError: # It's either a ghost atom and/or an atom with a custom name if symbol.startswith('Gh.'): # Ghost atom kwargs['ghost'], symbol = symbol.split('.', maxsplit=1) if '.' in symbol: # Atom with a custom name symbol, kwargs['name'] = symbol.split('.', maxsplit=1) at = Atom(symbol=symbol, coords=coords, **kwargs) mol.add_atom(at) # Set the lattice vector if applicable if settings_block.lattice._1: mol.lattice = [tuple(float(j) for j in i.split()) for i in settings_block.lattice._1] # Add bonds for bond in settings_block.bondorders._1: _at1, _at2, _order, *bondprops = bond.split() at1, at2, order = mol[int(_at1)], mol[int(_at2)], float(_order) plams_bond = Bond(at1,at2,order=order) # Add bond properties as string if present (used in ACErxn) if len(bondprops) > 0 : suffix = ' '.join(bondprops) plams_bond.properties.suffix = suffix mol.add_bond(plams_bond) # Set the molecular charge if settings_block.charge: mol.properties.charge = float(settings_block.charge) # Set the region info (used in ACErxn) if settings_block.region : for s_reg in settings_block.region : if not '_h' in s_reg.keys() : raise JobError ('Region block requires a header!') if s_reg.properties : mol.properties.regions[s_reg._h] = s_reg.properties._1 mol.properties.name = str(settings_block._h) return mol # Raises a KeyError if the `system` key is absent with s.suppress_missing(): try: settings_list = s.input.ams.pop('system') except KeyError: # The block s.input.ams.system is absent return None # Create a new dictionary with system headers as keys and molecules as values moldict = {} for settings_block in settings_list: key = str(settings_block._h) if ('_h' in settings_block) else '' # Empty string used as default system name. if key in moldict: raise KeyError(f"Duplicate system headers found in s.input.ams.system: {repr(key)}") moldict[key] = read_mol(settings_block) return moldict
@staticmethod def _slurm_env(settings): """Produce a string with environment variables declaration needed for running AMS on a SLURM managed system. If the key ``slurm`` is present in ``settings.run`` and it's ``True`` the returned string is of the form: ``export SCM_MPIOPTIONS="-n X -N Y\n" ``X`` is taken from ``settings.run.cores`` (if not present, falls back to ``settings.runscript.nproc``). ``Y`` is taken from ``settings.run.nodes`` """ if 'slurm' in settings.run and settings.run.slurm is True: slurmenv = '' if 'cores' in settings.run: slurmenv += f'-n {settings.run.cores} ' elif 'nproc' in settings.runscript: slurmenv += f'-n {settings.runscript.nproc} ' if 'nodes' in settings.run: slurmenv += f'-N {settings.run.nodes} ' if slurmenv: return f'export SCM_MPIOPTIONS="{slurmenv}"\n' return ''