from string import Template import os.path import os from copy import deepcopy import collections # taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys def flatten(d, parent_key='', sep='_'): items = [] for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k if isinstance(v, collections.MutableMapping): items.extend(flatten(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items) analysis_template = Template (''' process ${id} { executor '${executor}' ${clusterOptions} input: file fasta from for_${id}${chunks} output: file "$${fasta}.${id}.results" into ${id}_results script: """ ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params} """ } ''') live_results_template = Template(''' process generate_${id}_live_results { publishDir "${output}/live", mode: 'copy', pattern: '*????????-????-????-????-????????????.json' input: file result from ${id}_json_live output: file "*.json" into ${id}_live_results script: """ split_json_into_separate_files.py --json $$result --output . --uuid """ } ''') convert_live_template = Template (''' process convert_${id}_to_json { input: file result from ${id}_results output: file "$${result}.json" into ${id}_json, ${id}_json_live script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') convert_info_template = Template (''' process convert_${id}_to_json { input: file result from ${id}_results output: file "$${result}.json" into ${id}_json_info script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') convert_template = Template (''' process convert_${id}_to_json { input: file result from ${id}_results output: file "$${result}.json" into ${id}_json script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') retrieve_informations_template = Template(''' process retrieve_informations_for_${id} { input: file result from ${id}_json_info output: file "$${result.simpleName}_info.json" into ${id}_json script: """ resolve_dbxrefs.py --input $$result --output $${result.simpleName}_info.json """ } ''') retrieve_informations_live_template = Template(''' process retrieve_informations_for_${id} { input: file result from ${id}_json_info output: file "$${result.simpleName}_info.json" into ${id}_json, ${id}_json_live script: """ resolve_dbxrefs.py --input $$result --output $${result.simpleName}_info.json """ } ''') input_template = Template(''' file ${id}_result from ${id}_json.collect()''') join_jsons_template = Template(''' process join_documents { input: ${inputs} output: file "joined.json" into joined_json script: """ join_json_files.py --output joined.json *.json """ } ''') split_jsons_template = Template(''' process split_documents { publishDir "${output}", mode: 'copy' input: file "input/json.json" from joined_json output: file "*.json" into result_documents script: """ split_json_into_separate_files.py --json 'input/json.json' --output . """ } ''') def setup_execution_directory(execution): directory = execution['directory'] if not os.path.exists(directory): os.mkdir(directory) if not os.path.isdir(directory): exit() nextflow_script = generate_nextflow_script(execution) with open(directory + '/main.nf', 'w') as script_file: script_file.write(nextflow_script) if not os.path.exists(directory + '/bin'): os.symlink(execution['script_path'], directory + '/bin') if not os.path.exists(directory + '/psot'): os.symlink(execution['psot_path'], directory + '/psot') def execute_analysis(execution): old_cwd = os.getcwd() os.chdir(execution['directory']) os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output']) os.chdir(old_cwd) def generate_nextflow_script(execution): modules = execution['modules'] fragments = [] fragments.append('''params.fasta = "example/proteins.fas" Channel.fromPath(params.fasta).set{fasta}''') target_channels = ["for_"+m['id'] for m in modules] fragments.append('fasta.into{'+';'.join(target_channels)+';}') for m in modules: config = flatten(m) if execution['use_cluster']: config['executor'] = 'sge' config['chunks'] = ".splitFasta(by:300, file:'input')" config['clusterOptions'] = "clusterOptions='-S /bin/bash'" else: config['executor'] = 'local' config['chunks'] = '' config['clusterOptions'] = '' fragments.append(analysis_template.substitute(config)) if execution['mode'] == 'live' and not execution['fetch_informations']: fragments.append(convert_live_template.substitute(flatten(m))) copy = deepcopy(m) copy['output'] = execution['output'] fragments.append(live_results_template.substitute(flatten(copy))) elif execution['mode'] == 'live' and execution['fetch_informations']: fragments.append(convert_info_template.substitute(flatten(m))) fragments.append(retrieve_informations_live_template.substitute(flatten(m))) copy = deepcopy(m) copy['output'] = execution['output'] fragments.append(live_results_template.substitute(flatten(copy))) elif execution['mode'] == 'complete' and execution['fetch_informations']: fragments.append(convert_info_template.substitute(flatten(m))) fragments.append(retrieve_informations_template.substitute(flatten(m))) else: fragments.append(convert_template.substitute(flatten(m))) json_inputs = [] for m in modules: json_inputs.append(input_template.substitute(m)) # fragments.append(fetch_template.substitute(flatten(m))) fragments.append(join_jsons_template.substitute({'inputs': '\n'.join(json_inputs)})) fragments.append(split_jsons_template.substitute(execution)) nextflow_script = '\n'.join(fragments) return nextflow_script