Source code for snsxt.run

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
Runs a series of analysis tasks

Originally designed as an extension to the sns pipeline output, with the flexibility of added ad hoc extra analysis tasks for downstream processing
"""
# ~~~~~ LOGGING ~~~~~~ #
import os
from util import log

import logging

# path to the script's dir
scriptdir = os.path.dirname(os.path.realpath(__file__))
scriptname = os.path.basename(__file__)
script_timestamp = log.timestamp()
# path to parent dir 2 levels above this script
snsxt_parent_dir = os.path.realpath(os.path.dirname(os.path.dirname(__file__)) )
# /ifs/data/molecpathlab/scripts/snsxt/

# set a timestamped log file for debug log
log_file = os.path.join(scriptdir, 'logs', '{0}.{1}.log'.format(scriptname, script_timestamp))
email_log_file = os.path.join(scriptdir, 'logs', '{0}.{1}.email.log'.format(scriptname, script_timestamp))

[docs]def logpath(): """ Returns the path to the main log file; needed by the logging.yml config file This generates dynamic output log file paths & names Returns ------- logging.FileHandler a Python logging FileHandler object configured with a log file path set dynamically at program run time """ global log_file return(log.logpath(logfile = log_file))
[docs]def email_logpath(): """ Returns the path to the email log file; needed by the logging.yml config file This generates dynamic output log file paths & names Returns ------- logging.FileHandler a Python logging FileHandler object configured with a log file path set dynamically at program run time """ return(log.logpath(logfile = email_log_file))
# load the logging config config_yaml = os.path.join(scriptdir, 'logging.yml') basic_yaml = os.path.join(scriptdir, "basic_logging.yml") if __name__ == "__main__": logger = log.log_setup(config_yaml = config_yaml, logger_name = "run") else: logger = log.log_setup(config_yaml = basic_yaml, logger_name = "run") extra_handlers = [h for h in log.get_all_handlers(logger)] """ Python logging Filehandlers to be passed throughout the program, in order to keep all submodules logging to the same file(s) set by `logpath()` and `email_logpath()` """ logger.debug("snsxt program is starting at location: {0}".format(os.path.realpath(os.path.expanduser(__file__)))) # print the paths to the log files to the log log.print_filehandler_filepaths_to_log(logger) # ~~~~~ LOAD CONFIGS ~~~~~ # import config # update program-wide config with extra items from this script config.config['snsxt_parent_dir'] = snsxt_parent_dir # snsxt/ config.config['snsxt_dir'] = scriptdir # snsxt/snsxt/ config.config['extra_handlers'] = extra_handlers config.config['sns_repo_dir'] = os.path.join(config.config['snsxt_dir'], config.config['sns_repo_dir']) # snsxt/snsxt/sns configs = config.config """ The main configurations dictionary to use for settings throughout the program. The `sns_repo_dir` value is modified at program run time, by preprending the `snsxt_dir` path (path to this script's directory). Other dict keys are set at program run time as well, including `snsxt_parent_dir`, `snsxt_dir`, and `extra_handlers` """ default_targets = os.path.join(snsxt_parent_dir, 'targets.bed') """ A .bed formatted file to use by default as the target regions for variant calling """ default_probes = os.path.join(snsxt_parent_dir, 'probes.bed') """ A .bed formatted file to use by default for CNV analysis. Must have only 3 tab-delimited columns. """ default_task_list = os.path.join(snsxt_parent_dir, "task_lists", "default.yml") """ The YAML formatted task list containing analysis tasks to be run by default """ # ~~~~ LOAD MORE PACKAGES ~~~~~~ # # system modules import sys import argparse import yaml import collections import json # this program's modules from util import tools from util import find from util import qsub from util import mutt from sns_classes.classes import SnsWESAnalysisOutput import run_tasks import job_management import validation import cleanup import setup_report import sns_tasks import mail import _exceptions as _e # add log file to email output mail.email_files.append(log_file) # add handlers to run_tasks run_tasks.extra_handlers = [h for h in extra_handlers] # ~~~~~ FUNCTIONS ~~~~~~ #
[docs]def startup(): """ Configures global attributes of other modules, and performs other actions, when the program starts up Todo ---- Integrate this with the rest of the program """ # add log file to email output mail.email_files.append(log_file) # add handlers to run_tasks run_tasks.extra_handlers = [h for h in extra_handlers]
[docs]def get_task_list(task_list_file): """ Reads the task_list from a YAML formatted file Parameters ---------- task_list_file: str the path to a YAML formatted file from which to read analysis tasks Returns ------- dict a dictionary containing the contents of the YAML `task_list_file` """ logger.debug('Loading tasks from task list file: {0}'.format(os.path.abspath(task_list_file))) # read the YAML as an OrderedDict _mapping_tag = yaml.resolver.BaseResolver.DEFAULT_MAPPING_TAG def dict_representer(dumper, data): return(dumper.represent_dict(data.items())) def dict_constructor(loader, node): return(collections.OrderedDict(loader.construct_pairs(node))) yaml.add_representer(collections.OrderedDict, dict_representer) yaml.add_constructor(_mapping_tag, dict_constructor) # get the list of tasks to run if task_list_file: with open(task_list_file, "r") as f: task_list = yaml.load(f) else: task_list = {} logger.debug('task_list config loaded: {0}'.format(task_list)) return(task_list)
[docs]def main(**kwargs): """ Main control function for the program Parameters ---------- kwargs: dict dictionary containing args to run the program, expected to be passed from `parse()` and passed on to `run_sns_tasks()` and `run_sns_tasks()` Keyword Arguments ----------------- analysis_id: str an identifier for the analysis (e.g. the NextSeq run ID) results_id: str a sub-identifier for the analysis (e.g. a timestamp) task_list_file: str the path to a YAML formatted file containing analysis tasks to be run debug_mode: bool prevents the program from halting if errors are found in qsub log output files; defaults to `False`. `True` = do not stop for qsub log errors, `False` = stop if errors are found fastq_dirs: list a list of paths to directories to use as input data locations for a new `sns` analysis. These directories should contain .fastq.gz files within two levels from the top level of the dir (e.g. at most 2 subdirs deep). The .fastq.gz files contained in these directories should keep the exact filenames output by the NextSeq; sample parsing will take place automatically. targets_bed: str path to a .bed formatted file to use as the target regions for variant calling probes_bed: str path to a .bed formatted file to use as the probes for CNV analysis pairs_sheet: str path to a .csv samplesheet to use for matching tumor and normal samples in the paired variant calling analysis steps. See GitHub for example. """ # get the args that were passed analysis_id = kwargs.pop('analysis_id', None) task_list_file = kwargs.pop('task_list_file', default_task_list) results_id = kwargs.pop('results_id', None) debug_mode = kwargs.pop('debug_mode', False) fastq_dirs = kwargs.pop('fastq_dirs', []) targets_bed = kwargs.pop('targets_bed', default_targets) probes_bed = kwargs.pop('probes_bed', default_probes) pairs_sheet = kwargs.pop('pairs_sheet', None) analysis_dir = kwargs.pop('analysis_dir', None) # make sure that analysis_dir was passed logger.debug('analysis_dir passed to script: {0}'.format(analysis_dir)) if not analysis_dir: raise _e.ArgumentError(message = 'No analysis_dir passed', errors = '') # make sure the analysis_dir exists if not tools.item_exists(analysis_dir): raise _e.AnalysisFileMissing(message = 'analysis_dir does not exist!', errors = '') # get the full path to the analysis_dir analysis_dir = os.path.realpath(os.path.expanduser(analysis_dir)) logger.info('Analysis directory will be: {0}'.format(analysis_dir)) # rebuild the kwargs with only the items chosen to pass on kwargs = { 'analysis_id': analysis_id, 'task_list_file': task_list_file, 'results_id': results_id, 'debug_mode': debug_mode, 'fastq_dirs': fastq_dirs, 'targets_bed': targets_bed, 'probes_bed': probes_bed, 'pairs_sheet': pairs_sheet } # get the task list contents task_list = get_task_list(task_list_file) # try to run all the tasks for the analysis try: # check if 'sns' is in the task list if task_list.get('sns', None): # check if there are items there if task_list['sns']: logger.debug('sns tasks:\n{0}'.format(task_list['sns'].items())) run_tasks.run_sns_tasks(task_list, analysis_dir, **kwargs) # check if there are downstream snsxt tasks if task_list.get('tasks', None): # check if there are items there if task_list['tasks']: logger.debug('downstream snsxt tasks:\n{0}'.format(task_list['tasks'].items())) run_tasks.run_snsxt_tasks(task_list, analysis_dir, **kwargs) # check if the report should be setup if task_list.get('setup_report', None): # TODO: move report out of this function and into main as part of cleanup logger.debug('Starting report setup') setup_report.setup_report(output_dir = analysis_dir, analysis_id = analysis_id, results_id = results_id) except: # run this if an exception is caught logger.exception('Encountered an exception while running tasks') job_management.kill_background_jobs() mail.email_error_output(message_file = email_log_file) else: # run this if no exception is caught mail.email_output(message_file = email_log_file) finally: # run this no matter what # run cleanup cleanup.save_configs(analysis_dir = analysis_dir)
[docs]def parse(): """ Runs the program based on CLI arguments. arg parsing happens here, if program was run as a script Returns ------- dict a dictionary of keyword arguments to pass to `main()` Examples -------- Example script usage:: snsxt$ snsxt/run.py -d mini_analysis-controls/ -f mini_analysis-controls/fastq/ -a mini_analysis -r results1 -t task_lists/dev.yml --pairs_sheet mini_analysis-controls/samples.pairs.csv_usethis """ # ~~~~ GET SCRIPT ARGS ~~~~~~ # # create the top-level parser parser = argparse.ArgumentParser(description='snsxt: sns bioinformatics pipeline extension program') # optional flags parser.add_argument("-a", "--analysis_id", default = None, type = str, dest = 'analysis_id', metavar = 'analysis_id', help="Identifier for the analysis") parser.add_argument("-t", "--task-list", default = default_task_list, dest = 'task_list_file', help="YAML formatted tasks list file to control which downstream analysis tasks get run") parser.add_argument("-r", "--results_id", default = None, type = str, dest = 'results_id', metavar = 'results_id', help="Identifier for the analysis results, e.g. timestamp used to differentiate multiple sns pipeline outputs for the same sequencing run raw analysis input files") parser.add_argument("--debug_mode", default = False, action = "store_true", dest = 'debug_mode', help="Skip analysis output validation and error checking before running downstream snsxt pipeline steps") parser.add_argument('-f', '--fastq_dir', dest = "fastq_dirs", nargs='*' , help = "Directories containing .fastq files to use in a new sns analysis") parser.add_argument('--targets', dest = 'targets_bed', help = 'Targets .bed file with regions for analysis', default = default_targets) parser.add_argument('--probes', dest = 'probes_bed', help = 'Probes .bed file with regions for CNV analysis', default = default_probes) parser.add_argument('--pairs_sheet', dest = 'pairs_sheet', help = '"samples.pairs.csv" samplesheet to use for paired analysis', default = None) # required flags parser.add_argument('-d', '--analysis_dir', dest = "analysis_dir", help = "Path to the to use for the analysis. For a new sns analysis, this will become the output directory. For an existing sns analysis output, this will become the input directory", required = True) # parse the args and run the default parser function args = parser.parse_args() main(**vars(args))
# ~~~~ RUN ~~~~~~ # if __name__ == "__main__": parse()