from string import Template import os.path import os from copy import deepcopy import collections # taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys def flatten(d, parent_key='', sep='_'): items = [] for k, v in d.items(): new_key = parent_key + sep + k if parent_key else k if isinstance(v, collections.MutableMapping): items.extend(flatten(v, new_key, sep=sep).items()) else: items.append((new_key, v)) return dict(items) analysis_template = Template (''' process ${id} { input: file fasta from for_${id}${chunks} output: file "$${fasta}.${id}.results" into ${id}_results script: """ ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params} """ } ''') live_results_template = Template(''' process generate_${id}_live_results { publishDir "${output}/live", mode: 'copy', pattern: '*.*.json' input: file result from ${id}_json_live output: file "*.json" into ${id}_live_results script: """ split_json_into_separate_files.py --json $$result --output . --tool ${id} """ } ''') convert_live_template = Template (''' process convert_${id}_to_json { input: file result from ${id}_results output: file "$${result}.json" into ${id}_json, ${id}_json_live script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') convert_info_template = Template (''' process convert_${id}_to_json { input: file result from ${id}_results output: file "$${result}.json" into ${id}_json_info script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') convert_template = Template (''' process convert_${id}_to_json { input: file result from ${id}_results output: file "$${result}.json" into ${id}_json script: """ ${converter_script} --result $$result --output $${result}.json ${converter_params} """ } ''') retrieve_informations_template = Template(''' process retrieve_informations_for_${id} { input: file result from ${id}_json_info output: file "$${result.baseName}_info.json" into ${id}_json script: """ resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json """ } ''') retrieve_informations_live_template = Template(''' process retrieve_informations_for_${id} { input: file result from ${id}_json_info output: file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live script: """ resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json """ } ''') input_template = Template('''file ${id}_result from ${id}_json.collect()''') join_jsons_template = Template(''' process join_documents { input: ${inputs} output: file "joined.json" into joined_json script: """ join_json_files.py --output joined.json *.json """ } ''') split_jsons_template = Template(''' process split_documents { publishDir "${output}", mode: 'copy' input: file "input/json.json" from joined_json output: file "*.json" into result_documents script: """ split_json_into_separate_files.py --json 'input/json.json' --output . """ } ''') analysis_config_template = Template(''' withName:${id}{ executor = '${executor}' ${clusterOptions} ${beforeScript} ${container} } ''' ) beforeScript_config_template = Template(''' withName:${process_names}{ ${beforeScript} } ''' ) def setup_execution_directory(execution): directory = execution['directory'] if not os.path.exists(directory): os.mkdir(directory) if not os.path.isdir(directory): exit() nextflow_script = generate_nextflow_script(execution) with open(directory + '/main.nf', 'w') as script_file: script_file.write(nextflow_script) nextflow_config = generate_nextflow_config(execution) with open(directory + '/nextflow.config', 'w') as config_file: config_file.write(nextflow_config) if not os.path.exists(directory + '/bin'): os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin') #if not os.path.exists(directory + '/psot'): # os.symlink(execution['psot_path'], directory + '/psot') def execute_analysis(execution): old_cwd = os.getcwd() os.chdir(execution['directory']) os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output']) os.chdir(old_cwd) def generate_nextflow_script(execution): modules = execution['modules'] fragments = [] fragments.append('''params.fasta = "'''+execution['fasta']+'''" Channel.fromPath(params.fasta).set{fasta}''') target_channels = ["for_"+m['id'] for m in modules] fragments.append('fasta.into{'+';'.join(target_channels)+';}') for m in modules: config = flatten(m) config['output'] = execution['output'] if execution['use_cluster']: config['chunks'] = ".splitFasta(by:300, file:'input')" else: config['chunks'] = '' fragments.append(analysis_template.substitute(config)) if execution['mode'] == 'live' and not execution['fetch_informations']: fragments.append(convert_live_template.substitute(config)) fragments.append(live_results_template.substitute(config)) elif execution['mode'] == 'live' and execution['fetch_informations']: fragments.append(convert_info_template.substitute(config)) fragments.append(retrieve_informations_live_template.substitute(config)) fragments.append(live_results_template.substitute(config)) elif execution['mode'] == 'complete' and execution['fetch_informations']: fragments.append(convert_info_template.substitute(config)) fragments.append(retrieve_informations_template.substitute(config)) else: fragments.append(convert_template.substitute(config)) json_inputs = [] for m in modules: json_inputs.append(input_template.substitute(m)) # fragments.append(fetch_template.substitute(flatten(m))) fragments.append(join_jsons_template.substitute({'inputs': '\n '.join(json_inputs)})) fragments.append(split_jsons_template.substitute(execution)) nextflow_script = '\n'.join(fragments) return nextflow_script def generate_nextflow_config(execution): modules = execution['modules'] fragments = [] fragments.append('''docker.enabled = true''') fragments.append('''process { ''') for m in modules: config = {} config['id'] = m['id'] if m['analysis']['container']: config['container'] = "container = " + "'" + m['analysis']['container'] + "'" else: config['container'] = '' if execution['use_cluster']: config['executor'] = 'sge' config['clusterOptions'] = "clusterOptions = '-S /bin/bash'" else: config['executor'] = 'local' config['clusterOptions'] = '' if 'venv' in execution: config['beforeScript'] = "beforeScript = 'export PS1=; source " + execution['venv'] + "/bin/activate'" if execution['fetch_informations']: config['process_names'] = "'" + Template('convert_${id}_to_json|retrieve_informations_for_${id}').substitute(config) + "'" else: config['process_names'] = Template('convert_${id}_to_json').substitute(config) fragments.append(analysis_config_template.substitute(config)) fragments.append(beforeScript_config_template.substitute(config)) else: config['beforeScript'] = '' fragments.append(analysis_config_template.substitute(config)) fragments.append('''}''') nextflow_config = '\n'.join(fragments) return nextflow_config