Street level columns for country, city and state

This commit is contained in:
Juan Ignacio Sánchez Lara 2018-06-22 12:19:51 +02:00
parent 4d2abc7667
commit bbbf70f3ac
5 changed files with 36 additions and 25 deletions

View File

@ -1986,7 +1986,8 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(
CONNECT cdb_dataservices_client._server_conn_str(); CONNECT cdb_dataservices_client._server_conn_str();
TARGET cdb_dataservices_server._DST_DisconnectUserTable; TARGET cdb_dataservices_server._DST_DisconnectUserTable;
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE; $$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, searchtext text) CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
country_column text, state_column text, city_column text, street_column text)
RETURNS SETOF cdb_dataservices_client.geocoding AS $$ RETURNS SETOF cdb_dataservices_client.geocoding AS $$
DECLARE DECLARE
query_row_count integer; query_row_count integer;
@ -1999,8 +2000,8 @@ DECLARE
BEGIN BEGIN
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count; EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; searchtext: %', RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
query_row_count, query, searchtext; query_row_count, query, country_column, state_column, city_column, street_column;
SELECT cdb_dataservices_client.cdb_enough_quota('hires_geocoder', query_row_count) INTO enough_quota; SELECT cdb_dataservices_client.cdb_enough_quota('hires_geocoder', query_row_count) INTO enough_quota;
IF enough_quota IS NOT NULL AND enough_quota THEN IF enough_quota IS NOT NULL AND enough_quota THEN
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count; RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
@ -2018,12 +2019,14 @@ BEGIN
EXECUTE format( EXECUTE format(
'WITH geocoding_data as (' || 'WITH geocoding_data as (' ||
' SELECT json_build_object(''id'', cartodb_id, ''address'', %s) as data , floor((cartodb_id-1)::float/$1) as batch' || ' SELECT ' ||
' json_build_object(''id'', cartodb_id, ''address'', %s, ''city'', %s, ''state'', %s, ''country'', %s) as data , ' ||
' floor((cartodb_id-1)::float/$1) as batch' ||
' FROM (%s) _x' || ' FROM (%s) _x' ||
')' || ')' ||
'INSERT INTO bulk_geocode_street_point SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' || 'INSERT INTO bulk_geocode_street_point SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
'FROM geocoding_data ' || 'FROM geocoding_data ' ||
'WHERE batch = $2', searchtext, query) 'WHERE batch = $2', street_column, city_column, state_column, country_column, query)
USING BATCHES_SIZE, cartodb_id_batch; USING BATCHES_SIZE, cartodb_id_batch;
GET DIAGNOSTICS current_row_count = ROW_COUNT; GET DIAGNOSTICS current_row_count = ROW_COUNT;

View File

@ -1,4 +1,5 @@
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, searchtext text) CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
country_column text, state_column text, city_column text, street_column text)
RETURNS SETOF cdb_dataservices_client.geocoding AS $$ RETURNS SETOF cdb_dataservices_client.geocoding AS $$
DECLARE DECLARE
query_row_count integer; query_row_count integer;
@ -11,8 +12,8 @@ DECLARE
BEGIN BEGIN
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count; EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; searchtext: %', RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
query_row_count, query, searchtext; query_row_count, query, country_column, state_column, city_column, street_column;
SELECT cdb_dataservices_client.cdb_enough_quota('hires_geocoder', query_row_count) INTO enough_quota; SELECT cdb_dataservices_client.cdb_enough_quota('hires_geocoder', query_row_count) INTO enough_quota;
IF enough_quota IS NOT NULL AND enough_quota THEN IF enough_quota IS NOT NULL AND enough_quota THEN
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count; RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
@ -30,12 +31,14 @@ BEGIN
EXECUTE format( EXECUTE format(
'WITH geocoding_data as (' || 'WITH geocoding_data as (' ||
' SELECT json_build_object(''id'', cartodb_id, ''address'', %s) as data , floor((cartodb_id-1)::float/$1) as batch' || ' SELECT ' ||
' json_build_object(''id'', cartodb_id, ''address'', %s, ''city'', %s, ''state'', %s, ''country'', %s) as data , ' ||
' floor((cartodb_id-1)::float/$1) as batch' ||
' FROM (%s) _x' || ' FROM (%s) _x' ||
')' || ')' ||
'INSERT INTO bulk_geocode_street_point SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' || 'INSERT INTO bulk_geocode_street_point SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
'FROM geocoding_data ' || 'FROM geocoding_data ' ||
'WHERE batch = $2', searchtext, query) 'WHERE batch = $2', street_column, city_column, state_column, country_column, query)
USING BATCHES_SIZE, cartodb_id_batch; USING BATCHES_SIZE, cartodb_id_batch;
GET DIAGNOSTICS current_row_count = ROW_COUNT; GET DIAGNOSTICS current_row_count = ROW_COUNT;

View File

