from string import Template import os.path import os from copy import deepcopy import collections # taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys def flatten(d, parent_key='', sep='_'): items = [] for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k if isinstance(v, collections.MutableMapping): items.extend(flatten(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items) analysis_template = Template (''' process ${id} { input: file fasta from for_${id} output: file "$${fasta}.${id}.results" into ${id}_results script: """ ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params} """ } ''') live_results_template = Template(''' process generate_${id}_live_results { publishDir "${output}/live", mode: 'copy', pattern: '*????????-????-????-????-????????????.json' input: file result from ${id}_json_live output: file "*.json" into ${id}_live_results script: """ split_json_into_separate_files.py --json $$result --output . --uuid """ } ''') convert_live_template = Template (''' process convert_${id}_to_json { input: file result from ${id}_results output: file "$${result}.json" into ${id}_json, ${id}_json_live script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') convert_template = Template (''' process convert_${id}_to_json { input: file result from ${id}_results output: file "$${result}.json" into ${id}_json script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') input_template = Template(''' file ${id}_result from ${id}_json''') join_jsons_template = Template(''' process join_documents { input: ${inputs} output: file "joined.json" into joined_json script: """ join_json_files.py --output joined.json *.json """ } ''') split_jsons_template = Template(''' process split_documents { publishDir "${output}", mode: 'copy' input: file "input/json.json" from joined_json output: file "*.json" into result_documents script: """ split_json_into_separate_files.py --json 'input/json.json' --output . """ } ''') def setup_execution_directory(execution): directory = execution['directory'] if not os.path.exists(directory): os.mkdir(directory) if not os.path.isdir(directory): exit() nextflow_script = generate_nextflow_script(execution) with open(directory + '/main.nf', 'w') as script_file: script_file.write(nextflow_script) if not os.path.exists(directory + '/bin'): os.symlink(execution['script_path'], directory + '/bin') if not os.path.exists(directory + '/psot'): os.symlink(execution['psot_path'], directory + '/psot') def execute_analysis(execution): old_cwd = os.getcwd() os.chdir(execution['directory']) os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output']) os.chdir(old_cwd) def generate_nextflow_script(execution): modules = execution['modules'] fragments = [] fragments.append('''params.fasta = "example/proteins.fas" Channel.fromPath(params.fasta).set{fasta}''') target_channels = ["for_"+m['id'] for m in modules] fragments.append('fasta.into{'+';'.join(target_channels)+';}') for m in modules: fragments.append(analysis_template.substitute(flatten(m))) if execution['mode'] == 'live': fragments.append(convert_live_template.substitute(flatten(m))) copy = deepcopy(m) copy['output'] = execution['output'] fragments.append(live_results_template.substitute(flatten(copy))) else: fragments.append(convert_template.substitute(flatten(m))) json_inputs = [] for m in modules: json_inputs.append(input_template.substitute(m)) fragments.append(join_jsons_template.substitute({'inputs': '\n'.join(json_inputs)})) fragments.append(split_jsons_template.substitute(execution)) nextflow_script = '\n'.join(fragments) return nextflow_script