from __future__ import unicode_literals
import os
import threading
try:
import dill as pickle
except ImportError:
import pickle
from os.path import join as opj
from .common import log
from .errors import PlamsError
from .basejob import MultiJob
__all__ = ['JobManager']
[docs]class JobManager(object):
"""Class responsible for jobs and files management.
Every instance has the following attributes:
* ``folder`` -- the working folder name.
* ``path`` -- the absolute path to the directory with the working folder.
* ``workdir`` -- the absolute path to the working folder (``path/folder``).
* ``settings`` -- a |Settings| instance for this job manager (see below).
* ``jobs`` -- a list of all jobs managed with this instance (in order of |run| calls).
* ``names`` -- a dictionary with names of jobs. For each name an integer value is stored indicating how many jobs with that name have already been run.
* ``hashes`` -- a dictionary working as a hash-table for jobs.
``path`` and ``folder`` can be adjusted with constructor arguments *path* and *folder*. If not supplied, Python current working directory and string ``plams.`` appended with PID of the current process are used.
``settings`` attribute is directly set to the value of *settings* argument (unlike in other classes where they are copied) and it should be a |Settings| instance with the following keys:
* ``hashing`` -- chosen hashing method (see |RPM|).
* ``counter_len`` -- length of number appended to the job name in case of name conflict.
* ``remove_empty_directories`` -- if ``True``, all empty subdirectories of the working folder are removed on |finish|.
"""
def __init__(self, settings, path=None, folder=None):
if path is None:
self.path = os.getcwd()
elif os.path.isdir(path):
self.path = os.path.abspath(path)
else:
raise PlamsError('Invalid path: %s'%path)
self.folder = folder or ('plams.' + str(os.getpid()))
self.settings = settings
self.jobs = []
self.names = {}
self.hashes = {}
self.workdir = opj(self.path, self.folder)
self.logfile = opj(self.workdir, self.folder+'.log')
self.input = opj(self.workdir, self.folder+'.inp')
self.restart = opj(self.workdir, self.folder+'.res')
if not os.path.exists(self.workdir):
os.mkdir(self.workdir)
else:
raise PlamsError('Directory %s already exists' % self.workdir)
[docs] def _register_name(self, job):
"""Register the name of the *job*.
If a job with the same name was already registered, *job* is renamed by appending consecutive integers. Number of digits in the appended number is defined by ``counter_len`` value in job manager's ``settings``.
"""
if job.name in self.names:
self.names[job.name] += 1
newname = job.name +'.'+ str(self.names[job.name]).zfill(self.settings.counter_len)
log('Renaming job %s to %s' % (job.name,newname), 3)
job.name = newname
else:
self.names[job.name] = 1
[docs] def _register(self, job):
"""Register the *job*. Register job's name (rename if needed) and create the job folder."""
log('Registering job %s' % job.name, 7)
job.jobmanager = self
self._register_name(job)
if job.path is None:
if job.parent:
job.path = opj(job.parent.path, job.name)
else:
job.path = opj(self.workdir, job.name)
os.mkdir(job.path)
self.jobs.append(job)
job.status = 'registered'
log('Job %s registered' % job.name, 7)
[docs] def _check_hash(self, job):
"""Calculate the hash of *job* and, if it is not ``None``, search previously run jobs for the same hash. If such a job is found, return it. Otherwise, return ``None``"""
h = job.hash()
if h is not None:
if h in self.hashes:
prev = self.hashes[h]
log('Job %s previously run as %s, using old results' % (job.name, prev.name), 1)
return prev
else:
self.hashes[h] = job
return None
[docs] def load_job(self, filename):
"""Load previously saved job from *filename*.
*Filename* should be a path to ``.dill`` file in some job folder. A |Job| instance stored there is loaded and returned. All attributes of this instance removed before pickling are restored. This includes ``jobmanager``, ``path`` (absolute path to *filename* is used), ``default_setting`` (list containing only ``config.job``) and also ``parent`` in case of children of some |MultiJob|.
See :ref:`pickling` for details.
"""
def setstate(job, path, parent=None):
job.parent = parent
job.jobmanager = self
job.default_settings = [config.job]
job.path = path
if isinstance(job, MultiJob):
job._lock = threading.Lock()
for child in job:
setstate(child, opj(path, child.name), job)
job.results.refresh()
h = job.hash()
if h is not None:
self.hashes[h] = job
for key in job._dont_pickle:
job.__dict__[key] = None
filename = os.path.abspath(filename)
path = os.path.dirname(filename)
with open(filename, 'rb') as f:
job = pickle.load(f)
setstate(job, path)
return job
[docs] def remove_job(self, job):
"""Remove *job* from job manager. Forget its hash."""
if job in self.jobs:
self.jobs.remove(job)
job.jobmanager = None
h = job.hash()
if h in self.hashes and self.hashes[h] == job:
del self.hashes[h]
[docs] def _clean(self):
"""Clean all registered jobs according to their ``save`` parameter in their ``settings``. If ``remove_empty_directories`` is ``True``, traverse the working directory and delete all empty subdirectories.
"""
log('Cleaning job manager', 7)
for job in self.jobs:
job.results._clean(job.settings.save)
if self.settings.remove_empty_directories:
for root, dirs, files in os.walk(self.workdir, topdown=False):
for dirname in dirs:
fullname = opj(root, dirname)
if not os.listdir(fullname):
os.rmdir(fullname)
log('Job manager cleaned', 7)