Source code for scm.plams.interfaces.adfsuite.amsanalysis

from ...core.errors import PlamsError
from ...core.errors import FileError
from .scmjob import SCMJob, SCMResults

__all__ = ['AMSAnalysisJob', 'AMSAnalysisResults','convert_to_unicode']

class AMSAnalysisPlot:
    """
    Class representing a plot of 2D or higher

    * ``x``       -- A list of lists containing the values in each of the multiple x-axes
    * ``y``       -- A list containing the values along the y-axis
    * ``y_sigma`` -- A list containing the standard deviation of the values onthe y-axis
    * ``name``    -- The name of the plot

    The most important method is the write method, which returns a string containing all the plot info,
    and can also write a corresponding file if a filename is provided as argument.
    This file can be read by e.g. gnuplot.
    """
    def __init__(self):
        """
        Initiate an instance of the plot class
        """
        self.x = []
        self.x_units = []
        self.x_names = []

        self.y = None
        self.y_units = None
        self.y_name = None
        self.y_sigma = None # stadard deviation for y_values

        self.properties = None
        self.name = None
        self.section = None

    def read_data (self, kf, sec) :
        """
        Read the xy data for a section from the kf file
        """
        # Read all the x-values. There can be multiple axes for ND plots (n=3,4,....)
        sections = kf.get_skeleton()
        xkeys = [k for k in sections[sec] if 'x(' in k and ')-axis' in k]
        xnums = sorted([int(k.split('(')[1].split(')')[0]) for k in xkeys])
        xnums = sorted([xnum for xnum in set(xnums)])
        for i in xnums :
            xkey = 'x(%i)-axis'%(i)
            self.x.append(kf.read(sec, xkey))
            x_name = kf.read(sec, '%s(label)'%(xkey))
            self.x_names.append(convert_to_unicode(x_name))
            self.x_units.append(convert_to_unicode(kf.read(sec, '%s(units)'%(xkey))))

        # Read the y-values
        ykey = 'y-axis'
        y_name = kf.read(sec, '%s(label)'%(ykey))
        self.y = kf.read(sec, ykey)
        self.y_name = convert_to_unicode(y_name)
        self.y_units = convert_to_unicode(kf.read(sec, '%s(units)'%(ykey)))

        self.y_sigma = kf.read(sec, 'sigma')

        self.read_properties(kf, sec)
        self.section = sec.split('(')[0] + '_' + sec.split('(')[1].split(')')[0]
        self.name = self.section

    def read_properties (self, kf, sec) :
        """
        Read properties from the KF file
        """
        counter = 0
        properties = {}
        while(1) :
            counter += 1
            try :
                propname = kf.read(sec, 'Property(%i)'%(counter)).strip()
            except :
                break
            properties[propname] = kf.read(sec, propname)
            if isinstance(properties[propname],str) :
                properties[propname] = properties[propname].strip()
                properties[propname] = convert_to_unicode(properties[propname])

        # Now set the instance variables
        self.properties = properties
        if 'Legend' in properties :
            self.name = properties['Legend']

    def get_dimensions (self) :
        """
        Get the dimensonality of the plot
        """ 
        return len(self.x)

    def write (self, outfilename=None) :
        """
        Print this plot to a text file
        """
        # Place property string
        parts = []
        for propname,prop in self.properties.items() :
            parts.append('%-30s %s\n'%(propname, prop))

        # Place the string with the column names
        x_name = ''
        for xname,xunit in zip(self.x_names,self.x_units) :
            x_str = '%s(%s)'%(xname,xunit)
            x_name += '%30s '%(x_str)
        y_name = '%s(%s)'%(self.y_name,self.y_units)
        parts.append('%s %30s %30s\n'%(x_name,y_name,'sigma'))

        # Determine the number of values per axis
        ndims = len(self.x)
        axis_length = int(len(self.x[0])**(1./ndims))

        # Place the values
        value_lists = self.x + [self.y] + [self.y_sigma]
        for i,values in enumerate(zip(*value_lists)) :
            v_str = ''
            for v in values :
                v_str += '%30.10e '%(v)
            v_str += '\n'
            if (i+1)%axis_length==0 :
                v_str += '\n'
            parts.append(v_str)
        block = ''.join(parts)

        if outfilename is not None :
            outfile = open(outfilename,'w',encoding='utf8')
            outfile.write(block)
            outfile.close()

        return block

    @classmethod
    def from_kf(cls, kf, section, i=1):
        xy = cls()

        # Find the correct section in the KF file
        sections= kf.sections()
        matches = [s for s in sections if s.lower()==section.lower()+'(%i)'%(i)]
        if len(matches) == 0 :
                print ('Sections: ',list(sections))
                raise PlamsError('AMSAnalysisResults.get_xy(section,i): section must be one of the above. You specified "{}"'.format(section))
        sec = matches[0]

        # Get the data
        xy.read_data(kf,sec)
        return xy

