Skip to content
Snippets Groups Projects
Select Git revision
  • c1aafe512bf9bbae85369d5a5aeb5722a76b0ef8
  • master default protected
2 results

nextflow.py

Blame
  • main.py 4.79 KiB
    #!/usr/bin/env python3
    import argparse
    import os
    from psot.config import load_config
    import copy
    import shutil
    from psot.nextflow import setup_execution_directory, execute_analysis
    import tempfile
    import sys
    import json
    
    def main():
        parser = argparse.ArgumentParser(description='Make bioinformatic observations on aminoacid sequences')
        parser.set_defaults(func=help)
        parser.add_argument('--repository', '-r', action='append', help='Include the given repository')
    
        subparsers = parser.add_subparsers()
        info_parser = subparsers.add_parser('info')
        info_parser.add_argument('--listanalyses', '-l', action='store_true', help='Show available analysis steps')
        info_parser.set_defaults(func=info)
    
        analyze_parser = subparsers.add_parser('analyze')
    
        analyze_parser.add_argument('--fasta', '-f', required=True, help='A fasta file with aminoacid sequences')
        analyze_parser.add_argument('--output', '-o', required=True, help='The output directory for the json documents')
        analyze_parser.add_argument('--profile', '-p', default='fast', help='The profile to use')
        analyze_parser.add_argument('--live', '-l', action='store_true', help='Report results as they are computed, not only at the end of the computation. The live results will be available in the $output/live.')
        analyze_parser.add_argument('--config', '-c', help='The config to use')
        analyze_parser.add_argument('--fetch_informations', '-i', action='store_true', help='Fetch informations')
        analyze_parser.add_argument('--debug', '-d', action='store_true', help='Debug mode, computation directory will not be removed after computation')
        analyze_parser.add_argument('--execution_dir', '-e', help='Use the specified execution directory and do not delete it after the computation')
        analyze_parser.add_argument('--use_cluster', '-C', action='store_true', help='Use compute cluster for execution')
        analyze_parser.set_defaults(func=analyze)
    
        args = parser.parse_args()
        config = load_config(repositories=args.repository)
        args.parser = parser
        args.func(args, config)
    
    def help(args, config):
        args.parser.print_help()
    
    def info(args, config):
        show_analyses(config)
    
    def analyze(args, config):
        if not os.path.isfile(args.fasta):
          print("Given fasta file does not exist")
          sys.exit(1)
        execution = generate_execution(config, args)
        if args.debug:
            print(json.dumps(execution, indent=2))
        setup_execution_directory(execution)
        execute_analysis(execution)
        cleanup(execution)
    
    def cleanup(execution):
        if not execution['debug']:
            shutil.rmtree(execution['directory'])
    
    def generate_execution(config, args):
        execution = {}
        execution['debug'] = args.debug
        execution['use_cluster'] = args.use_cluster
        execution['fetch_informations'] = args.fetch_informations 
        execution['mode'] = 'live' if args.live else 'complete'
        execution['fasta'] = os.path.abspath(args.fasta)
        execution['output'] = os.path.abspath(args.output)
        execution['install_path'] = config['install_path']
        if 'venv' in config:
            execution['venv'] = config['venv']
        if args.execution_dir:
            execution['directory'] = os.path.abspath(args.execution_dir)
        else:
            if args.use_cluster:
                execution['directory'] = tempfile.mkdtemp(dir='/vol/sge-tmp')
            else:
                execution['directory'] = tempfile.mkdtemp()
        execution['modules'] = generate_execution_modules_for_profile(config, args.profile)
        return execution
        
    def generate_execution_modules_for_profile(config, profile):
        # find profile by name
        p = [x for x in config['profiles'] if x['name'] == profile][0]
        modules = copy.deepcopy(p['modules'])
        # generate unique ids for each module
        for module in modules:
            module['id'] = module['name']
            module['analysis']['params'] = generate_params_string(module['analysis']['parameters'])
            module['converter']['params'] = generate_params_string(module['converter']['parameters'])
        return modules
    
    def generate_params_string(options):
        params = ''
        if options:
            l = []
            for k in options:
                if options[k]:
                    l.append('--' + k + " '" + options[k] + "'")
                else:
                    l.append('--' + k)
            params = ' '.join(l)
        return params
    
    
    def show_analyses(config):
        print('Profiles:')
        for profile in config['profiles']:
            print('   {0:<20} - {1}'.format(profile['name'], profile['info']))
            if 'modules' in profile:
                for module in profile['modules']:
                    print('      {0:<20}'.format(module['name']))
    
        print()
        print('Available modules for custom profile:')
        for key in config['modules']:
            module = config['modules'][key]
            print('   {0:<20} - {1}'.format(module['name'], module['info']))
    
    if __name__ == "__main__":
        main()