From 3f08d37ef7de2745dd98bd337d6a4a5cfb5542fc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Juan=20Ignacio=20S=C3=A1nchez=20Lara?= Date: Thu, 7 Jun 2018 14:35:51 +0200 Subject: [PATCH] Google bulk_geocoder --- .../cartodb_services/google/geocoder.py | 47 +++++++++++++++++++ 1 file changed, 47 insertions(+) diff --git a/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py index 2fb0e01..a741539 100644 --- a/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py @@ -8,9 +8,17 @@ from exceptions import MalformedResult from cartodb_services.google.exceptions import InvalidGoogleCredentials from client_factory import GoogleMapsClientFactory +from multiprocessing import Pool, TimeoutError + +import json + +def async_geocoder(geocoder, address): + results = geocoder.geocode(address=address) + return results if results else [] class GoogleMapsGeocoder: """A Google Maps Geocoder wrapper for python""" + PARALLEL_PROCESSES = 13 def __init__(self, client_id, client_secret, logger): if client_id is None: @@ -33,6 +41,45 @@ class GoogleMapsGeocoder: except KeyError: raise MalformedResult() + def bulk_geocode(self, searchtext): + try: + decoded_searchtext = json.loads(searchtext) + except Exception as e: + self._logger.error('General error', exception=e) + raise e + + bulk_results = {} + pool = Pool(processes=self.PARALLEL_PROCESSES) + for search in decoded_searchtext: + search_id, address = [search[k] for k in ['id', 'address']] + if address: + result = pool.apply_async(async_geocoder, + (self.geocoder, address)) + else: + result = [] + bulk_results[search_id] = result + pool.close() + pool.join() + + try: + results = [] + for search_id, bulk_result in bulk_results.items(): + try: + result = bulk_result.get() + except Exception as e: + self._logger.error('Error at Google async_geocoder', e) + result = [] + + lng_lat = self._extract_lng_lat_from_result(result[0]) if result else [] + results.append([search_id, lng_lat, []]) + return results + except KeyError as e: + self._logger.error('KeyError error', exception=e) + raise MalformedResult() + except Exception as e: + self._logger.error('General error', exception=e) + raise e + def _extract_lng_lat_from_result(self, result): location = result['geometry']['location'] longitude = location['lng']