nextflow.py 11.7 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
import subprocess
5
from copy import deepcopy
6
import collections
7
8
import sys

9
10
11
12
13
14
15
16
17
18
19

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
20

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
normalizing_fasta_template = Template('''
process normalizing_fasta {
    
    input:
    file fasta from for_normalization
    
    output:
    set file("$${fasta.baseName}_normalized.fasta"), file("$${fasta.baseName}_enum_headers.tsv") into for_analysis
    
    script:
    """
    ${helpers_path}/reduce_fasta_headers_to_enumeration.py -f $$fasta -e $${fasta.baseName}_enum_headers.tsv
    """
}
''')

37
38
analysis_template = Template ('''
process ${id} {
39

40
    input:
41
    set file(fasta), file(headers) from for_${id}
42
43

    output:
44
    set file("$${fasta}.${id}.results"), file(headers) into ${id}_results
45
46
47

    script:
    """
48
    ${cmdline}
49
50
51
    """
}
''')
52
53
54
live_results_template = Template('''
process generate_${id}_live_results {

55
    publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
56
57
58
59
60
61
62
63
64

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
65
    split_json_into_separate_files.py --json $$result --output . --tool ${id}
66
67
68
69
70
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
71

72
    input:
73
    set file(result), file(headers) from ${id}_results
74
75

    output:
76
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
77
78
79

    script:
    """
80
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
81
82
83
    """
}
''')
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

restore_headers_json_live_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json, ${id}_json_live
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

101
102
convert_info_template = Template ('''
process convert_${id}_to_json {
103

104
    input:
105
    set file(result), file(headers) from ${id}_results
106
107

    output:
108
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
109
110
111
112
113
114
115

    script:
    """
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
    """
}
''')
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

restore_headers_json_info_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json_info
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

133
134
convert_template = Template ('''
process convert_${id}_to_json {
135

136
    input:
137
    set file(result), file(headers) from ${id}_results
138
139

    output:
140
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
141
142
143

    script:
    """
144
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
145
146
147
    """
}
''')
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

restore_headers_json_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

165
166
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
167
168

    input:
169
    file result from ${id}_json_info
170
171

    output:
172
    file "$${result.baseName}_info.json" into ${id}_json
