_bulk_geocode logic extraction

This commit is contained in:
Juan Ignacio Sánchez Lara 2018-07-10 15:17:14 +02:00
parent a6c5c21131
commit 286a75fa8e
8 changed files with 115 additions and 123 deletions

View File

@ -1987,7 +1987,7 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(
TARGET cdb_dataservices_server._DST_DisconnectUserTable; TARGET cdb_dataservices_server._DST_DisconnectUserTable;
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE; $$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
street_column text, city_column text default null, state_column text default null, country_column text default null, batch_size integer DEFAULT 100) street_column text, city_column text default null, state_column text default null, country_column text default null, batch_size integer DEFAULT 50)
RETURNS SETOF cdb_dataservices_client.geocoding AS $$ RETURNS SETOF cdb_dataservices_client.geocoding AS $$
DECLARE DECLARE
query_row_count integer; query_row_count integer;

View File

@ -1,5 +1,5 @@
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
street_column text, city_column text default null, state_column text default null, country_column text default null, batch_size integer DEFAULT 100) street_column text, city_column text default null, state_column text default null, country_column text default null, batch_size integer DEFAULT 50)
RETURNS SETOF cdb_dataservices_client.geocoding AS $$ RETURNS SETOF cdb_dataservices_client.geocoding AS $$
DECLARE DECLARE
query_row_count integer; query_row_count integer;

View File

@ -51,7 +51,12 @@ StreetGeocoderSearch = namedtuple('StreetGeocoderSearch', 'id address city state
class StreetPointBulkGeocoder: class StreetPointBulkGeocoder:
""" """
Classes extending StreetPointBulkGeocoder should implement: Classes extending StreetPointBulkGeocoder should implement:
* _bulk_geocode(street_geocoder_searches) * _batch_geocode(street_geocoder_searches)
* MAX_BATCH_SIZE
If they want to provide an alternative serial (for small batches):
* _should_use_batch(street_geocoder_searches)
* _serial_geocode(street_geocoder_searches)
""" """
SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country'] SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country']
@ -77,10 +82,22 @@ class StreetPointBulkGeocoder:
street_geocoder_searches.append( street_geocoder_searches.append(
StreetGeocoderSearch(search_id, address, city, state, country)) StreetGeocoderSearch(search_id, address, city, state, country))
return self._bulk_geocode(street_geocoder_searches) if len(street_geocoder_searches) > self.MAX_BATCH_SIZE:
raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE))
if self._should_use_batch(street_geocoder_searches):
self._logger.debug('--> Batch geocode')
return self._batch_geocode(street_geocoder_searches)
else:
self._logger.debug('--> Serial geocode')
return self._serial_geocode(street_geocoder_searches)
def _batch_geocode(self, street_geocoder_searches):
raise NotImplementedError('Subclasses must implement _batch_geocode')
def _serial_geocode(self, street_geocoder_searches):
raise NotImplementedError('Subclasses must implement _serial_geocode')
def _should_use_batch(self, street_geocoder_searches):
return True
def _bulk_geocode(self, street_geocoder_searches):
"""
Subclasses must implement _bulk_geocode
"""
raise NotImplementedError('Subclasses must implement _bulk_geocode')

View File

@ -19,16 +19,6 @@ class GoogleMapsBulkGeocoder(GoogleMapsGeocoder, StreetPointBulkGeocoder):
def __init__(self, client_id, client_secret, logger): def __init__(self, client_id, client_secret, logger):
GoogleMapsGeocoder.__init__(self, client_id, client_secret, logger) GoogleMapsGeocoder.__init__(self, client_id, client_secret, logger)
def _bulk_geocode(self, searches):
if len(searches) > self.MAX_BATCH_SIZE:
raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE))
if self._should_use_batch(searches):
self._logger.debug('--> Batch geocode')
return self._batch_geocode(searches)
else:
self._logger.debug('--> Serial geocode')
return self._serial_geocode(searches)
def _should_use_batch(self, searches): def _should_use_batch(self, searches):
return len(searches) >= self.MIN_BATCHED_SEARCH return len(searches) >= self.MIN_BATCHED_SEARCH

