3.4. Job runners¶
Job runners have already been mentioned in previous chapters about jobs and results.
Here we sum up all that information and introduce a basic JobRunner object together with its subclass GridRunner which is meant for interacting with queueing systems that manage resources on computer clusters.
Job runners in PLAMS are very simple objects, both from user’s perspective and in terms of internal architecture.
They have no methods that are meant to be called in your scripts, apart from constructors.
Job runners are supposed to be created (with some parameters adjusting their behavior) and passed to the run() method as parameters (or placed as config.default_jobrunner).
3.4.1. Local job runner¶
-
class
JobRunner(parallel=False, maxjobs=0)[source]¶ Class defining the basic job runner interface. Instances of this class represent local job runners – job runners that execute computational jobs on the current machine.
The goal of the job runner is to take care of two important things – parallelization and runscript execution:
- When the method
run()of anyJobinstance is executed, the control, after some initial preparations, is passed to aJobRunnerinstance. ThisJobRunnerinstance decides if a separate thread should be spawned for the job or if the execution should proceed in the current thread. This decision is based on theparallelattribute which can be set onJobRunnercreation. There are no separate classes for serial and parallel job runner, both cases are covered byJobRunnerdepending on theparallelparameter. - If the executed job is an instance of
SingleJob, it creates a runscript which contains most of the actual computational work (usually it’s just an execution of some external binary). The runscript is then submitted to aJobRunnerinstance using itscall()method. This method executes the runscript as a separate subprocess and takes care of output and error streams handling, setting a proper working directory etc.
For a job runner with parallel execution enabled the number of simultaneously running jobs can be limited using the maxjobs parameter. If maxjobs is 0, no limit is enforced. If parallel is
False, maxjobs is ignored. If parallel isTrueand maxjobs is a positive integer, aBoundedSemaphoreof that size is used to limit the number of simultaneously runningcall()methods.A
JobRunnerinstance can be passed torun()with a keyword argumentjobrunner. If this argument is omitted, the instance stored inconfig.default_jobrunneris used.-
__init__(parallel=False, maxjobs=0)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
call(runscript, workdir, out, err, runflags)[source]¶ Execute the runscript in the folder workdir. Redirect output and error streams to out and err, respectively.
Arguments runscript, workdir, out and err should be strings with paths to corresponding files or folders.
runflags is a
Settingsinstance containing the run branch of running job’s settings. The basic job runner defined here ignores them, but they can be useful inJobRunnersubclasses (seeGridRunner.call()).Returns an integer with the exit code returned by the runscript.
This method can be safely overridden in
JobRunnersubclasses. For example, inGridRunnerit submits the runscript to a queueing system instead of executing it locally.Note
This method is used automatically during
run()and should never be explicitly called in your script.
-
_run_job(job, jobmanager)[source]¶ This method aggregates the parts of Running a job that are supposed to be run in a separate thread in case of parallel job execution. It is wrapped with
_in_thread()decorator.This method should not be overridden.
- When the method
Technical
Similarly to the Results class, the proper behavior of JobRunner and its subclasses (also the ones defined by the user) is ensured using a metaclass.
For the sake of completeness we present here a brief specification of all involved elements:
-
class
_MetaRunner[source]¶ Metaclass for
JobRunner. During an instance creation wrap thecall()method with_limit()decorator which enforces a limit on the number of simultaneouscall()calls.
3.4.2. Remote job runner¶
-
class
GridRunner(grid='auto', sleepstep=None, parallel=True, maxjobs=0)[source]¶ Subclass of
JobRunnerthat submits the runscript to a queueing system instead of executing it locally. Besides two new keyword arguments (grid and sleepstep) it behaves and is meant to be used just like a regularJobRunner.Note
The default value of the parallel argument is
True, unlike in the regularJobRunner.There are many different queueing systems that are popular nowadays (for example: TORQUE, SLURM, OGE). Usually they use different commands for submitting jobs or checking the queue status.
GridRunnerclass tries to build a common interface to these systems. The commands used to communicate with the queueing system are not hard-coded, but rather taken from aSettingsinstance. Thanks to that the user has almost full control over the behavior of aGridRunnerinstance and the behavior can be ajdusted dynamically.The behavior of a
GridRunnerinstance is determined by the contents of aSettingsinstance stored in itssettingsattribute. ThatSettingsinstance can be manually supplied by the user or taken from a collection of predefined instances stored as branches ofconfig.gridrunner. The adjustment is done with the grid parameter which should be a string or aSettingsinstance. If it’s a string, it has to be a key occurring inconfig.gridrunneror'auto'for autodetection. For example, ifgrid='slurm'is passed,config.gridrunner.slurmis used as settings. Ifgrid='auto'then entries present inconfig.gridrunnerare tested and the first one that works (its submit command is present on your system) is chosen. When aSettingsinstance is passed as grid, it is directly used assettings.Currently two predefined schemes are available (see Defaults file):
slurmfor SLURM andpbsfor queueing systems following PBS syntax (PBS, TORQUE, Oracle Grid Engine etc.).The settings of
GridRunnershould have the following structure:output– flag for specifying the output file path.error– flag for specifying the error file path.workdir– flag for specifying path to the working directory.commands.submit– submit command.commands.check– queue status check command.commands.getid– function extracting submitted job’s ID from the output of the submit command.commands.running– function extracting a list of all running jobs from the output of queue check commandcommands.special– branch storing definitions of specialrun()keyword arguments.
See
call()for more details and examples.The sleepstep parameter defines how often the queue check is performed. It should be a numerical value telling how many seconds should the interval between two consecutive checks last. If
Noneis used, the global default fromconfig.sleepstepis taken.Note
Usually queueing systems are configured in such a way that output of your calculation is captured somewhere else and copied to the location indicated by the output flag only when the job is finished. Because of that it is not possible to have a peek at your output while your job is running (for example, to see if your calculation is going well). This limitation can be circumvented with
myjob.settings.runscript.stdout_redirectflag. If set toTrue, the output redirection will not be handled by the queueing system, but rather placed in the runscript using the shell redirection>. That forces the output file to be created directly in workdir and updated live as the job proceeds.-
__init__(grid='auto', sleepstep=None, parallel=True, maxjobs=0)[source]¶ Initialize self. See help(type(self)) for accurate signature.
-
call(runscript, workdir, out, err, runflags)[source]¶ Submit runscript to the queueing system with workdir as the working directory. Redirect output and error streams to out and err, respectively. runflags stores varoius submit command options.
The submit command has the following structure:
<commands.submit>_<workdir>_{workdir}_<error>_{err}[_<output>_{out}][FLAGS]_{runscript}
Underscores denote spaces, parts in pointy brackets correspond to
settingsentries, parts in curly brackets tocall()arguments, square brackets contain optional parts. Output part is added if out is notNone. This is handled automatically based onrunscript.stdout_redirectvalue in job’ssettings.FLAGSpart is built based on runflags argument, which is aSettingsinstance storingrun()keyword arguments. For every (key,value) pair in runflags the string_-key_valueis appended toFLAGSunless the key is a special key occurring incommands.special. In that case_<commands.special.key>valueis used (mind the lack of space in between). For example, aSettingsinstance defining interaction with SLURM has the following entries:workdir = '-D' output = '-o' error = '-e' special.nodes = '-N ' special.walltime = '-t ' special.memory = '--mem=' special.queue = '-p ' commands.submit = 'sbatch' commands.check = 'squeue'
The submit command produced by:
gr = GridRunner(parallel=True, maxjobs=4, grid='slurm') j.run(jobrunner=gr, queue='short', nodes=2, J='something', O='')
will be:
sbatch -D {workdir} -e {err} -o {out} -p short -N 2 -J something -O {runscript}In certain queueing systems some flags don’t have a short form with semantics
-key value. For example, in SLURM the flag--nodefile=valuehas a short form-F value, but the flag--export=valuedoes not. One can still use such a flag using the special keys logic:gr = GridRunner(parallel=True, maxjobs=4, grid='slurm') gr.settings.special.export = '--export=' j.run(jobrunner=gr, queue='short', export='value')
That results in the command:
sbatch -D {workdir} -e {err} -o {out} -p short --export=value {runscript}
The submit command is then executed and the output returned by it is used to determine the submitted job’s ID. The value stored in
commands.getidis used for that purpose. It should be a function taking a single string (the whole output of the submit command) and returning a string with job’s ID.The submitted job’s ID is then added to
_active_jobsdictionary, with the key being job’s ID and the value being an instance ofthreading.Lock. This lock is used to singal the fact that the job is finished and the thread handling it can continue. Then the_check_queue()method starts the thread querying the queue and unlocking finished jobs.Since it is difficult to automatically obtain job’s exit code, the returned value is 0 (or 1, if the submit command failed). From
run()perspective it means that a job executed withGridRunneris crashed only if it never entered the queue (usually due to improper submit command).Note
This method is used automatically during
run()and should never be explicitly called in your script.
-
_check_queue()[source]¶ Query the queueing system to obtain a list of currently running jobs. Check for active jobs that are not any more in the queue and release their locks. Repeat this procedure every
sleepstepseconds until there are no more active jobs. The_mainlocklock ensures that there is at most one thread executing the main loop of this method at the same time.
-
_autodetect()[source]¶ Try to autodetect the type of queueing system.
The autodetection mechanism is very simple. For each entry in
config.gridrunnerthe submit command followed by--versionis executed (for exampleqsub --version). If the execution was successful (which is indicated by the exit code 0), that queueing system is present and it is chosen. Thus if there are multiple queueing systems installed, only one of them is picked – the one which “name” (indicated by a key inconfig.gridrunner) is first in the lexicographical order.Returned value is one of
config.gridrunnerbranches. If autodetection was not successful, an exception is raised.