HERE batch support

This commit is contained in:
Juan Ignacio Sánchez Lara 2018-07-02 18:35:36 +02:00
parent fc610313bf
commit e416a8a641
9 changed files with 181 additions and 17 deletions

View File

@ -1997,7 +1997,7 @@ DECLARE
cartodb_id_batch integer;
batches_n integer;
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
MAX_BATCH_SIZE CONSTANT numeric := 1000;
MAX_BATCH_SIZE CONSTANT numeric := 10000;
current_row_count integer ;
temp_table_name text;

View File

@ -9,7 +9,7 @@ DECLARE
cartodb_id_batch integer;
batches_n integer;
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
MAX_BATCH_SIZE CONSTANT numeric := 1000;
MAX_BATCH_SIZE CONSTANT numeric := 10000;
current_row_count integer ;
temp_table_name text;

View File

@ -2393,10 +2393,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_st
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services import run_street_point_geocoder
from cartodb_services.tools import LegacyServiceManager
from cartodb_services.here import HereMapsGeocoder
from cartodb_services.here import HereMapsBulkGeocoder
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params)
geocoder = HereMapsBulkGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params)
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;

View File

@ -50,10 +50,10 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_heremaps_geocode_st
RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services import run_street_point_geocoder
from cartodb_services.tools import LegacyServiceManager
from cartodb_services.here import HereMapsGeocoder
from cartodb_services.here import HereMapsBulkGeocoder
service_manager = LegacyServiceManager('geocoder', username, orgname, GD)
geocoder = HereMapsGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params)
geocoder = HereMapsBulkGeocoder(service_manager.config.heremaps_app_id, service_manager.config.heremaps_app_code, service_manager.logger, service_manager.config.heremaps_service_params)
return run_street_point_geocoder(plpy, GD, geocoder, service_manager, username, orgname, searches)
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;

View File

@ -71,6 +71,6 @@ class StreetPointBulkGeocoder:
search_id, address, city, state, country = \
[search.get(k, None) for k in self.SEARCH_KEYS]
street_geocoder_searches.append(
(search_id, address, city, state, country))
StreetGeocoderSearch(search_id, address, city, state, country))
return self._bulk_geocode(street_geocoder_searches)

View File

@ -1,2 +1,3 @@
from geocoder import HereMapsGeocoder
from bulk_geocoder import HereMapsBulkGeocoder
from routing import HereMapsRoutingIsoline

View File

