nextflow.py 11.6 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
import subprocess
5
from copy import deepcopy
6
import collections
7
8
import sys

9
10
11
12
13
14
15
16
17
18
19

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
20

21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
normalizing_fasta_template = Template('''
process normalizing_fasta {
    
    input:
    file fasta from for_normalization
    
    output:
    set file("$${fasta.baseName}_normalized.fasta"), file("$${fasta.baseName}_enum_headers.tsv") into for_analysis
    
    script:
    """
    ${helpers_path}/reduce_fasta_headers_to_enumeration.py -f $$fasta -e $${fasta.baseName}_enum_headers.tsv
    """
}
''')

37
38
analysis_template = Template ('''
process ${id} {
39

40
    input:
41
    set file(fasta), file(headers) from for_${id}
42
43

    output:
44
    set file("$${fasta}.${id}.results"), file(headers) into ${id}_results
45
46
47

    script:
    """
48
    ${cmdline}
49
50
51
    """
}
''')
52
53
54
live_results_template = Template('''
process generate_${id}_live_results {

55
    publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
56
57
58
59
60
61
62
63
64

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
65
    split_json_into_separate_files.py --json $$result --output . --tool ${id}
66
67
68
69
70
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
71

72
    input:
73
    set file(result), file(headers) from ${id}_results
74
75

    output:
76
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
77
78
79

    script:
    """
80
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
81
82
83
    """
}
''')
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100

restore_headers_json_live_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json, ${id}_json_live
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

101
102
convert_info_template = Template ('''
process convert_${id}_to_json {
103

104
    input:
105
    set file(result), file(headers) from ${id}_results
106
107

    output:
108
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
109
110
111
112
113
114
115

    script:
    """
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
    """
}
''')
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132

restore_headers_json_info_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json_info
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

133
134
convert_template = Template ('''
process convert_${id}_to_json {
135

136
    input:
137
    set file(result), file(headers) from ${id}_results
138
139

    output:
140
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
141
142
143

    script:
    """
144
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
145
146
147
    """
}
''')
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164

restore_headers_json_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

165
166
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
167
168

    input:
169
    file result from ${id}_json_info
170
171

    output:
172
    file "$${result.baseName}_info.json" into ${id}_json
173
174
175

    script:
    """
176
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
177
178
179
180
181
182
183
184
185
186
    """
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {

    input:
    file result from ${id}_json_info

    output:
187
    file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
188
189
190

    script:
    """
191
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
192
193
194
    """
}
''')
195
input_template = Template('''file ${id}_result from ${id}_json.collect()''')
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')
228
analysis_config_template = Template('''
229
    withName:${id}{
230
231
232
        executor = '${executor}'
        ${clusterOptions}
        ${beforeScript}
233
234
235
236
        ${container}
    }
    '''
    )
237
beforeScript_modul_config_template = Template('''
238
    withName:${process_name}{
239
240
241
242
        ${beforeScript}
    }
    '''
    )
243

244
245
246
247
248
249
250
251
beforeScript_norm_config_template = Template('''
    withName:normalizing_fasta{
        ${beforeScript}
    }
    '''
    )


252
253
254
255
256
257
258
259
260
261
def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)
262
263
264
265
        
    nextflow_config = generate_nextflow_config(execution)
    with open(directory + '/nextflow.config', 'w') as config_file:
        config_file.write(nextflow_config)
266

267
    if not os.path.exists(directory + '/bin'):
268
        os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
269
270
    #if not os.path.exists(directory + '/psot'):
    #    os.symlink(execution['psot_path'], directory + '/psot')
271
272

def execute_analysis(execution):
273
    old_cwd = os.getcwd()
274
    os.chdir(execution['directory'])
275
276
277
278
279
280
    command = 'nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output']
    retcode = 1
    try:
      retcode = subprocess.call(command, shell= True)
    except OSError as e:
      print("Execution failed: ", e, file=sys.stderr)
281
    os.chdir(old_cwd)
282
    return retcode
283
284
285
286
287

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
288
289
290
291
292
293
294
295
296
    fragments.append('''params.fasta = "'''+execution['fasta']+'''"''')

    if execution['use_cluster']:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta).splitFasta(by:300, file:'input')''')
    else:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta)''')    
    
    fragments.append(normalizing_fasta_template.substitute(execution))
    
297
    target_channels = ["for_"+m['id'] for m in modules]
298
    fragments.append('for_analysis.into{'+';'.join(target_channels)+';}')
299
300

    for m in modules:
301
        config = flatten(m)
302
        config['output'] = execution['output']
303
        config['helpers_path'] = execution['helpers_path']
304
305
306
307
        
        command = Template("""${analysis_script} --fasta '$$fasta' --output '$${fasta}.${id}.results' ${analysis_params}""").substitute(config)
        cmdline = subprocess.run(command, shell=True, stdout=subprocess.PIPE)
        config['cmdline'] = cmdline.stdout.decode('utf-8')
308
309

        fragments.append(analysis_template.substitute(config))
310
        if execution['mode'] == 'live' and not execution['fetch_informations']:
311
            fragments.append(convert_live_template.substitute(config))
312
            fragments.append(restore_headers_json_live_template.substitute(config))
313
            fragments.append(live_results_template.substitute(config))
314
        elif execution['mode'] == 'live' and execution['fetch_informations']:
315
            fragments.append(convert_info_template.substitute(config))
316
            fragments.append(restore_headers_json_info_template.substitute(config))
317
318
            fragments.append(retrieve_informations_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
319
        elif execution['mode'] == 'complete' and execution['fetch_informations']:
320
          fragments.append(convert_info_template.substitute(config))
321
          fragments.append(restore_headers_json_info_template.substitute(config))
322
          fragments.append(retrieve_informations_template.substitute(config))
323
        else:
324
            fragments.append(convert_template.substitute(config))
325
            fragments.append(restore_headers_json_template.substitute(config))
326
        
327
328
329
330
    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

331
332
#   fragments.append(fetch_template.substitute(flatten(m)))
    fragments.append(join_jsons_template.substitute({'inputs': '\n    '.join(json_inputs)}))
333
    fragments.append(split_jsons_template.substitute(execution))
334

335
336
    nextflow_script = '\n'.join(fragments)
    return nextflow_script
337
338
339
340
341

def generate_nextflow_config(execution):
    modules = execution['modules']
    
    fragments = []
342
343
    
    if execution['docker']:
344
345
346
347
348
349
        fragments.append('''docker {
            enabled = 'true'
            fixOwnership = 'true'
            runOptions = '--volume=/home/ubuntu/db:/databases'
        }
        ''')
350
    elif execution['singularity']:
351
352
353
354
355
        fragments.append('''singularity {
            enabled = 'true'
            runOptions = '--bind /home/ubuntu/db:/databases'
        }
        ''')
356
        
357
    fragments.append('''process { ''')
358
            
359
360
    for m in modules:
        config = {}
361
362
        config['id'] = m['id']
        
363
364
365
366
        if execution['docker'] and m['analysis']['container']['docker']:
            config['container'] = "container = " + "'" + m['analysis']['container']['docker'] + "'"
        elif execution['singularity'] and m['analysis']['container']['singularity']:
            config['container'] = "container = " + "'" + m['analysis']['container']['singularity'] + "'"       
367
368
369
370
371
372
373
374
375
        else:
            config['container'] = ''

        if execution['use_cluster']:
            config['executor'] = 'sge'
            config['clusterOptions'] = "clusterOptions = '-S /bin/bash'"
        else:
            config['executor'] = 'local'
            config['clusterOptions'] = ''
376
377
378
379
380
        
        if 'venv' in execution:
            config['beforeScript'] = "beforeScript = 'export PS1=; source " + execution['venv'] + "/bin/activate'"
            
            if execution['fetch_informations']:
381
                process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json|retrieve_informations_for_${id}').substitute(config).split('|')
382
            else:
383
                process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json').substitute(config).split('|')
384
385
                
            fragments.append(analysis_config_template.substitute(config))
386
387
388
            for process in process_names_list:
                config['process_name'] = process
                fragments.append(beforeScript_modul_config_template.substitute(config))
389
        else:
390
391
            config['beforeScript'] = ''
            fragments.append(analysis_config_template.substitute(config))
392
393
394
    
    if config['beforeScript']:
        fragments.append(beforeScript_norm_config_template.substitute(config))
395
        
396
397
398
399
400
401
    fragments.append('''}''')
    
    nextflow_config = '\n'.join(fragments)
    return nextflow_config