@ -2375,7 +2375,7 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searchtext jsonb) CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
RETURNS SETOF cdb_dataservices_server.geocoding AS $$ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger
from cartodb_services.google import GoogleMapsGeocoder from cartodb_services.google import GoogleMapsGeocoder
@ -2389,7 +2389,7 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
try: try:
service_manager.assert_within_limits(quota=False) service_manager.assert_within_limits(quota=False)
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
geocode_results = geocoder.bulk_geocode(searchtext=searchtext) geocode_results = geocoder.bulk_geocode(searches=searches)
if geocode_results: if geocode_results:
results = [] results = []
for result in geocode_results: for result in geocode_results:
@ -2402,10 +2402,10 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
service_manager.quota_service.increment_success_service_use(len(results)) service_manager.quota_service.increment_success_service_use(len(results))
return results return results
else: else:
service_manager.quota_service.increment_empty_service_use(len(searchtext)) service_manager.quota_service.increment_empty_service_use(len(searches))
return [] return []
except QuotaExceededException as qe: except QuotaExceededException as qe:
service_manager.quota_service.increment_failed_service_use(len(searchtext)) service_manager.quota_service.increment_failed_service_use(len(searches))
return [] return []
except BaseException as e: except BaseException as e:
import sys import sys

View File

@ -32,7 +32,7 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED; $$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searchtext jsonb) CREATE OR REPLACE FUNCTION cdb_dataservices_server._cdb_bulk_google_geocode_street_point(username TEXT, orgname TEXT, searches jsonb)
RETURNS SETOF cdb_dataservices_server.geocoding AS $$ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger from cartodb_services.tools import LegacyServiceManager,QuotaExceededException,Logger
from cartodb_services.google import GoogleMapsGeocoder from cartodb_services.google import GoogleMapsGeocoder
@ -46,7 +46,7 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
try: try:
service_manager.assert_within_limits(quota=False) service_manager.assert_within_limits(quota=False)
geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger) geocoder = GoogleMapsGeocoder(service_manager.config.google_client_id, service_manager.config.google_api_key, service_manager.logger)
geocode_results = geocoder.bulk_geocode(searchtext=searchtext) geocode_results = geocoder.bulk_geocode(searches=searches)
if geocode_results: if geocode_results:
results = [] results = []
for result in geocode_results: for result in geocode_results:
@ -59,10 +59,10 @@ RETURNS SETOF cdb_dataservices_server.geocoding AS $$
service_manager.quota_service.increment_success_service_use(len(results)) service_manager.quota_service.increment_success_service_use(len(results))
return results return results
else: else:
service_manager.quota_service.increment_empty_service_use(len(searchtext)) service_manager.quota_service.increment_empty_service_use(len(searches))
return [] return []
except QuotaExceededException as qe: except QuotaExceededException as qe:
service_manager.quota_service.increment_failed_service_use(len(searchtext)) service_manager.quota_service.increment_failed_service_use(len(searches))
return [] return []
except BaseException as e: except BaseException as e:
import sys import sys

View File

@ -14,16 +14,18 @@ import json
import time, random import time, random
def async_geocoder(geocoder, address): def async_geocoder(geocoder, address, components):
# TODO: clean this and previous import
# time.sleep(.3 + random.random()) # time.sleep(.3 + random.random())
# return [{ 'geometry': { 'location': { 'lng': 1, 'lat': 2 } } }] # return [{ 'geometry': { 'location': { 'lng': 1, 'lat': 2 } } }]
results = geocoder.geocode(address=address) results = geocoder.geocode(address=address, components=components)
return results if results else [] return results if results else []
class GoogleMapsGeocoder: class GoogleMapsGeocoder:
"""A Google Maps Geocoder wrapper for python""" """A Google Maps Geocoder wrapper for python"""
PARALLEL_PROCESSES = 13 PARALLEL_PROCESSES = 13
SEARCH_KEYS = ['id', 'address', 'city', 'state', 'country']
def __init__(self, client_id, client_secret, logger): def __init__(self, client_id, client_secret, logger):
if client_id is None: if client_id is None:
@ -46,20 +48,23 @@ class GoogleMapsGeocoder:
except KeyError: except KeyError:
raise MalformedResult() raise MalformedResult()
def bulk_geocode(self, searchtext): def bulk_geocode(self, searches):
try: try:
decoded_searchtext = json.loads(searchtext) decoded_searches = json.loads(searches)
except Exception as e: except Exception as e:
self._logger.error('General error', exception=e) self._logger.error('General error', exception=e)
raise e raise e
bulk_results = {} bulk_results = {}
pool = Pool(processes=self.PARALLEL_PROCESSES) pool = Pool(processes=self.PARALLEL_PROCESSES)
for search in decoded_searchtext: for search in decoded_searches:
search_id, address = [search[k] for k in ['id', 'address']] search_id, address, city, state, country = \
[search.get(k, None) for k in self.SEARCH_KEYS]
opt_params = self._build_optional_parameters(city, state, country)
if address: if address:
self._logger.debug('async geocoding --> {} {}'.format(address.encode('utf-8'), opt_params))
result = pool.apply_async(async_geocoder, result = pool.apply_async(async_geocoder,
(self.geocoder, address)) (self.geocoder, address, opt_params))
else: else:
result = [] result = []
bulk_results[search_id] = result bulk_results[search_id] = result