import os
import inspect
import subprocess
from itertools import cycle
import numpy as np
try:
import pandas as pd
PANDAS = True
except ImportError:
PANDAS = False
from .scmjob import (SCMJob, SCMResults)
from ...tools.units import Units
__all__ = ['CRSResults', 'CRSJob']
[docs]class CRSResults(SCMResults):
"""A |SCMResults| subclass for accessing results of |CRSJob|."""
_kfext = '.crskf'
_rename_map = {'CRSKF': '$JN.crskf'}
@property
def section(self) -> str:
try: # Return the cached value if possible
return self._section
except AttributeError:
self._section = self.job.settings.input.property._h.upper()
return self._section
[docs] def get_energy(self, energy_type: str = "deltag", compound_idx: int = 0, unit: str = 'kcal/mol') -> float:
"""Returns the solute solvation energy from an Activity Coefficients calculation."""
E = self.readkf(self.section, energy_type)[compound_idx]
return Units.convert(E, 'kcal/mol', unit)
[docs] def get_activity_coefficient(self, compound_idx: int = 0) -> float:
"""Return the solute activity coefficient from an Activity Coefficients calculation."""
return self.readkf(self.section, 'gamma')[compound_idx]
[docs] def get_sigma_profile(self, subsection: str = 'profil', as_df: bool = False) -> dict:
r"""Grab all sigma profiles, returning a dictionary of Numpy Arrays.
Values of :math:`\sigma` are stored under the ``"σ (e/A**2)"`` key.
Results can be returned as a Pandas DataFrame by settings *as_df* to ``True``.
The returned results can be plotted by passing them to the :meth:`CRSResults.plot` method.
.. note::
*as_df* = ``True`` requires the Pandas_ package.
Plotting requires the `matplotlib <https://matplotlib.org/index.html>`__ package.
.. _Pandas: https://pandas.pydata.org/
"""
args = (subsection, 'σ (e/A**2)', 'chdval')
try:
return self._get_array_dict('SIGMAPROFILE', *args, as_df=as_df)
except KeyError:
return self._get_array_dict('PURESIGMAPROFILE', *args, as_df=as_df)
[docs] def get_sigma_potential(self, subsection: str = 'mu', unit: str = 'kcal/mol',
as_df: bool = False) -> dict:
r"""Grab all sigma profiles, expressed in *unit*, and return a dictionary of Numpy Arrays.
Values of :math:`\sigma` are stored under the ``"σ (e/A**2)"`` key.
Results can be returned as a Pandas DataFrame by settings *as_df* to ``True``.
The returned results can be plotted by passing them to the :meth:`CRSResults.plot` method.
.. note::
*as_df* = ``True`` requires the Pandas_ package.
Plotting requires the `matplotlib <https://matplotlib.org/index.html>`__ package.
.. _Pandas: https://pandas.pydata.org/
"""
args = (subsection, 'σ (e/A**2)', 'chdval')
try:
return self._get_array_dict('SIGMAPOTENTIAL', *args, unit=unit, as_df=as_df)
except KeyError:
return self._get_array_dict('PURESIGMAPOTENTIAL', *args, unit=unit, as_df=as_df)
[docs] def get_prop_names(self, section = None) -> list:
r"""Read the section of the .crskf file and return a list of the properties that were calculated. The section argument can be supplied to look at previously-calculated results. If no section name is supplied, the function defaults to using the most recent property that was calculated.
"""
if section == None:
section = self.section
try:
return self._kf.get_skeleton()[section]
except KeyError:
raise KeyError("Cannot find section name: " + str(section))
[docs] def get_results(self, section = None) -> dict:
r"""Read the section from the most recent calculation type and return the result as a dictionary.
"""
if section == None:
section = self.section
output = getattr(self, '_prop_dict', False)
if output and output["section"] == section:
return output
props = self.get_prop_names()
try:
props.remove("ncomp")
props.remove("nitems")
except ValueError:
raise ValueError("Results object is missing or incomplete.")
# first get the two ranges for the indices
ncomp = self.readkf(section, 'ncomp')
nitems = self.readkf(section, 'nitems')
np_dict = { "section" : section }
for prop in props:
tmp = self.readkf(section,prop)
if prop == "filename":
np_dict[prop] = [str(x).strip() for x in tmp.split()]
continue
if not isinstance(tmp,list):
np_dict[prop] = tmp
else:
np_dict[prop] = np.array(tmp)
if len(tmp) == ncomp*nitems:
np_dict[prop].shape = (ncomp,nitems)
setattr(self, '_prop_dict', np_dict)
return np_dict
[docs] def plot(self, *arrays: np.ndarray, x_axis: str = None, plot_fig: bool = True, x_label = None, y_label = None):
"""Plot, show and return a series of COSMO-RS results as a matplotlib Figure instance.
Accepts the output of, *e.g.*, :meth:`CRSResults.get_sigma_profile`:
A dictionary of Numpy arrays or a Pandas DataFrame.
Returns a matplotlib Figure_ instance which can be further modified to the users liking.
Automatic plotting of the resulting figure can be disabled with the *plot_fig* argument.
.. note::
This method requires the `matplotlib <https://matplotlib.org/index.html>`__ package.
.. note::
The name of the dictionary/DataFrame key containing the index (*i.e.* the x-axis) can,
and should, be manually specified in *x_axis* if a custom *x_axis* is passed
to :meth:`CRSResults._get_array_dict`.
This argument can be ignored otherwise.
.. _Figure: https://matplotlib.org/api/_as_gen/matplotlib.figure.Figure.html#matplotlib.figure.Figure
""" # noqa
def get_x_axis(array, x_axis):
"""Find and return the index and its name."""
if x_axis is None:
return np.arange(array.shape[1])
if isinstance(x_axis, str):
ret = self._prop_dict[x_axis]
else:
ret = np.array(x_axis, copy=False)
ret = ret.ravel() # Flatten it
return ret[:array.shape[1]]
# Check if matplotlib is installed
try:
import matplotlib
matplotlib.use('TkAgg') if plot_fig else matplotlib.use('Agg')
import matplotlib.pyplot as plt
except ImportError:
method = self.__class__.__name__ + '.plot'
raise ImportError("{}: this method requires the 'matplotlib' package".format(method))
self.get_results()
# Create a dictionary of 1d arrays
array_dict = {}
for array in arrays:
name = None
if isinstance(array, str): # Array refers to a section in the kf file
name = array
array = self._prop_dict[array]
# Ensure it's a 2D array
array = np.array(array, ndmin=2, dtype=float, copy=False)
# Fill the array dict with 1d arrays
base_key = '' if name is None else name + ' '
iterator = enumerate(array, 1) if array.shape[0] != 1 else zip(cycle(' '), array)
for i, array_1d in iterator:
key = f'{base_key}{i}'
array_dict[key] = array_1d
# Retrieve the index and its name
index = get_x_axis(array, x_axis)
# print ("INDEX::::", index)
if x_label is None:
if isinstance(x_axis,str):
x_label = x_axis
else:
x_label = ""
if y_label is None:
y_label = ""
# Assign various series to the plot
fig, ax = plt.subplots()
for k, v in array_dict.items():
ax.plot(index, v, label=k)
# Add the legend and x-label
ax.legend()
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
# Show and return
if plot_fig:
plt.show()
return fig
[docs] def _get_array_dict(self, section: str, subsection: str, x_axis: str, index_subsection: str,
unit: str = 'kcal/mol', as_df: bool = False) -> dict:
"""Create dictionary or DataFrame containing all values in *section*/*subsection*.
Takes the following arguments:
* The *section*/*subsection* of the desired quantity.
* The desired name of the index (*x_axis*).
* The name of subsection containing the index (*index_subsection*).
* The *unit* of the output quanty (ignore this keyword if not applicable).
* If the result should be returned as Pandas DataFrame (*as_df*).
"""
ret = self._construct_array_dict(section, subsection, unit)
# Create the index
index = self.readarray(section, index_subsection, dtype=float)
if section in ('BINMIXCOEF', 'COMPOSITIONLINE', 'TERNARYMIX'):
ncomponent = 3 if section == 'TERNARYMIX' else 2
index.shape = ncomponent, len(index) // ncomponent
iterator = np.nditer(index.astype(str), flags=['external_loop'], order='F')
ret[x_axis] = np.array([' / '.join(i for i in item) for item in iterator])
else:
ret[x_axis] = index
# Return a dictionary of arrays or a DataFrame
if not as_df:
return ret
else:
return self._dict_to_df(ret, section, x_axis)
[docs] def _construct_array_dict(self, section: str, subsection: str, unit: str = 'kcal/mol') -> dict:
"""Construct dictionary containing all values in *section*/*subsection*."""
# Use filenames as keys
_filenames = self.readkf(section, 'filename').split()
filenames = [_filenames] if not isinstance(_filenames, list) else _filenames
# Grab the keys and the number of items per key
keys = [os.path.basename(key) for key in filenames] + ['Total']
nitems = self.readkf(section, 'nitems')
# Use sigma profiles/potentials as values
ratio = Units.conversion_ratio('kcal/mol', unit)
values = ratio * self.readarray(section, subsection, dtype=float)
values.shape = len(values) // nitems, nitems
ret = dict(zip(keys, values))
try:
ret['Total'] = self.readarray(section, subsection + 'tot', dtype=float)
except KeyError:
pass
return ret
[docs] @staticmethod
def _dict_to_df(array_dict: dict, section: str, x_axis: str):
"""Attempt to convert a dictionary into a DataFrame."""
if not PANDAS:
method = inspect.stack()[2][3]
raise ImportError("{}: as_df=True requires the 'pandas' package".format(method))
index = pd.Index(array_dict.pop(x_axis), name=x_axis)
df = pd.DataFrame(array_dict, index=index)
df.columns.name = section.lower()
return df
[docs]class CRSJob(SCMJob):
"""A |SCMJob| subclass intended for running COSMO-RS jobs."""
_command = 'crs'
_result_type = CRSResults
_subblock_end = 'end'
[docs] def __init__(self, **kwargs) -> None:
"""Initialize a :class:`CRSJob` instance."""
super().__init__(**kwargs)
self.settings.ignore_molecule = True
[docs] @staticmethod
def cos_to_coskf(filename: str) -> str:
"""Convert a .cos file into a .coskf file with the :code:`$AMSBIN/cosmo2kf` command.
Returns the filename of the new .coskf file.
"""
filename_out = filename + 'kf'
try:
amsbin = os.environ['AMSBIN']
except KeyError:
raise EnvironmentError("cos_to_coskf: Failed to load 'cosmo2kf' from '$AMSBIN/'; "
"the 'AMSBIN' environment variable has not been set")
args = [os.path.join(amsbin, 'cosmo2kf'), filename, filename_out]
subprocess.run(args)
return filename_out