GoogleMapsBulkGeocoder extraction

This commit is contained in:
Juan Ignacio Sánchez Lara 2018-07-10 13:25:40 +02:00
parent 1ffe3658fe
commit f6b7c13dde
6 changed files with 63 additions and 58 deletions

View File

@ -2387,10 +2387,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_stre
RETURNS SETOF cdb_dataservices_server.geocoding AS $$ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services import run_street_point_geocoder from cartodb_services import run_street_point_geocoder
from cartodb_services.tools import LegacyServiceManager from cartodb_services.tools import LegacyServiceManager
from cartodb_services.google import GoogleMapsGeocoder from cartodb_services.google import GoogleMapsBulkGeocoder
service_manager = LegacyServiceManager('geocoder', username, orgname, GD) service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) geocoder = GoogleMapsBulkGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;

View File

@ -44,10 +44,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_stre
RETURNS SETOF cdb_dataservices_server.geocoding AS $$ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services import run_street_point_geocoder from cartodb_services import run_street_point_geocoder
from cartodb_services.tools import LegacyServiceManager from cartodb_services.tools import LegacyServiceManager
from cartodb_services.google import GoogleMapsGeocoder from cartodb_services.google import GoogleMapsBulkGeocoder
service_manager = LegacyServiceManager('geocoder', username, orgname, GD) service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) geocoder = GoogleMapsBulkGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches) return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;

View File

@ -1 +1,2 @@
from geocoder import GoogleMapsGeocoder from geocoder import GoogleMapsGeocoder
from bulk_geocoder import GoogleMapsBulkGeocoder

View File

@ -0,0 +1,56 @@
from multiprocessing import Pool
from exceptions import MalformedResult
from cartodb_services import StreetPointBulkGeocoder
from cartodb_services.geocoder import compose_address
from cartodb_services.google import GoogleMapsGeocoder
def async_geocoder(geocoder, address, components):
results = geocoder.geocode(address=address, components=components)
return results if results else []
class GoogleMapsBulkGeocoder(GoogleMapsGeocoder, StreetPointBulkGeocoder):
"""A Google Maps Geocoder wrapper for python"""
PARALLEL_PROCESSES = 13
def __init__(self, client_id, client_secret, logger):
GoogleMapsGeocoder.__init__(self, client_id, client_secret, logger)
def _bulk_geocode(self, searches):
bulk_results = {}
pool = Pool(processes=self.PARALLEL_PROCESSES)
for search in searches:
(search_id, street, city, state, country) = search
opt_params = self._build_optional_parameters(city, state, country)
# Geocoding works better if components are also inside the address
address = compose_address(street, city, state, country)
if address:
self._logger.debug('async geocoding --> {} {}'.format(address.encode('utf-8'), opt_params))
result = pool.apply_async(async_geocoder,
(self.geocoder, address, opt_params))
else:
result = []
bulk_results[search_id] = result
pool.close()
pool.join()
try:
results = []
for search_id, bulk_result in bulk_results.items():
try:
result = bulk_result.get()
except Exception as e:
self._logger.error('Error at Google async_geocoder', e)
result = []
lng_lat = self._extract_lng_lat_from_result(result[0]) if result else []
results.append((search_id, lng_lat, []))
return results
except KeyError as e:
self._logger.error('KeyError error', exception=e)
raise MalformedResult()
except Exception as e:
self._logger.error('General error', exception=e)
raise e

View File

@ -1,30 +1,15 @@
#!/usr/local/bin/python #!/usr/local/bin/python
# -*- coding: utf-8 -*- # -*- coding: utf-8 -*-
import googlemaps
from urlparse import parse_qs from urlparse import parse_qs
from exceptions import MalformedResult from exceptions import MalformedResult
from cartodb_services import StreetPointBulkGeocoder
from cartodb_services.geocoder import compose_address from cartodb_services.geocoder import compose_address
from cartodb_services.google.exceptions import InvalidGoogleCredentials from cartodb_services.google.exceptions import InvalidGoogleCredentials
from client_factory import GoogleMapsClientFactory from client_factory import GoogleMapsClientFactory
from multiprocessing import Pool, TimeoutError
import time, random class GoogleMapsGeocoder():
def async_geocoder(geocoder, address, components):
# TODO: clean this and previous import
# time.sleep(.3 + random.random())
# return [{ 'geometry': { 'location': { 'lng': 1, 'lat': 2 } } }]
results = geocoder.geocode(address=address, components=components)
return results if results else []
class GoogleMapsGeocoder(StreetPointBulkGeocoder):
"""A Google Maps Geocoder wrapper for python"""
PARALLEL_PROCESSES = 13
def __init__(self, client_id, client_secret, logger): def __init__(self, client_id, client_secret, logger):
if client_id is None: if client_id is None:
@ -48,43 +33,6 @@ class GoogleMapsGeocoder(StreetPointBulkGeocoder):
except KeyError: except KeyError:
raise MalformedResult() raise MalformedResult()
def _bulk_geocode(self, searches):
bulk_results = {}
pool = Pool(processes=self.PARALLEL_PROCESSES)
for search in searches:
(search_id, street, city, state, country) = search
opt_params = self._build_optional_parameters(city, state, country)
# Geocoding works better if components are also inside the address
address = compose_address(street, city, state, country)
if address:
self._logger.debug('async geocoding --> {} {}'.format(address.encode('utf-8'), opt_params))
result = pool.apply_async(async_geocoder,
(self.geocoder, address, opt_params))
else:
result = []
bulk_results[search_id] = result
pool.close()
pool.join()
try:
results = []
for search_id, bulk_result in bulk_results.items():
try:
result = bulk_result.get()
except Exception as e:
self._logger.error('Error at Google async_geocoder', e)
result = []
lng_lat = self._extract_lng_lat_from_result(result[0]) if result else []
results.append((search_id, lng_lat, []))
return results
except KeyError as e:
self._logger.error('KeyError error', exception=e)
raise MalformedResult()
except Exception as e:
self._logger.error('General error', exception=e)
raise e
def _extract_lng_lat_from_result(self, result): def _extract_lng_lat_from_result(self, result):
location = result['geometry']['location'] location = result['geometry']['location']
longitude = location['lng'] longitude = location['lng']

View File

@ -38,7 +38,7 @@ class TestStreetFunctions(TestStreetFunctionsSetUp):
def test_if_select_with_street_without_api_key_raise_error(self): def test_if_select_with_street_without_api_key_raise_error(self):
table = self.env_variables['table_name'] table = self.env_variables['table_name']
query = "SELECT cdb_geocode_street_point(street) " \ query = "SELECT cdb_dataservices_client.cdb_geocode_street_point(street) " \
"as geometry FROM {0} LIMIT 1".format(table) "as geometry FROM {0} LIMIT 1".format(table)
try: try:
IntegrationTestHelper.execute_query(self.sql_api_url, query) IntegrationTestHelper.execute_query(self.sql_api_url, query)