From f6b7c13dde9010e54fc40959105b8e539edd46bc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Tue, 10 Jul 2018 13:25:40 +0200 Subject: [PATCH] GoogleMapsBulkGeocoder extraction --- .../cdb_dataservices_server--0.32.0.sql | 4 +- .../extension/sql/21_bulk_geocode_street.sql | 4 +- .../cartodb_services/google/__init__.py | 1 + .../cartodb_services/google/bulk_geocoder.py | 56 +++++++++++++++++++ .../cartodb_services/google/geocoder.py | 54 +----------------- test/integration/test_street_functions.py | 2 +- 6 files changed, 63 insertions(+), 58 deletions(-) create mode 100644 server/lib/python/cartodb_services/cartodb_services/google/bulk_geocoder.py diff --git a/server/extension/cdb_dataservices_server--0.32.0.sql b/server/extension/cdb_dataservices_server--0.32.0.sql index 8543e7f..a013f3c 100644 --- a/server/extension/cdb_dataservices_server--0.32.0.sql +++ b/server/extension/cdb_dataservices_server--0.32.0.sql @@ -2387,10 +2387,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_stre RETURNS SETOF cdb_dataservices_server.geocoding AS $$ from cartodb_services import run_street_point_geocoder from cartodb_services.tools import LegacyServiceManager - from cartodb_services.google import GoogleMapsGeocoder + from cartodb_services.google import GoogleMapsBulkGeocoder service_manager = LegacyServiceManager('geocoder', username, orgname, GD) - geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) + geocoder = GoogleMapsBulkGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; diff --git a/server/extension/sql/21_bulk_geocode_street.sql b/server/extension/sql/21_bulk_geocode_street.sql index adee498..309d323 100644 --- a/server/extension/sql/21_bulk_geocode_street.sql +++ b/server/extension/sql/21_bulk_geocode_street.sql @@ -44,10 +44,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_stre RETURNS SETOF cdb_dataservices_server.geocoding AS $$ from cartodb_services import run_street_point_geocoder from cartodb_services.tools import LegacyServiceManager - from cartodb_services.google import GoogleMapsGeocoder + from cartodb_services.google import GoogleMapsBulkGeocoder service_manager = LegacyServiceManager('geocoder', username, orgname, GD) - geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) + geocoder = GoogleMapsBulkGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; diff --git a/server/lib/python/cartodb_services/cartodb_services/google/__init__.py b/server/lib/python/cartodb_services/cartodb_services/google/__init__.py index 4e8cd6b..7570e31 100644 --- a/server/lib/python/cartodb_services/cartodb_services/google/__init__.py +++ b/server/lib/python/cartodb_services/cartodb_services/google/__init__.py @@ -1 +1,2 @@ from geocoder import GoogleMapsGeocoder +from bulk_geocoder import GoogleMapsBulkGeocoder diff --git a/server/lib/python/cartodb_services/cartodb_services/google/bulk_geocoder.py b/server/lib/python/cartodb_services/cartodb_services/google/bulk_geocoder.py new file mode 100644 index 0000000..aa174b8 --- /dev/null +++ b/server/lib/python/cartodb_services/cartodb_services/google/bulk_geocoder.py @@ -0,0 +1,56 @@ +from multiprocessing import Pool +from exceptions import MalformedResult +from cartodb_services import StreetPointBulkGeocoder +from cartodb_services.geocoder import compose_address +from cartodb_services.google import GoogleMapsGeocoder + + +def async_geocoder(geocoder, address, components): + results = geocoder.geocode(address=address, components=components) + return results if results else [] + + +class GoogleMapsBulkGeocoder(GoogleMapsGeocoder, StreetPointBulkGeocoder): + """A Google Maps Geocoder wrapper for python""" + PARALLEL_PROCESSES = 13 + + def __init__(self, client_id, client_secret, logger): + GoogleMapsGeocoder.__init__(self, client_id, client_secret, logger) + + def _bulk_geocode(self, searches): + bulk_results = {} + pool = Pool(processes=self.PARALLEL_PROCESSES) + for search in searches: + (search_id, street, city, state, country) = search + opt_params = self._build_optional_parameters(city, state, country) + # Geocoding works better if components are also inside the address + address = compose_address(street, city, state, country) + if address: + self._logger.debug('async geocoding --> {} {}'.format(address.encode('utf-8'), opt_params)) + result = pool.apply_async(async_geocoder, + (self.geocoder, address, opt_params)) + else: + result = [] + bulk_results[search_id] = result + pool.close() + pool.join() + + try: + results = [] + for search_id, bulk_result in bulk_results.items(): + try: + result = bulk_result.get() + except Exception as e: + self._logger.error('Error at Google async_geocoder', e) + result = [] + + lng_lat = self._extract_lng_lat_from_result(result[0]) if result else [] + results.append((search_id, lng_lat, [])) + return results + except KeyError as e: + self._logger.error('KeyError error', exception=e) + raise MalformedResult() + except Exception as e: + self._logger.error('General error', exception=e) + raise e + diff --git a/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py index 956d09e..aa8877c 100644 --- a/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py @@ -1,30 +1,15 @@ #!/usr/local/bin/python # -*- coding: utf-8 -*- -import googlemaps from urlparse import parse_qs from exceptions import MalformedResult -from cartodb_services import StreetPointBulkGeocoder from cartodb_services.geocoder import compose_address from cartodb_services.google.exceptions import InvalidGoogleCredentials from client_factory import GoogleMapsClientFactory -from multiprocessing import Pool, TimeoutError -import time, random - -def async_geocoder(geocoder, address, components): - # TODO: clean this and previous import - # time.sleep(.3 + random.random()) - # return [{ 'geometry': { 'location': { 'lng': 1, 'lat': 2 } } }] - - results = geocoder.geocode(address=address, components=components) - return results if results else [] - -class GoogleMapsGeocoder(StreetPointBulkGeocoder): - """A Google Maps Geocoder wrapper for python""" - PARALLEL_PROCESSES = 13 +class GoogleMapsGeocoder(): def __init__(self, client_id, client_secret, logger): if client_id is None: @@ -48,43 +33,6 @@ class GoogleMapsGeocoder(StreetPointBulkGeocoder): except KeyError: raise MalformedResult() - def _bulk_geocode(self, searches): - bulk_results = {} - pool = Pool(processes=self.PARALLEL_PROCESSES) - for search in searches: - (search_id, street, city, state, country) = search - opt_params = self._build_optional_parameters(city, state, country) - # Geocoding works better if components are also inside the address - address = compose_address(street, city, state, country) - if address: - self._logger.debug('async geocoding --> {} {}'.format(address.encode('utf-8'), opt_params)) - result = pool.apply_async(async_geocoder, - (self.geocoder, address, opt_params)) - else: - result = [] - bulk_results[search_id] = result - pool.close() - pool.join() - - try: - results = [] - for search_id, bulk_result in bulk_results.items(): - try: - result = bulk_result.get() - except Exception as e: - self._logger.error('Error at Google async_geocoder', e) - result = [] - - lng_lat = self._extract_lng_lat_from_result(result[0]) if result else [] - results.append((search_id, lng_lat, [])) - return results - except KeyError as e: - self._logger.error('KeyError error', exception=e) - raise MalformedResult() - except Exception as e: - self._logger.error('General error', exception=e) - raise e - def _extract_lng_lat_from_result(self, result): location = result['geometry']['location'] longitude = location['lng'] diff --git a/test/integration/test_street_functions.py b/test/integration/test_street_functions.py index d939de1..c35e1d5 100644 --- a/test/integration/test_street_functions.py +++ b/test/integration/test_street_functions.py @@ -38,7 +38,7 @@ class TestStreetFunctions(TestStreetFunctionsSetUp): def test_if_select_with_street_without_api_key_raise_error(self): table = self.env_variables['table_name'] - query = "SELECT cdb_geocode_street_point(street) " \ + query = "SELECT cdb_dataservices_client.cdb_geocode_street_point(street) " \ "as geometry FROM {0} LIMIT 1".format(table) try: IntegrationTestHelper.execute_query(self.sql_api_url, query)