From 9381d5644bae546c94432413a6cc508694792455 Mon Sep 17 00:00:00 2001 From: Mario de Frutos Date: Fri, 16 Sep 2016 13:24:14 +0200 Subject: [PATCH 1/2] Fixed QPS retry decorator --- .../cartodb_services/mapzen/exceptions.py | 12 ++++++ .../cartodb_services/mapzen/geocoder.py | 25 ++++++------ .../cartodb_services/mapzen/matrix_client.py | 3 +- .../cartodb_services/mapzen/qps.py | 36 +++++++++++++---- .../cartodb_services/mapzen/routing.py | 4 +- .../cartodb_services/tools/log.py | 40 +++++++++++++++---- server/lib/python/cartodb_services/setup.py | 2 +- .../test/test_mapzengeocoder.py | 2 +- .../python/cartodb_services/test/test_qps.py | 33 +++++++++++++++ 9 files changed, 124 insertions(+), 33 deletions(-) create mode 100644 server/lib/python/cartodb_services/test/test_qps.py diff --git a/server/lib/python/cartodb_services/cartodb_services/mapzen/exceptions.py b/server/lib/python/cartodb_services/cartodb_services/mapzen/exceptions.py index dd3a3b8..4877f6a 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapzen/exceptions.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapzen/exceptions.py @@ -19,3 +19,15 @@ class MalformedResult(Exception): class TimeoutException(Exception): def __str__(self): return repr('Timeout requesting to mapzen server') + + +class ServiceException(Exception): + def __init__(self, message, response): + self.message = message + self.response = response + + def response(self): + return self.response + + def __str__(self): + return self.message diff --git a/server/lib/python/cartodb_services/cartodb_services/mapzen/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/mapzen/geocoder.py index 6765363..ac869f5 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapzen/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapzen/geocoder.py @@ -2,7 +2,7 @@ import requests import json import re -from exceptions import WrongParams, MalformedResult +from exceptions import WrongParams, MalformedResult, ServiceException from qps import qps_retry from cartodb_services.tools import Coordinate, PolyLine @@ -17,8 +17,9 @@ class MapzenGeocoder: self._url = base_url self._logger = logger - @qps_retry - def geocode(self, searchtext, city=None, state_province=None, country=None, search_type=None): + @qps_retry(qps=20) + def geocode(self, searchtext, city=None, state_province=None, + country=None, search_type=None): request_params = self._build_requests_parameters(searchtext, city, state_province, country, search_type) @@ -31,21 +32,21 @@ class MapzenGeocoder: else: self._logger.error('Error trying to geocode using mapzen', data={"response_status": response.status_code, - "response_reason": response.reason, - "response_content": response.text, - "reponse_url": response.url, - "response_headers": response.headers, - "searchtext": searchtext, - "city": city, "country": country, - "state_province": state_province }) - raise Exception('Error trying to geocode {0} using mapzen'.format(searchtext)) + "response_reason": response.reason, + "response_content": response.text, + "reponse_url": response.url, + "response_headers": response.headers, + "searchtext": searchtext, + "city": city, "country": country, + "state_province": state_province}) + raise ServiceException('Error trying to geocode {0} using mapzen'.format(searchtext), + response) except requests.ConnectionError as e: # Don't raise the exception to continue with the geocoding job self._logger.error('Error connecting to Mapzen geocoding server', exception=e) return [] - def _build_requests_parameters(self, searchtext, city=None, state_province=None, country=None, search_type=None): diff --git a/server/lib/python/cartodb_services/cartodb_services/mapzen/matrix_client.py b/server/lib/python/cartodb_services/cartodb_services/mapzen/matrix_client.py index 375c7ea..2b38155 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapzen/matrix_client.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapzen/matrix_client.py @@ -1,6 +1,7 @@ import requests import json from qps import qps_retry +from exceptions import ServiceException class MatrixClient: @@ -51,6 +52,6 @@ class MatrixClient: "response_headers": response.headers, "locations": locations, "costing": costing}) - raise Exception('Error trying to get matrix distance from mapzen') + raise ServiceException("Error trying to get matrix distance from mapzen", response) return response.json() diff --git a/server/lib/python/cartodb_services/cartodb_services/mapzen/qps.py b/server/lib/python/cartodb_services/cartodb_services/mapzen/qps.py index f999804..e44153b 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapzen/qps.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapzen/qps.py @@ -4,18 +4,38 @@ from datetime import datetime from exceptions import TimeoutException DEFAULT_RETRY_TIMEOUT = 60 +DEFAULT_QUERIES_PER_SECOND = 10 - -def qps_retry(f): - def wrapped_f(*args, **kw): - return QPSService().call(f, *args, **kw) - return wrapped_f +def qps_retry(original_function=None,**options): + """ Query Per Second retry decorator + The intention of this decorator is to retry requests against third + party services that has QPS restriction. + Parameters: + - timeout: Maximum number of seconds to retry + - qps: Allowed queries per second. This parameter is used to + calculate the next time to retry the request + """ + if original_function is not None: + def wrapped_function(*args, **kwargs): + if 'timeout' in options: + timeout = options['timeout'] + else: + timeout = DEFAULT_RETRY_TIMEOUT + if 'qps' in options: + qps = options['qps'] + else: + qps = DEFAULT_QUERIES_PER_SECOND + return QPSService(retry_timeout=timeout, queries_per_second=qps).call(original_function, *args, **kwargs) + return wrapped_function + else: + def partial_wrapper(func): + return qps_retry(func, **options) + return partial_wrapper class QPSService: - def __init__(self, queries_per_second=10, - retry_timeout=DEFAULT_RETRY_TIMEOUT): + def __init__(self, queries_per_second, retry_timeout): self._queries_per_second = queries_per_second self._retry_timeout = retry_timeout @@ -27,7 +47,7 @@ class QPSService: return fn(*args, **kwargs) except Exception as e: response = getattr(e, 'response', None) - if response and (response.status_code == 429): + if response is not None and (response.status_code == 429): self.retry(start_time, attempt_number) else: raise e diff --git a/server/lib/python/cartodb_services/cartodb_services/mapzen/routing.py b/server/lib/python/cartodb_services/cartodb_services/mapzen/routing.py index dab21b5..0067514 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapzen/routing.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapzen/routing.py @@ -2,7 +2,7 @@ import requests import json import re -from exceptions import WrongParams, MalformedResult +from exceptions import WrongParams, MalformedResult, ServiceException from qps import qps_retry from cartodb_services.tools import Coordinate, PolyLine @@ -57,7 +57,7 @@ class MapzenRouting: "response_headers": response.headers, "waypoints": waypoints, "mode": mode, "options": options}) - raise Exception('Error trying to calculate route using Mapzen') + raise ServiceException('Error trying to calculate route using Mapzen', response) def __parse_options(self, options): return dict(option.split('=') for option in options) diff --git a/server/lib/python/cartodb_services/cartodb_services/tools/log.py b/server/lib/python/cartodb_services/cartodb_services/tools/log.py index f1df969..8136f18 100644 --- a/server/lib/python/cartodb_services/cartodb_services/tools/log.py +++ b/server/lib/python/cartodb_services/cartodb_services/tools/log.py @@ -1,4 +1,3 @@ -import plpy import rollbar import logging import json @@ -6,7 +5,14 @@ import traceback import sys # Monkey patch because plpython sys module doesn't have argv and rollbar # package use it -sys.__dict__['argv'] = [] +if 'argv' not in sys.__dict__: + sys.__dict__['argv'] = [] + +# Only can be imported when is called from PLPython +try: + import plpy +except ImportError: + pass class Logger: @@ -30,30 +36,28 @@ class Logger: return self._send_to_rollbar('debug', text, exception, data) self._send_to_log_file('debug', text, exception, data) - plpy.debug(text) + self._send_to_plpy('debug', text) def info(self, text, exception=None, data={}): if not self._check_min_level('info'): return self._send_to_rollbar('info', text, exception, data) self._send_to_log_file('info', text, exception, data) - plpy.info(text) + self._send_to_plpy('info', text) def warning(self, text, exception=None, data={}): if not self._check_min_level('warning'): return self._send_to_rollbar('warning', text, exception, data) self._send_to_log_file('warning', text, exception, data) - plpy.warning(text) + self._send_to_plpy('warning', text) def error(self, text, exception=None, data={}): if not self._check_min_level('error'): return self._send_to_rollbar('error', text, exception, data) self._send_to_log_file('error', text, exception, data) - # Plpy.error and fatal raises exceptions and we only want to log an - # error, exceptions should be raise explicitly - plpy.warning(text) + self._send_to_plpy('error', text) def _check_min_level(self, level): return True if self.LEVELS[level] >= self._min_level else False @@ -82,6 +86,19 @@ class Logger: elif level == 'error': self._file_logger.error(text, extra=extra_data) + def _send_to_plpy(self, level, text): + if self._check_plpy(): + if level == 'debug': + plpy.debug(text) + elif level == 'info': + plpy.info(text) + elif level == 'warning': + plpy.warning(text) + elif level == 'error': + # Plpy.error and fatal raises exceptions and we only want to + # log an error, exceptions should be raise explicitly + plpy.warning(text) + def _parse_log_extra_data(self, exception, data): extra_data = {} if exception: @@ -118,6 +135,13 @@ class Logger: def _log_file_activated(self): return True if self._config.log_file_path else False + def _check_plpy(self): + try: + module = sys.modules['plpy'] + return True + except KeyError: + return False + class ConfigException(Exception): pass diff --git a/server/lib/python/cartodb_services/setup.py b/server/lib/python/cartodb_services/setup.py index d01c585..9d4b55e 100644 --- a/server/lib/python/cartodb_services/setup.py +++ b/server/lib/python/cartodb_services/setup.py @@ -10,7 +10,7 @@ from setuptools import setup, find_packages setup( name='cartodb_services', - version='0.8', + version='0.9.0', description='CartoDB Services API Python Library', diff --git a/server/lib/python/cartodb_services/test/test_mapzengeocoder.py b/server/lib/python/cartodb_services/test/test_mapzengeocoder.py index 3a4aff5..19d8f42 100644 --- a/server/lib/python/cartodb_services/test/test_mapzengeocoder.py +++ b/server/lib/python/cartodb_services/test/test_mapzengeocoder.py @@ -6,7 +6,7 @@ import requests_mock from mock import Mock from cartodb_services.mapzen import MapzenGeocoder -from cartodb_services.mapzen.exceptions import MalformedResult +from cartodb_services.mapzen.exceptions import MalformedResult, TimeoutException requests_mock.Mocker.TEST_PREFIX = 'test_' diff --git a/server/lib/python/cartodb_services/test/test_qps.py b/server/lib/python/cartodb_services/test/test_qps.py new file mode 100644 index 0000000..b7c576a --- /dev/null +++ b/server/lib/python/cartodb_services/test/test_qps.py @@ -0,0 +1,33 @@ +import test_helper +import requests +from unittest import TestCase +from nose.tools import assert_raises +from datetime import datetime, date +from cartodb_services.mapzen.qps import qps_retry +from cartodb_services.mapzen.exceptions import ServiceException, TimeoutException +import requests_mock +import mock + +requests_mock.Mocker.TEST_PREFIX = 'test_' + +@requests_mock.Mocker() +class TestQPS(TestCase): + QPS_ERROR_MESSAGE = "Queries per second exceeded: Queries exceeded (10 allowed)" + + def test_qps_timeout(self, req_mock): + class TestClass: + @qps_retry(timeout=1) + def test(self): + response = requests.get('http://localhost/test_qps') + if response.status_code == 429: + raise ServiceException('Error 429', response) + + def _text_cb(request, context): + context.status_code = 429 + return self.QPS_ERROR_MESSAGE + + req_mock.register_uri('GET', 'http://localhost/test_qps', + text=_text_cb) + with self.assertRaises(TimeoutException): + c = TestClass() + c.test() From 64fc18b9e03fec7cf69f7b17e285981dd196420a Mon Sep 17 00:00:00 2001 From: Rafa de la Torre Date: Wed, 28 Sep 2016 13:43:50 +0200 Subject: [PATCH 2/2] Fix and improve test speed --- .../lib/python/cartodb_services/cartodb_services/mapzen/qps.py | 2 +- server/lib/python/cartodb_services/test/test_qps.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/server/lib/python/cartodb_services/cartodb_services/mapzen/qps.py b/server/lib/python/cartodb_services/cartodb_services/mapzen/qps.py index e44153b..2c7101c 100644 --- a/server/lib/python/cartodb_services/cartodb_services/mapzen/qps.py +++ b/server/lib/python/cartodb_services/cartodb_services/mapzen/qps.py @@ -55,7 +55,7 @@ class QPSService: def retry(self, first_request_time, retry_count): elapsed = datetime.now() - first_request_time - if elapsed.seconds > self._retry_timeout: + if elapsed.microseconds > (self._retry_timeout * 1000.0): raise TimeoutException() # inverse qps * (1.5 ^ i) is an increased sleep time of 1.5x per diff --git a/server/lib/python/cartodb_services/test/test_qps.py b/server/lib/python/cartodb_services/test/test_qps.py index b7c576a..014c510 100644 --- a/server/lib/python/cartodb_services/test/test_qps.py +++ b/server/lib/python/cartodb_services/test/test_qps.py @@ -16,7 +16,7 @@ class TestQPS(TestCase): def test_qps_timeout(self, req_mock): class TestClass: - @qps_retry(timeout=1) + @qps_retry(timeout=0.001, qps=100) def test(self): response = requests.get('http://localhost/test_qps') if response.status_code == 429: