nextflow.py 6.54 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
from copy import deepcopy
5
6
7
8
9
10
11
12
13
14
15
16
import collections

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
17
18
19

analysis_template = Template ('''
process ${id} {
20
    executor '${executor}'
Lukas Jelonek's avatar
Lukas Jelonek committed
21
    ${clusterOptions}
22
    ${beforeScript}
23
    input:
24
    file fasta from for_${id}${chunks}
25
26
27
28
29
30

    output:
    file "$${fasta}.${id}.results" into ${id}_results

    script:
    """
31
    ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
32
33
34
    """
}
''')
35
36
37
live_results_template = Template('''
process generate_${id}_live_results {

38
    publishDir "${output}/live", mode: 'copy', pattern: '*.*.json'
39
40
41
42
43
44
45
46
47

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
48
    split_json_into_separate_files.py --json $$result --output . --tool ${id}
49
50
51
52
53
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
54
    ${beforeScript}
55
56
57
58
59
60
61
62
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json, ${id}_json_live

    script:
    """
63
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
64
65
66
    """
}
''')
67
68
convert_info_template = Template ('''
process convert_${id}_to_json {
69
    ${beforeScript}
70
71
72
73
74
75
76
77
78
79
80
81
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json_info

    script:
    """
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
    """
}
''')
82
83
convert_template = Template ('''
process convert_${id}_to_json {
84
    ${beforeScript}
85
86
87
88
89
90
91
92
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json

    script:
    """
93
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
94
95
96
    """
}
''')
97
98
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
99
    ${beforeScript}
100
101

    input:
102
    file result from ${id}_json_info
103
104

    output:
105
    file "$${result.baseName}_info.json" into ${id}_json
106
107
108

    script:
    """
109
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
110
111
112
113
114
    """
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {
115
    ${beforeScript}
116
117
118
119
120

    input:
    file result from ${id}_json_info

    output:
121
    file "$${result.baseName}_info.json" into ${id}_json, ${id}_json_live
122
123
124

    script:
    """
125
    resolve_dbxrefs.py --input $$result --output $${result.baseName}_info.json
126
127
128
    """
}
''')
Lukas Jelonek's avatar
Lukas Jelonek committed
129
input_template = Template('''    file ${id}_result from ${id}_json.collect()''')
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')

def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)

174
    if not os.path.exists(directory + '/bin'):
175
        os.symlink(os.path.join(execution['install_path'], 'helpers'), directory + '/bin')
176
177
    #if not os.path.exists(directory + '/psot'):
    #    os.symlink(execution['psot_path'], directory + '/psot')
178
179

def execute_analysis(execution):
180
    old_cwd = os.getcwd()
181
    os.chdir(execution['directory'])
182
183
    os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
    os.chdir(old_cwd)
184
185
186
187
188

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
189
    fragments.append('''params.fasta = "'''+execution['fasta']+'''"
190
191
192
193
194
Channel.fromPath(params.fasta).set{fasta}''')
    target_channels = ["for_"+m['id'] for m in modules]
    fragments.append('fasta.into{'+';'.join(target_channels)+';}')

    for m in modules:
195
        config = flatten(m)
196
197
198
199
200
201
        config['output'] = execution['output']
        if 'venv' in execution:
            config['beforeScript'] = "beforeScript 'export PS1=; source " + execution['venv'] + "/bin/activate'"
        else:
            config['beforeScript'] = ''

202
        if execution['use_cluster']:
Lukas Jelonek's avatar
Lukas Jelonek committed
203
            config['executor'] = 'sge'
204
            config['chunks'] = ".splitFasta(by:300, file:'input')"
Lukas Jelonek's avatar
Lukas Jelonek committed
205
            config['clusterOptions'] = "clusterOptions='-S /bin/bash'"
206
207
208
        else:
            config['executor'] = 'local'
            config['chunks'] = ''
Lukas Jelonek's avatar
Lukas Jelonek committed
209
            config['clusterOptions'] = ''
210
211

        fragments.append(analysis_template.substitute(config))
212
        if execution['mode'] == 'live' and not execution['fetch_informations']:
213
214
            fragments.append(convert_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
215
        elif execution['mode'] == 'live' and execution['fetch_informations']:
216
217
218
            fragments.append(convert_info_template.substitute(config))
            fragments.append(retrieve_informations_live_template.substitute(config))
            fragments.append(live_results_template.substitute(config))
219
        elif execution['mode'] == 'complete' and execution['fetch_informations']:
220
221
          fragments.append(convert_info_template.substitute(config))
          fragments.append(retrieve_informations_template.substitute(config))
222
        else:
223
            fragments.append(convert_template.substitute(config))
224
225
226
227
228

    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

229
#    fragments.append(fetch_template.substitute(flatten(m)))
230
231
    fragments.append(join_jsons_template.substitute({'inputs': '\n'.join(json_inputs)}))
    fragments.append(split_jsons_template.substitute(execution))
232

233
234
    nextflow_script = '\n'.join(fragments)
    return nextflow_script