from string import Template import os.path import os import subprocess from copy import deepcopy import collections import sys # taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys def flatten(d, parent_key='', sep='_'): items = [] for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k if isinstance(v, collections.MutableMapping): items.extend(flatten(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items) normalizing_fasta_template = Template(''' process normalizing_fasta { input: file fasta from for_normalization output: set file("$${fasta.baseName}_normalized.fasta"), file("$${fasta.baseName}_enum_headers.tsv") into for_analysis script: """ ${helpers_path}/reduce_fasta_headers_to_enumeration.py -f $$fasta -e $${fasta.baseName}_enum_headers.tsv """ } ''') analysis_template = Template (''' process ${id} { input: set file(fasta), file(headers) from for_${id} output: set file("$${fasta}.${id}.results"), file(headers) into ${id}_results script: """ ${cmdline} """ } ''') live_results_template = Template(''' process generate_${id}_live_results { publishDir "${output}/live", mode: 'copy', pattern: '*.*.json' input: file result from ${id}_json_live output: file "*.json" into ${id}_live_results script: """ split_json_into_separate_files.py --json $$result --output . --tool ${id} """ } ''') convert_live_template = Template (''' process convert_${id}_to_json { input: set file(result), file(headers) from ${id}_results output: set file("$${result}.json"), file(headers) into ${id}_restore_headers script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') restore_headers_json_live_template = Template(''' process ${id}_restore_headers_json { input: set file(result), file(headers) from ${id}_restore_headers output: file "$${result.baseName}_restored_headers.json" into ${id}_json, ${id}_json_live script: """ ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers """ } ''') convert_info_template = Template (''' process convert_${id}_to_json { input: set file(result), file(headers) from ${id}_results output: set file("$${result}.json"), file(headers) into ${id}_restore_headers script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') restore_headers_json_info_template = Template(''' process ${id}_restore_headers_json { input: set file(result), file(headers) from ${id}_restore_headers output: file "$${result.baseName}_restored_headers.json" into ${id}_json_info script: """ ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers """ } ''') convert_template = Template (''' process convert_${id}_to_json { input: set file(result), file(headers) from ${id}_results output: set file("$${result}.json"), file(headers) into ${id}_restore_headers script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') restore_headers_json_template = Template(''' process ${id}_restore_headers_json { input: set file(result), file(headers) from ${id}_restore_headers output: file "$${result.baseName}_restored_headers.json" into ${id}_json script: """ ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers """ } ''') retrieve_informations_template = Template(''' process retrieve_informations_for_${id} { input: file result from ${id}_json_info output: file "$${result.baseName}_info.json" into ${id}_json script: """ resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json """ } ''') retrieve_informations_live_template = Template(''' process retrieve_informations_for_${id} { input: file result from ${id}_json_info output: file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live script: """ resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json """ } ''') input_template = Template('''file ${id}_result from ${id}_json.collect()''') join_jsons_template = Template(''' process join_documents { input: ${inputs} output: file "joined.json" into joined_json script: """ join_json_files.py --output joined.json *.json """ } ''') split_jsons_template = Template(''' process split_documents { publishDir "${output}", mode: 'copy' input: file "input/json.json" from joined_json output: file "*.json" into result_documents script: """ split_json_into_separate_files.py --json 'input/json.json' --output . """ } ''') analysis_config_template = Template(''' withName:${id}{ executor = '${executor}' ${clusterOptions} ${beforeScript} ${container} } ''' ) beforeScript_modul_config_template = Template(''' withName:${process_name}{ ${beforeScript} } ''' ) beforeScript_norm_config_template = Template(''' withName:normalizing_fasta{ ${beforeScript} } ''' ) def setup_execution_directory(execution): directory = execution['directory'] if not os.path.exists(directory): os.mkdir(directory) if not os.path.isdir(directory): exit() nextflow_script = generate_nextflow_script(execution) with open(directory + '/main.nf', 'w') as script_file: script_file.write(nextflow_script) nextflow_config = generate_nextflow_config(execution) with open(directory + '/nextflow.config', 'w') as config_file: config_file.write(nextflow_config) if not os.path.exists(directory + '/bin'): os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin') #if not os.path.exists(directory + '/psot'): # os.symlink(execution['psot_path'], directory + '/psot') def execute_analysis(execution): old_cwd = os.getcwd() os.chdir(execution['directory']) command = 'nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'] retcode = 1 try: retcode = subprocess.call(command, shell= True) except OSError as e: print("Execution failed: ", e, file=sys.stderr) os.chdir(old_cwd) return retcode def generate_nextflow_script(execution): modules = execution['modules'] fragments = [] fragments.append('''params.fasta = "'''+execution['fasta']+'''"''') if execution['use_cluster']: fragments.append('''for_normalization = Channel.fromPath(params.fasta).splitFasta(by:300, file:'input')''') else: fragments.append('''for_normalization = Channel.fromPath(params.fasta)''') fragments.append(normalizing_fasta_template.substitute(execution)) target_channels = ["for_"+m['id'] for m in modules] fragments.append('for_analysis.into{'+';'.join(target_channels)+';}') for m in modules: config = flatten(m) config['output'] = execution['output'] config['helpers_path'] = execution['helpers_path'] command = Template("""${analysis_script} --fasta '$$fasta' --output '$${fasta}.${id}.results' ${analysis_params}""").substitute(config) cmdline = subprocess.run(command, shell=True, stdout=subprocess.PIPE) config['cmdline'] = cmdline.stdout.decode('utf-8') fragments.append(analysis_template.substitute(config)) if execution['mode'] == 'live' and not execution['fetch_informations']: fragments.append(convert_live_template.substitute(config)) fragments.append(restore_headers_json_live_template.substitute(config)) fragments.append(live_results_template.substitute(config)) elif execution['mode'] == 'live' and execution['fetch_informations']: fragments.append(convert_info_template.substitute(config)) fragments.append(restore_headers_json_info_template.substitute(config)) fragments.append(retrieve_informations_live_template.substitute(config)) fragments.append(live_results_template.substitute(config)) elif execution['mode'] == 'complete' and execution['fetch_informations']: fragments.append(convert_info_template.substitute(config)) fragments.append(restore_headers_json_info_template.substitute(config)) fragments.append(retrieve_informations_template.substitute(config)) else: fragments.append(convert_template.substitute(config)) fragments.append(restore_headers_json_template.substitute(config)) json_inputs = [] for m in modules: json_inputs.append(input_template.substitute(m)) # fragments.append(fetch_template.substitute(flatten(m))) fragments.append(join_jsons_template.substitute({'inputs': '\n '.join(json_inputs)})) fragments.append(split_jsons_template.substitute(execution)) nextflow_script = '\n'.join(fragments) return nextflow_script def generate_nextflow_config(execution): modules = execution['modules'] fragments = [] if execution['docker']: fragments.append('''docker { enabled = 'true' fixOwnership = 'true' runOptions = '--volume=/home/ubuntu/db:/databases' } ''') elif execution['singularity']: fragments.append('''singularity { enabled = 'true' runOptions = '--bind /home/ubuntu/db:/databases' } ''') fragments.append('''process { ''') for m in modules: config = {} config['id'] = m['id'] if execution['docker'] and m['analysis']['container']['docker']: config['container'] = "container = " + "'" + m['analysis']['container']['docker'] + "'" elif execution['singularity'] and m['analysis']['container']['singularity']: config['container'] = "container = " + "'" + m['analysis']['container']['singularity'] + "'" else: config['container'] = '' if execution['use_cluster']: config['executor'] = 'sge' config['clusterOptions'] = "clusterOptions = '-S /bin/bash'" else: config['executor'] = 'local' config['clusterOptions'] = '' if 'venv' in execution: config['beforeScript'] = "beforeScript = 'export PS1=; source " + execution['venv'] + "/bin/activate'" if execution['fetch_informations']: process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json|retrieve_informations_for_${id}').substitute(config).split('|') else: process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json').substitute(config).split('|') fragments.append(analysis_config_template.substitute(config)) for process in process_names_list: config['process_name'] = process fragments.append(beforeScript_modul_config_template.substitute(config)) else: config['beforeScript'] = '' fragments.append(analysis_config_template.substitute(config)) if config['beforeScript']: fragments.append(beforeScript_norm_config_template.substitute(config)) fragments.append('''}''') nextflow_config = '\n'.join(fragments) return nextflow_config