Bulk geocoding refactor
This commit is contained in:
parent
e884b1d1f4
commit
18e2349713
0
server/__init__.py
Normal file
0
server/__init__.py
Normal file
@ -2367,53 +2367,37 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
|
||||
with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params):
|
||||
if user_geocoder_config.google_geocoder:
|
||||
google_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
result = plpy.execute(google_plan, [username, orgname, searches])
|
||||
return result
|
||||
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
elif user_geocoder_config.heremaps_geocoder:
|
||||
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
else:
|
||||
raise Exception('Requested geocoder is not available')
|
||||
|
||||
result = plpy.execute(plan, [username, orgname, searches])
|
||||
return result
|
||||
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import LegacyServiceManager
|
||||
from cartodb_services.google import GoogleMapsGeocoder
|
||||
|
||||
plpy.execute("SELECT cdb_dataservices_server._get_logger_config()")
|
||||
logger_config = GD["logger_config"]
|
||||
|
||||
logger = Logger(logger_config)
|
||||
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
|
||||
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
try:
|
||||
service_manager.assert_within_limits(quota=False)
|
||||
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
|
||||
geocode_results = geocoder.bulk_geocode(searches=searches)
|
||||
if geocode_results:
|
||||
results = []
|
||||
for result in geocode_results:
|
||||
if result[1]:
|
||||
plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"])
|
||||
point = plpy.execute(plan, result[1], 1)[0]
|
||||
results.append([result[0], point['the_geom'], None])
|
||||
else:
|
||||
results.append([result[0], None, None])
|
||||
service_manager.quota_service.increment_success_service_use(len(results))
|
||||
return results
|
||||
else:
|
||||
service_manager.quota_service.increment_empty_service_use(len(searches))
|
||||
return []
|
||||
except QuotaExceededException as qe:
|
||||
service_manager.quota_service.increment_failed_service_use(len(searches))
|
||||
return []
|
||||
except BaseException as e:
|
||||
import sys
|
||||
service_manager.quota_service.increment_failed_service_use()
|
||||
service_manager.logger.error('Error trying to bulk geocode street point using google maps', sys.exc_info(), data={"username": username, "orgname": orgname})
|
||||
raise Exception('Error trying to bulk geocode street point using google maps')
|
||||
finally:
|
||||
service_manager.quota_service.increment_total_service_use()
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import LegacyServiceManager
|
||||
from cartodb_services.here import HereMapsGeocoder
|
||||
|
||||
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
|
||||
geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_geocode_admin0_polygon(username text, orgname text, country_name text)
|
||||
|
@ -24,52 +24,36 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
|
||||
with metrics('cdb_bulk_geocode_street_point', user_geocoder_config, logger, params):
|
||||
if user_geocoder_config.google_geocoder:
|
||||
google_plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
result = plpy.execute(google_plan, [username, orgname, searches])
|
||||
return result
|
||||
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_google_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
elif user_geocoder_config.heremaps_geocoder:
|
||||
plan = plpy.prepare("SELECT * FROM cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point($1, $2, $3); ", ["text", "text", "jsonb"])
|
||||
else:
|
||||
raise Exception('Requested geocoder is not available')
|
||||
|
||||
result = plpy.execute(plan, [username, orgname, searches])
|
||||
return result
|
||||
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import LegacyServiceManager
|
||||
from cartodb_services.google import GoogleMapsGeocoder
|
||||
|
||||
plpy.execute("SELECT cdb_dataservices_server._get_logger_config()")
|
||||
logger_config = GD["logger_config"]
|
||||
|
||||
logger = Logger(logger_config)
|
||||
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
|
||||
|
||||
try:
|
||||
service_manager.assert_within_limits(quota=False)
|
||||
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
|
||||
geocode_results = geocoder.bulk_geocode(searches=searches)
|
||||
if geocode_results:
|
||||
results = []
|
||||
for result in geocode_results:
|
||||
if result[1]:
|
||||
plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"])
|
||||
point = plpy.execute(plan, result[1], 1)[0]
|
||||
results.append([result[0], point['the_geom'], None])
|
||||
else:
|
||||
results.append([result[0], None, None])
|
||||
service_manager.quota_service.increment_success_service_use(len(results))
|
||||
return results
|
||||
else:
|
||||
service_manager.quota_service.increment_empty_service_use(len(searches))
|
||||
return []
|
||||
except QuotaExceededException as qe:
|
||||
service_manager.quota_service.increment_failed_service_use(len(searches))
|
||||
return []
|
||||
except BaseException as e:
|
||||
import sys
|
||||
service_manager.quota_service.increment_failed_service_use()
|
||||
service_manager.logger.error('Error trying to bulk geocode street point using google maps', sys.exc_info(), data={"username": username, "orgname": orgname})
|
||||
raise Exception('Error trying to bulk geocode street point using google maps')
|
||||
finally:
|
||||
service_manager.quota_service.increment_total_service_use()
|
||||
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
|
||||
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
|
||||
from cartodb_services import run_street_point_geocoder
|
||||
from cartodb_services.tools import LegacyServiceManager
|
||||
from cartodb_services.here import HereMapsGeocoder
|
||||
|
||||
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
|
||||
geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params)
|
||||
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
|
||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||
|
||||
|
0
server/lib/__init__.py
Normal file
0
server/lib/__init__.py
Normal file
0
server/lib/python/__init__.py
Normal file
0
server/lib/python/__init__.py
Normal file
0
server/lib/python/cartodb_services/__init__.py
Normal file
0
server/lib/python/cartodb_services/__init__.py
Normal file
@ -33,3 +33,5 @@ def _reset():
|
||||
|
||||
plpy = None
|
||||
GD = None
|
||||
|
||||
from geocoder import run_street_point_geocoder, StreetPointBulkGeocoder
|
||||
|
@ -0,0 +1,76 @@
|
||||
#!/usr/local/bin/python
|
||||
# -*- coding: utf-8 -*-
|
||||
|
||||
from tools import QuotaExceededException, Logger
|
||||
from collections import namedtuple
|
||||
import json
|
||||
|
||||
|
||||
def run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches):
|
||||
plpy.execute("SELECT cdb_dataservices_server._get_logger_config()")
|
||||
logger_config = GD["logger_config"]
|
||||
|
||||
logger = Logger(logger_config)
|
||||
|
||||
try:
|
||||
service_manager.assert_within_limits(quota=False)
|
||||
geocode_results = geocoder.bulk_geocode(searches=searches)
|
||||
if geocode_results:
|
||||
results = []
|
||||
for result in geocode_results:
|
||||
if result[1]:
|
||||
plan = plpy.prepare("SELECT ST_SetSRID(ST_MakePoint($1, $2), 4326) as the_geom; ", ["double precision", "double precision"])
|
||||
point = plpy.execute(plan, result[1], 1)[0]
|
||||
results.append([result[0], point['the_geom'], None])
|
||||
else:
|
||||
results.append([result[0], None, None])
|
||||
service_manager.quota_service.increment_success_service_use(len(results))
|
||||
return results
|
||||
else:
|
||||
service_manager.quota_service.increment_empty_service_use(len(searches))
|
||||
return []
|
||||
except QuotaExceededException as qe:
|
||||
service_manager.quota_service.increment_failed_service_use(len(searches))
|
||||
return []
|
||||
except BaseException as e:
|
||||
import sys
|
||||
service_manager.quota_service.increment_failed_service_use()
|
||||
service_manager.logger.error('Error trying to bulk geocode street point', sys.exc_info(), data={"username": username, "orgname": orgname})
|
||||
raise Exception('Error trying to bulk geocode street')
|
||||
finally:
|
||||
service_manager.quota_service.increment_total_service_use()
|
||||
|
||||
|
||||
StreetGeocoderSearch = namedtuple('StreetGeocoderSearch', 'id address city state country')
|
||||
|
||||
|
||||
class StreetPointBulkGeocoder:
|
||||
"""
|
||||
Classes extending StreetPointBulkGeocoder should implement:
|
||||
* _bulk_geocode(decoded_searches)
|
||||
"""
|
||||
|
||||
SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country']
|
||||
|
||||
def bulk_geocode(self, searches):
|
||||
"""
|
||||
:param searches: array of StreetGeocoderSearch
|
||||
:return: array of tuples with three elements:
|
||||
* id
|
||||
* latitude and longitude (array of two elements)
|
||||
* empty array (future use: metadata)
|
||||
"""
|
||||
try:
|
||||
decoded_searches = json.loads(searches)
|
||||
except Exception as e:
|
||||
self._logger.error('General error', exception=e)
|
||||
raise e
|
||||
|
||||
street_geocoder_searches = []
|
||||
for search in decoded_searches:
|
||||
search_id, address, city, state, country = \
|
||||
[search.get(k, None) for k in self.SEARCH_KEYS]
|
||||
street_geocoder_searches.append(
|
||||
(search_id, address, city, state, country))
|
||||
|
||||
return self._bulk_geocode(street_geocoder_searches)
|
@ -5,13 +5,12 @@ import googlemaps
|
||||
from urlparse import parse_qs
|
||||
|
||||
from exceptions import MalformedResult
|
||||
from cartodb_services import StreetPointBulkGeocoder
|
||||
from cartodb_services.google.exceptions import InvalidGoogleCredentials
|
||||
from client_factory import GoogleMapsClientFactory
|
||||
|
||||
from multiprocessing import Pool, TimeoutError
|
||||
|
||||
import json
|
||||
|
||||
import time, random
|
||||
|
||||
def async_geocoder(geocoder, address, components):
|
||||
@ -22,10 +21,9 @@ def async_geocoder(geocoder, address, components):
|
||||
results = geocoder.geocode(address=address, components=components)
|
||||
return results if results else []
|
||||
|
||||
class GoogleMapsGeocoder:
|
||||
class GoogleMapsGeocoder(StreetPointBulkGeocoder):
|
||||
"""A Google Maps Geocoder wrapper for python"""
|
||||
PARALLEL_PROCESSES = 13
|
||||
SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country']
|
||||
|
||||
def __init__(self, client_id, client_secret, logger):
|
||||
if client_id is None:
|
||||
@ -48,18 +46,11 @@ class GoogleMapsGeocoder:
|
||||
except KeyError:
|
||||
raise MalformedResult()
|
||||
|
||||
def bulk_geocode(self, searches):
|
||||
try:
|
||||
decoded_searches = json.loads(searches)
|
||||
except Exception as e:
|
||||
self._logger.error('General error', exception=e)
|
||||
raise e
|
||||
|
||||
def _bulk_geocode(self, searches):
|
||||
bulk_results = {}
|
||||
pool = Pool(processes=self.PARALLEL_PROCESSES)
|
||||
for search in decoded_searches:
|
||||
search_id, address, city, state, country = \
|
||||
[search.get(k, None) for k in self.SEARCH_KEYS]
|
||||
for search in searches:
|
||||
(search_id, address, city, state, country) = search
|
||||
opt_params = self._build_optional_parameters(city, state, country)
|
||||
# Geocoding works better if components are also inside the address
|
||||
address = ', '.join(filter(None, [address, city, state, country]))
|
||||
@ -83,7 +74,7 @@ class GoogleMapsGeocoder:
|
||||
result = []
|
||||
|
||||
lng_lat = self._extract_lng_lat_from_result(result[0]) if result else []
|
||||
results.append([search_id, lng_lat, []])
|
||||
results.append((search_id, lng_lat, []))
|
||||
return results
|
||||
except KeyError as e:
|
||||
self._logger.error('KeyError error', exception=e)
|
||||
|
@ -6,10 +6,11 @@ import requests
|
||||
|
||||
from requests.adapters import HTTPAdapter
|
||||
from exceptions import *
|
||||
from cartodb_services import StreetPointBulkGeocoder
|
||||
from cartodb_services.metrics import Traceable
|
||||
|
||||
|
||||
class HereMapsGeocoder(Traceable):
|
||||
class HereMapsGeocoder(Traceable, StreetPointBulkGeocoder):
|
||||
'A Here Maps Geocoder wrapper for python'
|
||||
|
||||
PRODUCTION_GEOCODE_JSON_URL = 'https://geocoder.api.here.com/6.2/geocode.json'
|
||||
@ -64,6 +65,13 @@ class HereMapsGeocoder(Traceable):
|
||||
self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT)
|
||||
self.max_retries = service_params.get('max_retries', self.MAX_RETRIES)
|
||||
|
||||
def _bulk_geocode(self, searches):
|
||||
results = []
|
||||
for search in searches:
|
||||
result = ()
|
||||
return results
|
||||
coordinates = geocoder.geocode(searchtext=searchtext, city=city, state=state_province, country=country)
|
||||
|
||||
def geocode(self, **kwargs):
|
||||
params = {}
|
||||
for key, value in kwargs.iteritems():
|
||||
|
Loading…
Reference in New Issue
Block a user