Merge pull request #275 from CartoDB/fix_qps_retry

Fixed QPS retry decorator
This commit is contained in:
Rafa de la Torre 2016-09-28 13:47:45 +02:00 committed by GitHub
commit facda9e8be
9 changed files with 125 additions and 34 deletions

View File

@ -19,3 +19,15 @@ class MalformedResult(Exception):
class TimeoutException(Exception): class TimeoutException(Exception):
def __str__(self): def __str__(self):
return repr('Timeout requesting to mapzen server') return repr('Timeout requesting to mapzen server')
class ServiceException(Exception):
def __init__(self, message, response):
self.message = message
self.response = response
def response(self):
return self.response
def __str__(self):
return self.message

View File

@ -2,7 +2,7 @@ import requests
import json import json
import re import re
from exceptions import WrongParams, MalformedResult from exceptions import WrongParams, MalformedResult, ServiceException
from qps import qps_retry from qps import qps_retry
from cartodb_services.tools import Coordinate, PolyLine from cartodb_services.tools import Coordinate, PolyLine
@ -17,8 +17,9 @@ class MapzenGeocoder:
self._url = base_url self._url = base_url
self._logger = logger self._logger = logger
@qps_retry @qps_retry(qps=20)
def geocode(self, searchtext, city=None, state_province=None, country=None, search_type=None): def geocode(self, searchtext, city=None, state_province=None,
country=None, search_type=None):
request_params = self._build_requests_parameters(searchtext, city, request_params = self._build_requests_parameters(searchtext, city,
state_province, state_province,
country, search_type) country, search_type)
@ -38,14 +39,14 @@ class MapzenGeocoder:
"searchtext": searchtext, "searchtext": searchtext,
"city": city, "country": country, "city": city, "country": country,
"state_province": state_province}) "state_province": state_province})
raise Exception('Error trying to geocode {0} using mapzen'.format(searchtext)) raise ServiceException('Error trying to geocode {0} using mapzen'.format(searchtext),
response)
except requests.ConnectionError as e: except requests.ConnectionError as e:
# Don't raise the exception to continue with the geocoding job # Don't raise the exception to continue with the geocoding job
self._logger.error('Error connecting to Mapzen geocoding server', self._logger.error('Error connecting to Mapzen geocoding server',
exception=e) exception=e)
return [] return []
def _build_requests_parameters(self, searchtext, city=None, def _build_requests_parameters(self, searchtext, city=None,
state_province=None, country=None, state_province=None, country=None,
search_type=None): search_type=None):

View File

@ -1,6 +1,7 @@
import requests import requests
import json import json
from qps import qps_retry from qps import qps_retry
from exceptions import ServiceException
class MatrixClient: class MatrixClient:
@ -51,6 +52,6 @@ class MatrixClient:
"response_headers": response.headers, "response_headers": response.headers,
"locations": locations, "locations": locations,
"costing": costing}) "costing": costing})
raise Exception('Error trying to get matrix distance from mapzen') raise ServiceException("Error trying to get matrix distance from mapzen", response)
return response.json() return response.json()

View File

