nextflow.py 11.2 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
import subprocess
5
from copy import deepcopy
6
7
8
9
10
11
12
13
14
15
16
17
import collections

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
18

19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
normalizing_fasta_template = Template('''
process normalizing_fasta {
    
    input:
    file fasta from for_normalization
    
    output:
    set file("$${fasta.baseName}_normalized.fasta"), file("$${fasta.baseName}_enum_headers.tsv") into for_analysis
    
    script:
    """
    ${helpers_path}/reduce_fasta_headers_to_enumeration.py -f $$fasta -e $${fasta.baseName}_enum_headers.tsv
    """
}
''')

35
36
analysis_template = Template ('''
process ${id} {
37

38
    input:
39
    set file(fasta), file(headers) from for_${id}
40
41

    output:
42
    set file("$${fasta}.${id}.results"), file(headers) into ${id}_results
43
44
45

    script:
    """
46
    ${cmdline}
47
48
49
    """
}
''')
50
51
52
live_results_template = Template('''
process generate_${id}_live_results {

53
    publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
54
55
56
57
58
59
60
61
62

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
63
    split_json_into_separate_files.py --json $$result --output . --tool ${id}
64
65
66
67
68
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
69

70
    input:
71
    set file(result), file(headers) from ${id}_results
72
73

    output:
74
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
75
76
77

    script:
    """
78
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
79
80
81
    """
}
''')
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98

restore_headers_json_live_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json, ${id}_json_live
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

99
100
convert_info_template = Template ('''
process convert_${id}_to_json {
101

102
    input:
103
    set file(result), file(headers) from ${id}_results
104
105

    output:
106
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
107
108
109
110
111
112
113

    script:
    """
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
    """
}
''')
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130

restore_headers_json_info_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json_info
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

131
132
convert_template = Template ('''
process convert_${id}_to_json {
133

134
    input:
135
    set file(result), file(headers) from ${id}_results
136
137

    output:
138
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
139
140
141

    script:
    """
142
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
143
144
145
    """
}
''')
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162

restore_headers_json_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

163
164
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
165
166

    input:
167
    file result from ${id}_json_info
168
169

    output:
170
    file "$${result.baseName}_info.json" into ${id}_json
