Mapzen isochtrones integration

This commit is contained in:
Mario de Frutos 2016-11-29 12:28:19 +01:00
parent 4b714b3845
commit 77f4f3e7ff
10 changed files with 293 additions and 32 deletions

View File

@ -57,11 +57,9 @@ RETURNS SETOF cdb_dataservices_server.isoline AS $$
quota_service.increment_total_service_use() quota_service.increment_total_service_use()
$$ LANGUAGE plpythonu SECURITY DEFINER; $$ LANGUAGE plpythonu SECURITY DEFINER;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_mapzen_isodistance(
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_mapzen_isolines(
username TEXT, username TEXT,
orgname TEXT, orgname TEXT,
isotype TEXT,
source geometry(Geometry, 4326), source geometry(Geometry, 4326),
mode TEXT, mode TEXT,
data_range integer[], data_range integer[],
@ -78,7 +76,6 @@ RETURNS SETOF cdb_dataservices_server.isoline AS $$
plpy.execute("SELECT cdb_dataservices_server._get_logger_config()") plpy.execute("SELECT cdb_dataservices_server._get_logger_config()")
logger_config = GD["logger_config"] logger_config = GD["logger_config"]
logger = Logger(logger_config) logger = Logger(logger_config)
# -- Check the quota
quota_service = QuotaService(user_isolines_routing_config, redis_conn) quota_service = QuotaService(user_isolines_routing_config, redis_conn)
if not quota_service.check_user_quota(): if not quota_service.check_user_quota():
raise Exception('You have reached the limit of your quota') raise Exception('You have reached the limit of your quota')
@ -96,14 +93,9 @@ RETURNS SETOF cdb_dataservices_server.isoline AS $$
# -- TODO Support options properly # -- TODO Support options properly
isolines = {} isolines = {}
if isotype == 'isodistance':
for r in data_range: for r in data_range:
isoline = mapzen_isolines.calculate_isodistance(origin, mode, r) isoline = mapzen_isolines.calculate_isodistance(origin, mode, r)
isolines[r] = isoline isolines[r] = isoline
elif isotype == 'isochrone':
for r in data_range:
isoline = mapzen_isolines.calculate_isochrone(origin, mode, r)
isolines[r] = isoline
result = [] result = []
for r in data_range: for r in data_range:
@ -130,3 +122,67 @@ RETURNS SETOF cdb_dataservices_server.isoline AS $$
finally: finally:
quota_service.increment_total_service_use() quota_service.increment_total_service_use()
$$ LANGUAGE plpythonu SECURITY DEFINER; $$ LANGUAGE plpythonu SECURITY DEFINER;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_mapzen_isochrones(
username TEXT,
orgname TEXT,
source geometry(Geometry, 4326),
mode TEXT,
data_range integer[],
options text[])
RETURNS SETOF cdb_dataservices_server.isoline AS $$
import json
from cartodb_services.mapzen import MatrixClient, MapzenIsochrones
from cartodb_services.metrics import QuotaService
from cartodb_services.tools import Logger,LoggerConfig
from cartodb_services.mapzen.types import coordinates_to_polygon
redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection']
user_isolines_routing_config = GD["user_isolines_routing_config_{0}".format(username)]
plpy.execute("SELECT cdb_dataservices_server._get_logger_config()")
logger_config = GD["logger_config"]
logger = Logger(logger_config)
# -- Check the quota
quota_service = QuotaService(user_isolines_routing_config, redis_conn)
if not quota_service.check_user_quota():
raise Exception('You have reached the limit of your quota')
try:
mapzen_isochrones = MapzenIsochrones(user_isolines_routing_config.mapzen_matrix_api_key,
logger)
if source:
lat = plpy.execute("SELECT ST_Y('%s') AS lat" % source)[0]['lat']
lon = plpy.execute("SELECT ST_X('%s') AS lon" % source)[0]['lon']
origin = {'lat': lat, 'lon': lon}
else:
raise Exception('source is NULL')
resp = mapzen_isochrones.isochrone(origin, mode, data_range)
if resp:
result = []
for isochrone in resp:
result_polygon = coordinates_to_polygon(isochrone.coordinates)
if result_polygon:
quota_service.increment_success_service_use()
result.append([source, isochrone.duration, result_polygon])
else:
quota_service.increment_empty_service_use()
result.append([source, isochrone.duration, None])
quota_service.increment_success_service_use()
quota_service.increment_isolines_service_use(len(result))
return result
else:
quota_service.increment_empty_service_use()
return []
except BaseException as e:
import sys
quota_service.increment_failed_service_use()
logger.error('Error trying to get mapzen isochrones', sys.exc_info(), data={"username": username, "orgname": orgname})
raise Exception('Error trying to get mapzen isochrones')
finally:
quota_service.increment_total_service_use()
$$ LANGUAGE plpythonu SECURITY DEFINER;

View File

@ -1,13 +1,20 @@
CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_isodistance(username TEXT, orgname TEXT, source geometry(Geometry, 4326), mode TEXT, range integer[], options text[] DEFAULT array[]::text[]) CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_isodistance(username TEXT, orgname TEXT, source geometry(Geometry, 4326), mode TEXT, range integer[], options text[] DEFAULT array[]::text[])
RETURNS SETOF cdb_dataservices_server.isoline AS $$ RETURNS SETOF cdb_dataservices_server.isoline AS $$
from cartodb_services.metrics import metrics
from cartodb_services.tools import Logger
plpy.execute("SELECT cdb_dataservices_server._connect_to_redis('{0}')".format(username)) plpy.execute("SELECT cdb_dataservices_server._connect_to_redis('{0}')".format(username))
redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection'] redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection']
plpy.execute("SELECT cdb_dataservices_server._get_isolines_routing_config({0}, {1})".format(plpy.quote_nullable(username), plpy.quote_nullable(orgname))) plpy.execute("SELECT cdb_dataservices_server._get_isolines_routing_config({0}, {1})".format(plpy.quote_nullable(username), plpy.quote_nullable(orgname)))
user_isolines_config = GD["user_isolines_routing_config_{0}".format(username)] user_isolines_config = GD["user_isolines_routing_config_{0}".format(username)]
plpy.execute("SELECT cdb_dataservices_server._get_logger_config()")
logger_config = GD["logger_config"]
logger = Logger(logger_config)
if user_isolines_config.google_services_user: if user_isolines_config.google_services_user:
raise Exception('This service is not available for google service users.') raise Exception('This service is not available for google service users.')
with metrics('cb_isodistance', user_isolines_config, logger):
if user_isolines_config.heremaps_provider: if user_isolines_config.heremaps_provider:
here_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server.cdb_here_isodistance($1, $2, $3, $4, $5, $6) as isoline; ", ["text", "text", "geometry(geometry, 4326)", "text", "integer[]", "text[]"]) here_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server.cdb_here_isodistance($1, $2, $3, $4, $5, $6) as isoline; ", ["text", "text", "geometry(geometry, 4326)", "text", "integer[]", "text[]"])
return plpy.execute(here_plan, [username, orgname, source, mode, range, options]) return plpy.execute(here_plan, [username, orgname, source, mode, range, options])
@ -40,10 +47,9 @@ RETURNS SETOF cdb_dataservices_server.isoline AS $$
redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection'] redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection']
plpy.execute("SELECT cdb_dataservices_server._get_isolines_routing_config({0}, {1})".format(plpy.quote_nullable(username), plpy.quote_nullable(orgname))) plpy.execute("SELECT cdb_dataservices_server._get_isolines_routing_config({0}, {1})".format(plpy.quote_nullable(username), plpy.quote_nullable(orgname)))
user_isolines_config = GD["user_isolines_routing_config_{0}".format(username)] user_isolines_config = GD["user_isolines_routing_config_{0}".format(username)]
type = 'isodistance'
mapzen_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_mapzen_isolines($1, $2, $3, $4, $5, $6, $7) as isoline; ", ["text", "text", "text", "geometry(geometry, 4326)", "text", "integer[]", "text[]"]) mapzen_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_mapzen_isodistance($1, $2, $3, $4, $5, $6) as isoline; ", ["text", "text", "geometry(geometry, 4326)", "text", "integer[]", "text[]"])
result = plpy.execute(mapzen_plan, [username, orgname, type, source, mode, range, options]) result = plpy.execute(mapzen_plan, [username, orgname, source, mode, range, options])
return result return result
$$ LANGUAGE plpythonu; $$ LANGUAGE plpythonu;

