Source code for scm.plams.interfaces.thirdparty.dirac

import os

from os.path import join as opj

from ...core.basejob import SingleJob
from ...core.private import saferun
from ...core.results import Results
from ...core.settings import Settings

__all__ = ['DiracJob', 'DiracResults']

[docs]class DiracResults(Results): """A class for result of computation done with DIRAC.""" _rename_map = {'DFCOEF':'$JN.dfcoef', 'GRIDOUT':'$JN.grid', 'dirac.xml':'$JN.xml',}
[docs] def collect(self): """After collecting the files produced by job execution with parent method :meth:`Results.collect<scm.plams.core.results.Results.collect>` append the ``pam`` output to the regular output file. """ Results.collect(self) pamfile = self.job._filename('out') process = saferun(['grep', 'output file', pamfile], cwd=self.job.path) output = process.stdout.decode() diracfile = output.split(':')[-1].strip() if diracfile in self.files: pampath = opj(self.job.path, pamfile) diracpath = opj(self.job.path, diracfile) with open(pampath, 'r') as f: pamoutput = f.readlines() with open(diracpath, 'r') as f: diracoutput = f.readlines() with open(pampath, 'w') as f: f.writelines(diracoutput) f.write('\n\n '+'*'*74+'\n') f.write(' '+'*'*30+' pam output '+'*'*30+'\n') f.write(' '+'*'*74+'\n\n') f.writelines(pamoutput) os.remove(diracpath) self.refresh()
[docs]class DiracJob(SingleJob): """A class representing a single computational job with DIRAC.""" _result_type = DiracResults _top = ['dirac'] _filenames = {'inp':'$JN.inp', 'run':'$', 'out':'$JN.out', 'err': '$JN.err'}
[docs] def __init__(self, **kwargs): SingleJob.__init__(self, **kwargs) self.settings.runscript.pam.noarch = True self.settings.runscript.pam.get = ['DFCOEF', 'GRIDOUT', 'dirac.xml']
[docs] def _get_ready(self): """Before generating runscript and input with parent method :meth:`SingleJob._get_ready<scm.plams.core.basejob.SingleJob._get_ready>` add proper ``mol`` and ``inp`` entries to ``self.settings.runscript.pam``. If already present there, ``mol`` will not be added. """ s = self.settings.runscript.pam if 'mol' not in s: s.mol ='.xyz' with open(opj(self.path,'.xyz'), 'w') as f: f.write(str(len(self.molecule)) + '\n\n') for atom in self.molecule: suffix = 'b={block}' if hasattr(atom,'block') else '' f.write(atom.str(suffix=suffix)+'\n') s.inp = self._filename('inp') SingleJob._get_ready(self)
[docs] def get_input(self): """Transform all contents of ``input`` branch of ``settings`` into string with blocks, subblocks, keys and values. On the highest level alphabetic order of iteration is modified: keys occuring in class attribute ``_top`` are printed first. See :ref:`dirac-input` for details. """ is_empty = lambda x: isinstance(x, Settings) and len(x) == 0 def parse_key(key, value): ret = '.' + key.upper() + '\n' if not (value is True or is_empty(value)): if isinstance(value, list): for i in value: ret += str(i) + '\n' else: ret += str(value) + '\n' return ret def parse_block(block): enabler = '_en' ret = '**' + block.upper() + '\n' s = self.settings.input[block] for k,v in s.items(): if not isinstance(v, Settings) or is_empty(v): ret += parse_key(k, v) for k,v in s.items(): if isinstance(v, Settings) and enabler in v: ret += parse_key(k, v[enabler]) for k,v in s.items(): if isinstance(v, Settings) and len(v) > 0: ret += '*' + k.upper() + '\n' for kk,vv in v.items(): if kk != enabler: ret += parse_key(kk, vv) return ret inp = '' for block in self._top: if block in self.settings.input: inp += parse_block(block) for block in self.settings.input: if block not in self._top: inp += parse_block(block) inp += '*END OF INPUT\n' return inp
[docs] def get_runscript(self): """Generate a runscript. Returned string is a ``pam`` call followed by option flags generated based on ``self.settings.runscript.pam`` contents. See :ref:`dirac-runscript` for details.""" r = self.settings.runscript.pam ret = 'pam' for k,v in r.items(): ret += ' --%s'%k if v is not True: if isinstance(v, list): ret += '="%s"' % ' '.join(v) else: ret += '='+str(v) if self.settings.runscript.stdout_redirect: ret += ' >'+self._filename('out') ret += '\n\n' return ret
[docs] def check(self): """Check if the calculation was successful by examining the last line of ``pam`` output.""" s = self.results.grep_output('exit :')[0] status = s.split(':')[-1].strip() return status == 'normal'