Source code for scm.plams.tools.converters

from .units import Units
from ..trajectories.rkffile import RKFTrajectoryFile
from ..core.private import saferun
from .kftools import KFFile
import re
import os
import tempfile

__all__ = ['traj_to_rkf', 'vasp_output_to_ams', 'qe_output_to_ams']

[docs]def traj_to_rkf(trajfile, rkftrajectoryfile): """ Convert ase .traj file to .rkf file. NOTE: The order of atoms (or the number of atoms) cannot change between frames! trajfile : str path to a .traj file rkftrajectoryfile : str path to the output .rkf file (will be created) Returns : 2-tuple (coords, cell) The final coordinates and cell in angstrom """ from ase.io import read, Trajectory import warnings warnings.filterwarnings('ignore', 'Creating an ndarray from ragged nested sequences') traj = Trajectory(trajfile) rkfout = RKFTrajectoryFile(rkftrajectoryfile, mode='wb') rkfout.store_historydata() rkfout.store_mddata() # Store the units properly mdunits = {} mdunits['PotentialEnergy'] = 'hartree' mdunits['KineticEnergy'] = 'hartree' mdunits['TotalEnergy'] = 'hartree' rkfout._set_mdunits(mdunits) energy_converter = Units.convert(1.0, 'eV', 'hartree') gradients_converter = Units.convert(1.0, 'eV/angstrom', 'hartree/bohr') stress_converter = Units.convert(1.0, 'eV/angstrom^3', 'hartree/bohr^3') coords, cell = None, None try: for i,atoms in enumerate(traj): if i == 0: rkfout.set_elements(atoms.get_chemical_symbols()) coords = atoms.get_positions() # angstrom cell = atoms.get_cell() try: gradients = -atoms.get_forces()*gradients_converter except: gradients = None try: stresstensor = atoms.get_stress(voigt=False)*stress_converter except: stresstensor = None mddata = {} try: energy = atoms.get_potential_energy()*energy_converter mddata['PotentialEnergy'] = energy except: energy = None try: kinetic_energy = atoms.get_kinetic_energy()*energy_converter mddata['KineticEnergy'] = kinetic_energy except: kinetic_energy = None if 'PotentialEnergy' in mddata and 'KineticEnergy' in mddata: mddata['TotalEnergy'] = mddata['PotentialEnergy'] + mddata['KineticEnergy'] if len(mddata) == 0: mddata = None # Create a historydata dictionary, to go into the History section historydata = {} if gradients is not None : historydata['Gradients'] = gradients historydata['EngineGradients'] = gradients if stresstensor is not None : historydata['StressTensor'] = stresstensor if len(historydata) == 0 : historydata = {} rkfout.write_next(coords=coords, cell=cell, historydata=historydata, mddata=mddata) finally: rkfout.close() # the below is needed to be able to load the .rkf file with AMSJob.load_external() kf = KFFile(rkftrajectoryfile) kf['EngineResults%nEntries'] = 0 kf['General%user input'] = '\xFF'.join(['Engine External','EndEngine']) if len(traj) == 1: kf['General%task'] = 'singlepoint' elif kinetic_energy is None or kinetic_energy == 0: kf['General%task'] = 'geometryoptimization' else: kf['General%task'] = 'moleculardynamics' return coords, cell
def file_to_traj(outfile, trajfile): """ outfile : str path to existing file (OUTCAR, qe.out, etc.) trajfile : str will be created """ from ase.io import read, write if os.path.exists(trajfile): os.remove(trajfile) atoms = read(outfile, ':') write(trajfile, atoms) if not os.path.exists(trajfile): raise RuntimeError("Couldn't write {}".format(trajfile)) return trajfile def _remove_or_raise(file, overwrite): if os.path.exists(file): if overwrite: os.remove(file) else: raise RuntimeError("{} already exists, specify overwrite=True to overwrite".format(file)) def _write_engine_rkf(kffile, enginefile): kf = KFFile(kffile) enginerkf = KFFile(enginefile) # write engine.rkf # copy General, Molecule, InputMolecule from ams.rkf for sec in ['General','Molecule','InputMolecule']: secdict = kf.read_section(sec) for k, v in secdict.items(): enginerkf[sec+'%'+k] = v nEntries = kf['History%nEntries'] suffix='({})'.format(nEntries) if ('History', 'Energy'+suffix) in kf: enginerkf['AMSResults%Energy'] = kf['History%Energy'+suffix] if ('History', 'Gradients'+suffix) in kf: enginerkf['AMSResults%Gradients'] = kf['History%Gradients'+suffix] if ('History', 'StressTensor'+suffix) in kf: enginerkf['AMSResults%StressTensor'] = kf['History%StressTensor'+suffix] def _postprocess_vasp_amsrkf(kffile, outcar): # add extra info to the kffile kf = KFFile(kffile, autosave=False) try: kf['EngineResults%nEntries'] = 1 kf['EngineResults%Title(1)'] = 'vasp' kf['EngineResults%Description(1)'] = 'Standalone VASP run. Data from {}'.format(os.path.abspath(outcar)) kf['EngineResults%Files(1)'] = 'vasp.rkf' kf['General%user input'] = '!VASP' # read the INCAR incarfile = os.path.join(os.path.dirname(outcar), 'INCAR') userinput = ['!VASP','Engine External', ' Input', ' !INCAR'] if os.path.exists(incarfile): with open(incarfile) as incar: for line in incar: line = line.split('!')[0] line = line.split('#')[0] line = line.strip() if line.lower().startswith('end'): # "End" is reserved to end the block line = '!'+line if len(line) > 0: userinput.append(' '+line) userinput.append(' !EndINCAR') userinput.append(' EndInput') #end of the Free block userinput.append('EndEngine') kf['General%user input'] = '\xFF'.join(userinput) finally: kf.save()
[docs]def vasp_output_to_ams(vasp_folder, wdir=None, overwrite=False, write_engine_rkf=True): """ Converts VASP output (OUTCAR, ...) to AMS output (ams.rkf, vasp.rkf) Returns: a string containing the directory where ams.rkf was written vasp_folder : str path to a directory with an OUTCAR, INCAR, POTCAR etc. files wdir : str or None directory in which to write the ams.rkf and vasp.rkf files If None, a subdirectory "AMSJob" of vasp_folder will be created overwrite : bool if False, first check if wdir already contains ams.rkf and vasp.rkf, in which case do nothing if True, overwrite if exists write_engine_rkf : bool If True, also write vasp.rkf alongside ams.rkf. The vasp.rkf file will only contain an AMSResults section (energy, gradients, stress tensor). It will not contain the DOS or the band structure. """ if not os.path.isdir(vasp_folder): raise ValueError('Directory {} does not exist'.format(vasp_folder)) outcar = os.path.join(vasp_folder, 'OUTCAR') if not os.path.exists(outcar): raise ValueError('File {} does not exist, should be an OUTCAR file.'.format(outcar)) if wdir is None: wdir = os.path.join(os.path.dirname(outcar),'AMSJob') os.makedirs(wdir, exist_ok=True) # exit early if ams.rkf already exists if os.path.exists(os.path.join(wdir, 'ams.rkf')) and not overwrite: return wdir # convert OUTCAR to a .traj file inside wdir trajfile = file_to_traj(outcar, os.path.join(wdir, 'vasp.traj')) # remove the target files first if overwrite kffile = os.path.join(wdir, 'ams.rkf') enginefile = os.path.join(wdir, 'vasp.rkf') _remove_or_raise(kffile, overwrite) _remove_or_raise(enginefile, overwrite) # convert the .traj file to ams.rkf traj_to_rkf(trajfile, kffile) _postprocess_vasp_amsrkf(kffile, outcar) if write_engine_rkf: _write_engine_rkf(kffile, enginefile) if os.path.exists(trajfile): os.remove(trajfile) return wdir
def _postprocess_qe_amsrkf(kffile, qe_outfile): # add extra info to the kffile kf = KFFile(kffile, autosave=False) try: kf['EngineResults%nEntries'] = 1 kf['EngineResults%Title(1)'] = 'qe' kf['EngineResults%Description(1)'] = 'Standalone Quantum ESPRESSO run. Data from {}'.format(os.path.abspath(qe_outfile)) kf['EngineResults%Files(1)'] = 'qe.rkf' userinput = ['!QuantumESPRESSO', 'Engine External', ' Input', ' Unknown Quantum ESPRESSO input', ' EndInput', 'EndEngine'] kf['General%user input'] = '\xFF'.join(userinput) finally: kf.save()
[docs]def qe_output_to_ams(qe_outfile, wdir=None, overwrite=False, write_engine_rkf=True): """ Converts a qe .out file to ams.rkf and qe.rkf. Returns: a string containing the directory where ams.rkf was written If the filename ends in .out, check if a .results directory exists. In that case, place the AMSJob subdirectory in the .results directory. Otherwise, create a new directory called filename.AMSJob qe_outfile : str path to the qe output file """ if not os.path.exists(qe_outfile) or os.path.isdir(qe_outfile): raise FileNotFoundError(qe_outfile) basename = os.path.basename(qe_outfile) basename_no_suffix = basename if basename.endswith('.out'): basename_no_suffix = re.sub('.out$', '', basename) dirname = os.path.abspath(os.path.dirname(qe_outfile)) if wdir is None: if os.path.isdir(os.path.join(dirname, basename_no_suffix+'.results')): wdir = os.path.join(dirname, basename_no_suffix+'.results', 'AMSJob') else: wdir = os.path.join(dirname, basename_no_suffix+'.AMSJob') if os.path.exists(os.path.join(wdir, 'ams.rkf')) and not overwrite: return wdir os.makedirs(wdir, exist_ok=True) # convert to a .traj file inside wdir # first trim the qe_outfile to the first occurrence of "JOB DONE". This is needed because # running standalone QE via the AMS GUI will print multiple jobs into the same output file # i.e. both the geo opt and band structure calculation into the same file, which causes # the ASE qe.out parser to crash with tempfile.NamedTemporaryFile() as tf: with open(qe_outfile, 'r') as instream: for line in instream: tf.write(line.encode()) if 'JOB DONE' in line: break tf.flush() trajfile = file_to_traj(tf.name, os.path.join(wdir, 'out.traj')) # remove the target files first if overwrite kffile = os.path.join(wdir, 'ams.rkf') enginefile = os.path.join(wdir, 'qe.rkf') _remove_or_raise(kffile, overwrite) _remove_or_raise(enginefile, overwrite) # convert the .traj file to ams.rkf traj_to_rkf(trajfile, kffile) _postprocess_qe_amsrkf(kffile, qe_outfile) if write_engine_rkf: # here one could also run $AMSBIN/tokf to get things like the DOS # $AMSBIN/tokf qe qe_outfile enginefile # But that would need to reread the entire trajectory # and one would need to postprocess it with the AMSResults section _write_engine_rkf(kffile, enginefile) if os.path.exists(trajfile): os.remove(trajfile) return wdir