171
172
173

    script:
    """
174
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
175
176
177
178
179
180
181
182
183
184
    """
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {

    input:
    file result from ${id}_json_info

    output:
185
    file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
186
187
188

    script:
    """
189
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
190
191
192
    """
}
''')
193
input_template = Template('''file ${id}_result from ${id}_json.collect()''')
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')
226
analysis_config_template = Template('''
227
    withName:${id}{
228
229
230
        executor = '${executor}'
        ${clusterOptions}
        ${beforeScript}
231
232
233
234
        ${container}
    }
    '''
    )
235
beforeScript_modul_config_template = Template('''
236
    withName:${process_name}{
237
238
239
240
        ${beforeScript}
    }
    '''
    )
241

242
243
244
245
246
247
248
249
beforeScript_norm_config_template = Template('''
    withName:normalizing_fasta{
        ${beforeScript}
    }
    '''
    )


250
251
252
253
254
255
256
257
258
259
def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)
260
261
262
263
        
    nextflow_config = generate_nextflow_config(execution)
    with open(directory + '/nextflow.config', 'w') as config_file:
        config_file.write(nextflow_config)
264

265
    if not os.path.exists(directory + '/bin'):
266
        os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
267
268
    #if not os.path.exists(directory + '/psot'):
    #    os.symlink(execution['psot_path'], directory + '/psot')
269
270

def execute_analysis(execution):
271
    old_cwd = os.getcwd()
272
    os.chdir(execution['directory'])
273
274
    os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
    os.chdir(old_cwd)
275
276
277
278
279

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
280
281
282
283
284
285
286
287
288
    fragments.append('''params.fasta = "'''+execution['fasta']+'''"''')

    if execution['use_cluster']:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta).splitFasta(by:300, file:'input')''')
    else:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta)''')    
    
    fragments.append(normalizing_fasta_template.substitute(execution))
    
289
    target_channels = ["for_"+m['id'] for m in modules]
290
    fragments.append('for_analysis.into{'+';'.join(target_channels)+';}')
291
292

    for m in modules:
293
        config = flatten(m)
294
        config['output'] = execution['output']
295
        config['helpers_path'] = execution['helpers_path']
296
297
298
299
        
        command = Template("""${analysis_script} --fasta '$$fasta' --output '$${fasta}.${id}.results' ${analysis_params}""").substitute(config)
        cmdline = subprocess.run(command, shell=True, stdout=subprocess.PIPE)
        config['cmdline'] = cmdline.stdout.decode('utf-8')
300
301

        fragments.append(analysis_template.substitute(config))
302
        if execution['mode'] == 'live' and not execution['fetch_informations']:
303
            fragments.append(convert_live_template.substitute(config))
304
            fragments.append(restore_headers_json_live_template.substitute(config))
305
            fragments.append(live_results_template.substitute(config))
306
        elif execution['mode'] == 'live' and execution['fetch_informations']:
307
            fragments.append(convert_info_template.substitute(config))
308
            fragments.append(restore_headers_json_info_template.substitute(config))
309
310
            fragments.append(retrieve_informations_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
311
        elif execution['mode'] == 'complete' and execution['fetch_informations']:
312
          fragments.append(convert_info_template.substitute(config))
313
          fragments.append(restore_headers_json_info_template.substitute(config))
314
          fragments.append(retrieve_informations_template.substitute(config))
315
        else:
316
            fragments.append(convert_template.substitute(config))
317
            fragments.append(restore_headers_json_template.substitute(config))
318
        
319
320
321
322
    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

323
324
#   fragments.append(fetch_template.substitute(flatten(m)))
    fragments.append(join_jsons_template.substitute({'inputs': '\n    '.join(json_inputs)}))
325
    fragments.append(split_jsons_template.substitute(execution))
326

327
328
    nextflow_script = '\n'.join(fragments)
    return nextflow_script
329
330
331
332
333

def generate_nextflow_config(execution):
    modules = execution['modules']
    
    fragments = []
334
335
336
337
338
339
    
    if execution['docker']:
        fragments.append('''docker.enabled = true''')
    elif execution['singularity']:
        fragments.append('''singularity.enabled = true''')
        
340
    fragments.append('''process { ''')
341
            
342
343
    for m in modules:
        config = {}
344
345
        config['id'] = m['id']
        
346
347
348
349
        if execution['docker'] and m['analysis']['container']['docker']:
            config['container'] = "container = " + "'" + m['analysis']['container']['docker'] + "'"
        elif execution['singularity'] and m['analysis']['container']['singularity']:
            config['container'] = "container = " + "'" + m['analysis']['container']['singularity'] + "'"       
350
351
352
353
354
355
356
357
358
        else:
            config['container'] = ''

        if execution['use_cluster']:
            config['executor'] = 'sge'
            config['clusterOptions'] = "clusterOptions = '-S /bin/bash'"
        else:
            config['executor'] = 'local'
            config['clusterOptions'] = ''
359
360
361
362
363
        
        if 'venv' in execution:
            config['beforeScript'] = "beforeScript = 'export PS1=; source " + execution['venv'] + "/bin/activate'"
            
            if execution['fetch_informations']:
364
                process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json|retrieve_informations_for_${id}').substitute(config).split('|')
365
            else:
366
                process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json').substitute(config).split('|')
367
368
                
            fragments.append(analysis_config_template.substitute(config))
369
370
371
            for process in process_names_list:
                config['process_name'] = process
                fragments.append(beforeScript_modul_config_template.substitute(config))
372
        else:
373
374
            config['beforeScript'] = ''
            fragments.append(analysis_config_template.substitute(config))
375
376
377
    
    if config['beforeScript']:
        fragments.append(beforeScript_norm_config_template.substitute(config))
378
        
379
380
381
382
383
384
    fragments.append('''}''')
    
    nextflow_config = '\n'.join(fragments)
    return nextflow_config