nextflow.py 7.43 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
from copy import deepcopy
5
6
7
8
9
10
11
12
13
14
15
16
import collections

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
17
18
19

analysis_template = Template ('''
process ${id} {
20
    executor '${executor}'
Lukas Jelonek's avatar
Lukas Jelonek committed
21
    ${clusterOptions}
22
    ${beforeScript}
23
    input:
24
    file fasta from for_${id}${chunks}
25
26
27
28
29
30

    output:
    file "$${fasta}.${id}.results" into ${id}_results

    script:
    """
31
    ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
32
33
34
    """
}
''')
35
36
37
live_results_template = Template('''
process generate_${id}_live_results {

38
    publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
39
40
41
42
43
44
45
46
47

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
48
    split_json_into_separate_files.py --json $$result --output . --tool ${id}
49
50
51
52
53
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
54
    ${beforeScript}
55
56
57
58
59
60
61
62
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json, ${id}_json_live

    script:
    """
63
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
64
65
66
    """
}
''')
67
68
convert_info_template = Template ('''
process convert_${id}_to_json {
69
    ${beforeScript}
70
71
72
73
74
75
76
77
78
79
80
81
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json_info

    script:
    """
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
    """
}
''')
82
83
convert_template = Template ('''
process convert_${id}_to_json {
84
    ${beforeScript}
85
86
87
88
89
90
91
92
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json

    script:
    """
93
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
94
95
96
    """
}
''')
97
98
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
99
    ${beforeScript}
100
101

    input:
102
    file result from ${id}_json_info
103
104

    output:
105
    file "$${result.baseName}_info.json" into ${id}_json
106
107
108

    script:
    """
109
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
110
111
112
113
114
    """
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {
115
    ${beforeScript}
116
117
118
119
120

    input:
    file result from ${id}_json_info

    output:
121
    file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
122
123
124

    script:
    """
125
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
126
127
128
    """
}
''')
Lukas Jelonek's avatar
Lukas Jelonek committed
129
input_template = Template('''    file ${id}_result from ${id}_json.collect()''')
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')

163
164
165
166
167
168
169
process_config_template = Template('''
    withName:${id}{
        ${container}
    }
    '''
    )

170
171
172
173
174
175
176
177
178
179
def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)
180
181
182
183
        
    nextflow_config = generate_nextflow_config(execution)
    with open(directory + '/nextflow.config', 'w') as config_file:
        config_file.write(nextflow_config)
184

185
    if not os.path.exists(directory + '/bin'):
186
        os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
187
188
    #if not os.path.exists(directory + '/psot'):
    #    os.symlink(execution['psot_path'], directory + '/psot')
189
190

def execute_analysis(execution):
191
    old_cwd = os.getcwd()
192
    os.chdir(execution['directory'])
193
194
    os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
    os.chdir(old_cwd)
195
196
197
198
199

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
200
    fragments.append('''params.fasta = "'''+execution['fasta']+'''"
201
202
203
204
205
Channel.fromPath(params.fasta).set{fasta}''')
    target_channels = ["for_"+m['id'] for m in modules]
    fragments.append('fasta.into{'+';'.join(target_channels)+';}')

    for m in modules:
206
        config = flatten(m)
207
208
209
210
211
212
        config['output'] = execution['output']
        if 'venv' in execution:
            config['beforeScript'] = "beforeScript 'export PS1=; source " + execution['venv'] + "/bin/activate'"
        else:
            config['beforeScript'] = ''

213
        if execution['use_cluster']:
Lukas Jelonek's avatar
Lukas Jelonek committed
214
            config['executor'] = 'sge'
215
            config['chunks'] = ".splitFasta(by:300, file:'input')"
Lukas Jelonek's avatar
Lukas Jelonek committed
216
            config['clusterOptions'] = "clusterOptions='-S /bin/bash'"
217
218
219
        else:
            config['executor'] = 'local'
            config['chunks'] = ''
Lukas Jelonek's avatar
Lukas Jelonek committed
220
            config['clusterOptions'] = ''
221
222

        fragments.append(analysis_template.substitute(config))
223
        if execution['mode'] == 'live' and not execution['fetch_informations']:
224
225
            fragments.append(convert_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
226
        elif execution['mode'] == 'live' and execution['fetch_informations']:
227
228
229
            fragments.append(convert_info_template.substitute(config))
            fragments.append(retrieve_informations_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
230
        elif execution['mode'] == 'complete' and execution['fetch_informations']:
231
232
          fragments.append(convert_info_template.substitute(config))
          fragments.append(retrieve_informations_template.substitute(config))
233
        else:
234
            fragments.append(convert_template.substitute(config))
235
236
237
238
239

    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

240
#    fragments.append(fetch_template.substitute(flatten(m)))
241
242
    fragments.append(join_jsons_template.substitute({'inputs': '\n'.join(json_inputs)}))
    fragments.append(split_jsons_template.substitute(execution))
243

244
245
    nextflow_script = '\n'.join(fragments)
    return nextflow_script
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267

def generate_nextflow_config(execution):
    modules = execution['modules']
    
    fragments = []
    fragments.append('''docker.enabled = true''')
    fragments.append('''process { ''')
    
    for m in modules:
        config = {}
        if m['analysis']['container']:
            config['id'] = m['id']
            config['container'] = "container = " + "'" + m['analysis']['container'] + "'"
            
            fragments.append(process_config_template.substitute(config))
            
    fragments.append('''}''')
    
    nextflow_config = '\n'.join(fragments)
    return nextflow_config