Source code for snsxt.job_management

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Functions for custom management of compute cluster qsub jobs
"""
# ~~~~~ LOGGING ~~~~~~ #
import os
from util import log
from util import qsub
import logging
import _exceptions as _e

logger = logging.getLogger(__name__)

# path to the script's dir
scriptdir = os.path.dirname(os.path.realpath(__file__))
scriptname = os.path.basename(__file__)
script_timestamp = log.timestamp()


# ~~~~~ GLOBALS ~~~~~~ #
# list to capture qsub jobs submitted but not monitored by a task
background_jobs = []
"""
If an analysis task generated qsub jobs, but did not wait for them to finish, they will be captured in this list and will be monitored to completion when `run_tasks` finishes running all tasks. This way, the program will not exit until all jobs created have finished.
"""

# ~~~~ CUSTOM FUNCTIONS ~~~~~~ #
[docs]def kill_background_jobs(): """ Kills all jobs in the ``background_jobs`` """ logger.warning("Killing background jobs: {0}".format(background_jobs)) qsub.kill_jobs(jobs = background_jobs)
[docs]def monitor_validate_background_jobs(): """ Monitors the global ``background_jobs`` until completion, then validates their completion status. """ if background_jobs: logger.debug('Background jobs will be monitored for completion and validated') monitor_validate_jobs(jobs = background_jobs) # make sure the job list was cleared if background_jobs: err_message = 'Jobs are still left in the background_jobs queue after monitoring: {0}'.format([job for job in background_jobs]) raise _e.ComputeJobInvalid(message = err_message, errors = '') else: logger.debug('No background jobs present for monitoring')
[docs]def monitor_validate_jobs(jobs): """ Monitors a list of qsub jobs until completion, then validates their completion status. Parameters ---------- jobs: list a list of of ``qsub.Job`` objects Todo ---- Need to add more error handling of invalid or error jobs here """ if not jobs: logger.debug('No jobs were passed for monitoring') # TODO: what to return here? return() logger.debug('Waiting for qsub jobs to complete:\n{0}'.format([(job.id, job.name) for job in jobs])) completed_jobs, err_jobs = qsub.monitor_jobs(jobs = jobs) logger.debug('All jobs completed') logger.debug('Validating completion status of completed jobs...') valid_jobs = [] invalid_jobs = [] for job in completed_jobs: if job.validate_completion(): valid_jobs.append(job) else: invalid_jobs.append(job) if invalid_jobs: logger.error('Some completed jobs appear invalid: {0}'.format([(job.id, job.name) for job in invalid_jobs])) if err_jobs: logger.error('Some jobs did not complete due to errors: {0}'.format([(job.id, job.name) for job in err_jobs])) if invalid_jobs or err_jobs: all_invalid_jobs = [] if invalid_jobs: for job in invalid_jobs: all_invalid_jobs.append(job) if err_jobs: for job in err_jobs: all_invalid_jobs.append(job) err_message = 'Jobs did not complete successfully:\n\n' jobs_message = '\n'.join([job.completions for job in all_invalid_jobs]) raise _e.ComputeJobInvalid(message = err_message + jobs_message, errors = '')