TomTom bulk geocoding by bypassing to serial

This commit is contained in:
Juan Ignacio Sánchez Lara 2018-07-05 17:12:22 +02:00
parent 5be43e15c0
commit 31afc82b56
6 changed files with 97 additions and 6 deletions

View File

@ -83,3 +83,6 @@ deploy: release_remove_parallel_deploy
$(INSTALL_DATA) old_versions/*.sql *.sql '$(DESTDIR)$(datadir)/extension/' $(INSTALL_DATA) old_versions/*.sql *.sql '$(DESTDIR)$(datadir)/extension/'
install: deploy install: deploy
reinstall: install
psql -U postgres -d dataservices_db -c "drop extension if exists cdb_dataservices_server; create extension cdb_dataservices_server;"

View File

@ -2367,12 +2367,15 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params): with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params):
if user_geocoder_config.google_geocoder: if user_geocoder_config.google_geocoder:
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) provider_function = "_cdb_bulk_google_geocode_street_point";
elif user_geocoder_config.heremaps_geocoder: elif user_geocoder_config.heremaps_geocoder:
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) provider_function = "_cdb_bulk_heremaps_geocode_street_point";
elif user_geocoder_config.tomtom_geocoder:
provider_function = "_cdb_bulk_tomtom_geocode_street_point";
else: else:
raise Exception('Requested geocoder is not available') raise Exception('Requested geocoder is not available')
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server.{}($1, $2, $3); ".format(provider_function), ["text", "text", "jsonb"])
result = plpy.execute(plan, [username, orgname, searches]) result = plpy.execute(plan, [username, orgname, searches])
return result return result
@ -2400,6 +2403,23 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_tomtom_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services import run_street_point_geocoder
from cartodb_services.tools import ServiceManager
from cartodb_services.refactor.service.tomtom_geocoder_config import TomTomGeocoderConfigBuilder
from cartodb_services.tomtom import TomTomBulkGeocoder
from cartodb_services.tools import Logger
import cartodb_services
cartodb_services.init(plpy, GD)
logger_config = GD["logger_config"]
logger = Logger(logger_config)
service_manager = ServiceManager('geocoder', TomTomGeocoderConfigBuilder, username, orgname, GD)
geocoder = TomTomBulkGeocoder(service_manager.config.tomtom_api_key, service_manager.logger, service_manager.config.service_params)
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(username text, orgname text, country_name text) CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(username text, orgname text, country_name text)
RETURNS Geometry AS $$ RETURNS Geometry AS $$
from cartodb_services.metrics import QuotaService from cartodb_services.metrics import QuotaService

View File

@ -24,12 +24,15 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params): with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params):
if user_geocoder_config.google_geocoder: if user_geocoder_config.google_geocoder:
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) provider_function = "_cdb_bulk_google_geocode_street_point";
elif user_geocoder_config.heremaps_geocoder: elif user_geocoder_config.heremaps_geocoder:
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"]) provider_function = "_cdb_bulk_heremaps_geocode_street_point";
elif user_geocoder_config.tomtom_geocoder:
provider_function = "_cdb_bulk_tomtom_geocode_street_point";
else: else:
raise Exception('Requested geocoder is not available') raise Exception('Requested geocoder is not available')
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server.{}($1, $2, $3); ".format(provider_function), ["text", "text", "jsonb"])
result = plpy.execute(plan, [username, orgname, searches]) result = plpy.execute(plan, [username, orgname, searches])
return result return result
@ -57,3 +60,20 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_tomtom_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services import run_street_point_geocoder
from cartodb_services.tools import ServiceManager
from cartodb_services.refactor.service.tomtom_geocoder_config import TomTomGeocoderConfigBuilder
from cartodb_services.tomtom import TomTomBulkGeocoder
from cartodb_services.tools import Logger
import cartodb_services
cartodb_services.init(plpy, GD)
logger_config = GD["logger_config"]
logger = Logger(logger_config)
service_manager = ServiceManager('geocoder', TomTomGeocoderConfigBuilder, username, orgname, GD)
geocoder = TomTomBulkGeocoder(service_manager.config.tomtom_api_key, service_manager.logger, service_manager.config.service_params)
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;

View File

@ -1,3 +1,4 @@
from geocoder import TomTomGeocoder from geocoder import TomTomGeocoder
from bulk_geocoder import TomTomBulkGeocoder
from routing import TomTomRouting, TomTomRoutingResponse from routing import TomTomRouting, TomTomRoutingResponse
from isolines import TomTomIsolines, TomTomIsochronesResponse from isolines import TomTomIsolines, TomTomIsochronesResponse

View File

@ -0,0 +1,36 @@
from cartodb_services import StreetPointBulkGeocoder
from cartodb_services.tomtom import TomTomGeocoder
class TomTomBulkGeocoder(TomTomGeocoder, StreetPointBulkGeocoder):
# TODO: ?
MAX_BATCH_SIZE = 1000000 # From the docs
# TODO: ?
MIN_BATCHED_SEARCH = 100 # Under this, serial will be used
def _bulk_geocode(self, searches):
if len(searches) > self.MAX_BATCH_SIZE:
raise Exception("Batch size can't be larger than {}".format(self.MAX_BATCH_SIZE))
if self._should_use_batch(searches):
self._logger.debug('--> Batch geocode')
return self._batch_geocode(searches)
else:
self._logger.debug('--> Serial geocode')
return self._serial_geocode(searches)
def _should_use_batch(self, searches):
return len(searches) >= self.MIN_BATCHED_SEARCH
def _serial_geocode(self, searches):
results = []
for search in searches:
(search_id, address, city, state, country) = search
self._logger.debug('--> Sending serial search: {}'.format(search))
coordinates = self.geocode(searchtext=address.encode('utf-8'),
city=city.encode('utf-8'),
state_province=state.encode('utf-8'),
country=country.encode('utf-8'))
self._logger.debug('--> result sent')
results.append((search_id, coordinates, []))
return results

View File

@ -71,9 +71,20 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
'Logroño, Argentina': [-61.69604, -29.50425] 'Logroño, Argentina': [-61.69604, -29.50425]
} }
TOMTOM_POINTS = HERE_POINTS.copy()
TOMTOM_POINTS.update({
'Plaza Mayor, Valladolid': [-4.72183, 41.5826],
'Paseo Zorrilla, Valladolid': [-4.74031, 41.63181],
'Valladolid': [-4.72838, 41.6542],
'Valladolid, Spain': [-4.72838, 41.6542],
'Madrid': [-3.70035, 40.42028],
'Logroño, Spain': [-2.44998, 42.46592],
})
FIXTURE_POINTS = { FIXTURE_POINTS = {
'google': GOOGLE_POINTS, 'google': GOOGLE_POINTS,
'heremaps': HERE_POINTS 'heremaps': HERE_POINTS,
'tomtom': TOMTOM_POINTS
} }
def setUp(self): def setUp(self):