View File

@ -47,9 +47,8 @@ RETURNS SETOF cdb_dataservices_server.isoline AS $$
redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection'] redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection']
plpy.execute("SELECT cdb_dataservices_server._get_isolines_routing_config({0}, {1})".format(plpy.quote_nullable(username), plpy.quote_nullable(orgname))) plpy.execute("SELECT cdb_dataservices_server._get_isolines_routing_config({0}, {1})".format(plpy.quote_nullable(username), plpy.quote_nullable(orgname)))
user_isolines_config = GD["user_isolines_routing_config_{0}".format(username)] user_isolines_config = GD["user_isolines_routing_config_{0}".format(username)]
type = 'isochrone'
mapzen_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_mapzen_isolines($1, $2, $3, $4, $5, $6, $7) as isoline; ", ["text", "text", "text", "geometry(geometry, 4326)", "text", "integer[]", "text[]"]) mapzen_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_mapzen_isochrones($1, $2, $3, $4, $5, $6) as isoline; ", ["text", "text", "geometry(geometry, 4326)", "text", "integer[]", "text[]"])
result = plpy.execute(mapzen_plan, [username, orgname, type, source, mode, range, options]) result = plpy.execute(mapzen_plan, [username, orgname, source, mode, range, options])
return result return result
$$ LANGUAGE plpythonu; $$ LANGUAGE plpythonu;

View File

@ -2,3 +2,4 @@ from routing import MapzenRouting, MapzenRoutingResponse
from isolines import MapzenIsolines from isolines import MapzenIsolines
from geocoder import MapzenGeocoder from geocoder import MapzenGeocoder
from matrix_client import MatrixClient from matrix_client import MatrixClient
from isochrones import MapzenIsochrones

View File

@ -49,7 +49,7 @@ class MapzenGeocoder(Traceable):
except requests.Timeout as te: except requests.Timeout as te:
# In case of timeout we want to stop the job because the server # In case of timeout we want to stop the job because the server
# could be down # could be down
self._logger.error('Timeout connecting to Mapzen geocoding server') self._logger.error('Timeout connecting to Mapzen geocoding server', te)
raise ServiceException('Error trying to geocode {0} using mapzen'.format(searchtext), raise ServiceException('Error trying to geocode {0} using mapzen'.format(searchtext),
None) None)
except requests.ConnectionError as e: except requests.ConnectionError as e:

View File

@ -0,0 +1,128 @@
import requests
import json
import re
from exceptions import WrongParams, MalformedResult, ServiceException
from qps import qps_retry
class MapzenIsochrones:
'A Mapzen Isochrones wrapper for python'
BASE_URL = 'https://matrix.mapzen.com/isochrone'
READ_TIMEOUT = 60
CONNECT_TIMEOUT = 10
ACCEPTED_MODES = {
"walk": "pedestrian",
"car": "car"
}
def __init__(self, app_key, logger, base_url=BASE_URL):
self._app_key = app_key
self._url = base_url
self._logger = logger
@qps_retry
def isochrone(self, locations, costing, ranges):
request_params = self._parse_request_params(locations, costing,
ranges)
try:
response = requests.get(self._url, params=request_params,
timeout=(self.CONNECT_TIMEOUT,
self.READ_TIMEOUT))
if response.status_code is requests.codes.ok:
return self._parse_response(response)
elif response.status_code == requests.codes.bad_request:
return []
else:
self._logger.error('Error trying to get isochrones from mapzen',
data={"response_status": response.status_code,
"response_reason": response.reason,
"response_content": response.text,
"reponse_url": response.url,
"response_headers": response.headers,
"locations": locations,
"costing": costing})
raise ServiceException('Error trying to get isochrones from mapzen',
response)
except requests.Timeout as te:
# In case of timeout we want to stop the job because the server
# could be down
self._logger.error('Timeout connecting to Mapzen isochrones server', exception=te)
raise ServiceException('Error trying to calculate isochrones using mapzen',
None)
except requests.ConnectionError as e:
# Don't raise the exception to continue with the geocoding job
self._logger.error('Error connecting to Mapzen isochrones server',
exception=e)
return []
def _parse_request_params(self, locations, costing, ranges):
if costing in self.ACCEPTED_MODES:
mode_source = self.ACCEPTED_MODES[costing]
else:
raise WrongParams("{0} is not an accepted mode".format(costing))
contours = []
for r in ranges:
# range is in seconds but mapzen uses minutes
range_minutes = r / 60
contours.append({"time": range_minutes, "color": 'tbd'})
request_params = {
'json': json.dumps({'locations': [locations],
'costing': mode_source,
'contours': contours}),
'api_key': self._app_key
}
return request_params
def _parse_response(self, response):
try:
json_response = response.json()
isochrones = []
for feature in json_response['features']:
# Coordinates could have more than one isochrone. For the
# moment we're getting the first polygon only
coordinates = feature['geometry']['coordinates']
duration = feature['properties']['contour']
mapzen_response = MapzenIsochronesResponse(coordinates,
duration)
isochrones.append(mapzen_response)
return isochrones
except IndexError:
return []
except KeyError:
self._logger.error('Non existing key for mapzen isochrones response',
data={"response_status": response.status_code,
"response_reason": response.reason,
"response_content": response.text,
"reponse_url": response.url,
"response_headers": response.headers})
raise MalformedResult()
except ValueError:
# JSON decode error
self._logger.error('JSON decode error for Mapzen isochrones',
data={"response_status": response.status_code,
"response_reason": response.reason,
"response_content": response.text,
"reponse_url": response.url,
"response_headers": response.headers})
return []
class MapzenIsochronesResponse:
def __init__(self, coordinates, duration):
self._coordinates = coordinates
self._duration = duration
@property
def coordinates(self):
return self._coordinates
@property
def duration(self):
return self._duration

