nextflow.py 11 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
from copy import deepcopy
5
6
7
8
9
10
11
12
13
14
15
16
import collections

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
17

18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
normalizing_fasta_template = Template('''
process normalizing_fasta {
    
    input:
    file fasta from for_normalization
    
    output:
    set file("$${fasta.baseName}_normalized.fasta"), file("$${fasta.baseName}_enum_headers.tsv") into for_analysis
    
    script:
    """
    ${helpers_path}/reduce_fasta_headers_to_enumeration.py -f $$fasta -e $${fasta.baseName}_enum_headers.tsv
    """
}
''')

34
35
analysis_template = Template ('''
process ${id} {
36

37
    input:
38
    set file(fasta), file(headers) from for_${id}
39
40

    output:
41
    set file("$${fasta}.${id}.results"), file(headers) into ${id}_results
42
43
44

    script:
    """
45
    ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
46
47
48
    """
}
''')
49
50
51
live_results_template = Template('''
process generate_${id}_live_results {

52
    publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
53
54
55
56
57
58
59
60
61

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
62
    split_json_into_separate_files.py --json $$result --output . --tool ${id}
63
64
65
66
67
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
68

69
    input:
70
    set file(result), file(headers) from ${id}_results
71
72

    output:
73
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
74
75
76

    script:
    """
77
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
78
79
80
    """
}
''')
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97

restore_headers_json_live_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json, ${id}_json_live
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

98
99
convert_info_template = Template ('''
process convert_${id}_to_json {
100

101
    input:
102
    set file(result), file(headers) from ${id}_results
103
104

    output:
105
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
106
107
108
109
110
111
112

    script:
    """
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
    """
}
''')
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129

restore_headers_json_info_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json_info
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

130
131
convert_template = Template ('''
process convert_${id}_to_json {
132

133
    input:
134
    set file(result), file(headers) from ${id}_results
135
136

    output:
137
    set file("$${result}.json"), file(headers) into ${id}_restore_headers
138
139
140

    script:
    """
141
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
142
143
144
    """
}
''')
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161

restore_headers_json_template = Template('''
process ${id}_restore_headers_json {
    
    input:
    set file(result), file(headers) from ${id}_restore_headers
    
    output:
    file "$${result.baseName}_restored_headers.json" into ${id}_json
    
    script:
    """
    ${helpers_path}/restore_seq_id_from_enumeration.py -j $$result -e $$headers
    """
}
''')

162
163
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
164
165

    input:
166
    file result from ${id}_json_info
167
168

    output:
169
    file "$${result.baseName}_info.json" into ${id}_json
170
171
172

    script:
    """
173
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
174
175
176
177
178
179
180
181
182
183
    """
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {

    input:
    file result from ${id}_json_info

    output:
184
    file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
185
186
187

    script:
    """
188
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
189
190
191
    """
}
''')
192
input_template = Template('''file ${id}_result from ${id}_json.collect()''')
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')
225
analysis_config_template = Template('''
226
    withName:${id}{
227
228
229
        executor = '${executor}'
        ${clusterOptions}
        ${beforeScript}
230
231
232
233
        ${container}
    }
    '''
    )
234
beforeScript_modul_config_template = Template('''
235
    withName:${process_name}{
236
237
238
239
        ${beforeScript}
    }
    '''
    )
240

241
242
243
244
245
246
247
248
beforeScript_norm_config_template = Template('''
    withName:normalizing_fasta{
        ${beforeScript}
    }
    '''
    )


249
250
251
252
253
254
255
256
257
258
def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)
259
260
261
262
        
    nextflow_config = generate_nextflow_config(execution)
    with open(directory + '/nextflow.config', 'w') as config_file:
        config_file.write(nextflow_config)
263

264
    if not os.path.exists(directory + '/bin'):
265
        os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
266
267
    #if not os.path.exists(directory + '/psot'):
    #    os.symlink(execution['psot_path'], directory + '/psot')
268
269

def execute_analysis(execution):
270
    old_cwd = os.getcwd()
271
    os.chdir(execution['directory'])
272
273
    os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
    os.chdir(old_cwd)
274
275
276
277
278

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
279
280
281
282
283
284
285
286
287
    fragments.append('''params.fasta = "'''+execution['fasta']+'''"''')

    if execution['use_cluster']:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta).splitFasta(by:300, file:'input')''')
    else:
        fragments.append('''for_normalization = Channel.fromPath(params.fasta)''')    
    
    fragments.append(normalizing_fasta_template.substitute(execution))
    