View File

@ -15,13 +15,13 @@ from cartodb_services.tools.exceptions import ServiceException
HereJobStatus = namedtuple('HereJobStatus', 'total_count processed_count status') HereJobStatus = namedtuple('HereJobStatus', 'total_count processed_count status')
class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder): class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
BATCH_URL = 'https://batch.geocoder.cit.api.here.com/6.2/jobs'
MAX_BATCH_SIZE = 1000000 # From the docs MAX_BATCH_SIZE = 1000000 # From the docs
MIN_BATCHED_SEARCH = 100 # Under this, serial will be used
BATCH_URL = 'https://batch.geocoder.cit.api.here.com/6.2/jobs'
# https://developer.here.com/documentation/batch-geocoder/topics/read-batch-request-output.html # https://developer.here.com/documentation/batch-geocoder/topics/read-batch-request-output.html
META_COLS = ['relevance', 'matchType', 'matchCode', 'matchLevel', 'matchQualityStreet'] META_COLS = ['relevance', 'matchType', 'matchCode', 'matchLevel', 'matchQualityStreet']
MAX_STALLED_RETRIES = 100 MAX_STALLED_RETRIES = 100
BATCH_RETRY_SLEEP_S = 5 BATCH_RETRY_SLEEP_S = 5
MIN_BATCHED_SEARCH = 100 # Under this, serial will be used
JOB_FINAL_STATES = ['completed', 'cancelled', 'deleted', 'failed'] JOB_FINAL_STATES = ['completed', 'cancelled', 'deleted', 'failed']
def __init__(self, app_id, app_code, logger, service_params=None, maxresults=HereMapsGeocoder.DEFAULT_MAXRESULTS): def __init__(self, app_id, app_code, logger, service_params=None, maxresults=HereMapsGeocoder.DEFAULT_MAXRESULTS):
@ -34,16 +34,6 @@ class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
'app_code': self.app_code, 'app_code': self.app_code,
} }
def _bulk_geocode(self, searches):
if len(searches) > self.MAX_BATCH_SIZE:
raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE))
if self._should_use_batch(searches):
self._logger.debug('--> Batch geocode')
return self._batch_geocode(searches)
else:
self._logger.debug('--> Serial geocode')
return self._serial_geocode(searches)
def _should_use_batch(self, searches): def _should_use_batch(self, searches):
return len(searches) >= self.MIN_BATCHED_SEARCH return len(searches) >= self.MIN_BATCHED_SEARCH

View File

@ -21,16 +21,6 @@ class MapboxBulkGeocoder(MapboxGeocoder, StreetPointBulkGeocoder):
self.max_retries = service_params.get('max_retries', self.MAX_RETRIES) self.max_retries = service_params.get('max_retries', self.MAX_RETRIES)
self.session = requests.Session() self.session = requests.Session()
def _bulk_geocode(self, searches):
if len(searches) > self.MAX_BATCH_SIZE:
raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE))
if self._should_use_batch(searches):
self._logger.debug('--> Batch geocode')
return self._batch_geocode(searches)
else:
self._logger.debug('--> Serial geocode')
return self._serial_geocode(searches)
def _should_use_batch(self, searches): def _should_use_batch(self, searches):
return len(searches) >= self.MIN_BATCHED_SEARCH return len(searches) >= self.MIN_BATCHED_SEARCH

View File