@ -4,18 +4,38 @@ from datetime import datetime
from exceptions import TimeoutException from exceptions import TimeoutException
DEFAULT_RETRY_TIMEOUT = 60 DEFAULT_RETRY_TIMEOUT = 60
DEFAULT_QUERIES_PER_SECOND = 10
def qps_retry(original_function=None,**options):
def qps_retry(f): """ Query Per Second retry decorator
def wrapped_f(*args, **kw): The intention of this decorator is to retry requests against third
return QPSService().call(f, *args, **kw) party services that has QPS restriction.
return wrapped_f Parameters:
- timeout: Maximum number of seconds to retry
- qps: Allowed queries per second. This parameter is used to
calculate the next time to retry the request
"""
if original_function is not None:
def wrapped_function(*args, **kwargs):
if 'timeout' in options:
timeout = options['timeout']
else:
timeout = DEFAULT_RETRY_TIMEOUT
if 'qps' in options:
qps = options['qps']
else:
qps = DEFAULT_QUERIES_PER_SECOND
return QPSService(retry_timeout=timeout, queries_per_second=qps).call(original_function, *args, **kwargs)
return wrapped_function
else:
def partial_wrapper(func):
return qps_retry(func, **options)
return partial_wrapper
class QPSService: class QPSService:
def __init__(self, queries_per_second=10, def __init__(self, queries_per_second, retry_timeout):
retry_timeout=DEFAULT_RETRY_TIMEOUT):
self._queries_per_second = queries_per_second self._queries_per_second = queries_per_second
self._retry_timeout = retry_timeout self._retry_timeout = retry_timeout
@ -27,7 +47,7 @@ class QPSService:
return fn(*args, **kwargs) return fn(*args, **kwargs)
except Exception as e: except Exception as e:
response = getattr(e, 'response', None) response = getattr(e, 'response', None)
if response and (response.status_code == 429): if response is not None and (response.status_code == 429):
self.retry(start_time, attempt_number) self.retry(start_time, attempt_number)
else: else:
raise e raise e
@ -35,7 +55,7 @@ class QPSService:
def retry(self, first_request_time, retry_count): def retry(self, first_request_time, retry_count):
elapsed = datetime.now() - first_request_time elapsed = datetime.now() - first_request_time
if elapsed.seconds > self._retry_timeout: if elapsed.microseconds > (self._retry_timeout * 1000.0):
raise TimeoutException() raise TimeoutException()
# inverse qps * (1.5 ^ i) is an increased sleep time of 1.5x per # inverse qps * (1.5 ^ i) is an increased sleep time of 1.5x per

View File

@ -2,7 +2,7 @@ import requests
import json import json
import re import re
from exceptions import WrongParams, MalformedResult from exceptions import WrongParams, MalformedResult, ServiceException
from qps import qps_retry from qps import qps_retry
from cartodb_services.tools import Coordinate, PolyLine from cartodb_services.tools import Coordinate, PolyLine
@ -57,7 +57,7 @@ class MapzenRouting:
"response_headers": response.headers, "response_headers": response.headers,
"waypoints": waypoints, "mode": mode, "waypoints": waypoints, "mode": mode,
"options": options}) "options": options})
raise Exception('Error trying to calculate route using Mapzen') raise ServiceException('Error trying to calculate route using Mapzen', response)
def __parse_options(self, options): def __parse_options(self, options):
return dict(option.split('=') for option in options) return dict(option.split('=') for option in options)

View File

