resolver.py 2.8 KB
Newer Older
1
import requests
2
3
from cachecontrol import CacheControl
from cachecontrol.caches.file_cache import FileCache
4
import logging
5
6
logger = logging.getLogger(__name__)

7
from dbxref import config
8

9
10
11
12
13
14
15
16
17
18
cache = FileCache(".web_cache", forever=True)
sess = CacheControl(requests.Session(), cache=cache)

STATUS_EXISTS='found'
STATUS_NOT_EXISTS='not found'
STATUS_UNKNOWN='status unknown'
STATUS_NOT_CHECKED='status not checked'
STATUS_CHECK_NOT_SUPPORTED='check of status not supported'
STATUS_CHECK_TIMEOUT='status check timed out'
STATUS_UNSUPPORTED_DB='database unsupported'
19

20
def resolve(dbxrefs, check_existence=True):
21
    results = []
22
    for dbxref in dbxrefs:
23
        status = STATUS_NOT_CHECKED
24
        if check_existence:
25
           status = check_dbxref_exists(dbxref)
26
27
        if config.has_provider(dbxref['db']):
            provider = config.get_provider(dbxref['db'])
28
29
30
31
32
33
            locations = {}
            for _type in provider['resources']:
                urls = []
                for url_template in provider['resources'][_type]:
                    urls.append(compile_url(url_template, dbxref))
                locations[_type] = urls
34
35
36
            results.append({'dbxref': dbxref['db'] + ':' + dbxref['id'], 'locations': locations, 'status': status})
        else:
            results.append({'dbxref': dbxref['db'] + ':' + dbxref['id'], 'status': STATUS_UNSUPPORTED_DB})
37
38
    return results

39
40
41
42
43
def convert_to_dbxrefs(strings):
  '''convert a list of strings to dbxref maps with db and id attribute'''
  return list(map(convert_string_to_dbxref, strings))

def check_dbxref_exists(dbxref):
44
45
    if config.has_provider(dbxref['db']):
        provider = config.get_provider(dbxref['db'])
46
        urls = []
47
        exists = STATUS_NOT_CHECKED
48
49
        if 'check_existence' in provider:
            url = compile_url(provider['check_existence'], dbxref)
50
            logger.debug('Checking existence of dbxref at "%s"', url)
51
52
53
            exists = check_url_exists(url)
            return exists
        else:
54
            return STATUS_CHECK_NOT_SUPPORTED
55
    return STATUS_UNSUPPORTED_DB
56
57
58
59
60

def compile_url(template, dbxref):
    return template.replace('%i', dbxref['id']).replace('%d', dbxref['db'])

def check_url_exists(url):
61
    try:
62
        r = sess.head(url, allow_redirects=True, timeout=1)
63
        r.close()
64
        if r.status_code < 400:
65
            return STATUS_EXISTS
66
        else:
67
            logger.debug('The server responded with status code: %s', r.status_code)
68
            return STATUS_NOT_EXISTS
69
70
    except requests.exceptions.Timeout as ex:
        logger.info('Timeout for URL: "%s"', url)
71
72
73
        return STATUS_CHECK_TIMEOUT
    except:
        return STATUS_NOT_EXISTS
74

75
76
77
78
79
80
def convert_string_to_dbxref(string):
    """
    A dbxref is dictionary with two keys: db and id.
    """
    split = string.split(':', 1)
    return {'db': split[0], 'id': split[1]}