uniprot.py 6.93 KB
Newer Older
1
2
3
4
#!/usr/bin/env python3
import dbxref.resolver
import requests
import xml.etree.ElementTree as ET
5
import lxml.html as HTML
6
7
8
9
10
11
12
13
14
15
16
17
import logging
import json
import argparse
#logging.basicConfig(level=logging.DEBUG)
#logging.getLogger().setLevel(logging.WARNING)
logger = logging.getLogger(__name__)
#logger.setLevel(logging.DEBUG)

ns = {'uniprot': 'http://uniprot.org/uniprot'}

def main():
    parser = argparse.ArgumentParser(description='Retrieve uniprot xml documents for dbxrefs and convert them into json')
18
    parser.add_argument('--basic', '-b', action='store_true', help='Include id and description')
19
20
21
    parser.add_argument('--sequence', '-s', action='store_true', help='Include sequence')
    parser.add_argument('--organism', '-o', action='store_true', help='Include organism info')
    parser.add_argument('--annotation', '-a', action='store_true', help='Include annotation')
22
    parser.add_argument('--features', '-f', action='store_true', help='Include features')
23
24
25
26
27
28
29
30
31
    parser.add_argument('dbxrefs', nargs=argparse.REMAINDER)
    args = parser.parse_args()

    if not (args.basic or args.sequence or args.organism or args.annotation or args.features):
        args.basic = True
        args.sequence = True
        args.organism = True
        args.annotation = True
        args.features = True
32
33
34
35
36
37
38
39

    dbxrefs = dbxref.resolver.convert_to_dbxrefs(args.dbxrefs)

    documents = retrieve(dbxrefs, basic=args.basic, sequence=args.sequence, organism=args.organism, annotation=args.annotation, features=args.features)
    print(json.dumps(documents))

def retrieve(dbxrefs, basic=True, sequence=True, organism=True, annotation=True, features=True):
    resolved = dbxref.resolver.resolve(dbxrefs, check_existence=False)
40
41
42
43
44
45
46
    documents = []
    for entry in resolved:
        xml_url = entry['locations']['xml'][0]
        logger.debug('URL: %s', xml_url)
        r = requests.get(xml_url)
        logger.debug('Content: %s', r.text)

47
        output = {'id': entry['dbxref']}
48
49
50
        try:
            root = ET.fromstring(r.text)
            for child in root.findall('uniprot:entry', ns):
51
                if basic:
52
                    output.update(read_basic(child))
53
                if sequence:
54
                    output.update(read_sequence(child))
55
                if organism:
56
                    output.update(read_taxonomy(child))
57
                if annotation:
58
                    output.update(read_annotation(child))
59
                if features:
60
                    output['features'] = read_features(child)
61
        except (KeyError, AttributeError) as e:
62
63
            logger.warn('Error in retrieving %s', str(entry))
            raise
64
        except RuntimeError as e:
65
            output['message'] = 'an error occurred'
66
            try:
67
68
69
                html = HTML.document_fromstring(r.text.replace('\n', ' '))
                if html.get_element_by_id('noResultsMessage') is not None:
                    output['message'] = 'no results found; probably invalid ID'
70
71
            except:
                pass
72
        documents.append(output)
73
    return documents
74
75
76
77

def read_basic(entry):
    protein = entry.find('uniprot:protein', ns)
    recname = protein.find('uniprot:recommendedName', ns)
78
79
80
    if recname is None:
      # use submittedName if recommendedName is not available
      recname = protein.find('uniprot:submittedName', ns)
81
    fullName = recname.find('uniprot:fullName', ns).text
82
    shortName = recname.find('uniprot:shortName', ns)
83

84
    output = {}
85
86
87
88
    if shortName is not None:
        return {'description': fullName + '(' + shortName.text + ')'}
    else:
        return {'description': fullName }
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103

def read_sequence(entry):
    sequence = entry.find('uniprot:sequence', ns).text
    # remove whitespaces
    sequence = ''.join(sequence.split())
    return {'sequence': sequence}

def read_taxonomy(entry):
    organism = entry.find('uniprot:organism', ns)
    taxid = organism.find('uniprot:dbReference', ns).attrib
    return {'organism': 'Taxon:' + taxid['id'] }

def read_annotation(entry):
    annotation = {
            'accessions': read_accessions(entry),
104
            'dbxrefs' : read_dbrefs(entry),
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
            'keywords': read_keywords(entry)
            }
    annotation.update(read_names(entry))
    return annotation

def read_dbrefs(entry):
    dbrefs = entry.findall('uniprot:dbReference', ns)
    refs = []
    for dbref in dbrefs:
        type = dbref.attrib['type']
        id = dbref.attrib['id']
        if type == 'GO':
            id = id.split(':')[1]
        refs.append(type + ':' + id)
    return refs

def read_names(entry):
122
    output = {}
123
124
    protein = entry.find('uniprot:protein', ns)
    recname = protein.find('uniprot:recommendedName', ns)
125
126
127
128
129
130
131
132
133
134
135
136
    if recname is not None:
      output['recommended_name'] = { 'full' : recname.find('uniprot:fullName', ns).text }
      short = recname.find('uniprot:shortName', ns)
      if short is not None:
          output['recommended_name']['short'] = short.text
    subname = protein.find('uniprot:submittedName', ns)
    if subname is not None:
      output['submitted_name'] = { 'full' : subname.find('uniprot:fullName', ns).text }
      short = subname.find('uniprot:shortName', ns)
      if short is not None:
          output['submitted_name']['short'] = short.text

137
138
139
140
141
142
143
144
    alternative_names = []
    altnames = protein.findall('uniprot:alternativeName', ns)
    for altname in altnames:
        alternative_name = {'full': altname.find('uniprot:fullName', ns).text}
        short = altname.find('uniprot:shortName', ns)
        if short is not None:
            alternative_name['short'] = short.text
        alternative_names.append(alternative_name)
145
146
147
    output['alternative_names'] = alternative_names

    return output
148
149
150
151
152
153
154
155
156
157
158
159
160
161

def read_accessions(entry):
    accessions = []
    for acc in entry.findall('uniprot:accession', ns):
        accessions.append(acc.text)
    return accessions

def read_keywords(entry):
    keywords = []
    for kw in entry.findall('uniprot:keyword', ns):
        keywords.append(kw.text)
    return keywords

def read_features(entry):
162
163
164
    features = []
    for f in entry.findall('uniprot:feature', ns):
        feature = {}
165
166
        if 'description' in f.attrib:
            feature['description'] = f.attrib['description']
167
168
169
170
        feature['type'] = f.attrib['type']
        if f.find('uniprot:location', ns).find('uniprot:position', ns) is not None:
            feature['position'] = f.find('uniprot:location', ns).find('uniprot:position', ns).attrib['position']
        else:
171
172
173
174
175
176
177
178
179
180
181
            begin = f.find('uniprot:location', ns).find('uniprot:begin', ns)
            if 'position' in begin.attrib:
                feature['begin'] = begin.attrib['position']
            else:
                feature['begin'] = begin.attrib['status']

            end = f.find('uniprot:location', ns).find('uniprot:end', ns)
            if 'position' in end.attrib:
                feature['end'] = end.attrib['position']
            else:
                feature['end'] = end.attrib['status']
182
183
        features.append (feature)
    return features
184

185
186
if __name__ == '__main__':
  main()