nextflow.py 5.07 KB
Newer Older
1
2
3
from string import Template
import os.path
import os
4
from copy import deepcopy
5
6
7
8
9
10
11
12
13
14
15
16
import collections

# taken from https://stackoverflow.com/questions/6027558/flatten-nested-python-dictionaries-compressing-keys
def flatten(d, parent_key='', sep='_'):
    items = []
    for k, v in d.items():
        new_key = parent_key + sep + k if parent_key else k
        if isinstance(v, collections.MutableMapping):
            items.extend(flatten(v, new_key, sep=sep).items())
        else:
            items.append((new_key, v))
    return dict(items)
17
18
19

analysis_template = Template ('''
process ${id} {
20
    executor '${executor}'
Lukas Jelonek's avatar
Lukas Jelonek committed
21
    ${clusterOptions}
22
    input:
23
    file fasta from for_${id}${chunks}
24
25
26
27
28
29

    output:
    file "$${fasta}.${id}.results" into ${id}_results

    script:
    """
30
    ${analysis_script} --fasta $$fasta --output $${fasta}.${id}.results ${analysis_params}
31
32
33
    """
}
''')
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
live_results_template = Template('''
process generate_${id}_live_results {

    publishDir "${output}/live", mode: 'copy', pattern: '*????????-????-????-????-????????????.json'

    input:
    file result from ${id}_json_live

    output:
    file "*.json" into ${id}_live_results

    script:
    """
    split_json_into_separate_files.py --json $$result --output . --uuid
    """
}
''')
convert_live_template = Template ('''
process convert_${id}_to_json {
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json, ${id}_json_live

    script:
    """
61
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
62
63
64
    """
}
''')
65
66
67
68
69
70
71
72
73
74
convert_template = Template ('''
process convert_${id}_to_json {
    input:
    file result from ${id}_results

    output:
    file "$${result}.json" into ${id}_json

    script:
    """
75
    ${converter_script} --result $$result --output $${result}.json ${converter_params}
76
77
78
    """
}
''')
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
fetch_template = Template('''
process fetch_dbxrefs {

    input:
    file result from ${id}_fetch_json

    output:
    file "$${result}_fetched.json" into ${id}_json

    script:
    """
    resolve_dbxrefs.py --input $$result --retreiver ../dbxref/scripts --output $${result}_fetched.json
    """
}
''')
Lukas Jelonek's avatar
Lukas Jelonek committed
94
input_template = Template('''    file ${id}_result from ${id}_json.collect()''')
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
join_jsons_template = Template('''
process join_documents {

    input:
    ${inputs}

    output:
    file "joined.json" into joined_json

    script:
    """
    join_json_files.py --output joined.json *.json
    """
}
''')
split_jsons_template = Template('''
process split_documents {

    publishDir "${output}", mode: 'copy'

    input:
    file "input/json.json" from joined_json

    output:
    file "*.json" into result_documents

    script:
    """
    split_json_into_separate_files.py --json 'input/json.json' --output .
    """
}
''')

def setup_execution_directory(execution):
    directory = execution['directory']
    if not os.path.exists(directory):
        os.mkdir(directory)
    if not os.path.isdir(directory):
        exit()

    nextflow_script = generate_nextflow_script(execution)
    with open(directory + '/main.nf', 'w') as script_file:
        script_file.write(nextflow_script)

139
140
141
142
    if not os.path.exists(directory + '/bin'):
        os.symlink(execution['script_path'], directory + '/bin')
    if not os.path.exists(directory + '/psot'):
        os.symlink(execution['psot_path'], directory + '/psot')
143
144

def execute_analysis(execution):
145
    old_cwd = os.getcwd()
146
    os.chdir(execution['directory'])
147
148
    os.system('nextflow run ' + execution['directory'] + '/main.nf --fasta ' + execution['fasta'] + ' --output ' + execution['output'])
    os.chdir(old_cwd)
149
150
151
152
153
154
155
156
157
158
159

def generate_nextflow_script(execution):
    modules = execution['modules']

    fragments = []
    fragments.append('''params.fasta = "example/proteins.fas"
Channel.fromPath(params.fasta).set{fasta}''')
    target_channels = ["for_"+m['id'] for m in modules]
    fragments.append('fasta.into{'+';'.join(target_channels)+';}')

    for m in modules:
160
161
        config = flatten(m)
        if execution['use_cluster']:
Lukas Jelonek's avatar
Lukas Jelonek committed
162
            config['executor'] = 'sge'
163
            config['chunks'] = ".splitFasta(by:300, file:'input')"
Lukas Jelonek's avatar
Lukas Jelonek committed
164
            config['clusterOptions'] = "clusterOptions='-S /bin/bash'"
165
166
167
        else:
            config['executor'] = 'local'
            config['chunks'] = ''
Lukas Jelonek's avatar
Lukas Jelonek committed
168
            config['clusterOptions'] = ''
169
170

        fragments.append(analysis_template.substitute(config))
171
        if execution['mode'] == 'live':
172
            fragments.append(convert_live_template.substitute(flatten(m)))
173
174
            copy = deepcopy(m)
            copy['output'] = execution['output']
175
            fragments.append(live_results_template.substitute(flatten(copy)))
176
        else:
177
            fragments.append(convert_template.substitute(flatten(m)))
178
179
180
181
182

    json_inputs = []
    for m in modules:
        json_inputs.append(input_template.substitute(m))

183
#    fragments.append(fetch_template.substitute(flatten(m)))
184
185
    fragments.append(join_jsons_template.substitute({'inputs': '\n'.join(json_inputs)}))
    fragments.append(split_jsons_template.substitute(execution))
186

187
188
    nextflow_script = '\n'.join(fragments)
    return nextflow_script