View File

@ -19,6 +19,24 @@ def polyline_to_linestring(polyline):
return geometry return geometry
def coordinates_to_polygon(coordinates):
"""Convert a Mapzen coordinates to a PostGIS polygon"""
result_coordinates = []
for coordinate in coordinates:
result_coordinates.append("%s %s" % (coordinate[0], coordinate[1]))
wkt_coordinates = ','.join(result_coordinates)
try:
sql = "SELECT ST_MakePolygon(ST_GeomFromText('LINESTRING({0})', 4326)) as geom".format(wkt_coordinates)
geometry = plpy.execute(sql, 1)[0]['geom']
except BaseException as e:
plpy.warning("Can't generate POLYGON from coordinates: {0}".format(e))
geometry = None
return geometry
def country_to_iso3(country): def country_to_iso3(country):
""" Convert country to its iso3 code """ """ Convert country to its iso3 code """
try: try:

View File

@ -14,7 +14,6 @@ try:
except ImportError: except ImportError:
pass pass
class Logger: class Logger:
LEVELS = {'debug': 1, 'info': 2, 'warning': 3, 'error': 4} LEVELS = {'debug': 1, 'info': 2, 'warning': 3, 'error': 4}
@ -66,7 +65,7 @@ class Logger:
if self._rollbar_activated(): if self._rollbar_activated():
try: try:
if exception: if exception:
rollbar.report_exc_info(exception, extra_data=data, rollbar.report_exc_info(sys.exc_info(), extra_data=data,
level=level) level=level)
else: else:
rollbar.report_message(text, level, extra_data=data) rollbar.report_message(text, level, extra_data=data)
@ -102,7 +101,7 @@ class Logger:
def _parse_log_extra_data(self, exception, data): def _parse_log_extra_data(self, exception, data):
extra_data = {} extra_data = {}
if exception: if exception:
type_, value_, traceback_ = exception type_, value_, traceback_ = sys.exc_info()
exception_traceback = traceback.format_tb(traceback_) exception_traceback = traceback.format_tb(traceback_)
extra_data = {"exception_type": type_, "exception_message": value_, extra_data = {"exception_type": type_, "exception_message": value_,
"exception_traceback": exception_traceback, "exception_traceback": exception_traceback,

View File

@ -128,8 +128,8 @@ class HereMapsRoutingIsolineTestCase(unittest.TestCase):
MALFORMED_RESPONSE = """{"manolo": "escobar"}""" MALFORMED_RESPONSE = """{"manolo": "escobar"}"""
def setUp(self): def setUp(self):
logger = Mock() self.logger = Mock()
self.routing = HereMapsRoutingIsoline(None, None, logger) self.routing = HereMapsRoutingIsoline(None, None, self.logger)
self.isoline_url = "{0}{1}".format(HereMapsRoutingIsoline.PRODUCTION_ROUTING_BASE_URL, self.isoline_url = "{0}{1}".format(HereMapsRoutingIsoline.PRODUCTION_ROUTING_BASE_URL,
HereMapsRoutingIsoline.ISOLINE_PATH) HereMapsRoutingIsoline.ISOLINE_PATH)

View File

@ -0,0 +1,54 @@
import mock
import unittest
import requests_mock
from mock import Mock
from cartodb_services.mapzen import MapzenIsochrones
from cartodb_services.mapzen.exceptions import ServiceException
requests_mock.Mocker.TEST_PREFIX = 'test_'
@requests_mock.Mocker()
class MapzenIsochronesTestCase(unittest.TestCase):
MAPZEN_ISOCHRONES_URL = 'https://matrix.mapzen.com/isochrone'
ERROR_RESPONSE = """{
"error_code": 171,
"error": "No suitable edges near location",
"status_code": 400,
"status": "Bad Request"
}"""
GOOD_RESPONSE = """{"features":[{"properties":{"opacity":0.33,"contour":15,"color":"tbd"},"type":"Feature","geometry":{"coordinates":[[-3.702579,40.430893],[-3.702193,40.430122],[-3.702579,40.430893]],"type":"LineString"}},{"properties":{"opacity":0.33,"contour":5,"color":"tbd"},"type":"Feature","geometry":{"coordinates":[[-3.703050,40.424995],[-3.702546,40.424694],[-3.703050,40.424995]],"type":"LineString"}}],"type":"FeatureCollection"}"""
def setUp(self):
self.logger = Mock()
self.mapzen_isochrones = MapzenIsochrones('matrix-xxxxx', self.logger)
def test_calculate_isochrone(self, req_mock):
req_mock.register_uri('GET', self.MAPZEN_ISOCHRONES_URL,
text=self.GOOD_RESPONSE)
response = self.mapzen_isochrones.isochrone([-41.484375, 28.993727],
'walk', [300, 900])
self.assertEqual(len(response), 2)
self.assertEqual(response[0].coordinates, [[-3.702579,40.430893],[-3.702193,40.430122],[-3.702579,40.430893]])
self.assertEqual(response[0].duration, 15)
self.assertEqual(response[1].coordinates, [[-3.703050,40.424995],[-3.702546,40.424694],[-3.703050,40.424995]])
self.assertEqual(response[1].duration, 5)
def test_calculate_isochrone_error_400_returns_empty(self, req_mock):
req_mock.register_uri('GET', self.MAPZEN_ISOCHRONES_URL,
text=self.ERROR_RESPONSE, status_code=400)
response = self.mapzen_isochrones.isochrone([-41.484375, 28.993727],
'walk', [300, 900])
self.assertEqual(response, [])
def test_calculate_isochrone_error_500_returns_exception(self, req_mock):
req_mock.register_uri('GET', self.MAPZEN_ISOCHRONES_URL,
text=self.ERROR_RESPONSE, status_code=500)
with self.assertRaises(ServiceException):
self.mapzen_isochrones.isochrone([-41.484375, 28.993727],
'walk', [300, 900])