nextflow.py 9.85 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
from copy import deepcopy
5
6
7
8
9
10
11
12
13
14
15
16
import collections

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
17

18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
normalizing_fasta_template = Template('''
process normalizing_fasta {
    
    input:
    file fasta from for_normalization
    
    output:
    set file("$${fasta.baseName}_normalized.fasta"), file("$${fasta.baseName}_enum_headers.tsv") into for_analysis
    
    script:
    """
    ${helpers_path}/reduce_fasta_headers_to_enumeration.py -f $$fasta -e $${fasta.baseName}_enum_headers.tsv
    """
}
''')

34
35
analysis_template = Template ('''
process ${id} {
36

37
    input:
38
    set file(fasta), file(headers) from for_${id}
39
40

    output:
41
    set file("$${fasta}.${id}.results"), file(headers) into ${id}_results
42
43
44

    script:
    """
45
    ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
46
47
48
    """
}
''')
49
50
51
live_results_template = Template('''
process generate_${id}_live_results {

52
    publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
53
54
55
56
57
58
59
60
61

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
62
    split_json_into_separate_files.py --json $$result --output . --tool ${id}
63
64
65
66
67
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
68

69
70
71
72
73
74
75
76
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json, ${id}_json_live

    script:
    """
77
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
78
79
80
    """
}
''')
81
82
convert_info_template = Template ('''
process convert_${id}_to_json {
83

84
85
86
87
88
89
90
91
92
93
94
95
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json_info

    script:
    """
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
    """
}
''')
96
97
convert_template = Template ('''
process convert_${id}_to_json {
98

99
    input:
100
    set file(result), file(headers) from ${id}_results
101
102

    output:
103
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
104
105
106

    script:
    """
107
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
108
109
110
    """
}
''')
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127

restore_headers_json_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

128
129
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
130
131

    input:
132
    file result from ${id}_json_info
133
134

    output:
135
    file "$${result.baseName}_info.json" into ${id}_json
136
137
138

    script:
    """
139
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
140
141
142
143
144
145
146
147
148
149
    """
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {

    input:
    file result from ${id}_json_info

    output:
150
    file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
151
152
153

    script:
    """
154
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
155
156
157
    """
}
''')
158
input_template = Template('''file ${id}_result from ${id}_json.collect()''')
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')
191
analysis_config_template = Template('''
192
    withName:${id}{
193
194
195
        executor = '${executor}'
        ${clusterOptions}
        ${beforeScript}
196
197
198
199
        ${container}
    }
    '''
    )
200
beforeScript_modul_config_template = Template('''
201
202
203
204
205
    withName:${process_names}{
        ${beforeScript}
    }
    '''
    )
206

207
208
209
210
211
212
213
214
beforeScript_norm_config_template = Template('''
    withName:normalizing_fasta{
        ${beforeScript}
    }
    '''
    )


215
216
217
218
219
220
221
222
223
224
def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)
225
226
227
228
        
    nextflow_config = generate_nextflow_config(execution)
    with open(directory + '/nextflow.config', 'w') as config_file:
        config_file.write(nextflow_config)
229

230
    if not os.path.exists(directory + '/bin'):
231
        os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
232
233
    #if not os.path.exists(directory + '/psot'):
    #    os.symlink(execution['psot_path'], directory + '/psot')
234
235

def execute_analysis(execution):
236
    old_cwd = os.getcwd()
237
    os.chdir(execution['directory'])
238
239
    os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
    os.chdir(old_cwd)
240
241
242
243
244

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
245
246
247
248
249
250
251
252
253
    fragments.append('''params.fasta = "'''+execution['fasta']+'''"''')

    if execution['use_cluster']:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta).splitFasta(by:300, file:'input')''')
    else:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta)''')    
    
    fragments.append(normalizing_fasta_template.substitute(execution))
    
254
    target_channels = ["for_"+m['id'] for m in modules]
255
    fragments.append('for_analysis.into{'+';'.join(target_channels)+';}')
256
257

    for m in modules:
258
        config = flatten(m)
259
        config['output'] = execution['output']
260
261

        fragments.append(analysis_template.substitute(config))
262
        if execution['mode'] == 'live' and not execution['fetch_informations']:
263
264
            fragments.append(convert_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
265
        elif execution['mode'] == 'live' and execution['fetch_informations']:
266
267
268
            fragments.append(convert_info_template.substitute(config))
            fragments.append(retrieve_informations_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
269
        elif execution['mode'] == 'complete' and execution['fetch_informations']:
270
271
          fragments.append(convert_info_template.substitute(config))
          fragments.append(retrieve_informations_template.substitute(config))
272
        else:
273
            fragments.append(convert_template.substitute(config))
274
275
276
277
        
        config['helpers_path'] = execution['helpers_path']
        fragments.append(restore_headers_json_template.substitute(config))
        
278
279
280
281
    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

282
283
#   fragments.append(fetch_template.substitute(flatten(m)))
    fragments.append(join_jsons_template.substitute({'inputs': '\n    '.join(json_inputs)}))
284
    fragments.append(split_jsons_template.substitute(execution))
285

286
287
    nextflow_script = '\n'.join(fragments)
    return nextflow_script
288
289
290
291
292

def generate_nextflow_config(execution):
    modules = execution['modules']
    
    fragments = []
293
294
295
296
297
298
    
    if execution['docker']:
        fragments.append('''docker.enabled = true''')
    elif execution['singularity']:
        fragments.append('''singularity.enabled = true''')
        
299
    fragments.append('''process { ''')
300
            
301
302
    for m in modules:
        config = {}
303
304
        config['id'] = m['id']
        
305
306
307
308
        if execution['docker'] and m['analysis']['container']['docker']:
            config['container'] = "container = " + "'" + m['analysis']['container']['docker'] + "'"
        elif execution['singularity'] and m['analysis']['container']['singularity']:
            config['container'] = "container = " + "'" + m['analysis']['container']['singularity'] + "'"       
309
310
311
312
313
314
315
316
317
        else:
            config['container'] = ''

        if execution['use_cluster']:
            config['executor'] = 'sge'
            config['clusterOptions'] = "clusterOptions = '-S /bin/bash'"
        else:
            config['executor'] = 'local'
            config['clusterOptions'] = ''
318
319
320
321
322
        
        if 'venv' in execution:
            config['beforeScript'] = "beforeScript = 'export PS1=; source " + execution['venv'] + "/bin/activate'"
            
            if execution['fetch_informations']:
323
                config['process_names'] = "'" + Template('convert_${id}_to_json|${id}_restore_headers_json|retrieve_informations_for_${id}').substitute(config) + "'"
324
            else:
325
                config['process_names'] = "'" + Template('convert_${id}_to_json|${id}_restore_headers_json').substitute(config) + "'"
326
327
                
            fragments.append(analysis_config_template.substitute(config))
328
            fragments.append(beforeScript_modul_config_template.substitute(config))
329
        else:
330
331
            config['beforeScript'] = ''
            fragments.append(analysis_config_template.substitute(config))
332
333
334
    
    if config['beforeScript']:
        fragments.append(beforeScript_norm_config_template.substitute(config))
335
        
336
337
338
339
340
341
    fragments.append('''}''')
    
    nextflow_config = '\n'.join(fragments)
    return nextflow_config