Skip to content
Snippets Groups Projects
Select Git revision
  • master
1 result

PKG-INFO

Blame
  • Forked from cfankep / dbman
    Source project has a limited visibility.
    nextflow.py 6.38 KiB
    from string import Template
    import os.path
    import os
    from copy import deepcopy
    import collections
    
    # taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
    def flatten(d, parent_key='', sep='_'):
        items = []
        for k, v in d.items():
            new_key = parent_key + sep + k if parent_key else k
            if isinstance(v, collections.MutableMapping):
                items.extend(flatten(v, new_key, sep=sep).items())
            else:
                items.append((new_key, v))
        return dict(items)
    
    analysis_template = Template ('''
    process ${id} {
        executor '${executor}'
        ${clusterOptions}
        input:
        file fasta from for_${id}${chunks}
    
        output:
        file "$${fasta}.${id}.results" into ${id}_results
    
        script:
        """
        ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
        """
    }
    ''')
    live_results_template = Template('''
    process generate_${id}_live_results {
    
        publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
    
        input:
        file result from ${id}_json_live
    
        output:
        file "*.json" into ${id}_live_results
    
        script:
        """
        split_json_into_separate_files.py --json $$result --output . --tool ${id}
        """
    }
    ''')
    convert_live_template = Template ('''
    process convert_${id}_to_json {
        input:
        file result from ${id}_results
    
        output:
        file "$${result}.json" into ${id}_json, ${id}_json_live
    
        script:
        """
        ${converter_script} --result $$result --output $${result}.json ${converter_params}
        """
    }
    ''')
    convert_info_template = Template ('''
    process convert_${id}_to_json {
        input:
        file result from ${id}_results
    
        output:
        file "$${result}.json" into ${id}_json_info
    
        script:
        """
        ${converter_script} --result $$result --output $${result}.json ${converter_params}
        """
    }
    ''')
    convert_template = Template ('''
    process convert_${id}_to_json {
        input:
        file result from ${id}_results
    
        output:
        file "$${result}.json" into ${id}_json
    
        script:
        """
        ${converter_script} --result $$result --output $${result}.json ${converter_params}
        """
    }
    ''')
    retrieve_informations_template = Template('''
    process retrieve_informations_for_${id} {
    
        input:
        file result from ${id}_json_info
    
        output:
        file "$${result.baseName}_info.json" into ${id}_json
    
        script:
        """
        resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
        """
    }
    ''')
    retrieve_informations_live_template = Template('''
    process retrieve_informations_for_${id} {
    
        input:
        file result from ${id}_json_info
    
        output:
        file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
    
        script:
        """
        resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
        """
    }
    ''')
    input_template = Template('''    file ${id}_result from ${id}_json.collect()''')
    join_jsons_template = Template('''
    process join_documents {
    
        input:
        ${inputs}
    
        output:
        file "joined.json" into joined_json
    
        script:
        """
        join_json_files.py --output joined.json *.json
        """
    }
    ''')
    split_jsons_template = Template('''
    process split_documents {
    
        publishDir "${output}", mode: 'copy'
    
        input:
        file "input/json.json" from joined_json
    
        output:
        file "*.json" into result_documents
    
        script:
        """
        split_json_into_separate_files.py --json 'input/json.json' --output .
        """
    }
    ''')
    
    def setup_execution_directory(execution):
        directory = execution['directory']
        if not os.path.exists(directory):
            os.mkdir(directory)
        if not os.path.isdir(directory):
            exit()
    
        nextflow_script = generate_nextflow_script(execution)
        with open(directory + '/main.nf', 'w') as script_file:
            script_file.write(nextflow_script)
    
        if not os.path.exists(directory + '/bin'):
            os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
        #if not os.path.exists(directory + '/psot'):
        #    os.symlink(execution['psot_path'], directory + '/psot')
    
    def execute_analysis(execution):
        old_cwd = os.getcwd()
        os.chdir(execution['directory'])
        os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
        os.chdir(old_cwd)
    
    def generate_nextflow_script(execution):
        modules = execution['modules']
    
        fragments = []
        fragments.append('''params.fasta = "'''+execution['fasta']+'''"
    Channel.fromPath(params.fasta).set{fasta}''')
        target_channels = ["for_"+m['id'] for m in modules]
        fragments.append('fasta.into{'+';'.join(target_channels)+';}')
    
        for m in modules:
            config = flatten(m)
            if execution['use_cluster']:
                config['executor'] = 'sge'
                config['chunks'] = ".splitFasta(by:300, file:'input')"
                config['clusterOptions'] = "clusterOptions='-S /bin/bash'"
            else:
                config['executor'] = 'local'
                config['chunks'] = ''
                config['clusterOptions'] = ''
    
            fragments.append(analysis_template.substitute(config))
            if execution['mode'] == 'live' and not execution['fetch_informations']:
                fragments.append(convert_live_template.substitute(flatten(m)))
                copy = deepcopy(m)
                copy['output'] = execution['output']
                fragments.append(live_results_template.substitute(flatten(copy)))
            elif execution['mode'] == 'live' and execution['fetch_informations']:
                fragments.append(convert_info_template.substitute(flatten(m)))
                fragments.append(retrieve_informations_live_template.substitute(flatten(m)))
                copy = deepcopy(m)
                copy['output'] = execution['output']
                fragments.append(live_results_template.substitute(flatten(copy)))
            elif execution['mode'] == 'complete' and execution['fetch_informations']:
              fragments.append(convert_info_template.substitute(flatten(m)))
              fragments.append(retrieve_informations_template.substitute(flatten(m)))
            else:
                fragments.append(convert_template.substitute(flatten(m)))
    
        json_inputs = []
        for m in modules:
            json_inputs.append(input_template.substitute(m))
    
    #    fragments.append(fetch_template.substitute(flatten(m)))
        fragments.append(join_jsons_template.substitute({'inputs': '\n'.join(json_inputs)}))
        fragments.append(split_jsons_template.substitute(execution))
    
        nextflow_script = '\n'.join(fragments)
        return nextflow_script