173
174
175

    script:
    """
176
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
177
178
179
180
181
182
183
184
185
186
    """
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {

    input:
    file result from ${id}_json_info

    output:
187
    file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
188
189
190

    script:
    """
191
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
192
193
194
    """
}
''')
195
input_template = Template('''file ${id}_result from ${id}_json.collect()''')
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')
228
analysis_config_template = Template('''
229
    withName:${id}{
230
231
232
        executor = '${executor}'
        ${clusterOptions}
        ${beforeScript}
233
234
235
236
        ${container}
    }
    '''
    )
237
beforeScript_modul_config_template = Template('''
238
    withName:${process_name}{
239
240
241
242
        ${beforeScript}
    }
    '''
    )
243

244
245
246
247
248
249
250
251
beforeScript_norm_config_template = Template('''
    withName:normalizing_fasta{
        ${beforeScript}
    }
    '''
    )


252
253
254
255
256
257
258
259
260
261
def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)
262
263
264
265
        
    nextflow_config = generate_nextflow_config(execution)
    with open(directory + '/nextflow.config', 'w') as config_file:
        config_file.write(nextflow_config)
266

267
    if not os.path.exists(directory + '/bin'):
268
        os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
269
270
    #if not os.path.exists(directory + '/psot'):
    #    os.symlink(execution['psot_path'], directory + '/psot')
271
272

def execute_analysis(execution):
273
    old_cwd = os.getcwd()
274
    os.chdir(execution['directory'])
275
276
277
278
279
280
    command = 'nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output']
    retcode = 1
    try:
      retcode = subprocess.call(command, shell= True)
    except OSError as e:
      print("Execution failed: ", e, file=sys.stderr)
281
    os.chdir(old_cwd)
282
    return retcode
283
284
285
286
287

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
288
289
290
291
292
293
294
295
296
    fragments.append('''params.fasta = "'''+execution['fasta']+'''"''')

    if execution['use_cluster']:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta).splitFasta(by:300, file:'input')''')
    else:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta)''')    
    
    fragments.append(normalizing_fasta_template.substitute(execution))
    
297
    target_channels = ["for_"+m['id'] for m in modules]
298
    fragments.append('for_analysis.into{'+';'.join(target_channels)+';}')
299
300

    for m in modules:
301
        config = flatten(m)
302
        config['output'] = execution['output']
303
        config['helpers_path'] = execution['helpers_path']
304
305
306
307
        
        command = Template("""${analysis_script} --fasta '$$fasta' --output '$${fasta}.${id}.results' ${analysis_params}""").substitute(config)
        cmdline = subprocess.run(command, shell=True, stdout=subprocess.PIPE)
        config['cmdline'] = cmdline.stdout.decode('utf-8')
308
309

        fragments.append(analysis_template.substitute(config))
310
        if execution['mode'] == 'live' and not execution['fetch_informations']:
311
            fragments.append(convert_live_template.substitute(config))
312
            fragments.append(restore_headers_json_live_template.substitute(config))
313
            fragments.append(live_results_template.substitute(config))
314
        elif execution['mode'] == 'live' and execution['fetch_informations']:
315
            fragments.append(convert_info_template.substitute(config))
316
            fragments.append(restore_headers_json_info_template.substitute(config))
317
318
            fragments.append(retrieve_informations_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
319
        elif execution['mode'] == 'complete' and execution['fetch_informations']:
320
          fragments.append(convert_info_template.substitute(config))
321
          fragments.append(restore_headers_json_info_template.substitute(config))
322
          fragments.append(retrieve_informations_template.substitute(config))
323
        else:
324
            fragments.append(convert_template.substitute(config))
325
            fragments.append(restore_headers_json_template.substitute(config))
326
        
327
328
329
330
    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

331
332
#   fragments.append(fetch_template.substitute(flatten(m)))
    fragments.append(join_jsons_template.substitute({'inputs': '\n    '.join(json_inputs)}))
333
    fragments.append(split_jsons_template.substitute(execution))
334

335
336
    nextflow_script = '\n'.join(fragments)
    return nextflow_script
337
338
339

def generate_nextflow_config(execution):
    modules = execution['modules']
340
    database_path = execution['database_path']
341
342
    
    fragments = []
343
344
    
    if execution['docker']:
345
346
347
        fragments.append('''docker {
            enabled = 'true'
            fixOwnership = 'true'
348
            runOptions = '--volume={path}/databases'
349
        }
350
        '''.format(path=database_path))
351
    elif execution['singularity']:
352
353
        fragments.append('''singularity {
            enabled = 'true'
354
            runOptions = '--bind {path}:/databases'
355
        }
356
        '''.format(path=database_path))
357
        
358
    fragments.append('''process { ''')
359
            
360
361
    for m in modules:
        config = {}
362
363
        config['id'] = m['id']
        
364
365
366
367
        if execution['docker'] and m['analysis']['container']['docker']:
            config['container'] = "container = " + "'" + m['analysis']['container']['docker'] + "'"
        elif execution['singularity'] and m['analysis']['container']['singularity']:
            config['container'] = "container = " + "'" + m['analysis']['container']['singularity'] + "'"       
368
369
370
371
372
373
374
375
376
        else:
            config['container'] = ''

        if execution['use_cluster']:
            config['executor'] = 'sge'
            config['clusterOptions'] = "clusterOptions = '-S /bin/bash'"
        else:
            config['executor'] = 'local'
            config['clusterOptions'] = ''
377
378
379
380
381
        
        if 'venv' in execution:
            config['beforeScript'] = "beforeScript = 'export PS1=; source " + execution['venv'] + "/bin/activate'"
            
            if execution['fetch_informations']:
382
                process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json|retrieve_informations_for_${id}').substitute(config).split('|')
383
            else:
384
                process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json').substitute(config).split('|')
385
386
                
            fragments.append(analysis_config_template.substitute(config))
387
388
389
            for process in process_names_list:
                config['process_name'] = process
                fragments.append(beforeScript_modul_config_template.substitute(config))
390
        else:
391
392
            config['beforeScript'] = ''
            fragments.append(analysis_config_template.substitute(config))
393
394
395
    
    if config['beforeScript']:
        fragments.append(beforeScript_norm_config_template.substitute(config))
396
        
397
398
399
400
401
402
    fragments.append('''}''')
    
    nextflow_config = '\n'.join(fragments)
    return nextflow_config