@ -1,4 +1,3 @@
import plpy
import rollbar import rollbar
import logging import logging
import json import json
@ -6,8 +5,15 @@ import traceback
import sys import sys
# Monkey patch because plpython sys module doesn't have argv and rollbar # Monkey patch because plpython sys module doesn't have argv and rollbar
# package use it # package use it
if 'argv' not in sys.__dict__:
sys.__dict__['argv'] = [] sys.__dict__['argv'] = []
# Only can be imported when is called from PLPython
try:
import plpy
except ImportError:
pass
class Logger: class Logger:
@ -30,30 +36,28 @@ class Logger:
return return
self._send_to_rollbar('debug', text, exception, data) self._send_to_rollbar('debug', text, exception, data)
self._send_to_log_file('debug', text, exception, data) self._send_to_log_file('debug', text, exception, data)
plpy.debug(text) self._send_to_plpy('debug', text)
def info(self, text, exception=None, data={}): def info(self, text, exception=None, data={}):
if not self._check_min_level('info'): if not self._check_min_level('info'):
return return
self._send_to_rollbar('info', text, exception, data) self._send_to_rollbar('info', text, exception, data)
self._send_to_log_file('info', text, exception, data) self._send_to_log_file('info', text, exception, data)
plpy.info(text) self._send_to_plpy('info', text)
def warning(self, text, exception=None, data={}): def warning(self, text, exception=None, data={}):
if not self._check_min_level('warning'): if not self._check_min_level('warning'):
return return
self._send_to_rollbar('warning', text, exception, data) self._send_to_rollbar('warning', text, exception, data)
self._send_to_log_file('warning', text, exception, data) self._send_to_log_file('warning', text, exception, data)
plpy.warning(text) self._send_to_plpy('warning', text)
def error(self, text, exception=None, data={}): def error(self, text, exception=None, data={}):
if not self._check_min_level('error'): if not self._check_min_level('error'):
return return
self._send_to_rollbar('error', text, exception, data) self._send_to_rollbar('error', text, exception, data)
self._send_to_log_file('error', text, exception, data) self._send_to_log_file('error', text, exception, data)
# Plpy.error and fatal raises exceptions and we only want to log an self._send_to_plpy('error', text)
# error, exceptions should be raise explicitly
plpy.warning(text)
def _check_min_level(self, level): def _check_min_level(self, level):
return True if self.LEVELS[level] >= self._min_level else False return True if self.LEVELS[level] >= self._min_level else False
@ -82,6 +86,19 @@ class Logger:
elif level == 'error': elif level == 'error':
self._file_logger.error(text, extra=extra_data) self._file_logger.error(text, extra=extra_data)
def _send_to_plpy(self, level, text):
if self._check_plpy():
if level == 'debug':
plpy.debug(text)
elif level == 'info':
plpy.info(text)
elif level == 'warning':
plpy.warning(text)
elif level == 'error':
# Plpy.error and fatal raises exceptions and we only want to
# log an error, exceptions should be raise explicitly
plpy.warning(text)
def _parse_log_extra_data(self, exception, data): def _parse_log_extra_data(self, exception, data):
extra_data = {} extra_data = {}
if exception: if exception:
@ -118,6 +135,13 @@ class Logger:
def _log_file_activated(self): def _log_file_activated(self):
return True if self._config.log_file_path else False return True if self._config.log_file_path else False
def _check_plpy(self):
try:
module = sys.modules['plpy']
return True
except KeyError:
return False
class ConfigException(Exception): class ConfigException(Exception):
pass pass

View File

@ -10,7 +10,7 @@ from setuptools import setup, find_packages
setup( setup(
name='cartodb_services', name='cartodb_services',
version='0.8', version='0.9.0',
description='CartoDB Services API Python Library', description='CartoDB Services API Python Library',

View File

@ -6,7 +6,7 @@ import requests_mock
from mock import Mock from mock import Mock
from cartodb_services.mapzen import MapzenGeocoder from cartodb_services.mapzen import MapzenGeocoder
from cartodb_services.mapzen.exceptions import MalformedResult from cartodb_services.mapzen.exceptions import MalformedResult, TimeoutException
requests_mock.Mocker.TEST_PREFIX = 'test_' requests_mock.Mocker.TEST_PREFIX = 'test_'

View File

@ -0,0 +1,33 @@
import test_helper
import requests
from unittest import TestCase
from nose.tools import assert_raises
from datetime import datetime, date
from cartodb_services.mapzen.qps import qps_retry
from cartodb_services.mapzen.exceptions import ServiceException, TimeoutException
import requests_mock
import mock
requests_mock.Mocker.TEST_PREFIX = 'test_'
@requests_mock.Mocker()
class TestQPS(TestCase):
QPS_ERROR_MESSAGE = "Queries per second exceeded: Queries exceeded (10 allowed)"
def test_qps_timeout(self, req_mock):
class TestClass:
@qps_retry(timeout=0.001, qps=100)
def test(self):
response = requests.get('http://localhost/test_qps')
if response.status_code == 429:
raise ServiceException('Error 429', response)
def _text_cb(request, context):
context.status_code = 429
return self.QPS_ERROR_MESSAGE
req_mock.register_uri('GET', 'http://localhost/test_qps',
text=_text_cb)
with self.assertRaises(TimeoutException):
c = TestClass()
c.test()