from scm.plams.core.functions import add_to_instance
from scm.plams.core.settings import Settings
from scm.plams.interfaces.adfsuite.ams import AMSJob, AMSResults
import numpy as np
from scm.plams.tools.kftools import KFFile
from scm.plams.tools.units import Units
from typing import List, Dict, Union, Tuple
import scm.plams as plams
__all__ = ["AMSMDJob", "AMSNVEJob", "AMSNVTJob", "AMSNPTJob"]
[docs]class AMSMDJob(AMSJob):
"""
molecule: Molecule
The initial structure
name: str
The name of the job
settings: Settings
Settings for the job. You should normally not populate neither settings.input.ams.MolecularDynamics nor settings.input.ams.Task
velocities: float or AMSJob or str (path/to/ams.rkf) or 2-tuple (path/to/ams.rkf, frame-number)
If float, it is taken as the temperature. If AMSJob or str, the velocities are taken from the EndVelocities section of the corresponding ams.rkf file. If 2-tuple, the first item must be a path to an ams.rkf, and the second item an integer specifying the frame number - the velocities are then read from History%Velocities(framenumber).
timestep: float
Timestep
samplingfreq: int
Sampling frequency
nsteps: int
Length of simulation
**Trajectory options**:
checkpointfrequency: int
How frequently to write MDStep*.rkf checkpoint files
writevelocities : bool
Whether to save velocities to ams.rkf (needed for example to restart from individual frames or to calculate velocity autocorrelation functions)
writebonds: bool
Whether to save bonds to ams.rkf
writemolecules: bool
Whether to write molecules to ams.rkf
writeenginegradients: bool
Whether to save engine gradients (negative of atomic forces) to ams.rkf
**Thermostat (NVT, NPT) options**:
thermostat: str
'Berendsen' or 'NHC'
tau: float
Thermostat time constant (fs)
temperature: float or tuple of floats
Temperature (K). If a tuple/list of floats, the Thermostat.Duration option will be set to evenly divided intervals.
thermostat_region: str
Region for which to apply the thermostat
**Barostat (NPT) options**:
barostat: str
'Berendsen' or 'MTK'
barostat_tau: float
Barostat time constant (fs)
pressure: float
Barostat pressure (pascal)
equal: str
'XYZ' etc.
scale: str
'XYZ' etc.
constantvolume: bool
Constant volume?
**Other options**:
calcpressure: bool
Whether to calculate pressure for each frame.
binlog_time: bool
Whether to log the time at every timestep in the BinLog section on ams.rkf
binlog_pressuretensor: bool
Whether to log the pressure tensor at every timestep in the BinLog section on ams.rkf
"""
default_nsteps = 1000
default_timestep = 0.25
default_samplingfreq = 100
default_thermostat = "NHC"
default_temperature = 300
default_tau_multiplier = 400 # get tau by multiplying the timestep with this number
default_thermostat_region = None
default_barostat = "MTK"
default_pressure = 1e5
default_barostat_tau_multiplier = 4000 # get barostat_tau by multiplying the timestep with this number
default_scale = "XYZ"
default_equal = "None"
default_constantvolume = "False"
default_checkpointfrequency = 1000000
default_writevelocities = "True"
default_writebonds = "True"
default_writecharges = "True"
default_writemolecules = "True"
default_writeenginegradients = "False"
default_calcpressure = "False"
default_binlog_time = "False"
default_binlog_pressuretensor = "False"
default_binlog_dipolemoment = "False"
[docs] def __init__(
self,
velocities=None,
timestep=None,
samplingfreq: int = None,
nsteps: int = None,
checkpointfrequency=None,
writevelocities=None,
writebonds=None,
writemolecules=None,
writecharges=None,
writeenginegradients=None,
calcpressure=None,
molecule: Union[plams.Molecule, AMSJob, AMSResults] = None,
temperature: float = None,
thermostat=None,
tau=None,
thermostat_region=None,
pressure=None,
barostat=None,
barostat_tau=None,
scale=None,
equal=None,
constantvolume=None,
binlog_time=None,
binlog_pressuretensor=None,
binlog_dipolemoment=None,
_enforce_thermostat=False,
_enforce_barostat=False,
**kwargs,
):
""" """
if isinstance(molecule, AMSJob):
molecule = molecule.results.get_main_molecule()
if isinstance(molecule, AMSResults):
molecule = molecule.get_main_molecule()
AMSJob.__init__(self, molecule=molecule, **kwargs)
self.settings.input.ams.Task = "MolecularDynamics"
mdsett = self.settings.input.ams.MolecularDynamics
mdsett.TimeStep = timestep or mdsett.TimeStep or self.default_timestep
mdsett.Trajectory.SamplingFreq = samplingfreq or mdsett.Trajectory.SamplingFreq or self.default_samplingfreq
mdsett.NSteps = nsteps or mdsett.NSteps or self.default_nsteps
mdsett.Trajectory.WriteVelocities = (
str(writevelocities)
if writevelocities is not None
else mdsett.Trajectory.WriteVelocities or self.default_writevelocities
)
mdsett.Trajectory.WriteBonds = (
str(writebonds) if writebonds is not None else mdsett.Trajectory.WriteBonds or self.default_writebonds
)
mdsett.Trajectory.WriteMolecules = (
str(writemolecules)
if writemolecules is not None
else mdsett.Trajectory.WriteMolecules or self.default_writemolecules
)
mdsett.Trajectory.WriteCharges = (
str(writecharges)
if writecharges is not None
else mdsett.Trajectory.WriteCharges or self.default_writecharges
)
mdsett.Trajectory.WriteEngineGradients = (
str(writeenginegradients)
if writeenginegradients is not None
else mdsett.Trajectory.WriteEngineGradients or self.default_writeenginegradients
)
mdsett.CalcPressure = (
str(calcpressure) if calcpressure is not None else mdsett.CalcPressure or self.default_calcpressure
)
mdsett.Checkpoint.Frequency = (
checkpointfrequency or mdsett.Checkpoint.Frequency or self.default_checkpointfrequency
)
mdsett.BinLog.Time = (
str(binlog_time) if binlog_time is not None else mdsett.BinLog.Time or self.default_binlog_time
)
mdsett.BinLog.PressureTensor = (
str(binlog_pressuretensor)
if binlog_pressuretensor is not None
else mdsett.BinLog.PressureTensor or self.default_binlog_pressuretensor
)
mdsett.BinLog.DipoleMoment = (
str(binlog_dipolemoment)
if binlog_dipolemoment is not None
else mdsett.BinLog.DipoleMoment or self.default_binlog_dipolemoment
)
if velocities is None and temperature is not None:
velocities = self._get_initial_temperature(temperature)
self.settings += self._velocities2settings(velocities)
if temperature or thermostat or _enforce_thermostat:
self.settings.update(
self._get_thermostat_settings(
thermostat=thermostat,
temperature=temperature,
tau=tau,
thermostat_region=thermostat_region,
nsteps=int(mdsett.NSteps),
)
)
if pressure or barostat or _enforce_barostat:
self.settings.update(
self._get_barostat_settings(
pressure=pressure,
barostat=barostat,
barostat_tau=barostat_tau,
scale=scale,
equal=equal,
constantvolume=constantvolume,
)
)
def remove_blocks(self, blocks=None):
if blocks:
for block in blocks:
if block in self.settings.input.ams.MolecularDynamics:
del self.settings.input.ams.MolecularDynamics[block]
@staticmethod
def _get_initial_temperature(temperature):
try:
return temperature[0]
except TypeError:
return temperature
@staticmethod
def _velocities2settings(velocities):
s = Settings()
if isinstance(velocities, int) or isinstance(velocities, float) or velocities is None or velocities is False:
s.input.ams.MolecularDynamics.InitialVelocities.Type = "Random"
s.input.ams.MolecularDynamics.InitialVelocities.Temperature = velocities or AMSMDJob.default_temperature
elif isinstance(velocities, tuple):
# file and frame number
f = velocities[0]
frame = velocities[1]
vels = KFFile(f).read("MDHistory", f"Velocities({frame})")
vels = np.array(vels).reshape(-1, 3) * Units.convert(1.0, "bohr", "angstrom") # angstrom/fs
s.input.ams.MolecularDynamics.InitialVelocities.Type = "Input"
values = ""
for x in vels:
values += 6 * " " + " ".join(str(y) for y in x) + "\n"
s.input.ams.MolecularDynamics.InitialVelocities.Values._h = " # From {} frame {}".format(f, frame)
s.input.ams.MolecularDynamics.InitialVelocities.Values._1 = values
else:
s.input.ams.MolecularDynamics.InitialVelocities.Type = "FromFile"
s.input.ams.MolecularDynamics.InitialVelocities.File = velocities
return s
[docs] def get_velocities_from(self, other_job, frame=None, update_molecule=True):
"""
Function to update the InitialVelocities block in self. It is normally not needed, instead use the e.g. AMSNVEJob.restart_from() function.
This function can be called in prerun() methods for MultiJobs
"""
_, velocities, molecule, _ = self._get_restart_job_velocities_molecule(other_job, frame=frame)
del self.settings.input.ams.MolecularDynamics.InitialVelocities
self.settings += self._velocities2settings(velocities)
if update_molecule:
self.molecule = molecule
@staticmethod
def _get_restart_job_velocities_molecule(other_job, frame=None, settings=None, get_velocities_molecule=True):
"""
other_job: str or some AMSMdJob
get_velocities_molecule : bool
Whether to get the velocities and molecule right now. Set to False to not access other_job.results (for use with MultiJob prerun() methods)
Returns: (other_job [AMSJob], velocities, molecule, extra_settings [Settings])
"""
if isinstance(other_job, str):
other_job = AMSJob.load_external(other_job)
if get_velocities_molecule:
if frame:
velocities = (other_job.results.rkfpath(), frame)
molecule = other_job.results.get_history_molecule(frame)
else:
velocities = other_job
molecule = other_job.results.get_main_molecule()
else:
velocities = None
molecule = None
extra_settings = other_job.settings.copy()
if settings:
extra_settings.update(settings)
if "InitialVelocities" in extra_settings.input.ams.MolecularDynamics:
del extra_settings.input.ams.MolecularDynamics.InitialVelocities
if "System" in extra_settings.input.ams:
del extra_settings.input.ams.System
return other_job, velocities, molecule, extra_settings
def _get_thermostat_settings(self, thermostat, temperature, tau, thermostat_region, nsteps: int):
s = Settings()
prev_thermostat_settings = self.settings.input.ams.MolecularDynamics.Thermostat
if isinstance(prev_thermostat_settings, list):
prev_thermostat_settings = prev_thermostat_settings[0]
s.input.ams.MolecularDynamics.Thermostat.Type = (
thermostat or prev_thermostat_settings.Type or self.default_thermostat
)
try:
n_temperatures = len(temperature)
my_temperature = " ".join(str(x) for x in temperature)
if n_temperatures > 1:
nsteps_per_temperature = nsteps // (len(temperature) - 1)
s.input.ams.MolecularDynamics.Thermostat.Duration = " ".join(
[str(nsteps_per_temperature)] * (n_temperatures - 1)
)
else:
s.input.ams.MolecularDynamics.Thermostat.Duration = None
except TypeError:
my_temperature = temperature
s.input.ams.MolecularDynamics.Thermostat.Duration = None
s.input.ams.MolecularDynamics.Thermostat.Region = (
thermostat_region or prev_thermostat_settings.get("Region", None) or self.default_thermostat_region
)
s.input.ams.MolecularDynamics.Thermostat.Temperature = (
my_temperature or prev_thermostat_settings.Temperature or self.default_temperature
)
s.input.ams.MolecularDynamics.Thermostat.Tau = (
tau
or prev_thermostat_settings.Tau
or float(self.settings.input.ams.MolecularDynamics.TimeStep) * AMSMDJob.default_tau_multiplier
)
return s
def _get_barostat_settings(self, pressure, barostat, barostat_tau, scale, equal, constantvolume):
s = Settings()
self.settings.input.ams.MolecularDynamics.Barostat.Type = (
barostat or self.settings.input.ams.MolecularDynamics.Barostat.Type or self.default_barostat
)
self.settings.input.ams.MolecularDynamics.Barostat.Pressure = (
pressure
if pressure is not None
else self.settings.input.ams.MolecularDynamics.Barostat.Pressure or self.default_pressure
)
self.settings.input.ams.MolecularDynamics.Barostat.Tau = (
barostat_tau
or self.settings.input.ams.MolecularDynamics.Barostat.Tau
or float(self.settings.input.ams.MolecularDynamics.TimeStep) * AMSMDJob.default_barostat_tau_multiplier
)
self.settings.input.ams.MolecularDynamics.Barostat.Scale = (
scale or self.settings.input.ams.MolecularDynamics.Barostat.Scale or self.default_scale
)
self.settings.input.ams.MolecularDynamics.Barostat.Equal = (
equal or self.settings.input.ams.MolecularDynamics.Barostat.Equal or self.default_equal
)
self.settings.input.ams.MolecularDynamics.Barostat.ConstantVolume = (
str(constantvolume)
if constantvolume is not None
else self.settings.input.ams.MolecularDynamics.Barostat.ConstantVolume or self.default_constantvolume
)
return s
[docs] @classmethod
def restart_from(cls, other_job, frame=None, settings=None, use_prerun=False, **kwargs):
"""
other_job: AMSJob
The job to restart from.
frame: int
Which frame to read the structure and velocities from. If None, the final structure and end velocities will be used (section Molecule and MDResults%EndVelocities)
settings: Settings
Settings that override any other settings. All settings from other_job (e.g. the engine settings) are inherited by default but they can be overridden here.
use_prerun: bool
If True, the molecule and velocities will only be read from other_job inside the prerun() method. Set this to True to prevent PLAMS from waiting for other_job to finish as soon as the new job is defined.
kwargs: many options
See the docstring for AMSMDJob.
"""
other_job, velocities, molecule, extra_settings = cls._get_restart_job_velocities_molecule(
other_job, frame, settings, get_velocities_molecule=not use_prerun
)
job = cls(settings=extra_settings, velocities=velocities, molecule=molecule, **kwargs)
if use_prerun:
@add_to_instance(job)
def prerun(self): # noqa F811
self.get_velocities_from(other_job, frame=frame, update_molecule=True)
return job
[docs]class AMSNVEJob(AMSMDJob):
"""
A class for running NVE MD simulations
"""
def __init__(self, **kwargs):
AMSMDJob.__init__(self, **kwargs)
self.remove_blocks(["thermostat", "barostat", "deformation", "nanoreactor"])
[docs]class AMSNVTJob(AMSMDJob):
"""
A class for running NVT MD simulations
"""
def __init__(self, **kwargs):
AMSMDJob.__init__(self, _enforce_thermostat=True, **kwargs)
self.remove_blocks(["barostat", "deformation", "nanoreactor"])
[docs]class AMSNPTResults(AMSResults):
[docs] def get_equilibrated_molecule(self, equilibration_fraction=0.667, return_index=False):
"""
Discards the first equilibration_fraction of the trajectory.
Calculates the average density of the rest. Returns the molecule
with the closest density to the average density among the remaining
trajectory.
"""
densities = self.job.results.get_history_property("Density", "MDHistory")
analyze_from = int(len(densities) * equilibration_fraction)
# take structure closest to the target density
avg_density = np.mean(densities[analyze_from:]) # amu/bohr^3
delta = np.array(densities[analyze_from:]) - avg_density
delta = np.abs(delta)
min_index = np.argmin(delta)
min_index += analyze_from
mol = self.job.results.get_history_molecule(min_index + 1)
if return_index:
return mol, min_index + 1
return mol
[docs]class AMSNPTJob(AMSMDJob):
"""
A class for running NPT MD simulations
"""
_result_type = AMSNPTResults
def __init__(self, **kwargs):
AMSMDJob.__init__(self, _enforce_thermostat=True, _enforce_barostat=True, **kwargs)
self.settings.input.ams.MolecularDynamics.CalcPressure = "True"
self.remove_blocks(["deformation", "nanoreactor"])