Source code for snsxt.util.qsub

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
A collection of functions and objects for submitting jobs to the NYUMC SGE compute cluster with `qsub` from within Python, and monitoring them until completion

This submodule can also be run as a stand-alone demo script
"""
import log
import logging
logger = logging.getLogger("qsub")
logger.debug("loading qsub module")

import os
from collections import defaultdict
import subprocess as sp
import re
import time
import datetime
from time import sleep
import sys
import getpass
import json
try:
    from sh import qstat
except:
    logger.error("qstat could not be loaded")
import tools

# ~~~~ GLOBALS ~~~~~~ #
job_state_key = defaultdict(lambda: None)
"""
dictionary containing possible qsub job states; default state is None

format `key: value`, where `key` is the character string representation of the job state provided by `qstat` output, and `value` is a description of the state.

Recognized Job States
---------------------
`Eqw`: `Error`; the job is in an error status and never started running

`r`: `Running`; the job is currently running

`qw`: `Waiting`; the job is currently in the scheduler queue waiting to run

`t`: `None`; ???

`dr`: `None`; the job has been submitted for deletion and will be deleted
"""
job_state_key['Eqw'] = 'Error'
job_state_key['r'] = 'Running'
job_state_key['qw'] = 'Waiting'
job_state_key['t'] = None
job_state_key['dr'] = None


# ~~~~ CUSTOM CLASSES ~~~~~~ #
[docs]class Job(object): """ Main object class for tracking and validating a compute job that has been submitted to the HPC cluster with the `qsub` command Notes ----- The default action upon initialization is to query `qstat` to determine whether the job is currently running. After a job has completed, built-in methods can be used to query `qacct -j` to determine if the job finished with a successful exit status. Both `qstat` and `qacct` are queried by making system calls to the the corresponding programs and parsing their stdout messages. Many of the methods included with this object class have stand-alone functions of the same name, with the same usage & functionality. Examples ---------- Example usage:: x = qsub.Job('2379768') x.running() x.present() """
[docs] def __init__(self, id, name = None, log_dir = None, debug = False): """ Parameters ---------- id: int numeric job ID, as returned by `qsub` at job submission name: str the name given to the compute job log_dir: str path to the directory used to hold log output by the compute job debug: bool intialize the job without immediately querying `qstat` to determine job status Attributes ---------- job_state_key: dict the module's `job_state_key` object id: int a numeric ID for the Job object name: str a name for the Job log_dir: str path to the directory used to hold log output by the compute job log_paths: dict dictionary containing the types and paths to the job's output logs completions: str character string used to describe the job and its completion states """ global job_state_key self.job_state_key = job_state_key self.id = id self.name = name self.log_dir = log_dir self.log_paths = {} if log_dir: self.update_log_files() # hold a character string of completion validation information self.completions = self._completions() # add the rest of the attributes as per the update function if not debug: self._update()
def __repr__(self): return('Job(id = {0}, name = {1}, log_dir = {2})'.format(self.id, self.name, self.log_dir)) # ~~~~~ Methods for determining running job state from qstat ~~~~~ #
[docs] def _completions(self): """ Makes a default 'completions' string attribute Returns ------- str character string describing the object and its qsub log paths """ return('{0}\nlog_paths = {1}\n'.format(self.__repr__(), self.log_paths))
[docs] def update_log_files(self, _type = 'stdout'): """ Updates the paths to the log files in the `log_paths` attribute """ log_path = self.get_log_file(_type = _type) self.log_paths.update({_type: log_path})
[docs] def get_log_file(self, _type = 'stdout'): """ Returns the expected path to the job's log file Parameters ---------- _type: str either 'stdout' or 'stderr', representing the type of log path to generate Notes ----- A stdout log file basename for a compute job with an ID of `4088513` and a name of `python` would look like this: `python.o4088513` The corresponding stderr log name would look like: `python.e4088513` """ if not self.log_dir: logger.warning('log_dir attribute is not set for this qsub job: {0}'.format((self.id, self.name))) return(None) type_key = {'stdout': '.o', 'stderr': '.e'} type_char = type_key[_type] logfile = str(self.name) + type_char + str(self.id) log_path = os.path.join(str(self.log_dir), logfile) if not tools.item_exists(log_path): logger.warning('Log file does not appear to exist: {0}'.format(log_path)) return(log_path)
[docs] def get_job(self, id, qstat_stdout = None): """ Retrieves the job's `qstat` entry Returns ------- str """ import re try: from sh import qstat except: logger.error("qstat could not be loaded") job_id_pattern = r"^\s*{0}\s.*$".format(id) if not qstat_stdout: qstat_stdout = qstat() entry = re.findall(str(job_id_pattern), str(qstat_stdout), re.MULTILINE) return(entry)
[docs] def get_status(self, id, entry = None, qstat_stdout = None): """ Gets the status of the qsub job, e.g. "Eqw", "r", etc. Returns ------- str """ import re # regex for the pattern matching https://docs.python.org/2/library/re.html job_id_pattern = r"^.*\s*{0}.*\s([a-zA-Z]+)\s.*$".format(id) if not entry: entry = self.get_job(id = id, qstat_stdout = qstat_stdout) status = re.search(str(job_id_pattern), str(entry), re.MULTILINE) if status: return(status.group(1)) else: return(status)
[docs] def get_state(self, status, job_state_key): """ Gets the interpretation of the job's status from the `job_state_key`, e.g. "Running", etc. Returns ------- str """ # defaultdict returns None if the key is not present state = job_state_key[str(status)] return(state)
[docs] def get_is_running(self, state, job_state_key): """ Checks if the job is considered to be running Returns ------- bool """ is_running = False if state in ['Running']: is_running = True return(is_running)
[docs] def get_is_error(self, state, job_state_key): """ Checks if the job is considered to in an error state Returns ------- bool """ is_running = False if state in ['Error']: is_running = True return(is_running)
[docs] def get_is_present(self, id, entry = None, qstat_stdout = None): """ Finds out if a job is present in qsub Returns ------- bool """ if not entry: entry = self.get_job(id = id, qstat_stdout = qstat_stdout) if entry: return(True) else: return(False)
[docs] def _update(self): """ Update the object's status attributes based on `qstat` stdout messages """ self.qstat_stdout = qstat() self.entry = self.get_job(id = self.id, qstat_stdout = self.qstat_stdout) self.status = self.get_status(id = self.id, entry = self.entry, qstat_stdout = self.qstat_stdout) self.state = self.get_state(status = self.status, job_state_key = self.job_state_key) self.is_running = self.get_is_running(state = self.state, job_state_key = self.job_state_key) self.is_error = self.get_is_error(state = self.state, job_state_key = self.job_state_key) self.is_present = self.get_is_present(id = self.id, entry = self.entry, qstat_stdout = self.qstat_stdout)
[docs] def _debug_update(self, qstat_stdout): """ Debug update mode with requires a qstat_stdout to be passed manually after object initialization """ self.qstat_stdout = qstat_stdout self.entry = self.get_job(id = self.id, qstat_stdout = self.qstat_stdout) self.status = self.get_status(id = self.id, entry = self.entry, qstat_stdout = self.qstat_stdout) self.state = self.get_state(status = self.status, job_state_key = self.job_state_key) self.is_running = self.get_is_running(state = self.state, job_state_key = self.job_state_key) self.is_present = self.get_is_present(id = self.id, entry = self.entry, qstat_stdout = self.qstat_stdout)
[docs] def running(self): """ Returns `True` or `False` whether or not the job is currently considered to be running Returns ------- bool `True` if running, otherwise `False` """ self._update() return(self.is_running)
[docs] def error(self): """ Returns `True` or `False` whether or not the job is currently considered to be in an error state Returns ------- bool `True` if in error, otherwise `False` """ self._update() return(self.is_error)
[docs] def present(self): """ Returns `True` or `False` whether or not the job is currently in the `qstat` queue Returns ------- bool `True` if present, otherwise `False` """ self._update() return(self.is_present)
# ~~~~~ Methods for querying qacct for job completion status ~~~~~ #
[docs] def get_qacct(self, job_id = None): """ Gets the `qacct` entry for a completed qsub job, used to determine if the job completed successfully Notes ----- This operation is extremely slow, takes about 10 - 30+ seconds to complete Returns ------- str The character string representation of the stdout from the `qacct -j` command for the job """ if not job_id: job_id = self.id qacct_command = 'qacct -j {0}'.format(job_id) run_cmd = tools.SubprocessCmd(command = qacct_command).run() return(run_cmd.proc_stdout)
[docs] def qacct2dict(self, proc_stdout = None, entry_delim = None): """ Converts text output from `qacct` into a dictionary for parsing Parameters ---------- entry_delim: str character string delimiter to split entries in the `qacct` output, defaults to '==============================================================' Returns ------- dict a dictionary of individual records containing metadata about the completion status of jobs with the matching `job_id` Notes ----- `qacct` returns multiple entries per `job_id`, because the `job_id` wrap around. So multiple historic jobs with the same `job_id` number will also be returned, delimited by a long string of `===` """ if not proc_stdout: proc_stdout = self.get_qacct() # entries for jobs in the qacct stdout are split with this character string if not entry_delim: entry_delim = '==============================================================' # dict to hold the items in each entry entry_dict = {} # split the large blob of stdout text into separate job entries; skip empty entries entries = [c for c in proc_stdout.split(entry_delim) if c != ''] # each entry has one item per line, separated by a variable amount of whitespace for i, entry in enumerate(entries): # items in the entry entry_stats = entry.split('\n') entry_dict[i] = {} for item in entry_stats: # split on the first whitespace; make sure at least 2 entries are produced if len(item.split(None, 1)) >=2: key, value = item.split(None, 1) entry_dict[i][key] = value.strip() return(entry_dict)
[docs] def filter_qacct(self, qacct_dict = None, days_limit = 7, username = None): """ Filters out 'bad' entries from the `qacct` output dictionary Parameters ---------- qacct_dict: dict dictionary containing job records which represent `qacct` entries days_limit: int or None Maximum allowed age of a job. Defaults to 7 days, change this to `None` to disable date filtering username: str The username which `qacct` records must match, defaults to the current user's name Returns ------- dict a dictionary which will hopefully contain only one `qacct` record, hopefully matching the intended compute job Notes ----- Filtering is required to remove historic job records from the `qacct` output; only one record can remain in order for the job's completeion status to be determined. This function will try to identify entries which are extraneous and do not represent the intended compute job. The default filtering criteria will first try filter out records that contain usernames which do not match that of the current user. Next, records with a timestamp older than the provided `days_limit` will also be filtered out, in case the current user has multiple job entries for the given `job_id`. Note that the timestamp format used in the `qacct` output is inconsistent, so this type of filtering may be prone to errors. Todo ---- Come up with other criteria for filtering out unwanted job entries here... """ if not username: username = getpass.getuser() if not qacct_dict: qacct_dict = self.qacct2dict() for key, subdict in qacct_dict.items(): # only keep the entries that match the current user's username job_owner = subdict['owner'] if job_owner != username: qacct_dict.pop(key) continue # make sure the job was completed within the last 7 days job_end_time = subdict['end_time'] # TODO: sometimes the timestamp is not in the correct format, need to have try/except here job_end_time_obj = datetime.datetime.strptime(job_end_time, "%c") time_now = datetime.datetime.now() time_elapsed = time_now - job_end_time_obj if days_limit: if time_elapsed.days > days_limit: qacct_dict.pop(key) continue # more filter criteria here... return(qacct_dict)
[docs] def get_qacct_job_failed_status(self, failed_entry): """ Special parsing for the 'failed' entry in qacct output because its not a plain digit value its got some weird text description stuck in there too Returns ------- int the first int value found after splitting text on the first whitespace found Examples -------- Example of weird 'failed' entry that needs to be parsed:: {'failed': '100 : assumedly after job'} In this case, the value 100 would be returned """ # get the first entry in the line split by whitespace value = failed_entry.split(None, 1)[0] value = int(value) return(value)
[docs] def update_completion_validations(self, validation_dict): """ Updates the `completion_validations` dict of validation stats with a pretty printed view of the `validations` dictionary, along with the Job's text string representation """ self.completion_validations.update(validation_dict) self.validations = json.dumps(self.completion_validations, indent = 4) # self.completions = '{0}\n{1}\n'.format(self.__repr__(), self.validations) self.completions = self._completions() + self.validations
[docs] def validate_completion(self, job_id = None, *args, **kwargs): """ Checks if the qsub job completed successfully. Multiple validation criteria are evaluated one at a time, and the results of each are added to a `completion_validations` dictionary attribute along with a verbose description of the criteria. After all the criteria have been evaluated, returns a boolean `True` or `False` to determine if all criteria passed validation. This determines if a compute job is considered to have completed successfully or not. Returns ------- bool `True` or `False`, whether or not all job completion validation criteria passed """ if not job_id: job_id = self.id # create a list of validations for the object self.completion_validations = {} # make sure the job is not currently running or in queues validation = { 'qtat_presence': { 'status': self.present(), 'note': None } } validation = {'qtat_presence': {'status': self.present()}} if validation['qtat_presence']['status']: validation['qtat_presence']['note'] = 'The job is still present in qstat and has not completed yet; job cannot be validated' self.update_completion_validations(validation) # logger.error('Job {0} is still running and cannot be validated for completion'.format(job_id)) return(False) else: validation['qtat_presence']['note'] = 'The job is not present in qstat and has completed' self.update_completion_validations(validation) # get the results of the qacct query command if not hasattr(self, 'qacct_stdout'): # allow qacct_stdout to be pre-set externally for debugging & testing # also prevent querying qacct multiple times for the same job self.qacct_stdout = self.get_qacct(job_id = job_id) # convert it into a dict self.qacct_dict = self.qacct2dict(proc_stdout = self.qacct_stdout) # filter extraneous entries self.qacct_dict = self.filter_qacct(qacct_dict = self.qacct_dict, *args, **kwargs) # make sure there are entries left validation = { 'has_qacct_entries': { 'status': None, 'note': None } } if not self.qacct_dict: validation['has_qacct_entries']['status'] = False validation['has_qacct_entries']['note'] = 'No entries were left in qacct job record output after filtering; job cannot be validated' self.update_completion_validations(validation) # logger.error('No valid job entries found for job_id {0}'.format(job_id)) return(False) else: validation['has_qacct_entries']['status'] = True validation['has_qacct_entries']['note'] = 'At least one entry was left in qacct job record output after filtering' self.update_completion_validations(validation) # make sure only one entry is left! validation = { 'has_only_one_qacct_entry': { 'status': None, 'note': None } } if len(self.qacct_dict.keys()) > 1: validation['has_only_one_qacct_entry']['status'] = False validation['has_only_one_qacct_entry']['note'] = 'More than one entry was left in qacct job record output after filtering; job cannot be validated' self.update_completion_validations(validation) # logger.debug('Multiple entries found for job_id {0};\n{1}'.format(job_id, qacct_dict)) return(False) else: validation['has_only_one_qacct_entry']['status'] = True validation['has_only_one_qacct_entry']['note'] = 'Only one entry was left in qacct job record output after filtering' self.update_completion_validations(validation) # example qacct output: # failed 0 # exit_status 1 # check the 'failed' status; >0 = failed !! validation = { 'failed_status_0': { 'status': False, 'note': None } } # get the key index for the first entry inthe dict first_entry_key = self.qacct_dict.keys()[0] status_code = self.get_qacct_job_failed_status(failed_entry = self.qacct_dict[first_entry_key]['failed']) if status_code > 0: validation['failed_status_0']['status'] = False validation['failed_status_0']['note'] = 'The "failed" qacct value for the job was {0}; >0 means the job failed'.format(status_code) self.update_completion_validations(validation) else: validation['failed_status_0']['status'] = True validation['failed_status_0']['note'] = 'The "failed" qacct value for the job was {0}; >0 means the job failed'.format(status_code) self.update_completion_validations(validation) # check the 'exit_status' validation = { 'exit_status_0': { 'status': False, 'note': None } } first_entry_key = self.qacct_dict.keys()[0] exit_status = int(self.qacct_dict[first_entry_key]['exit_status']) if exit_status > 0: validation['exit_status_0']['status'] = False validation['exit_status_0']['note'] = 'The "exit_status" qacct value for the job was {0}; >0 means the job failed'.format(exit_status) self.update_completion_validations(validation) else: validation['exit_status_0']['status'] = True validation['exit_status_0']['note'] = 'The "exit_status" qacct value for the job was {0}; >0 means the job failed'.format(exit_status) self.update_completion_validations(validation) # add more criteria here... # aggregate the validations validations = [ self.completion_validations['exit_status_0']['status'], self.completion_validations['failed_status_0']['status'] ] # check if not all validations are True... if not all(validations): # logger.error('The job {0} is not valid'.format(job_id)) # logger.error({'validate_failed_status': validate_failed_status, 'validate_exit_status': validate_exit_status}) return(False) else: # logger.debug('The job {0} is valid'.format(job_id)) return(True)
# ~~~~~~ JOB FUNCTIONS ~~~~~ #
[docs]def submit(verbose = False, log_dir = None, monitor = False, validate = False, *args, **kwargs): """ Submits a shell command to be run as a `qsub` compute job. Returns a `Job` object. Passes args and kwargs to `submit_job`. Compute jobs are created by assembling a `qsub` shell command using a bash heredoc wrapped around the provided shell command to be executed. The numeric job ID and job name echoed by `qsub` on stdout will be captured and used to generate a 'Job' object. Parameters ---------- verbose: bool `True` or `False`, whether or not the generated `qsub` command should be printed in log output log_dir: str the directory to use for qsub job log output files, defaults to the current working directory monitor: bool whether the job should be immediately monitored until completion validate: bool whether or not the job should immediately be validated upon completion *args: list list of arguments to pass on to `submit_job` **kwargs: dict dictionary of args to pass on to `submit_job` Returns ------- Job a `Job` object, representing a `qsub` compute job that has been submitted to the HPC cluster Examples -------- Example usage:: job = submit(command = 'echo foo') job = submit(command = 'echo foo', log_dir = "logs", print_verbose = True, monitor = True, validate = True) """ # check if log_dir was passed if log_dir: # create the dir if it doesnt exist already tools.mkdirs(log_dir) # only continue if the log_dir exists now if not tools.item_exists(item = log_dir, item_type = 'dir'): logger.warning('log_dir does not exist and will not be used for qsub job submission; {0}'.format(log_dir)) else: # resolve the path to the full, expanded, absolute, real path - bad log_dir paths break job submissions easily log_dir = os.path.realpath(os.path.expanduser(log_dir)) stdout_log_dir = log_dir stderr_log_dir = log_dir kwargs.update({ 'stdout_log_dir': stdout_log_dir, 'stderr_log_dir': stderr_log_dir }) proc_stdout = submit_job(return_stdout = True, verbose = verbose, *args, **kwargs) job_id, job_name = get_job_ID_name(proc_stdout) job = Job(id = job_id, name = job_name, log_dir = log_dir) # optionally, monitor the job to completion if monitor: monitor_jobs(jobs = [job], **kwargs) # optionally, validate the job completion if validate: job.validate_completion() return(job)
[docs]def subprocess_cmd(command, return_stdout = False): """ Runs a terminal command with stdout piping enabled Notes ----- `universal_newlines=True` required for Python 2 3 compatibility with stdout parsing """ process = sp.Popen(command,stdout=sp.PIPE, shell=True, universal_newlines=True) proc_stdout = process.communicate()[0].strip() if return_stdout == True: return(proc_stdout) elif return_stdout == False: logger.debug(proc_stdout)
[docs]def get_job_ID_name(proc_stdout): """ Parses stdout text to find lines that match the output message from a `qsub` job submission Returns ------- tuple `(<job id number>, <job name>)` Examples -------- Example usage:: proc_stdout = submit_job(return_stdout = True) # 'Your job 1245023 ("python") has been submitted' job_id, job_name = get_job_ID_name(proc_stdout) """ proc_stdout_list = proc_stdout.split() job_id = proc_stdout_list[2] job_name = proc_stdout_list[3] job_name = re.sub(r'^\("', '', str(job_name)) job_name = re.sub(r'"\)$', '', str(job_name)) return((job_id, job_name))
[docs]def submit_job(command = 'echo foo', params = '-j y', name = "python", stdout_log_dir = None, stderr_log_dir = None, return_stdout = False, verbose = False, pre_commands = 'set -x', post_commands = 'set +x', sleeps = 0.5, print_verbose = False, **kwargs): """ Internal function for submitting compute jobs to the HPC cluster running SGE by using the `qsub` shell command. Call this function with `submit` instead; args and kwargs will be evaluated here. Creates a `qsub` shell command to be run in a subprocess, submitting the cluster job with a bash heredoc wrapper. Basic format for job submission to the SGE cluster with qsub using a bash heredoc format Parameters ---------- command: str shell commands to be run inside the compute job params: str extra params to be passed to `qsub` name: str the name of the qsub compute job stdout_log_dir: str the path to the directory to use for `qsub` log output; if `None`, defaults to the current working directory stderr_log_dir: str the path to the directory to use for `qsub` log output; if `None`, defaults to the current working directory return_stdout: bool whether or not the function should `return` the stdout of the `qsub` submission subprocess call, its recommened to always leave this set to `True`, otherwise stdout will be printed to program the log output verbose: bool whether or not the generated `qsub` command should be printed in program log output pre_commands: str commands to run before the `command` inside the qsub job; defaults to 'set -x' in order to provide verbose qsub log output, you can also put environment modulation code here. post_commands: str commands to run after the `command` inside the qsub job; defaults to 'set +x' sleeps: int number of seconds to `sleep` after submitting a `qsub` job; it is recommened to leave this set to a value >0 in order to avoid overwhelming the job scheduler with requests print_verbose: bool print the generated `qsub` command to the console with the Python `print` function (as opposed to logger output) Returns ------- str returns the stdout of the evaluated `qsub` shell command, assuming `return_stdout = True` was passed. Otherwise, returns nothing. Notes ----- `stdout_log_dir` and `stderr_log_dir` should have trailing slashes in their paths, and are set to the same path by default using the `log_dir` arg in `submit` Malformed or nonexistant `stdout_log_dir` and `stderr_log_dir` paths are a common source for compute job failure. Call this function with `submit` instead. This function generates a `qsub` shell command in a format such as this:: qsub -j y -N "python" -o :"/ifs/data/molecpathlab/scripts/snsxt/snsxt/util/" -e :"/ifs/data/molecpathlab/scripts/snsxt/snsxt/util/" <<E0F set -x cat /etc/hosts sleep 10 set +x E0F The generated shell command will be evaluated by Python `subprocess`, and its stdout messages returned. """ if not stdout_log_dir: stdout_log_dir = os.path.join(os.getcwd(), '') if not stderr_log_dir: stderr_log_dir = os.path.join(os.getcwd(), '') qsub_command = """ qsub {0} -N "{1}" -o :"{2}" -e :"{3}" <<E0F {4} {5} {6} E0F """.format( params, # 0 name, # 1 stdout_log_dir, # 2 stderr_log_dir, # 3 pre_commands, # 4 command, # 5 post_commands # 6 ) if verbose == True: logger.debug('qsub command is:\n{0}'.format(qsub_command)) if print_verbose: print('qsub command is:\n{0}'.format(qsub_command)) # submit the job proc_stdout = subprocess_cmd(command = qsub_command, return_stdout = True) # sleep after submitting the job if sleeps: sleep(sleeps) if return_stdout == True: return(proc_stdout) elif return_stdout == False: logger.debug(proc_stdout)
[docs]def monitor_jobs(jobs = None, kill_err = True, print_verbose = False, **kwargs): """ Monitors a list of qsub `Job` objects for completion. Job monitoring is accomplished by calling each job's `present()` and `error()` methods, then waiting for several seconds. Jobs that are no longer present in `qstat` or have an error state will be removed from the monitoring queue. The function will repeatedly check each job and then wait, removing absent or errored jobs, until no jobs remain in the monitoring queue. Optionally, jobs that had an error status will be killed with the `qdel` command, or else they will remain in `qstat` indefinitely. This function allows your program to wait for jobs to finish running before continuing. Parameters ---------- jobs: list a list of `Job` objects kill_err: bool `True` or `False`, whether or not jobs left in error state should be automatically killed. Its recommened to leave this `True` print_verbose: bool whether or not descriptions of the steps being taken should be printed to the console with Python's `print` function Returns ------- tuple a tuple of lists containing `Job` objects, in the format: `(completed_jobs, err_jobs)` Notes ----- This function will only check whether a job is present/absent in the `qstat` queue, or in an error state in the `qstat` queue; it does not actually check if a job is in a 'Running' state. If a job is present and not in error state, it is assumed to either be 'qw' (waiting to run), or 'r' (running). In both cases, it is assumed that the job will eventually finish and leave the `qstat` queue, and subsequently be removed from this function's monitoring queue. Jobs in 'Eqw' error state are stuck and will not leave on their own so must be removed automatically by this function, or killed manually by the end user. The ``jobs`` is mutable and passed by reference; this means that upon completion of this function, the original ``jobs`` list will be depleted:: >>> import qsub >>> jobs = [] >>> len(jobs) 0 >>> for i in range(5): ... job = qsub.submit('sleep 20') ... jobs.append(job) ... >>> len(jobs) 5 >>> qsub.monitor_jobs(jobs = jobs) ([Job(id = 4098911, name = python, log_dir = None), Job(id = 4098913, name = python, log_dir = None), Job(id = 4098915, name = python, log_dir = None), Job(id = 4098912, name = python, log_dir = None), Job(id = 4098914, name = python, log_dir = None)], []) >>> len(jobs) 0 Examples -------- Example usage:: job = submit(print_verbose = True) completed_jobs, err_jobs = monitor_jobs([job], print_verbose = True) [job.validate_completion() for job in completed_jobs] """ # make sure jobs were passed if not jobs or len(jobs) < 1: logger.error('No jobs to monitor') return() # make sure jobs is a list if not isinstance(jobs, list): logger.error('"jobs" passed is not a list') return() completed_jobs = [] # jobs in error state; won't finish err_jobs = [] num_jobs = len(jobs) logger.debug('Monitoring jobs for completion. Number of jobs in queue: {0}'.format(num_jobs)) if print_verbose: print('Monitoring jobs for completion. Number of jobs in queue: {0}'.format(num_jobs)) while num_jobs > 0: # check number of jobs in the list if num_jobs != len(jobs): num_jobs = len(jobs) logger.debug("Number of jobs in queue: {0}".format(num_jobs)) if print_verbose: print("Number of jobs in queue: {0}".format(num_jobs)) # check each job for presence & error state for i, job in enumerate(jobs): if not job.present(): completed_jobs.append(jobs.pop(i)) # jobs.remove(job) if job.error(): err_jobs.append(jobs.pop(i)) sleep(5) logger.debug('No jobs remaining in the job queue') if print_verbose: print('No jobs remaining in the job queue') # check if there were any jobs left in error state if err_jobs: logger.error('{0} jobs left were left in error state. Jobs: {1}'.format(len(err_jobs), [job.id for job in err_jobs])) if print_verbose: print('{0} jobs left were left in error state. Jobs: {1}'.format(len(err_jobs), [job.id for job in err_jobs])) # kill the error jobs with the 'qdel' command if kill_err: logger.debug('Killing jobs left in error state') if print_verbose: print('Killing jobs left in error state') qdel_command = 'qdel {0}'.format(' '.join([job.id for job in err_jobs])) cmd = tools.SubprocessCmd(command = qdel_command).run() logger.debug(cmd.proc_stdout) if print_verbose: print(cmd.proc_stdout) return((completed_jobs, err_jobs))
[docs]def kill_jobs(jobs): """ Kills qsub jobs by issuing the ``qdel`` command Parameters ---------- jobs: list a list of ``Job`` objects """ if jobs: logger.debug('Killing jobs: {0}'.format(jobs)) qdel_command = 'qdel {0}'.format(' '.join([job.id for job in jobs])) cmd = tools.SubprocessCmd(command = qdel_command).run() logger.debug(cmd.proc_stdout) logger.debug(cmd.proc_stderr) else: logger.debug("No jobs passed")
[docs]def kill_job_ids(job_ids): """ Kills qsub jobs by issuing the ``qdel`` command Parameters ---------- job_ids: list a list of job ID numbers Examples -------- Example usage:: import qsub job_ids = ['4104004', '4104006', '4104009'] qsub.kill_job_ids(job_ids = job_ids) """ if job_ids: logger.debug('Killing jobs: {0}'.format(job_ids)) qdel_command = 'qdel {0}'.format(' '.join([job_id for job_id in job_ids])) cmd = tools.SubprocessCmd(command = qdel_command).run() logger.debug(cmd.proc_stdout) logger.debug(cmd.proc_stderr) else: logger.debug("No jobs passed")
[docs]def find_all_job_id_names(text): """ Searchs a multi-line character string for all `qsub` job submission messages, where `text` represents the stdout from a series of shell commands where are assumed to have submitted a number of `qsub` jobs (e.g. by an external program) Parameters ---------- text: str a single character string, e.g. representing line(s) of text assumed to be stdout from a shell command that submitted `qsub` jobs Notes ----- This function works by parsing the provided text for lines that look like this:: Your job 3947957 ("sns.wes.SeraCare-1to1-Positive") has been submitted Examples -------- Example usage:: >>> text = '\\n\\n process sample SeraCare-1to1-Positive\\n\\n CMD: qsub -q all.q -cwd -b y -j y -N sns.wes.SeraCare-1to1-Positive -M kellys04@nyumc.org -m a -hard -l mem_free=64G -pe threaded 8-16 bash /ifs/data/molecpathlab/scripts/snsxt/sns_output/test/sns/routes/wes.sh /ifs/data/molecpathlab/scripts/snsxt/sns_output/test SeraCare-1to1-Positive\\nYour job 3947957 ("sns.wes.SeraCare-1to1-Positive") has been submitted\\n\\n' >>> [(job_id, job_name) for job_id, job_name in find_all_job_id_names(text)] [('3947957', 'sns.wes.SeraCare-1to1-Positive')] """ # split the lines text_lines = text.split('\n') for line in text_lines: # Your job 3947957 ("sns.wes.SeraCare-1to1-Positive") has been submitted parts = line.split() # ['Your', 'job', '3947957', '("sns.wes.SeraCare-1to1-Positive")', 'has', 'been', 'submitted'] if len(parts) >= 7: if parts[0] == 'Your' and parts[1] == 'job' and parts[4] == 'has' and parts[5] == 'been' and parts[6] == 'submitted': job_id, job_name = get_job_ID_name(proc_stdout = line) # job = Job(id = job_id, name = job_name) yield(job_id, job_name)
# ~~~~~~ COMPLETED JOB VALIDATION ~~~~~ #
[docs]def get_qacct(job_id): """ Gets the qacct entry for a completed qsub job """ qacct_command = 'qacct -j {0}'.format(job_id) run_cmd = tools.SubprocessCmd(command = qacct_command).run() return(run_cmd.proc_stdout)
[docs]def qacct2dict(proc_stdout): """ Converts text output from qacct into a dictionary for parsing """ entry_dict = {} entry_delim = '==============================================================' entries = [c for c in proc_stdout.split(entry_delim) if c != ''] for i, entry in enumerate(entries): entry_stats = entry.split('\n') entry_dict[i] = {} for item in entry_stats: if len(item.split(None, 1)) >=2: key, value = item.split(None, 1) entry_dict[i][key] = value.strip() return(entry_dict)
[docs]def filter_qacct(qacct_dict, days_limit = 7): """ Filters out 'bad' entries from the dict """ username = getpass.getuser() if qacct_dict: for key, subdict in qacct_dict.items(): # only keep the entries that match the current user's username job_owner = subdict['owner'] if job_owner != username: qacct_dict.pop(key) continue # make sure the job was completed within the last 7 days job_end_time = subdict['end_time'] job_end_time_obj = datetime.datetime.strptime(job_end_time, "%c") time_now = datetime.datetime.now() time_elapsed = time_now - job_end_time_obj if time_elapsed.days > days_limit: qacct_dict.pop(key) continue # more filter criteria here... return(qacct_dict)
[docs]def get_qacct_job_failed_status(failed_entry): """ Special parsing for the 'failed' entry in qacct output because its not a plain digit value its got some weird text description stuck in there too sometimes Examples -------- Example text that needs parsing:: {'failed': '100 : assumedly after job'} """ # get the first entry in the line split by whitespace value = failed_entry.split(None, 1)[0] value = int(value) return(value)
[docs]def validate_job_completion(job_id): """ Checks if a qsub job completed successfully """ # get the results of the qacct query command proc_stdout = get_qacct(job_id = job_id) # convert it into a dict qacct_dict = qacct2dict(proc_stdout = proc_stdout) # filter extraneous entries qacct_dict = filter_qacct(qacct_dict = qacct_dict) # make sure there are entrie left if not qacct_dict: print('ERROR: no valid job entries found for job_id {0}'.format(job_id)) return() # make sure only one entry is left! if len(qacct_dict.keys()) > 1: print('ERROR: multiple entries found for job_id {0};\n{1}'.format(job_id, qacct_dict)) return() # check the 'failed' status; >0 = failed !! validate_failed_status = True first_entry_key = qacct_dict.keys()[0] status_code = get_qacct_job_failed_status(failed_entry = qacct_dict[first_entry_key]['failed']) if status_code > 0: validate_failed_status = False # check the 'exit_status' validate_exit_status = True first_entry_key = qacct_dict.keys()[0] if int(qacct_dict[first_entry_key]['exit_status']) > 0: validate_exit_status = False # add more criteria here... # aggregate the validations validations = [ validate_failed_status, validate_exit_status ] # check if not all validations are True... if not all(validations): print('ERROR: The job {0} is not valid'.format(job_id)) print({'validate_failed_status': validate_failed_status, 'validate_exit_status': validate_exit_status}) return(False) else: print('The job {0} is valid'.format(job_id)) return(True)
# ~~~~~~ DEMO FUNCTIONS ~~~~~ #
[docs]def demo_qsub(): """ Demo the qsub code functions Examples -------- Example usages:: import qsub; job = qsub.submit(log_dir = "logs", print_verbose = True); qsub.monitor_jobs([job], print_verbose = True); job.validate_completion(); print(job.completions) import qsub; job = qsub.submit(log_dir = "logs", print_verbose = True, monitor = True); job.validate_completion() import qsub; job = qsub.submit(log_dir = "logs", print_verbose = True, monitor = True, validate = True) """ print('running single-job demo') command = """ set -x cat /etc/hosts sleep 10 """ job = submit(command = command, print_verbose = True) print('job id: {0}'.format(job.id)) print('job name: {0}'.format(job.name)) print('waiting on job to finish') monitor_jobs(jobs = [job]) print('job has finished\n\n') print('validating job completeion...') if job.validate_completion(): print('Job completed successfully') else: print('Job did not pass completion validation!!!') return()
[docs]def demo_multi_qsub(job_num = 3): """ Demo of the qsub code functions. Submits multiple jobs and monitors them to completion. """ job_num = int(job_num) print('running multi-job demo') command = """ set -x cat /etc/hosts sleep 10 sleep $((1 + RANDOM % 10)) """ # list to capture the jobs as they are created jobs = [] # submit a number of jobs for i in range(job_num): job = submit(command = command, print_verbose = True) print('submitted job: {0} "{1}"'.format(job.id, job.name)) jobs.append(job) # wait for jobs to finish print('waiting on jobs to finish') monitor_jobs(jobs = jobs) print('jobs have finished\n\n') print('validating job completions...') if all([job.validate_completion() for job in jobs]): print('All jobs completed successfully') else: for job in jobs: if not job.validate_completion(): print('Job {0} did not complete successfully!'.format(job.id))
if __name__ == "__main__": demo_qsub() demo_multi_qsub()