Select Git revision
Forked from
cfankep / dbman
Source project has a limited visibility.
nextflow.py 6.38 KiB
from string import Template
import os.path
import os
from copy import deepcopy
import collections
# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
items = []
for k, v in d.items():
new_key = parent_key + sep + k if parent_key else k
if isinstance(v, collections.MutableMapping):
items.extend(flatten(v, new_key, sep=sep).items())
else:
items.append((new_key, v))
return dict(items)
analysis_template = Template ('''
process ${id} {
executor '${executor}'
${clusterOptions}
input:
file fasta from for_${id}${chunks}
output:
file "$${fasta}.${id}.results" into ${id}_results
script:
"""
${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
"""
}
''')
live_results_template = Template('''
process generate_${id}_live_results {
publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
input:
file result from ${id}_json_live
output:
file "*.json" into ${id}_live_results
script:
"""
split_json_into_separate_files.py --json $$result --output . --tool ${id}
"""
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
input:
file result from ${id}_results
output:
file "$${result}.json" into ${id}_json, ${id}_json_live
script:
"""
${converter_script} --result $$result --output $${result}.json ${converter_params}
"""
}
''')
convert_info_template = Template ('''
process convert_${id}_to_json {
input:
file result from ${id}_results
output:
file "$${result}.json" into ${id}_json_info
script:
"""
${converter_script} --result $$result --output $${result}.json ${converter_params}
"""
}
''')
convert_template = Template ('''
process convert_${id}_to_json {
input:
file result from ${id}_results
output:
file "$${result}.json" into ${id}_json
script:
"""
${converter_script} --result $$result --output $${result}.json ${converter_params}
"""
}
''')
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
input:
file result from ${id}_json_info
output:
file "$${result.baseName}_info.json" into ${id}_json
script:
"""
resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
"""
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {
input:
file result from ${id}_json_info
output:
file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
script:
"""
resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
"""
}
''')
input_template = Template(''' file ${id}_result from ${id}_json.collect()''')
join_jsons_template = Template('''
process join_documents {
input:
${inputs}
output:
file "joined.json" into joined_json
script:
"""
join_json_files.py --output joined.json *.json
"""
}
''')
split_jsons_template = Template('''
process split_documents {
publishDir "${output}", mode: 'copy'
input:
file "input/json.json" from joined_json
output:
file "*.json" into result_documents
script:
"""
split_json_into_separate_files.py --json 'input/json.json' --output .
"""
}
''')
def setup_execution_directory(execution):
directory = execution['directory']
if not os.path.exists(directory):
os.mkdir(directory)
if not os.path.isdir(directory):
exit()
nextflow_script = generate_nextflow_script(execution)
with open(directory + '/main.nf', 'w') as script_file:
script_file.write(nextflow_script)
if not os.path.exists(directory + '/bin'):
os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
#if not os.path.exists(directory + '/psot'):
# os.symlink(execution['psot_path'], directory + '/psot')
def execute_analysis(execution):
old_cwd = os.getcwd()
os.chdir(execution['directory'])
os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
os.chdir(old_cwd)
def generate_nextflow_script(execution):
modules = execution['modules']
fragments = []
fragments.append('''params.fasta = "'''+execution['fasta']+'''"
Channel.fromPath(params.fasta).set{fasta}''')
target_channels = ["for_"+m['id'] for m in modules]
fragments.append('fasta.into{'+';'.join(target_channels)+';}')
for m in modules:
config = flatten(m)
if execution['use_cluster']:
config['executor'] = 'sge'
config['chunks'] = ".splitFasta(by:300, file:'input')"
config['clusterOptions'] = "clusterOptions='-S /bin/bash'"
else:
config['executor'] = 'local'
config['chunks'] = ''
config['clusterOptions'] = ''
fragments.append(analysis_template.substitute(config))
if execution['mode'] == 'live' and not execution['fetch_informations']:
fragments.append(convert_live_template.substitute(flatten(m)))
copy = deepcopy(m)
copy['output'] = execution['output']
fragments.append(live_results_template.substitute(flatten(copy)))
elif execution['mode'] == 'live' and execution['fetch_informations']:
fragments.append(convert_info_template.substitute(flatten(m)))
fragments.append(retrieve_informations_live_template.substitute(flatten(m)))
copy = deepcopy(m)
copy['output'] = execution['output']
fragments.append(live_results_template.substitute(flatten(copy)))
elif execution['mode'] == 'complete' and execution['fetch_informations']:
fragments.append(convert_info_template.substitute(flatten(m)))
fragments.append(retrieve_informations_template.substitute(flatten(m)))
else:
fragments.append(convert_template.substitute(flatten(m)))
json_inputs = []
for m in modules:
json_inputs.append(input_template.substitute(m))
# fragments.append(fetch_template.substitute(flatten(m)))
fragments.append(join_jsons_template.substitute({'inputs': '\n'.join(json_inputs)}))
fragments.append(split_jsons_template.substitute(execution))
nextflow_script = '\n'.join(fragments)
return nextflow_script