nextflow.py 6.39 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
from copy import deepcopy
5
6
7
8
9
10
11
12
13
14
15
16
import collections

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
17
18
19

analysis_template = Template ('''
process ${id} {
20
    executor '${executor}'
Lukas Jelonek's avatar
Lukas Jelonek committed
21
    ${clusterOptions}
22
    input:
23
    file fasta from for_${id}${chunks}
24
25
26
27
28
29

    output:
    file "$${fasta}.${id}.results" into ${id}_results

    script:
    """
30
    ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
31
32
33
    """
}
''')
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
live_results_template = Template('''
process generate_${id}_live_results {

    publishDir "${output}/live", mode: 'copy', pattern: '*????????-????-????-????-????????????.json'

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
    split_json_into_separate_files.py --json $$result --output . --uuid
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json, ${id}_json_live

    script:
    """
61
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
62
63
64
    """
}
''')
65
66
67
68
69
70
71
72
73
74
75
76
77
78
convert_info_template = Template ('''
process convert_${id}_to_json {
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json_info

    script:
    """
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
    """
}
''')
79
80
81
82
83
84
85
86
87
88
convert_template = Template ('''
process convert_${id}_to_json {
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json

    script:
    """
89
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
90
91
92
    """
}
''')
93
94
retrieve_informations_template = Template('''
process retrieve_informations_for_${id} {
95
96

    input:
97
    file result from ${id}_json_info
98
99

    output:
100
    file "$${result.simpleName}_info.json" into ${id}_json
101
102
103

    script:
    """
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
    resolve_dbxrefs.py --input $$result --output $${result.simpleName}_info.json
    """
}
''')
retrieve_informations_live_template = Template('''
process retrieve_informations_for_${id} {

    input:
    file result from ${id}_json_info

    output:
    file "$${result.simpleName}_info.json" into ${id}_json, ${id}_json_live

    script:
    """
    resolve_dbxrefs.py --input $$result --output $${result.simpleName}_info.json
120
121
122
    """
}
''')
Lukas Jelonek's avatar
Lukas Jelonek committed
123
input_template = Template('''    file ${id}_result from ${id}_json.collect()''')
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')

def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)

168
169
170
171
    if not os.path.exists(directory + '/bin'):
        os.symlink(execution['script_path'], directory + '/bin')
    if not os.path.exists(directory + '/psot'):
        os.symlink(execution['psot_path'], directory + '/psot')
172
173

def execute_analysis(execution):
174
    old_cwd = os.getcwd()
175
    os.chdir(execution['directory'])
176
177
    os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
    os.chdir(old_cwd)
178
179
180
181
182

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
183
    fragments.append('''params.fasta = "'''+execution['fasta']+'''"
184
185
186
187
188
Channel.fromPath(params.fasta).set{fasta}''')
    target_channels = ["for_"+m['id'] for m in modules]
    fragments.append('fasta.into{'+';'.join(target_channels)+';}')

    for m in modules:
189
190
        config = flatten(m)
        if execution['use_cluster']:
Lukas Jelonek's avatar
Lukas Jelonek committed
191
            config['executor'] = 'sge'
192
            config['chunks'] = ".splitFasta(by:300, file:'input')"
Lukas Jelonek's avatar
Lukas Jelonek committed
193
            config['clusterOptions'] = "clusterOptions='-S /bin/bash'"
194
195
196
        else:
            config['executor'] = 'local'
            config['chunks'] = ''
Lukas Jelonek's avatar
Lukas Jelonek committed
197
            config['clusterOptions'] = ''
198
199

        fragments.append(analysis_template.substitute(config))
200
        if execution['mode'] == 'live' and not execution['fetch_informations']:
201
            fragments.append(convert_live_template.substitute(flatten(m)))
202
203
            copy = deepcopy(m)
            copy['output'] = execution['output']
204
            fragments.append(live_results_template.substitute(flatten(copy)))
205
206
207
208
209
210
211
212
213
        elif execution['mode'] == 'live' and execution['fetch_informations']:
            fragments.append(convert_info_template.substitute(flatten(m)))
            fragments.append(retrieve_informations_live_template.substitute(flatten(m)))
            copy = deepcopy(m)
            copy['output'] = execution['output']
            fragments.append(live_results_template.substitute(flatten(copy)))
        elif execution['mode'] == 'complete' and execution['fetch_informations']:
          fragments.append(convert_info_template.substitute(flatten(m)))
          fragments.append(retrieve_informations_template.substitute(flatten(m)))
214
        else:
215
            fragments.append(convert_template.substitute(flatten(m)))
216
217
218
219
220

    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

221
#    fragments.append(fetch_template.substitute(flatten(m)))
222
223
    fragments.append(join_jsons_template.substitute({'inputs': '\n'.join(json_inputs)}))
    fragments.append(split_jsons_template.substitute(execution))
224

225
226
    nextflow_script = '\n'.join(fragments)
    return nextflow_script