#!/usr/bin/env python
import os
import struct
import array
import numpy
from ..mol.molecule import Molecule
from ..core.errors import PlamsError
from .trajectoryfile import TrajectoryFile
__all__ = ['DCDTrajectoryFile']
[docs]class DCDTrajectoryFile (TrajectoryFile) :
"""
Class representing a DCD file containing a molecular trajectory
An instance of this class has the following attributes:
* ``file_object`` -- A Python :py:class:`file` object, referring to the actual DCD file
* ``position`` -- The frame to which the cursor is currently pointing in the DCD file
* ``mode`` -- Designates whether the file is in read or write mode ('rb' or 'wb')
* ``ntap`` -- The number of atoms in the molecular system (needs to be constant throughout)
An |DCDTrajectoryFile| object behaves very similar to a regular file object.
It has read and write methods (:meth:`read_next` and :meth:`write_next`)
that read and write from/to the position of the cursor in the ``file_object`` attribute.
If the file is in read mode, an additional method :meth:`read_frame` can be used that moves
the cursor to any frame in the file and reads from there.
The amount of information stored in memory is kept to a minimum, as only information from the current frame
is ever stored.
Reading and writing to and from the files can be done as follows::
>>> from scm.plams import DCDTrajectoryFile
>>> dcd = DCDTrajectoryFile('old.dcd')
>>> mol = dcd.get_plamsmol()
>>> dcdout = DCDTrajectoryFile('new.dcd',mode='wb',ntap=dcd.ntap)
>>> for i in range(dcd.get_length()) :
>>> crd,cell = dcd.read_frame(i,molecule=mol)
>>> dcdout.write_next(molecule=mol)
The above script reads information from the DCD file old.dcd into the |Molecule| object ``mol``
in a step-by-step manner.
The |Molecule| object is then passed to the :meth:`write_next` method of the new |DCDTrajectoryFile|
object corresponding to the new dcd file ``new.dcd``.
The exact same result can also be achieved by iterating over the instance as a callable
>>> dcd = DCDTrajectoryFile('old.dcd')
>>> mol = dcd.get_plamsmol()
>>> dcdout = DCDTrajectoryFile('new.dcd',mode='wb',ntap=dcd.ntap)
>>> for crd,cell in dcd(mol) :
>>> dcdout.write_next(molecule=mol)
This procedure requires all coordinate information to be passed to and from the |Molecule| object
for each frame, which can be time-consuming.
It is therefore also possible to bypass the |Molecule| object when reading through the frames::
>>> dcd = DCDTrajectoryFile('old.dcd')
>>> dcdout = DCDTrajectoryFile('new.dcd',mode='wb',ntap=dcd.ntap)
>>> for crd,cell in dcd :
>>> dcdout.write_next(coords=crd,cell=cell)
"""
[docs] def __init__ (self, filename=None, mode='rb', fileobject=None, ntap=None) :
"""
Initiates a DCDTrajectoryFile object
* ``filename`` -- The path to the DCD file
* ``mode`` -- The mode in which to open the DCD file ('rb' or 'wb')
* ``fileobject`` -- Optionally, a file object can be passed instead (filename needs to be set to None)
* ``ntap`` -- If the file is in write mode, the number of atoms needs to be passed here
"""
TrajectoryFile.__init__(self, filename, mode, fileobject, ntap)
# DCD specific attributes
self.celldata = False # Whether the cell data is in the file (only set in read-mode)
self.byte_order = '@' # native endian
self.timesteps = (0,0,0) # (nsteps,startstep,steps_between_saves)
self.delta = 2.0 # The timestep (if stored)
self.namnf = 0 # The number of free atoms (if stored)
self.stepsize = 0 # The number of bytes used to store a single conformation
self.intsize = struct.calcsize('i') # The number of bytes in an integer
self.floatsize = struct.calcsize('f') # The number of bytes in a float
# Skip to the trajectory part of the file
if self.mode == 'rb' :
self._read_header()
#elif self.mode == 'wb' :
# self._write_header()
[docs] def set_byteorder (self, byteorder) :
"""
Specifies wether the file to be read from or written to is little endian or big endian
* ``byteorder`` -- Can be either '@', '<', or '>', denoting native endian, little endian, or big endian
"""
self.byte_order = byteorder
def _read_variable (self,size) :
"""
Reads a variable from a binary file
:note::
Assumes that the variable has been written on a machine
with the same bitsizes for the data
"""
if size == 'i' :
varsize = self.intsize
if size == 'f' :
varsize = self.floatsize
else :
varsize = struct.calcsize(size)
var_char = self.file_object.read(varsize)
fmt = self.byte_order+size
var = struct.unpack(fmt,var_char)[0]
return var
def _write_variable (self,var,size) :
"""
Writes a variable to a binary file
"""
var_char = struct.pack(size,var)
self.file_object.write(var_char)
def _read_header (self) :
"""
Reads in the header information in the binary DCD file
"""
# This should be an int32
input_integer = self._read_variable('i')
if input_integer != 84 :
print(input_integer)
print('BAD DCD FORMAT')
raise PlamsError
# Read CORD string
car4 = self.file_object.read(4).decode()
if car4 != 'CORD' :
print('BAD DCD FORMAT')
raise PlamsError
# Here we read the data of the timesteps
# NSET: The number of sets of coordinates
# ISTART: The starting timestep
# NSAVC: The number of timesteps between dcd saves
timesteps = array.array('i')
timesteps.fromfile(self.file_object,3)
self.timesteps = tuple(timesteps)
#print 'timesteps: ',self.timesteps
# 5 blanc integers
dummies = array.array('i')
dummies.fromfile(self.file_object,5)
# NAMNF: The number of free atoms
self.namnf = self._read_variable('i')
# DELTA: The timestep
delta = self._read_variable('f')
# 10 blanc integers
dummies = array.array('i')
dummies.fromfile(self.file_object,10)
# The end size of the first block
input_integer = self._read_variable('i')
if input_integer != 84 :
print('BAD DCD FORMAT')
raise PlamsError
############################################################
# The size of the next block
input_integer = self._read_variable('i')
if (input_integer-4)%80 == 0 :
# NTITLE: The number of 80 char title strings
ntitle = self._read_variable('i')
for i in range(ntitle) :
car = self.file_object.read(80).decode()
#print car
# The end size of this block
input_integer = self._read_variable('i')
else :
print('BAD DCD FORMAT')
raise PlamsError
############################################################
# This should be an int32
input_integer = self._read_variable('i')
if input_integer != 4 :
print('BAD DCD FORMAT')
raise PlamsError
# Read in the number of atoms
self.ntap = self._read_variable('i')
#print 'ntap: ',self.ntap
if self.coords.shape == (0,3) :
self.coords = numpy.zeros((self.ntap,3))
# This should be an int32
input_integer = self._read_variable('i')
if input_integer != 4 :
print('BAD DCD FORMAT')
raise PlamsError
if self.namnf != 0 :
# This should be an int32
input_integer = self._read_variable('i')
if input_integer != (self.ntap-self.namnf)*4 :
print('BAD DCD FORMAT')
raise PlamsError
# This should be an int32
dummies = array.array('i')
dummies.fromfile(self.file_object,(self.ntap-self.namnf))
#print 'list',list(dummies)
# This should be an int32
input_integer = self._read_variable('i')
if input_integer != (self.ntap-self.namnf)*4 :
print('BAD DCD FORMAT')
raise PlamsError
self.stepsize += 3 * self.ntap * self.floatsize
self.stepsize += 5 * self.intsize
if len(self.elements) != self.ntap :
self.elements = ['H']*self.ntap
[docs] def read_next (self,molecule=None,read=True) :
"""
Reads coordinates and lattice info from the current position of the cursor and returns it
* ``molecule`` -- |Molecule| object in which the new data needs to be stored
* ``read`` -- If set to False the cursor will move to the next frame without reading
"""
# Check for end of file
# This should be an int32
input_integer_char = self.file_object.read(self.intsize)
if len(input_integer_char) > 0 :
if self.firsttime :
self.celldata = False
input_integer = struct.unpack(self.byte_order+'i',input_integer_char)[0]
if input_integer == 48 :
self.celldata = True
else :
return None, None
if not read and not self.firsttime :
return self._move_cursor_without_reading()
# Read cell data
cell = numpy.zeros((3,3))
if self.celldata :
cell_array = array.array('d')
try :
cell_array.fromfile(self.file_object,6)
if self.byte_order != '@' :
cell_array.byteswap()
except EOFError :
return None, None
cell[numpy.tril_indices(3)] = cell_array
self.file_object.read(self.floatsize)
# Coords
if self.firsttime :
input_integer = self._read_variable('i')
else :
self.file_object.read(self.intsize)
if self.firsttime :
if input_integer != 4*self.ntap :
print('BAD DCD FORMAT')
raise PlamsError
xcoords = array.array('f')
try :
xcoords.fromfile(self.file_object,self.ntap)
#xcoords = numpy.fromfile(self.file,dtype='f',count=self.ntap)
if self.byte_order != '@' :
xcoords.byteswap()
except EOFError :
return None, None
for i in range(2) :
if self.firsttime :
input_integer = self._read_variable('i')
if input_integer != 4*self.ntap :
print('BAD DCD FORMAT')
raise PlamsError
else :
self.file_object.read(self.intsize)
ycoords = array.array('f')
try:
ycoords.fromfile(self.file_object,self.ntap)
if self.byte_order != '@' :
ycoords.byteswap()
except EOFError :
return None, None
for i in range(2) :
if self.firsttime :
input_integer = self._read_variable('i')
if input_integer != 4*self.ntap :
print('BAD DCD FORMAT')
raise PlamsError
else :
self.file_object.read(self.intsize)
zcoords = array.array('f')
try :
zcoords.fromfile(self.file_object,self.ntap)
if self.byte_order != '@' :
zcoords.byteswap()
except EOFError :
return None, None
if self.firsttime :
input_integer = self._read_variable('i')
if input_integer != 4*self.ntap :
print('BAD DCD FORMAT')
raise PlamsError
else :
self.file_object.read(self.intsize)
# Check that self.coords has not been changed
if not self.coords.shape == (self.ntap,3) :
raise PlamsError('The coordinate array has been changed outside the class')
self.coords[:,0] = xcoords
self.coords[:,1] = ycoords
self.coords[:,2] = zcoords
if self.firsttime :
if self.celldata :
self.stepsize += 6 * struct.calcsize('d')
self.stepsize += self.floatsize
self.stepsize += self.intsize
self.firsttime = False
if isinstance(molecule,Molecule) :
self._set_plamsmol(self.coords, cell, molecule)
self.position += 1
return self.coords, cell
def _is_endoffile (self) :
"""
If the end of file is reached, return coords and cell as None
"""
test = self.file_object.read(self.stepsize)
return len(test) < self.stepsize
def _write_header (self, ntap=0) :
"""
Write the header of the DCD file
"""
self.ntap = ntap
self.coords = numpy.zeros((self.ntap,3))
# This should be an int32
self._write_variable(84,'i')
# Read CORD string
self.file_object.write('CORD'.encode())
# Here we read the data of the timesteps
# NSET: The number of sets of coordinates
# ISTART: The starting timestep
# NSAVC: The number of timesteps between dcd saves
timesteps = array.array('i',[0,1,1])
self.file_object.write(timesteps)
# 5 blanc integers
dummies = array.array('i',[0,0,0,0,0])
self.file_object.write(dummies)
# NAMNF: The number of free atoms
self._write_variable(0,'i')
# DELTA: The timestep
delta = self._write_variable(self.delta,'f')
# 10 blanc integers
dummies = array.array('i')
dummies.append(1)
for i in range(8) :
dummies.append(0)
dummies.append(24)
self.file_object.write(dummies)
# The end size of the first block
self._write_variable(84,'i')
############################################################
# The size of the next block
self._write_variable(84,'i')
ntitle = 1
self._write_variable(ntitle,'i')
title = 'REMARK Created with py_md'
nlet = len(title)
for i in range(80-nlet) :
title += ' '
self.file_object.write(title.encode())
self._write_variable(84,'i')
############################################################
# This should be an int32
self._write_variable(4,'i')
# Read in the number of atoms
self._write_variable(self.ntap,'i')
# This should be an int32
self._write_variable(4,'i')
[docs] def write_next (self, coords=None, molecule=None, cell=[0.,0.,0.], conect=None) :
"""
Write frame to next position in trajectory file
* ``coords`` -- A list or numpy array of (``ntap``,3) containing the system coordinates
* ``molecule`` -- A molecule object to read the molecular data from
* ``cell`` -- A set of lattice vectors or cell diameters
* ``conect`` -- A dictionary containing connectivity info (not used)
.. note::
Either ``coords`` or ``molecule`` are mandatory arguments
"""
if isinstance(molecule,Molecule) :
coords, cell = self._read_plamsmol(molecule)[:2]
cell = self._convert_cell(cell)
# If this is the first step, write the header
if self.position == 0 :
self._write_header(len(coords))
if self.ntap != len(coords) :
print('BAD coordinate list')
return
# Invert the coords list [xcoords,ycoords,zcoords]
#coords = numpy.array(coords).transpose()
# This should be an int32
self._write_variable(48,'i')
cell = array.array('d',cell)
self.file_object.write(cell)
self._write_variable(0,'f')
# Coords
self._write_variable(4*self.ntap,'i')
xcoords = array.array('f',coords[:,0])
self.file_object.write(xcoords)
for i in range(2) :
self._write_variable(4*self.ntap,'i')
ycoords = array.array('f',coords[:,1])
self.file_object.write(ycoords)
for i in range(2) :
self._write_variable(4*self.ntap,'i')
zcoords = array.array('f',coords[:,2])
self.file_object.write(zcoords)
self._write_variable(4*self.ntap,'i')
self.position += 1
def _convert_cell (self, cell) :
"""
Check format of cell, and convert to required lower triangle
"""
if cell is None :
cell = numpy.zeros((3,3))[numpy.tril_indices(3)]
elif len(cell)==3 and isinstance(cell[0],float) or isinstance(cell[0],numpy.float64) :
cell = [cell[0],0.,cell[1],0.,0.,cell[2]]
elif len(cell) < 3 :
newcell = numpy.zeros((3,3))
newcell[:len(cell)] = cell
cell = newcell[numpy.tril_indices(3)]
else :
cell = numpy.array(cell)
cell = cell[numpy.tril_indices(3)]
return cell
def _rewind_to_first_frame(self) :
"""
Rewind the file to the first frame
"""
self.file_object.seek(0)
self.firsttime = True
self.position = 0
self.stepsize = 0
self._read_header()
def _rewind_n_frames(self,nframes) :
"""
Rewind the file by nframes frames
"""
whence = os.SEEK_CUR
stepsize = self.stepsize + self.intsize
self.file_object.seek(-nframes*stepsize,whence)
self.position = self.position - nframes