@ -6,12 +6,12 @@ from cartodb_services.tools.exceptions import ServiceException
class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder): class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder):
MAX_BATCH_SIZE = 1000000 # From the docs
MIN_BATCHED_SEARCH = 10 # Batch API is really fast
BASE_URL = 'https://api.tomtom.com' BASE_URL = 'https://api.tomtom.com'
BATCH_URL = BASE_URL + '/search/2/batch.json' BATCH_URL = BASE_URL + '/search/2/batch.json'
MAX_BATCH_SIZE = 1000000 # From the docs
MAX_STALLED_RETRIES = 100 MAX_STALLED_RETRIES = 100
BATCH_RETRY_SLEEP_S = 5 BATCH_RETRY_SLEEP_S = 5
MIN_BATCHED_SEARCH = 10 # Batch API is really fast
READ_TIMEOUT = 60 READ_TIMEOUT = 60
CONNECT_TIMEOUT = 10 CONNECT_TIMEOUT = 10
MAX_RETRIES = 1 MAX_RETRIES = 1
@ -26,16 +26,6 @@ class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder):
self.session.mount(self.BATCH_URL, self.session.mount(self.BATCH_URL,
HTTPAdapter(max_retries=self.max_retries)) HTTPAdapter(max_retries=self.max_retries))
def _bulk_geocode(self, searches):
if len(searches) > self.MAX_BATCH_SIZE:
raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE))
if self._should_use_batch(searches):
self._logger.debug('--> Batch geocode')
return self._batch_geocode(searches)
else:
self._logger.debug('--> Serial geocode')
return self._serial_geocode(searches)
def _should_use_batch(self, searches): def _should_use_batch(self, searches):
return len(searches) >= self.MIN_BATCHED_SEARCH return len(searches) >= self.MIN_BATCHED_SEARCH

View File

