New metrics logger system

- It's not mandatory to have the log_path defined for all the services
- Now we add metrics to a MetricsGatherer using the add method in order
  to gather data from all the function scope, not just in one defined
  point
- There is a `metrics` context manager to wrap the function block. This
  context manager get the execution time for the block among other
  things
This commit is contained in:
Mario de Frutos 2016-10-28 10:45:04 +02:00
parent 93579532e3
commit cf3c6f2ce5
9 changed files with 243 additions and 81 deletions

View File

@ -2,6 +2,7 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(us
RETURNS Geometry AS $$
from cartodb_services.metrics import QuotaService
from cartodb_services.metrics import InternalGeocoderConfig
from cartodb_services.metrics import metrics
from cartodb_services.tools import Logger,LoggerConfig
plpy.execute("SELECT cdb_dataservices_server._connect_to_redis('{0}')".format(username))
@ -13,23 +14,24 @@ RETURNS Geometry AS $$
logger_config = GD["logger_config"]
logger = Logger(logger_config)
quota_service = QuotaService(user_geocoder_config, redis_conn)
try:
plan = plpy.prepare("SELECT cdb_dataservices_server._cdb_geocode_admin0_polygon(trim($1)) AS mypolygon", ["text"])
rv = plpy.execute(plan, [country_name], 1)
result = rv[0]["mypolygon"]
if result:
quota_service.increment_success_service_use()
return result
else:
quota_service.increment_empty_service_use()
return None
except BaseException as e:
import sys
quota_service.increment_failed_service_use()
logger.error('Error trying to geocode admin0 polygon', sys.exc_info(), data={"username": username, "orgname": orgname})
raise Exception('Error trying to geocode admin0 polygon')
finally:
quota_service.increment_total_service_use()
with metrics('cdb_geocode_admin0_polygon', user_geocoder_config):
try:
plan = plpy.prepare("SELECT cdb_dataservices_server._cdb_geocode_admin0_polygon(trim($1)) AS mypolygon", ["text"])
rv = plpy.execute(plan, [country_name], 1)
result = rv[0]["mypolygon"]
if result:
quota_service.increment_success_service_use()
return result
else:
quota_service.increment_empty_service_use()
return None
except BaseException as e:
import sys
quota_service.increment_failed_service_use()
logger.error('Error trying to geocode admin0 polygon', sys.exc_info(), data={"username": username, "orgname": orgname})
raise Exception('Error trying to geocode admin0 polygon')
finally:
quota_service.increment_total_service_use()
$$ LANGUAGE plpythonu;

View File

@ -5,9 +5,10 @@ import re
from exceptions import WrongParams, MalformedResult, ServiceException
from qps import qps_retry
from cartodb_services.tools import Coordinate, PolyLine
from cartodb_services.metrics import Traceable
class MapzenGeocoder:
class MapzenGeocoder(Traceable):
'A Mapzen Geocoder wrapper for python'
BASE_URL = 'https://search.mapzen.com/v1/search'
@ -28,6 +29,7 @@ class MapzenGeocoder:
try:
response = requests.get(self._url, params=request_params,
timeout=(self.CONNECT_TIMEOUT, self.READ_TIMEOUT))
self.add_response_data(response, self._logger)
if response.status_code == requests.codes.ok:
return self.__parse_response(response.text)
elif response.status_code == requests.codes.bad_request:

View File

@ -2,9 +2,10 @@ import requests
import json
from qps import qps_retry
from exceptions import ServiceException
from cartodb_services.metrics import Traceable
class MatrixClient:
class MatrixClient(Traceable):
"""
A minimal client for Mapzen Time-Distance Matrix Service
@ -45,6 +46,7 @@ class MatrixClient:
}
response = requests.get(self.ONE_TO_MANY_URL, params=request_params,
timeout=(self.CONNECT_TIMEOUT, self.READ_TIMEOUT))
self.add_response_data(response)
if response.status_code != requests.codes.ok:
self._logger.error('Error trying to get matrix distance from mapzen',

View File

@ -5,9 +5,10 @@ import re
from exceptions import WrongParams, MalformedResult, ServiceException
from qps import qps_retry
from cartodb_services.tools import Coordinate, PolyLine
from cartodb_services.metrics import MetricsDataGatherer, Traceable
class MapzenRouting:
class MapzenRouting(Traceable):
'A Mapzen Routing wrapper for python'
PRODUCTION_ROUTING_BASE_URL = 'https://valhalla.mapzen.com/route'
@ -47,6 +48,7 @@ class MapzenRouting:
request_params = self.__parse_request_parameters(json_request_params)
response = requests.get(self._url, params=request_params,
timeout=(self.CONNECT_TIMEOUT, self.READ_TIMEOUT))
self.add_response_data(response)
if response.status_code == requests.codes.ok:
return self.__parse_routing_response(response.text)
elif response.status_code == requests.codes.bad_request:

View File

@ -1,3 +1,4 @@
from config import GeocoderConfig, IsolinesRoutingConfig, InternalGeocoderConfig, RoutingConfig, ConfigException, ObservatorySnapshotConfig, ObservatoryConfig
from quota import QuotaService
from user import UserMetricsService
from log import metrics, MetricsDataGatherer, Traceable

View File

@ -15,6 +15,7 @@ class ServiceConfig(object):
self._username = username
self._orgname = orgname
self._db_config = ServicesDBConfig(db_conn, username, orgname)
self._metrics_log_path = self.__get_metrics_log_path()
self._environment = self._db_config._server_environment
if redis_connection:
self._redis_config = ServicesRedisConfig(redis_connection).build(
@ -38,9 +39,20 @@ class ServiceConfig(object):
def environment(self):
return self._environment
@property
def metrics_log_path(self):
return self._metrics_log_path
def __get_metrics_log_path(self):
if self.METRICS_LOG_KEY:
return self._db_config.logger_config.get(self.METRICS_LOG_KEY, None)
else:
return None
class DataObservatoryConfig(ServiceConfig):
METRICS_LOG_KEY = 'do_log_path'
def __init__(self, redis_connection, db_conn, username, orgname=None):
super(DataObservatoryConfig, self).__init__(redis_connection, db_conn,
username, orgname)
@ -61,6 +73,10 @@ class DataObservatoryConfig(ServiceConfig):
def connection_str(self):
return self._connection_str
@property
def provider(self):
return 'data observatory'
class ObservatorySnapshotConfig(DataObservatoryConfig):
@ -118,6 +134,7 @@ class RoutingConfig(ServiceConfig):
DEFAULT_PROVIDER = 'mapzen'
QUOTA_KEY = 'mapzen_routing_quota'
SOFT_LIMIT_KEY = 'soft_mapzen_routing_limit'
METRICS_LOG_KEY = 'routing_log_path'
def __init__(self, redis_connection, db_conn, username, orgname=None):
super(RoutingConfig, self).__init__(redis_connection, db_conn,
@ -135,6 +152,10 @@ class RoutingConfig(ServiceConfig):
if self._routing_provider == self.MAPZEN_PROVIDER:
return 'routing_mapzen'
@property
def provider(self):
return self._routing_provider
@property
def mapzen_api_key(self):
return self._mapzen_api_key
@ -151,7 +172,6 @@ class RoutingConfig(ServiceConfig):
def soft_limit(self):
return self._soft_limit
def _set_monthly_quota(self):
self._monthly_quota = self._get_effective_monthly_quota()
@ -169,7 +189,6 @@ class RoutingConfig(ServiceConfig):
self._soft_limit = False
class IsolinesRoutingConfig(ServiceConfig):
ISOLINES_CONFIG_KEYS = ['here_isolines_quota', 'soft_here_isolines_limit',
@ -184,6 +203,7 @@ class IsolinesRoutingConfig(ServiceConfig):
MAPZEN_PROVIDER = 'mapzen'
HEREMAPS_PROVIDER = 'heremaps'
DEFAULT_PROVIDER = 'heremaps'
METRICS_LOG_KEY = 'isolines_log_path'
def __init__(self, redis_connection, db_conn, username, orgname=None):
super(IsolinesRoutingConfig, self).__init__(redis_connection, db_conn,
@ -260,11 +280,12 @@ class IsolinesRoutingConfig(ServiceConfig):
class InternalGeocoderConfig(ServiceConfig):
METRICS_LOG_KEY = 'geocoder_log_path'
def __init__(self, redis_connection, db_conn, username, orgname=None):
# For now, internal geocoder doesn't use the redis config
super(InternalGeocoderConfig, self).__init__(None, db_conn,
username, orgname)
self._log_path = self._db_config.geocoder_log_path
@property
def service_type(self):
@ -283,8 +304,8 @@ class InternalGeocoderConfig(ServiceConfig):
return None
@property
def log_path(self):
return self._log_path
def provider(self):
return 'internal'
class GeocoderConfig(ServiceConfig):
@ -310,6 +331,7 @@ class GeocoderConfig(ServiceConfig):
ORGNAME_KEY = 'orgname'
PERIOD_END_DATE = 'period_end_date'
DEFAULT_PROVIDER = 'mapzen'
METRICS_LOG_KEY = 'geocoder_log_path'
def __init__(self, redis_connection, db_conn, username, orgname=None, forced_provider=None):
super(GeocoderConfig, self).__init__(redis_connection, db_conn,
@ -341,7 +363,6 @@ class GeocoderConfig(ServiceConfig):
self._geocoder_provider = self.DEFAULT_PROVIDER
self._geocoding_quota = float(filtered_config[self.QUOTA_KEY])
self._period_end_date = date_parse(filtered_config[self.PERIOD_END_DATE])
self._log_path = db_config.geocoder_log_path
if filtered_config[self.SOFT_LIMIT_KEY].lower() == 'true':
self._soft_geocoding_limit = True
else:
@ -424,8 +445,8 @@ class GeocoderConfig(ServiceConfig):
return self._cost_per_hit
@property
def log_path(self):
return self._log_path
def provider(self):
return self._geocoder_provider
class ServicesDBConfig:
@ -440,7 +461,6 @@ class ServicesDBConfig:
self._get_server_config()
self._get_here_config()
self._get_mapzen_config()
self._get_logger_config()
self._get_data_observatory_config()
def _get_server_config(self):
@ -493,13 +513,6 @@ class ServicesDBConfig:
else:
self._data_observatory_connection_str = do_conf['connection']['production']
def _get_logger_config(self):
logger_conf_json = self._get_conf('logger_conf')
if not logger_conf_json:
raise ConfigException('Logger configuration missing')
else:
logger_conf = json.loads(logger_conf_json)
self._geocoder_log_path = logger_conf['geocoder_log_path']
def _get_conf(self, key):
try:
@ -507,7 +520,7 @@ class ServicesDBConfig:
conf = self._db_conn.execute(sql, 1)
return conf[0]['conf']
except Exception as e:
raise ConfigException("Malformed config for {0}: {1}".format(key, e))
raise ConfigException("Error trying to get config for {0}: {1}".format(key, e))
@property
def server_environment(self):
@ -557,14 +570,18 @@ class ServicesDBConfig:
def mapzen_geocoder_monthly_quota(self):
return self._mapzen_geocoder_quota
@property
def geocoder_log_path(self):
return self._geocoder_log_path
@property
def data_observatory_connection_str(self):
return self._data_observatory_connection_str
@property
def logger_config(self):
logger_conf_json = self._get_conf('logger_conf')
if not logger_conf_json:
raise ConfigException('Logger configuration missing')
else:
return json.loads(logger_conf_json)
class ServicesRedisConfig:

View File

@ -2,14 +2,100 @@ from datetime import datetime
import abc
import json
import re
import time
from contextlib import contextmanager
class MetricsLoggerFactory:
@contextmanager
def metrics(function, service_config):
try:
start_time = time.time()
yield
finally:
end_time = time.time()
MetricsDataGatherer.add('function_name', function)
MetricsDataGatherer.add('function_execution_time', (end_time - start_time))
logger = MetricsServiceLoggerFactory.build(service_config)
if logger:
data = MetricsDataGatherer.get()
logger.log(data)
MetricsDataGatherer.clean()
class Traceable:
def add_response_data(self, response):
try:
response_data = {}
response_data['time'] = response.elapsed.total_seconds()
response_data['code'] = response.status_code
response_data['message'] = response.reason
stored_data = MetricsDataGatherer.get_element('response')
if stored_data:
stored_data.append(response_data)
else:
MetricsDataGatherer.add('response', [response_data])
except BaseException as e:
# We don't want to stop the job for some error here
plpy.warning(e)
class MetricsDataGatherer:
class __MetricsDataGatherer:
def __init__(self):
self.data = {}
def add(self, key, value):
self.data[key] = value
def get(self):
return self.data
def get_element(self, key):
return self.data.get(key, None)
def clean(self):
self.data = {}
__instance = None
@classmethod
def add(self, key, value):
MetricsDataGatherer.instance().add(key, value)
@classmethod
def get(self):
return MetricsDataGatherer.instance().get()
@classmethod
def get_element(self, key):
return MetricsDataGatherer.instance().get_element(key)
@classmethod
def clean(self):
MetricsDataGatherer.instance().clean()
@classmethod
def instance(self):
if not MetricsDataGatherer.__instance:
MetricsDataGatherer.__instance = MetricsDataGatherer.__MetricsDataGatherer()
return MetricsDataGatherer.__instance
class MetricsServiceLoggerFactory:
@classmethod
def build(self, service_config):
if re.match('geocoder_*', service_config.service_type):
if re.search('^geocoder_*', service_config.service_type):
return MetricsGeocoderLogger(service_config)
elif re.search('^routing_*', service_config.service_type):
return MetricsGenericLogger(service_config)
elif re.search('_isolines$', service_config.service_type):
return MetricsIsolinesLogger(service_config)
elif re.search('^obs_*', service_config.service_type):
return MetricsGenericLogger(service_config)
else:
return None
@ -17,58 +103,103 @@ class MetricsLoggerFactory:
class MetricsLogger(object):
__metaclass__ = abc.ABCMeta
def __init__(self, file_path):
self._file_path = file_path
def __init__(self, service_config):
self._service_config = service_config
def dump_to_file(self, data):
with open(self._file_path, 'a') as logfile:
json.dump(data, logfile)
logfile.write('\n')
log_path = self.service_config.metrics_log_path
if log_path:
with open(log_path, 'a') as logfile:
json.dump(data, logfile)
logfile.write('\n')
def collect_data(self, data):
return {
"function_name": data.get('function_name', None),
"function_execution_time": data.get('function_execution_time', None),
"service": self._service_config.service_type,
"processable_rows": 1,
"success": data.get('success', False),
"successful_rows": data.get('successful_rows', 0),
"failed_rows": data.get('failed_rows', 0),
"empty_rows": data.get('empty_rows', 0),
"created_at": datetime.now().isoformat(),
"provider": self._service_config.provider,
"username": self._service_config.username,
"organization": self._service_config.organization,
"response": data.get('response', None)
}
@property
def service_config(self):
return self._service_config
@abc.abstractproperty
def log(self, **data):
def log(self, data):
raise NotImplementedError('log method must be defined')
class MetricsGeocoderLogger(MetricsLogger):
def __init__(self, service_config):
super(MetricsGeocoderLogger, self).__init__(service_config.log_path)
self._service_config = service_config
super(MetricsGeocoderLogger, self).__init__(service_config)
def log(self, **data):
dump_data = self._dump_data(**data)
def log(self, data):
dump_data = self.collect_data(data)
self.dump_to_file(dump_data)
def _dump_data(self, **data):
if data['success']:
cost = self._service_config.cost_per_hit
failed_rows = 0
successful_rows = 1
def collect_data(self, data):
dump_data = super(MetricsGeocoderLogger, self).collect_data(data)
if data.get('success', False):
cost = self.service_config.cost_per_hit
else:
cost = 0
failed_rows = 1
successful_rows = 0
if self._service_config.is_high_resolution:
if self.service_config.is_high_resolution:
kind = 'high-resolution'
else:
kind = 'internal'
return {
dump_data.update({
"batched": False,
"cache_hits": 0, # Always 0 because no cache involved
# https://github.com/CartoDB/cartodb/blob/master/app/models/geocoding.rb#L208-L211
"cost": cost,
"created_at": datetime.now().isoformat(),
"failed_rows": failed_rows,
"geocoder_type": self._service_config.service_type,
"geocoder_type": self.service_config.service_type,
"kind": kind,
"processable_rows": 1,
"processed_rows": successful_rows,
"real_rows": successful_rows,
"success": data['success'],
"successful_rows": successful_rows,
"username": self._service_config.username,
"organization": self._service_config.organization
}
"processed_rows": data.get('successful_rows', 0),
"real_rows": data.get('successful_rows', 0),
})
return dump_data
class MetricsGenericLogger(MetricsLogger):
def __init__(self, service_config):
super(MetricsGenericLogger, self).__init__(service_config)
def log(self, data):
dump_data = self.collect_data(data)
self.dump_to_file(dump_data)
def collect_data(self, data):
return super(MetricsGenericLogger, self).collect_data(data)
class MetricsIsolinesLogger(MetricsLogger):
def __init__(self, service_config):
super(MetricsIsolinesLogger, self).__init__(service_config)
def log(self, data):
dump_data = self.collect_data(data)
self.dump_to_file(dump_data)
def collect_data(self, data):
dump_data = super(MetricsIsolinesLogger, self).collect_data(data)
dump_data.update({
"isolines_generated": data.get('isolines_generated', 0)
})
return dump_data

View File

@ -1,5 +1,5 @@
from user import UserMetricsService
from log import MetricsLoggerFactory
from log import MetricsDataGatherer
from datetime import date
import re
@ -14,7 +14,6 @@ class QuotaService:
redis_connection)
self._user_service = UserMetricsService(self._user_service_config,
redis_connection)
self._metrics_logger = MetricsLoggerFactory.build(user_service_config)
def check_user_quota(self):
return self._quota_checker.check()
@ -46,13 +45,19 @@ class QuotaService:
self._user_service.increment_service_use(
self._user_service_config.service_type, "isolines_generated",
amount=amount)
MetricsDataGatherer.add('isolines_generated', amount)
def _log_service_process(self, event):
if self._metrics_logger:
if event is 'success' or event is 'empty':
self._metrics_logger.log(success=True)
elif event is 'empty':
self._metrics_logger.log(success=False)
if event is 'success':
MetricsDataGatherer.add('success', True)
MetricsDataGatherer.add('successful_rows', 1)
elif event is 'empty':
MetricsDataGatherer.add('success', True)
MetricsDataGatherer.add('successful_rows', 1)
MetricsDataGatherer.add('empty_rows', 1)
elif event is 'fail':
MetricsDataGatherer.add('success', False)
MetricsDataGatherer.add('failed_rows', 1)
class QuotaChecker:

View File

@ -10,7 +10,7 @@ from setuptools import setup, find_packages
setup(
name='cartodb_services',
version='0.9.4',
version='0.10.0',
description='CartoDB Services API Python Library',