288
    target_channels = ["for_"+m['id'] for m in modules]
289
    fragments.append('for_analysis.into{'+';'.join(target_channels)+';}')
290
291

    for m in modules:
292
        config = flatten(m)
293
        config['output'] = execution['output']
294
        config['helpers_path'] = execution['helpers_path']
295
296

        fragments.append(analysis_template.substitute(config))
297
        if execution['mode'] == 'live' and not execution['fetch_informations']:
298
            fragments.append(convert_live_template.substitute(config))
299
            fragments.append(restore_headers_json_live_template.substitute(config))
300
            fragments.append(live_results_template.substitute(config))
301
        elif execution['mode'] == 'live' and execution['fetch_informations']:
302
            fragments.append(convert_info_template.substitute(config))
303
            fragments.append(restore_headers_json_info_template.substitute(config))
304
305
            fragments.append(retrieve_informations_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
306
        elif execution['mode'] == 'complete' and execution['fetch_informations']:
307
          fragments.append(convert_info_template.substitute(config))
308
          fragments.append(restore_headers_json_info_template.substitute(config))
309
          fragments.append(retrieve_informations_template.substitute(config))
310
        else:
311
            fragments.append(convert_template.substitute(config))
312
            fragments.append(restore_headers_json_template.substitute(config))
313
        
314
315
316
317
    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

318
319
#   fragments.append(fetch_template.substitute(flatten(m)))
    fragments.append(join_jsons_template.substitute({'inputs': '\n    '.join(json_inputs)}))
320
    fragments.append(split_jsons_template.substitute(execution))
321

322
323
    nextflow_script = '\n'.join(fragments)
    return nextflow_script
324
325
326
327
328

def generate_nextflow_config(execution):
    modules = execution['modules']
    
    fragments = []
329
330
331
332
333
334
    
    if execution['docker']:
        fragments.append('''docker.enabled = true''')
    elif execution['singularity']:
        fragments.append('''singularity.enabled = true''')
        
335
    fragments.append('''process { ''')
336
            
337
338
    for m in modules:
        config = {}
339
340
        config['id'] = m['id']
        
341
342
343
344
        if execution['docker'] and m['analysis']['container']['docker']:
            config['container'] = "container = " + "'" + m['analysis']['container']['docker'] + "'"
        elif execution['singularity'] and m['analysis']['container']['singularity']:
            config['container'] = "container = " + "'" + m['analysis']['container']['singularity'] + "'"       
345
346
347
348
349
350
351
352
353
        else:
            config['container'] = ''

        if execution['use_cluster']:
            config['executor'] = 'sge'
            config['clusterOptions'] = "clusterOptions = '-S /bin/bash'"
        else:
            config['executor'] = 'local'
            config['clusterOptions'] = ''
354
355
356
357
358
        
        if 'venv' in execution:
            config['beforeScript'] = "beforeScript = 'export PS1=; source " + execution['venv'] + "/bin/activate'"
            
            if execution['fetch_informations']:
359
                process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json|retrieve_informations_for_${id}').substitute(config).split('|')
360
            else:
361
                process_names_list = Template('convert_${id}_to_json|${id}_restore_headers_json').substitute(config).split('|')
362
363
                
            fragments.append(analysis_config_template.substitute(config))
364
365
366
            for process in process_names_list:
                config['process_name'] = process
                fragments.append(beforeScript_modul_config_template.substitute(config))
367
        else:
368
369
            config['beforeScript'] = ''
            fragments.append(analysis_config_template.substitute(config))
370
371
372
    
    if config['beforeScript']:
        fragments.append(beforeScript_norm_config_template.substitute(config))
373
        
374
375
376
377
378
379
    fragments.append('''}''')
    
    nextflow_config = '\n'.join(fragments)
    return nextflow_config