from typing import Dict, Union, Optional, KeysView, List, Any, Tuple, NoReturn, TYPE_CHECKING, cast
from typing_extensions import LiteralString
if TYPE_CHECKING:
from scm.plams.tools.kftools import KFFile, TRead
from scm.plams.mol.molecule import Atom
import matplotlib.pyplot as plt
from scm.plams.core.errors import FileError, PlamsError
from scm.plams.interfaces.adfsuite.scmjob import SCMJob, SCMResults
from scm.plams.interfaces.adfsuite.inputparser import input_to_settings
from scm.plams.core.settings import Settings
from scm.plams.mol.molecule import Molecule
from scm.plams.core.functions import log, requires_optional_package
__all__ = ["AMSAnalysisJob", "AMSAnalysisResults", "convert_to_unicode"]
try:
from scm.pisa.block import DriverBlock
_has_scm_pisa = True
except ImportError:
_has_scm_pisa = False
class AMSAnalysisPlot:
"""
Class representing a plot of 2D or higher
* ``x`` -- A list of lists containing the values in each of the multiple x-axes
* ``y`` -- A list containing the values along the y-axis
* ``y_sigma`` -- A list containing the standard deviation of the values on the y-axis
* ``name`` -- The name of the plot
The most important method is the write method, which returns a string containing all the plot info,
and can also write a corresponding file if a filename is provided as argument.
This file can be read by e.g. gnuplot.
"""
def __init__(self) -> None:
"""
Initiate an instance of the plot class
"""
self.x: List[List[float]] = []
self.x_units: List[str] = []
self.x_names: List[str] = []
self.y: Optional[List[float]] = None
self.y_units: Optional[str] = None
self.y_name: Optional[str] = None
self.y_sigma: Optional[List[float]] = None
self.properties: Optional[Dict[str, TRead]] = None
self.name: Optional[str] = None
self.section: Optional[str] = None
def read_data(self, kf: "KFFile", sec: str) -> None:
"""
Read the xy data for a section from the kf file
"""
# Read all the x-values. There can be multiple axes for ND plots (n=3,4,....)
sections = kf.get_skeleton()
xkeys = [k for k in sections[sec] if "x(" in k and ")-axis" in k]
xnums = sorted([int(k.split("(")[1].split(")")[0]) for k in xkeys])
xnums = sorted([xnum for xnum in set(xnums)])
for i in xnums:
xkey = f"x({i})-axis"
self.x.append(kf.read_reals(sec, xkey))
x_name = kf.read_string(sec, f"{xkey}(label)")
self.x_names.append(convert_to_unicode(x_name))
self.x_units.append(convert_to_unicode(kf.read_string(sec, f"{xkey}(units)")))
# Read the y-values
ykey = "y-axis"
y_name = kf.read_string(sec, f"{ykey}(label)")
self.y = kf.read_reals(sec, ykey)
self.y_name = convert_to_unicode(y_name)
self.y_units = convert_to_unicode(kf.read_string(sec, f"{ykey}(units)"))
self.y_sigma = kf.read_reals(sec, "sigma")
self.read_properties(kf, sec)
self.section = sec.split("(")[0] + "_" + sec.split("(")[1].split(")")[0]
self.name = self.section
def read_properties(self, kf: "KFFile", sec: str) -> None:
"""
Read properties from the KF file
"""
counter = 0
properties: Dict[str, TRead] = {}
while 1:
counter += 1
try:
propname = kf.read_string(sec, f"Property({counter})").strip()
except:
break
prop = kf.read(sec, propname)
if isinstance(prop, str):
prop = convert_to_unicode(prop.strip())
properties[propname] = prop
# Now set the instance variables
self.properties = properties
if "Legend" in properties:
self.name = cast(str, properties["Legend"])
def get_dimensions(self) -> int:
"""
Get the dimensonality of the plot
"""
return len(self.x)
@requires_optional_package("matplotlib")
def plot(
self,
ax: Optional["plt.Axes"] = None,
include_sigma: bool = True,
xlim: Optional[Tuple[float, float]] = None,
ylim: Optional[Tuple[float, float]] = None,
title: Optional[str] = None,
) -> "plt.Axes":
"""
Plot this data with matplotlib and return the axis.
For 1D data, this plots ``y`` versus ``x`` (with optional error bars from ``y_sigma``).
For 2D data, this shows a colored scatter plot where color corresponds to ``y``.
``include_sigma`` controls whether ``y_sigma`` is shown (if available).
``xlim`` and ``ylim`` can be used to override axis ranges.
``title`` overrides the default title (``self.name``).
"""
import matplotlib.pyplot as plt
def _axis_label(name: Optional[str], unit: Optional[str]) -> str:
if name is None:
return ""
if unit:
return f"{name} ({unit})"
return name
if self.y is None or len(self.x) == 0:
raise PlamsError("AMSAnalysisPlot.plot(): no plot data available.")
if ax is None:
_, ax = plt.subplots()
ndims = self.get_dimensions()
y_label = _axis_label(self.y_name, self.y_units)
has_sigma = include_sigma and self.y_sigma is not None and len(self.y_sigma) == len(self.y)
if ndims == 1:
x_label = _axis_label(self.x_names[0], self.x_units[0])
if has_sigma:
ax.errorbar(self.x[0], self.y, yerr=self.y_sigma, linestyle="-")
else:
ax.plot(self.x[0], self.y, "-")
ax.set_xlabel(x_label)
ax.set_ylabel(y_label)
elif ndims == 2:
x_label = _axis_label(self.x_names[0], self.x_units[0])
y2_label = _axis_label(self.x_names[1], self.x_units[1])
mappable = ax.scatter(self.x[0], self.x[1], c=self.y)
ax.set_xlabel(x_label)
ax.set_ylabel(y2_label)
ax.figure.colorbar(mappable, ax=ax, label=y_label)
else:
raise PlamsError(
f"AMSAnalysisPlot.plot(): plotting dimensionality {ndims} is not supported by matplotlib helper."
)
if title is not None:
ax.set_title(title)
elif self.name is not None:
ax.set_title(self.name)
if xlim is not None:
ax.set_xlim(*xlim)
if ylim is not None:
ax.set_ylim(*ylim)
return ax
def write(self, outfilename: Optional[str] = None) -> LiteralString:
"""
Print this plot to a text file
"""
# Place property string
parts = []
properties = self.properties if self.properties is not None else {}
for propname, prop in properties.items():
parts.append(f"{propname:<30} {prop}\n")
# Place the string with the column names
x_name = ""
for xname, xunit in zip(self.x_names, self.x_units):
x_str = f"{xname}({xunit})"
x_name += f"{x_str:>30} "
y_name = f"{self.y_name}({self.y_units})"
parts.append(f"{x_name} {y_name:>30} {'sigma':>30}\n")
# Determine the number of values per axis
ndims = len(self.x)
axis_length = int(len(self.x[0]) ** (1.0 / ndims))
# Place the values
value_lists = self.x + [self.y] + [self.y_sigma]
for i, values in enumerate(zip(*value_lists)):
v_str = ""
for v in values:
v_str += f"{v:30.10e} "
v_str += "\n"
if (i + 1) % axis_length == 0:
v_str += "\n"
parts.append(v_str)
block = "".join(parts)
if outfilename is not None:
outfile = open(outfilename, "w", encoding="utf8")
outfile.write(block)
outfile.close()
return block
@classmethod
def from_kf(cls, kf: "KFFile", section: str, i: int = 1) -> "AMSAnalysisPlot":
xy = cls()
# Find the correct section in the KF file
sections = kf.sections()
matches = [s for s in sections if s.lower() == f"{section.lower()}({i})"]
if len(matches) == 0:
print("Sections: ", list(sections))
raise PlamsError(
'AMSAnalysisResults.get_xy(section,i): section must be one of the above. You specified "{}"'.format(
section
)
)
sec = matches[0]
# Get the data
xy.read_data(kf, sec)
return xy
class AMSAnalysisResults(SCMResults):
_kfext = ".kf"
_rename_map = {"plot.kf": "$JN" + _kfext}
def get_molecule(self, *args: Any, **kwargs: Any) -> NoReturn: # type: ignore[override]
raise PlamsError("AMSAnalysisResults does not support the get_molecule() method.")
def get_sections(self) -> KeysView[str]:
"""
Read the sections available to make xy plots
"""
if not self._kfpresent():
raise FileError("File {} not present in {}".format(self.job.name + self.__class__._kfext, self.job.path))
if self._kf.reader._sections is None: # type: ignore[union-attr]
self._kf.reader._create_index() # type: ignore[union-attr]
return self._kf.reader._sections.keys() # type: ignore[union-attr]
def get_xy(self, section: str = "", i: int = 1) -> AMSAnalysisPlot:
"""
Get the AMSAnalysisPlot object for a specific section of the plot KFFile
"""
if isinstance(self.job.settings.input, Settings):
task = self.job.settings.input.Task
else:
task = self.job.settings.input.Task.val
if section == "":
section = task
if not self._kfpresent():
raise FileError("File {} not present in {}".format(self.job.name + self.__class__._kfext, self.job.path))
xy = AMSAnalysisPlot.from_kf(self._kf, section, i)
return xy
def get_all_plots(self) -> List[AMSAnalysisPlot]:
"""
Get a list of all the plot objects created by the analysis jobs
"""
sections = self.get_sections()
plots: List[AMSAnalysisPlot] = []
for section in sections:
if section == "General":
continue
if "History" in section:
continue
name_part = section.split("(")[0]
num_part = int(section.split("(")[1].split(")")[0])
xy = self.get_xy(name_part, num_part)
plots.append(xy)
return plots
def write_all_plots(self) -> None:
"""
Write all the plots created by the analysis job to file
"""
plots = self.get_all_plots()
for xy in plots:
xy.write(f"{xy.section}.dat")
def get_D(self, i: int = 1) -> Tuple[Optional[float], Optional[str]]:
"""returns a 2-tuple (D, D_units) from the AutoCorrelation(i) section on the .kf file."""
# If there are multiple, it will read the first one
sections = [sec for sec in self.get_sections() if "Integral" in sec]
if len(sections) < i:
return None, None
section = sections[i - 1]
plot = self.get_xy(section.split("(")[0], i)
if not plot.properties or "Final" not in plot.properties.keys():
return None, None
if "Legend" not in plot.properties.keys() or not isinstance(plot.properties["Legend"], str):
return None, None
if "Diffusion" not in plot.properties["Legend"]:
return None, None
D = cast(float, plot.properties["Final"])
D_units = plot.y_units
return D, D_units
def recreate_settings(self) -> Optional[Settings]:
"""Recreate the input |Settings| instance for the corresponding job based on files present in the job folder. This method is used by |load_external|.
Extract user input from the kf file and parse it back to a |Settings| instance using ``scm.base`` module. Remove the ``system`` branch from that instance.
"""
user_input = self._kf.read_string("General", "user input")
try:
inp = input_to_settings(user_input, program="analysis")
except:
log(
"Failed to recreate input settings from {}".format(
str(self.job.get_path() / (self.job.name + self.__class__._kfext))
)
)
return None
s = Settings()
s.input = inp
return s
def recreate_molecule(self) -> Union[None, Molecule, Dict[str, Molecule]]:
"""Recreate the input molecule(s) for the corresponding job based on files present in the job folder.
This method is used by |load_external|.
It extracts data from the ``InputMolecule`` and ``InputMolecule(*)`` sections.
"""
from scm.plams import AMSJob
if "system" not in self.job.settings.input:
return None
self.job.settings.input.ams.system = self.job.settings.input.system
del self.job.settings.input.system
molecule = AMSJob.settings_to_mol(self.job.settings)
del self.job.settings.input.ams
return molecule
[docs]class AMSAnalysisJob(SCMJob):
"""A class for analyzing molecular dynamics trajectories using the ``analysis`` program."""
results: AMSAnalysisResults
_result_type = AMSAnalysisResults
_command = "analysis"
_subblock_end = "end"
[docs] def __init__(self, **kwargs: Any):
SCMJob.__init__(self, **kwargs)
[docs] def _serialize_mol(self) -> None:
"""
Use the method from AMSJob to move the molecule to the settings object
"""
from scm.plams import AMSJob
systems = AMSJob._serialize_molecule(self) # type: ignore[arg-type]
if len(systems) > 0:
if _has_scm_pisa and isinstance(self.settings.input, DriverBlock):
self.settings.system = systems
else:
self.settings.input.system = systems
[docs] def _remove_mol(self) -> None:
"""
Remove the molecule from the system block again
"""
if hasattr(self.settings.input, "system"):
del self.settings.input.system
elif "system" in self.settings:
del self.settings.system
[docs] @staticmethod
def _atom_suffix(atom: "Atom") -> str:
"""
Return the suffix of an atom.
"""
from scm.plams import AMSJob
return AMSJob._atom_suffix(atom)
[docs] def check(self) -> bool:
try:
grep = self.results.grep_file("$JN.err", "NORMAL TERMINATION")
except:
return False
return len(grep) > 0
def convert_to_unicode(k: str) -> str:
"""
Convert a string with ascii symbols representing unicode symbols
Example k: 'abc\\u03c9def'
"""
parts = k.split("\\u")
# Collect the hexadecimals
symbols = [chr(int(part[:4], 16)) for part in parts[1:]]
# Now repair the parts
parts = parts[:1] + ["".join([s, part[4:]]) for s, part in zip(symbols, parts[1:])]
key = "".join(parts)
return key