nextflow.py 8.24 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
from copy import deepcopy
5
6
7
8
9
10
11
12
13
14
15
16
import collections

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
17
18
19

analysis_template = Template ('''
process ${id} {
20

21
    input:
22
    file fasta from for_${id}${chunks}
23
24
25
26
27
28

    output:
    file "$${fasta}.${id}.results" into ${id}_results

    script:
    """
29
    ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
30
31
32
    """
}
''')
33
34
35
live_results_template = Template('''
process generate_${id}_live_results {

36
    publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
37
38
39
40
41
42
43
44
45

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
46
    split_json_into_separate_files.py --json $$result --output . --tool ${id}
47
48
49
50
51
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
52

53
54
55
56
57
58
59
60
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json, ${id}_json_live

    script:
    """
61
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
62
63
64
    """
}
''')
65
66
convert_info_template = Template ('''
process convert_${id}_to_json {
67

68
69
70
71
72
73
74
75
76
77
78
79
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json_info

    script:
    """
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
    """
}
''')
80
81
convert_template = Template ('''
process convert_${id}_to_json {
82

83
84
85
86
87
88
89
90
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json

    script:
    """
91
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
92
93
94
    """
}
''')
95
96
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
97
98

    input:
99
    file result from ${id}_json_info
100
101

    output:
102
    file "$${result.baseName}_info.json" into ${id}_json
103
104
105

    script:
    """
106
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
107
108
109
110
111
112
113
114
115
116
    """
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {

    input:
    file result from ${id}_json_info

    output:
117
    file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
118
119
120

    script:
    """
121
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
122
123
124
    """
}
''')
125
input_template = Template('''file ${id}_result from ${id}_json.collect()''')
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')
158
analysis_config_template = Template('''
159
    withName:${id}{
160
161
162
        executor = '${executor}'
        ${clusterOptions}
        ${beforeScript}
163
164
165
166
        ${container}
    }
    '''
    )
167
168
169
170
171
172
beforeScript_config_template = Template('''
    withName:${process_names}{
        ${beforeScript}
    }
    '''
    )
173

174
175
176
177
178
179
180
181
182
183
def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)
184
185
186
187
        
    nextflow_config = generate_nextflow_config(execution)
    with open(directory + '/nextflow.config', 'w') as config_file:
        config_file.write(nextflow_config)
188

189
    if not os.path.exists(directory + '/bin'):
190
        os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
191
192
    #if not os.path.exists(directory + '/psot'):
    #    os.symlink(execution['psot_path'], directory + '/psot')
193
194

def execute_analysis(execution):
195
    old_cwd = os.getcwd()
196
    os.chdir(execution['directory'])
197
198
    os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
    os.chdir(old_cwd)
199
200
201
202
203

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
204
    fragments.append('''params.fasta = "'''+execution['fasta']+'''"
205
206
207
208
209
Channel.fromPath(params.fasta).set{fasta}''')
    target_channels = ["for_"+m['id'] for m in modules]
    fragments.append('fasta.into{'+';'.join(target_channels)+';}')

    for m in modules:
210
        config = flatten(m)
211
        config['output'] = execution['output']
212
        
213
214
215
216
217
218
        if execution['use_cluster']:
            config['chunks'] = ".splitFasta(by:300, file:'input')"
        else:
            config['chunks'] = ''

        fragments.append(analysis_template.substitute(config))
219
        if execution['mode'] == 'live' and not execution['fetch_informations']:
220
221
            fragments.append(convert_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
222
        elif execution['mode'] == 'live' and execution['fetch_informations']:
223
224
225
            fragments.append(convert_info_template.substitute(config))
            fragments.append(retrieve_informations_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
226
        elif execution['mode'] == 'complete' and execution['fetch_informations']:
227
228
          fragments.append(convert_info_template.substitute(config))
          fragments.append(retrieve_informations_template.substitute(config))
229
        else:
230
            fragments.append(convert_template.substitute(config))
231
232
233
234
235

    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

236
237
#   fragments.append(fetch_template.substitute(flatten(m)))
    fragments.append(join_jsons_template.substitute({'inputs': '\n    '.join(json_inputs)}))
238
    fragments.append(split_jsons_template.substitute(execution))
239

240
241
    nextflow_script = '\n'.join(fragments)
    return nextflow_script
242
243
244
245
246

def generate_nextflow_config(execution):
    modules = execution['modules']
    
    fragments = []
247
248
249
250
251
252
    
    if execution['docker']:
        fragments.append('''docker.enabled = true''')
    elif execution['singularity']:
        fragments.append('''singularity.enabled = true''')
        
253
    fragments.append('''process { ''')
254
            
255
256
    for m in modules:
        config = {}
257
258
        config['id'] = m['id']
        
259
        if execution['docker'] or execution['singularity'] and 'container' in m['analysis']:
260
            config['container'] = "container = " + "'" + m['analysis']['container'] + "'"
261
262
263
264
265
266
267
268
269
        else:
            config['container'] = ''

        if execution['use_cluster']:
            config['executor'] = 'sge'
            config['clusterOptions'] = "clusterOptions = '-S /bin/bash'"
        else:
            config['executor'] = 'local'
            config['clusterOptions'] = ''
270
271
272
273
274
275
276
277
278
279
280
        
        if 'venv' in execution:
            config['beforeScript'] = "beforeScript = 'export PS1=; source " + execution['venv'] + "/bin/activate'"
            
            if execution['fetch_informations']:
                config['process_names'] = "'" + Template('convert_${id}_to_json|retrieve_informations_for_${id}').substitute(config) + "'"
            else:
                config['process_names'] = Template('convert_${id}_to_json').substitute(config)
                
            fragments.append(analysis_config_template.substitute(config))
            fragments.append(beforeScript_config_template.substitute(config))
281
        else:
282
283
            config['beforeScript'] = ''
            fragments.append(analysis_config_template.substitute(config))
284
        
285
286
287
288
289
290
    fragments.append('''}''')
    
    nextflow_config = '\n'.join(fragments)
    return nextflow_config