nextflow.py 4.29 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
from copy import deepcopy
5
6
7
8
9
10
11
12
13
14
15
16
import collections

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
17
18
19
20
21
22
23
24
25
26
27

analysis_template = Template ('''
process ${id} {
    input:
    file fasta from for_${id}

    output:
    file "$${fasta}.${id}.results" into ${id}_results

    script:
    """
28
    ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
29
30
31
    """
}
''')
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
live_results_template = Template('''
process generate_${id}_live_results {

    publishDir "${output}/live", mode: 'copy', pattern: '*????????-????-????-????-????????????.json'

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
    split_json_into_separate_files.py --json $$result --output . --uuid
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json, ${id}_json_live

    script:
    """
59
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
60
61
62
    """
}
''')
63
64
65
66
67
68
69
70
71
72
convert_template = Template ('''
process convert_${id}_to_json {
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json

    script:
    """
73
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
    """
}
''')
input_template = Template('''    file ${id}_result from ${id}_json''')
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')

def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)

122
123
124
125
    if not os.path.exists(directory + '/bin'):
        os.symlink(execution['script_path'], directory + '/bin')
    if not os.path.exists(directory + '/psot'):
        os.symlink(execution['psot_path'], directory + '/psot')
126
127

def execute_analysis(execution):
128
    old_cwd = os.getcwd()
129
    os.chdir(execution['directory'])
130
131
    os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
    os.chdir(old_cwd)
132
133
134
135
136
137
138
139
140
141
142

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
    fragments.append('''params.fasta = "example/proteins.fas"
Channel.fromPath(params.fasta).set{fasta}''')
    target_channels = ["for_"+m['id'] for m in modules]
    fragments.append('fasta.into{'+';'.join(target_channels)+';}')

    for m in modules:
143
        fragments.append(analysis_template.substitute(flatten(m)))
144
        if execution['mode'] == 'live':
145
            fragments.append(convert_live_template.substitute(flatten(m)))
146
147
            copy = deepcopy(m)
            copy['output'] = execution['output']
148
            fragments.append(live_results_template.substitute(flatten(copy)))
149
        else:
150
            fragments.append(convert_template.substitute(flatten(m)))
151
152
153
154
155
156
157
158
159
160

    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

    fragments.append(join_jsons_template.substitute({'inputs': '\n'.join(json_inputs)}))
    fragments.append(split_jsons_template.substitute(execution))
        
    nextflow_script = '\n'.join(fragments)
    return nextflow_script