from collections import Counter import re from typing import Any, Optional import pandas as pd def molecule_counts_from_ams_results(results: Any) -> pd.DataFrame: """Build a species-count table from the AMS molecule history. The AMS molecule analysis stored in ``ams.rkf`` assigns an integer type to each molecule in every saved frame. This function maps those type IDs back to their molecular formulas and counts how many times each formula appears in each frame. Parameters ---------- results PLAMS results object for an AMS MD job with molecule history present. Returns ------- pandas.DataFrame Table with one row per saved frame. The ``frame`` column stores the frame index and the remaining columns store species counts. """ n_molecule_types = int(results.readrkf("Molecules", "Num molecules")) names = { i: results.readrkf("Molecules", f"Molecule name {i}") for i in range(1, n_molecule_types + 1) } type_history = results.get_history_property("Mols.Type") rows = [] for frame, mol_types in enumerate(type_history, start=1): counter = Counter(int(x) for x in mol_types) row = {"frame": frame} for mol_type_id, name in names.items(): row[name] = int(counter[mol_type_id]) rows.append(row) return pd.DataFrame(rows).fillna(0) def species_family(species: str) -> str: """Assign a molecular formula to one analysis family. Parameters ---------- species Molecular formula as reported by AMS, for example ``"O2"`` or ``"C3H6O2"``. Returns ------- str Name of the product family used throughout the notebook. """ text = str(species).upper().replace(" ", "") counts = {el: int(n or 1) for el, n in re.findall(r"([A-Z][a-z]?)([0-9]*)", str(species))} carbon = counts.get("C", 0) oxygen = counts.get("O", 0) if text == "O2": return "unreacted_oxygen" if text == "CO2": return "co2" if text in {"CO", "H2O", "H2"}: return "small_neutral" if carbon > 0 and oxygen > 0 and carbon <= 4: return "small_oxygenated" if carbon >= 8: return "parent_or_large_fragment" if carbon > 0 and oxygen > 0: return "medium_oxygenated_fragment" if oxygen > 0: return "oxygenated_fragment" return "hydrocarbon_fragment" def summarize_product_families(counts: pd.DataFrame) -> pd.DataFrame: """Aggregate per-species counts into broader product-family populations. Parameters ---------- counts Species-count table produced from the AMS molecule history. Returns ------- pandas.DataFrame Copy of the frame/time columns together with one column per product family defined by :func:`species_family`. """ value_cols = [c for c in counts.columns if c not in {"frame", "time_ps"}] out = counts[[c for c in counts.columns if c in {"frame", "time_ps"}]].copy() families = [ "unreacted_oxygen", "co2", "small_neutral", "small_oxygenated", "medium_oxygenated_fragment", "oxygenated_fragment", "parent_or_large_fragment", "hydrocarbon_fragment", ] for family in families: members = [c for c in value_cols if species_family(c) == family] out[family] = counts[members].sum(axis=1) if members else 0 return out def add_degradation_metrics(family_summary: pd.DataFrame) -> pd.DataFrame: """Add coarse degradation descriptors to a family-summary table. The main derived quantity is ``scission_proxy``, which measures the increase in fragment-like populations relative to the initial frame. Its cumulative version is used in the notebook as a simple monotonic indicator of fragmentation activity. Parameters ---------- family_summary Table containing the product-family populations as a function of frame or time. Returns ------- pandas.DataFrame Copy of the input table with additional degradation-proxy columns. """ out = family_summary.copy() fragment_cols = [ "small_oxygenated", "medium_oxygenated_fragment", "oxygenated_fragment", "hydrocarbon_fragment", "parent_or_large_fragment", ] total = sum(out[col].astype(float) for col in fragment_cols if col in out) baseline = float(total.iloc[0]) if len(out) else 0.0 out["scission_proxy"] = (total - baseline).clip(lower=0) out["cumulative_scission_proxy"] = out["scission_proxy"].cummax() out["cumulative_co2"] = out["co2"].cummax() if "co2" in out else 0 return out def find_representative_molecule(results: Any, target_formula: str) -> Optional[Any]: """Return one fragment from the trajectory matching a target formula. Parameters ---------- results PLAMS results object for an AMS MD job. target_formula Molecular formula to search for in the separated fragments of each stored frame. Returns ------- optional The first matching molecule fragment found in the trajectory, or ``None`` if the formula never appears. """ n_steps = int(results.readrkf("History", "nEntries")) for step in range(1, n_steps + 1): frame = results.get_history_molecule(step) for fragment in frame.separate(): if fragment.get_formula() == target_formula: return fragment return None def prepare_molecule_for_display(molecule: Any) -> Any: """Prepare a molecule fragment for cleaner notebook visualization. The function reconnects periodic fragments when needed, removes lattice information, centers the molecule, and aligns it with its principal axes so representative-family images are easier to compare. Parameters ---------- molecule Molecule object to prepare for display. Returns ------- Any Copy of the input molecule with visualization-oriented cleanup applied. """ display_molecule = molecule.copy() if display_molecule.lattice: display_molecule.map_atoms_to_bonds() display_molecule.lattice = [] for bond in display_molecule.bonds: if "suffix" in bond.properties: del bond.properties.suffix center = display_molecule.get_center_of_mass() display_molecule.translate(tuple(-value for value in center)) if len(display_molecule) > 1: _, principal_axes = display_molecule.get_moments_of_inertia(eigen_vectors=True) display_molecule.rotate(principal_axes) return display_molecule