@ -0,0 +1,153 @@
#!/usr/local/bin/python
# -*- coding: utf-8 -*-
import requests, time, zipfile, io, csv, cStringIO
import xml.etree.ElementTree as ET
from collections import namedtuple
from requests.adapters import HTTPAdapter
from cartodb_services import StreetPointBulkGeocoder
from cartodb_services.here import HereMapsGeocoder
from cartodb_services.metrics import Traceable
from cartodb_services.tools.exceptions import ServiceException
HereJobStatus = namedtuple('HereJobStatus', 'total_count processed_count status')
class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
BATCH_URL = 'https://batch.geocoder.cit.api.here.com/6.2/jobs'
MAX_BATCH_SIZE = 1000000 # From the docs
# https://developer.here.com/documentation/batch-geocoder/topics/read-batch-request-output.html
META_COLS = ['relevance', 'matchType', 'matchCode', 'matchLevel', 'matchQualityStreet']
MAX_STALLED_RETRIES = 100
BATCH_RETRY_SLEEP_S = 5
MIN_BATCHED_SEARCH = 100 # Under this, serial will be used
JOB_FINAL_STATES = ['completed', 'cancelled', 'deleted', 'failed']
def __init__(self, app_id, app_code, logger, service_params=None, maxresults=HereMapsGeocoder.DEFAULT_MAXRESULTS):
HereMapsGeocoder.__init__(self, app_id, app_code, logger, service_params, maxresults)
self.session = requests.Session()
self.session.mount(self.BATCH_URL,
HTTPAdapter(max_retries=self.max_retries))
self.credentials_params = {
'app_id': self.app_id,
'app_code': self.app_code,
}
def _bulk_geocode(self, searches):
if len(searches) > self.MAX_STALLED_RETRIES:
raise "Batch size can't be larger than {}".format(self.MAX_STALLED_RETRIES)
if self._should_use_batch(searches):
self._logger.debug('--> Batch geocode')
return self._batch_geocode(searches)
else:
self._logger.debug('--> Serial geocode')
return self._serial_geocode(searches)
def _should_use_batch(self, searches):
return len(searches) >= self.MIN_BATCHED_SEARCH
def _serial_geocode(self, searches):
results = []
for search in searches:
(search_id, address, city, state, country) = search
coordinates = self.geocode(searchtext=address, city=city, state=state, country=country)
results.append((search_id, coordinates, []))
return results
def _batch_geocode(self, searches):
request_id = self._send_batch(self._searches_to_csv(searches))
self._logger.debug('--> Sent batch {}'.format(request_id))
last_processed = 0
stalled_retries = 0
# https://developer.here.com/documentation/batch-geocoder/topics/job-status.html
while True:
job_info = self._job_status(request_id)
if job_info.processed_count == last_processed:
stalled_retries += 1
if stalled_retries > self.MAX_STALLED_RETRIES:
raise Exception('Too many retries for job {}'.format(request_id))
else:
stalled_retries = 0
last_processed = job_info.processed_count
self._logger.debug('--> Job poll check: {}'.format(job_info))
if job_info.status in self.JOB_FINAL_STATES:
break
else:
time.sleep(self.BATCH_RETRY_SLEEP_S)
self._logger.debug('--> Job complete: {}'.format(job_info))
results = self._download_results(request_id)
self._logger.debug('--> Results: {} rows; {}'.format(len(results), results))
return results
def _searches_to_csv(self, searches):
queue = cStringIO.StringIO()
writer = csv.writer(queue, delimiter='|')
writer.writerow(['recId', 'searchText', 'country'])
for search in searches:
fields = [search.address, search.city, search.state]
search_text = ', '.join(filter(None, fields))
row = [s.encode("utf-8")
for s in [str(search.id), search_text, search.country]]
writer.writerow(row)
return queue.getvalue()
def _send_batch(self, data):
cols = 'displayLatitude,displayLongitude,' + ','.join(self.META_COLS)
request_params = self.credentials_params.copy()
request_params.update({
'gen': 8,
'action': 'run',
#'mailto': 'juanignaciosl@carto.com',
'header': 'true',
'inDelim': '|',
'outDelim': '|',
'outCols': cols,
'outputcombined': 'true'
})
response = self.session.post(self.BATCH_URL, data=data,
params=request_params,
timeout=(self.connect_timeout, self.read_timeout))
if response.status_code == 200:
root = ET.fromstring(response.text)
return root.find('./Response/MetaInfo/RequestId').text
else:
raise ServiceException("Error sending HERE batch", response)
def _job_status(self, request_id):
polling_params = self.credentials_params.copy()
polling_params.update({'action': 'status'})
polling_r = self.session.get("{}/{}".format(self.BATCH_URL, request_id),
params=polling_params,
timeout=(self.connect_timeout, self.read_timeout))
polling_root = ET.fromstring(polling_r.text)
return HereJobStatus(
total_count=polling_root.find('./Response/TotalCount').text,
processed_count=polling_root.find('./Response/ProcessedCount').text,
status=polling_root.find('./Response/Status').text)
def _download_results(self, job_id):
result_r = self.session.get("{}/{}/result".format(self.BATCH_URL, job_id),
params=self.credentials_params,
timeout=(self.connect_timeout, self.read_timeout))
root_zip = zipfile.ZipFile(io.BytesIO(result_r.content))
results = []
for name in root_zip.namelist():
if name.endswith('_out.txt'):
reader = csv.DictReader(root_zip.open(name), delimiter='|')
for row in reader:
if row['SeqNumber'] == '1': # First per requested data
results.append((row['recId'], [row['displayLongitude'], row['displayLatitude']]))
return results

View File

@ -6,11 +6,10 @@ import requests
from requests.adapters import HTTPAdapter
from exceptions import *
from cartodb_services import StreetPointBulkGeocoder
from cartodb_services.metrics import Traceable
class HereMapsGeocoder(Traceable, StreetPointBulkGeocoder):
class HereMapsGeocoder(Traceable):
'A Here Maps Geocoder wrapper for python'
PRODUCTION_GEOCODE_JSON_URL = 'https://geocoder.api.here.com/6.2/geocode.json'
@ -65,14 +64,6 @@ class HereMapsGeocoder(Traceable, StreetPointBulkGeocoder):
self.read_timeout = service_params.get('read_timeout', self.READ_TIMEOUT)
self.max_retries = service_params.get('max_retries', self.MAX_RETRIES)
def _bulk_geocode(self, searches):
results = []
for search in searches:
(search_id, address, city, state, country) = search
coordinates = self.geocode(searchtext=address, city=city, state=state, country=country)
results.append((search_id, coordinates, []))
return results
def geocode(self, **kwargs):
params = {}
for key, value in kwargs.iteritems():

View File

@ -214,6 +214,25 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
}
assert_equal(self._x_y_by_cartodb_id(response), points_by_cartodb_id)
def test_large_batches(self):
"""
Useful just to test a good batch size
"""
n = 10
streets = []
for i in range(0, n):
streets.append('{{"cartodb_id": {}, "address": "{} Yonge Street, ' \
'Toronto, Canada"}}'.format(i, i))
query = "select *, st_x(the_geom), st_y(the_geom) " \
"FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \
"'select * from jsonb_to_recordset(''[" \
"{}" \
"]''::jsonb) as (cartodb_id integer, address text)', " \
"'address', null, null, null, {})".format(','.join(streets), n)
response = self._run_authenticated(query)
assert_equal(n - 1, len(response['rows']))
def _run_authenticated(self, query):
authenticated_query = "{}&api_key={}".format(query,
self.env_variables[