@ -8,6 +8,67 @@ from ..helpers.integration_test_helper import IntegrationTestHelper
from ..helpers.integration_test_helper import assert_close_enough from ..helpers.integration_test_helper import assert_close_enough
class TestStreetFunctionsSetUp(TestCase): class TestStreetFunctionsSetUp(TestCase):
provider = None
fixture_points = None
GOOGLE_POINTS = {
'Plaza Mayor, Valladolid': [-4.728252, 41.6517025],
'Paseo Zorrilla, Valladolid': [-4.7404453, 41.6314339],
'1900 amphitheatre parkway': [-122.0875324, 37.4227968],
'1901 amphitheatre parkway': [-122.0885504, 37.4238657],
'1902 amphitheatre parkway': [-122.0876674, 37.4235729],
'Valladolid': [-4.7245321, 41.652251],
'Valladolid, Spain': [-4.7245321, 41.652251],
'Valladolid, Mexico': [-88.2022488, 20.68964],
'Madrid': [-3.7037902, 40.4167754],
'Logroño, Spain': [-2.4449852, 42.4627195],
'Logroño, Argentina': [-61.6961807, -29.5031057],
'Plaza España 1, Barcelona': [2.1482563, 41.375485]
}
HERE_POINTS = {
'Plaza Mayor, Valladolid': [-4.72979, 41.65258],
'Paseo Zorrilla, Valladolid': [-4.73869, 41.63817],
'1900 amphitheatre parkway': [-122.0879468, 37.4234763],
'1901 amphitheatre parkway': [-122.0879253, 37.4238725],
'1902 amphitheatre parkway': [-122.0879531, 37.4234775],
'Valladolid': [-4.73214, 41.6542],
'Valladolid, Spain': [-4.73214, 41.6542],
'Valladolid, Mexico': [-88.20117, 20.69021],
'Madrid': [-3.70578, 40.42028],
'Logroño, Spain': [-2.45194, 42.46592],
'Logroño, Argentina': [-61.69604, -29.50425],
'Plaza España 1, Barcelona': [2.1735699, 41.3823] # TODO: not ideal
}
TOMTOM_POINTS = HERE_POINTS.copy()
TOMTOM_POINTS.update({
'Plaza Mayor, Valladolid': [-4.72183, 41.5826],
'Paseo Zorrilla, Valladolid': [-4.74031, 41.63181],
'Valladolid': [-4.72838, 41.6542],
'Valladolid, Spain': [-4.72838, 41.6542],
'Madrid': [-3.70035, 40.42028],
'Logroño, Spain': [-2.44998, 42.46592],
'Plaza España 1, Barcelona': [2.07479, 41.36818] # TODO: not ideal
})
MAPBOX_POINTS = GOOGLE_POINTS.copy()
MAPBOX_POINTS.update({
'Logroño, Spain': [-2.44556, 42.47],
'Logroño, Argentina': [-70.687195, -33.470901], # TODO: huge mismatch
'Valladolid': [-4.72856, 41.652251],
'Valladolid, Spain': [-4.72856, 41.652251],
'1902 amphitheatre parkway': [-118.03, 34.06], # TODO: huge mismatch
'Madrid': [-3.69194, 40.4167754],
'Plaza España 1, Barcelona': [2.245969, 41.452483] # TODO: not ideal
})
FIXTURE_POINTS = {
'google': GOOGLE_POINTS,
'heremaps': HERE_POINTS,
'tomtom': TOMTOM_POINTS,
'mapbox': MAPBOX_POINTS
}
def setUp(self): def setUp(self):
self.env_variables = IntegrationTestHelper.get_environment_variables() self.env_variables = IntegrationTestHelper.get_environment_variables()
@ -18,6 +79,15 @@ class TestStreetFunctionsSetUp(TestCase):
self.env_variables['api_key'] self.env_variables['api_key']
) )
if not self.fixture_points:
query = "select provider from " \
"cdb_dataservices_client.cdb_service_quota_info() " \
"where service = 'hires_geocoder'"
response = self._run_authenticated(query)
provider = response['rows'][0]['provider']
self.fixture_points = self.FIXTURE_POINTS[provider]
def _run_authenticated(self, query): def _run_authenticated(self, query):
authenticated_query = "{}&api_key={}".format(query, authenticated_query = "{}&api_key={}".format(query,
self.env_variables[ self.env_variables[
@ -54,77 +124,9 @@ class TestStreetFunctions(TestStreetFunctionsSetUp):
row = response['rows'][0] row = response['rows'][0]
x_y = [row['st_x'], row['st_y']] x_y = [row['st_x'], row['st_y']]
# Wrong coordinates (Plaza España, Madrid): [-3.7138975, 40.4256762] # Wrong coordinates (Plaza España, Madrid): [-3.7138975, 40.4256762]
assert_close_enough(x_y, [2.1482563, 41.375485]) assert_close_enough(x_y, self.fixture_points['Plaza España 1, Barcelona'])
class TestBulkStreetFunctions(TestStreetFunctionsSetUp): class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
provider = None
fixture_points = None
GOOGLE_POINTS = {
'Plaza Mayor, Valladolid': [-4.728252, 41.6517025],
'Paseo Zorrilla, Valladolid': [-4.7404453, 41.6314339],
'1900 amphitheatre parkway': [-122.0875324, 37.4227968],
'1901 amphitheatre parkway': [-122.0885504, 37.4238657],
'1902 amphitheatre parkway': [-122.0876674, 37.4235729],
'Valladolid': [-4.7245321, 41.652251],
'Valladolid, Spain': [-4.7245321, 41.652251],
'Valladolid, Mexico': [-88.2022488, 20.68964],
'Madrid': [-3.7037902, 40.4167754],
'Logroño, Spain': [-2.4449852, 42.4627195],
'Logroño, Argentina': [-61.6961807, -29.5031057]
}
HERE_POINTS = {
'Plaza Mayor, Valladolid': [-4.72979, 41.65258],
'Paseo Zorrilla, Valladolid': [-4.73869, 41.63817],
'1900 amphitheatre parkway': [-122.0879468, 37.4234763],
'1901 amphitheatre parkway': [-122.0879253, 37.4238725],
'1902 amphitheatre parkway': [-122.0879531, 37.4234775],
'Valladolid': [-4.73214, 41.6542],
'Valladolid, Spain': [-4.73214, 41.6542],
'Valladolid, Mexico': [-88.20117, 20.69021],
'Madrid': [-3.70578, 40.42028],
'Logroño, Spain': [-2.45194, 42.46592],
'Logroño, Argentina': [-61.69604, -29.50425]
}
TOMTOM_POINTS = HERE_POINTS.copy()
TOMTOM_POINTS.update({
'Plaza Mayor, Valladolid': [-4.72183, 41.5826],
'Paseo Zorrilla, Valladolid': [-4.74031, 41.63181],
'Valladolid': [-4.72838, 41.6542],
'Valladolid, Spain': [-4.72838, 41.6542],
'Madrid': [-3.70035, 40.42028],
'Logroño, Spain': [-2.44998, 42.46592],
})
MAPBOX_POINTS = GOOGLE_POINTS.copy()
MAPBOX_POINTS.update({
'Logroño, Spain': [-2.44556, 42.47],
'Logroño, Argentina': [-70.687195, -33.470901], # TODO: huge mismatch
'Valladolid': [-4.72856, 41.652251],
'Valladolid, Spain': [-4.72856, 41.652251],
'1902 amphitheatre parkway': [-118.03, 34.06], # TODO: huge mismatch
'Madrid': [-3.69194, 40.4167754],
})
FIXTURE_POINTS = {
'google': GOOGLE_POINTS,
'heremaps': HERE_POINTS,
'tomtom': TOMTOM_POINTS,
'mapbox': MAPBOX_POINTS
}
def setUp(self):
TestStreetFunctionsSetUp.setUp(self)
if not self.fixture_points:
query = "select provider from " \
"cdb_dataservices_client.cdb_service_quota_info() " \
"where service = 'hires_geocoder'"
response = self._run_authenticated(query)
provider = response['rows'][0]['provider']
self.fixture_points = self.FIXTURE_POINTS[provider]
def test_full_spec(self): def test_full_spec(self):
query = "select cartodb_id, st_x(the_geom), st_y(the_geom) " \ query = "select cartodb_id, st_x(the_geom), st_y(the_geom) " \
@ -257,7 +259,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
""" """
Useful just to test a good batch size Useful just to test a good batch size
""" """
n = 10 n = 50
streets = [] streets = []
for i in range(0, n): for i in range(0, n):
streets.append('{{"cartodb_id": {}, "address": "{} Yonge Street, ' \ streets.append('{{"cartodb_id": {}, "address": "{} Yonge Street, ' \
@ -292,6 +294,19 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
x_y_by_cartodb_id = self._x_y_by_cartodb_id(response) x_y_by_cartodb_id = self._x_y_by_cartodb_id(response)
assert_equal(x_y_by_cartodb_id[1], x_y_by_cartodb_id[2]) assert_equal(x_y_by_cartodb_id[1], x_y_by_cartodb_id[2])
# "'Plaza España 1', 'Barcelona', null, 'Spain') as the_geom) _x"
def test_component_aggregation(self):
query = "select cartodb_id, st_x(the_geom), st_y(the_geom) " \
"FROM cdb_dataservices_client.cdb_bulk_geocode_street_point(" \
"'select 1 as cartodb_id, ''Spain'' as country, " \
"''Barcelona'' as city, " \
"''Plaza España 1'' as street' " \
", 'street', 'city', NULL, 'country')"
response = self._run_authenticated(query)
assert_close_enough(self._x_y_by_cartodb_id(response)[1],
self.fixture_points['Plaza España 1, Barcelona'])
def _run_authenticated(self, query): def _run_authenticated(self, query):
authenticated_query = "{}&api_key={}".format(query, authenticated_query = "{}&api_key={}".format(query,
self.env_variables[ self.env_variables[