class AMSAnalysisResults(SCMResults):
    _kfext = '.kf'
    _rename_map = {'plot.kf':'$JN'+_kfext}

    def get_molecule(self, *args, **kwargs):
        raise PlamsError('AMSAnalysisResults does not support the get_molecule() method.')

    def get_sections(self) :
        """
        Read the sections available to make xy plots
        """
        if not self._kfpresent():
            raise FileError('File {} not present in {}'.format(self.job.name+self.__class__._kfext, self.job.path))
        if self._kf.reader._sections is None :
            self._kf.reader._create_index()
        sections = self._kf.reader._sections.keys()
        return sections

    def get_xy(self, section='', i=1):
        """
        Get the AMSAnalysitPlot object for a specific section of the plot KFFile
        """
        task = self.job.settings.input.Task
        if section == '' :
            section = task

        if not self._kfpresent():
            raise FileError('File {} not present in {}'.format(self.job.name+self.__class__._kfext, self.job.path))
        xy = AMSAnalysisPlot.from_kf(self._kf,section,i)
        return xy

    def get_all_plots (self) :
        """
        Get a list of all the plot objects created by the analysis jobs
        """
        sections = self.get_sections()
        plots = []
        for section in sections :
            if section == 'General': continue
            if 'History' in section: continue
            name_part = section.split('(')[0]
            num_part = int(section.split('(')[1].split(')')[0])
            outfilename = '%s_%i.dat'%(name_part,num_part)
            xy = self.get_xy(name_part,num_part)
            plots.append(xy)
        return plots

    def write_all_plots (self) :
        """
        Write all the plots created by the analysis job to file
        """
        plots = self.get_all_plots()
        for xy in plots :
            xy.write('%s'%(xy.section+'.dat'))

    def get_D(self, i=1):
        """ returns a 2-tuple (D, D_units) from the AutoCorrelation(i) section on the .kf file. """

        # If there are multiple, it will read the first one
        sections = [sec for sec in self.get_sections() if 'Integral' in sec]
        if len(sections) < i : 
            return None,None
        section = sections[i-1]
        plot = self.get_xy(section.split('(')[0],i)
        if not 'DiffusionCoefficient' in plot.properties.keys() :
            return None, None

        D = plot.properties['DiffusionCoefficient']
        D_units = plot.y_units
        return D, D_units


[docs]class AMSAnalysisJob(SCMJob): """A class for analyzing molecular dynamics trajectories using the ``analysis`` program. """ _result_type = AMSAnalysisResults _command = 'analysis' _subblock_end = 'end'
[docs] def __init__(self, **kwargs): SCMJob.__init__(self, **kwargs)
[docs] def _serialize_mol(self): pass
[docs] def _remove_mol(self): pass
[docs] def check(self): try: grep = self.results.grep_file('$JN.err', 'NORMAL TERMINATION') except: return False return len(grep) > 0
def convert_to_unicode (k) : """ Convert a string with ascii symbols representing unicode symbols Example k: 'abc\\u03c9def' """ parts = k.split('\\u') # Collect the hexadecimals symbols = [chr(int(part[:4],16)) for part in parts[1:]] # Now repair the parts parts = parts[:1] + [''.join([s,part[4:]]) for s,part in zip(symbols,parts[1:])] key = ''.join(parts) return key