checked, bulked cdb_bulk_geocode_street_point

This commit is contained in:
Juan Ignacio Sánchez Lara 2018-06-20 18:30:03 +02:00
parent e85f43f1d1
commit 58d70e252f
5 changed files with 139 additions and 15 deletions

2
.gitignore vendored
View File

@ -5,3 +5,5 @@ cartodb_services.egg-info/
build/ build/
dist/ dist/
.vscode/ .vscode/
.idea/
venv/

View File

@ -4,7 +4,21 @@
-- Make sure we have a sane search path to create/update the extension -- Make sure we have a sane search path to create/update the extension
SET search_path = "$user",cartodb,public,cdb_dataservices_client; SET search_path = "$user",cartodb,public,cdb_dataservices_client;
-- -- Taken from https://wiki.postgresql.org/wiki/Count_estimate
CREATE FUNCTION count_estimate(query text) RETURNS INTEGER AS
$func$
DECLARE
rec record;
ROWS INTEGER;
BEGIN
FOR rec IN EXECUTE 'EXPLAIN ' || query LOOP
ROWS := SUBSTRING(rec."QUERY PLAN" FROM ' rows=([[:digit:]]+)');
EXIT WHEN ROWS IS NOT NULL;
END LOOP;
RETURN ROWS;
END
$func$ LANGUAGE plpgsql;--
-- Geocoder server connection config -- Geocoder server connection config
-- --
-- The purpose of this function is provide to the PL/Proxy functions -- The purpose of this function is provide to the PL/Proxy functions
@ -1972,18 +1986,62 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(
CONNECT cdb_dataservices_client._server_conn_str(); CONNECT cdb_dataservices_client._server_conn_str();
TARGET cdb_dataservices_server._DST_DisconnectUserTable; TARGET cdb_dataservices_server._DST_DisconnectUserTable;
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE; $$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (searchtext jsonb) CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, searchtext text)
RETURNS SETOF cdb_dataservices_client.geocoding AS $$ RETURNS SETOF cdb_dataservices_client.geocoding AS $$
DECLARE DECLARE
monthly_quota integer;
used_quota integer;
soft_limit boolean;
provider text;
username text; remaining_quota integer;
orgname text; estimated_row_count integer;
cartodb_id_batch integer;
batches_n integer;
BATCHES_SIZE CONSTANT numeric := 100;
current_row_count integer ;
BEGIN BEGIN
-- TODO: check SELECT csqi.monthly_quota, csqi.used_quota, csqi.soft_limit, csqi.provider
-- TODO: bulk into monthly_quota, used_quota, soft_limit, provider
RETURN QUERY SELECT * FROM cdb_dataservices_client._cdb_bulk_geocode_street_point(searchtext); FROM cdb_dataservices_client.cdb_service_quota_info() csqi WHERE service = 'hires_geocoder';
remaining_quota := monthly_quota - used_quota;
estimated_row_count := count_estimate(query);
RAISE DEBUG 'cdb_bulk_geocode_street_point --> estimated: %, remaining: %, monthly_quota: %, used_quota: %, soft_limit: %, provider: %',
estimated_row_count, remaining_quota, monthly_quota, used_quota, soft_limit, provider;
IF estimated_row_count > remaining_quota THEN
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, estimated_row_count;
END IF;
EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', BATCHES_SIZE, query) INTO batches_n;
RAISE DEBUG 'batches_n: %', batches_n;
CREATE TEMPORARY TABLE bulk_geocode_street_point
(cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb);
FOR cartodb_id_batch in 0..(batches_n - 1)
LOOP
EXECUTE format(
'WITH geocoding_data as (' ||
' SELECT json_build_object(''id'', cartodb_id, ''address'', %s) as data , floor((cartodb_id-1)::float/$1) as batch' ||
' FROM (%s) _x' ||
')' ||
'INSERT INTO bulk_geocode_street_point SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
'FROM geocoding_data ' ||
'WHERE batch = $2', searchtext, query)
USING BATCHES_SIZE, cartodb_id_batch;
GET DIAGNOSTICS current_row_count = ROW_COUNT;
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
END LOOP;
RETURN QUERY SELECT * FROM bulk_geocode_street_point;
END; END;
$$ LANGUAGE 'plpgsql' SECURITY DEFINER STABLE PARALLEL UNSAFE; $$ LANGUAGE 'plpgsql' SECURITY DEFINER VOLATILE PARALLEL UNSAFE;
-- --
-- Exception-safe private DataServices API function -- Exception-safe private DataServices API function
-- --

15
client/sql/05_utils.sql Normal file
View File

@ -0,0 +1,15 @@
-- Taken from https://wiki.postgresql.org/wiki/Count_estimate
CREATE FUNCTION count_estimate(query text) RETURNS INTEGER AS
$func$
DECLARE
rec record;
ROWS INTEGER;
BEGIN
FOR rec IN EXECUTE 'EXPLAIN ' || query LOOP
ROWS := SUBSTRING(rec."QUERY PLAN" FROM ' rows=([[:digit:]]+)');
EXIT WHEN ROWS IS NOT NULL;
END LOOP;
RETURN ROWS;
END
$func$ LANGUAGE plpgsql;

View File

@ -1,12 +1,56 @@
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (searchtext jsonb) CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, searchtext text)
RETURNS SETOF cdb_dataservices_client.geocoding AS $$ RETURNS SETOF cdb_dataservices_client.geocoding AS $$
DECLARE DECLARE
monthly_quota integer;
used_quota integer;
soft_limit boolean;
provider text;
username text; remaining_quota integer;
orgname text; estimated_row_count integer;
cartodb_id_batch integer;
batches_n integer;
BATCHES_SIZE CONSTANT numeric := 100;
current_row_count integer ;
BEGIN BEGIN
-- TODO: check SELECT csqi.monthly_quota, csqi.used_quota, csqi.soft_limit, csqi.provider
-- TODO: bulk into monthly_quota, used_quota, soft_limit, provider
RETURN QUERY SELECT * FROM cdb_dataservices_client._cdb_bulk_geocode_street_point(searchtext); FROM cdb_dataservices_client.cdb_service_quota_info() csqi WHERE service = 'hires_geocoder';
remaining_quota := monthly_quota - used_quota;
estimated_row_count := count_estimate(query);
RAISE DEBUG 'cdb_bulk_geocode_street_point --> estimated: %, remaining: %, monthly_quota: %, used_quota: %, soft_limit: %, provider: %',
estimated_row_count, remaining_quota, monthly_quota, used_quota, soft_limit, provider;
IF estimated_row_count > remaining_quota THEN
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, estimated_row_count;
END IF;
EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', BATCHES_SIZE, query) INTO batches_n;
RAISE DEBUG 'batches_n: %', batches_n;
CREATE TEMPORARY TABLE bulk_geocode_street_point
(cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb);
FOR cartodb_id_batch in 0..(batches_n - 1)
LOOP
EXECUTE format(
'WITH geocoding_data as (' ||
' SELECT json_build_object(''id'', cartodb_id, ''address'', %s) as data , floor((cartodb_id-1)::float/$1) as batch' ||
' FROM (%s) _x' ||
')' ||
'INSERT INTO bulk_geocode_street_point SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
'FROM geocoding_data ' ||
'WHERE batch = $2', searchtext, query)
USING BATCHES_SIZE, cartodb_id_batch;
GET DIAGNOSTICS current_row_count = ROW_COUNT;
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
END LOOP;
RETURN QUERY SELECT * FROM bulk_geocode_street_point;
END; END;
$$ LANGUAGE 'plpgsql' SECURITY DEFINER STABLE PARALLEL UNSAFE; $$ LANGUAGE 'plpgsql' SECURITY DEFINER VOLATILE PARALLEL UNSAFE;

View File

@ -12,7 +12,12 @@ from multiprocessing import Pool, TimeoutError
import json import json
import time, random
def async_geocoder(geocoder, address): def async_geocoder(geocoder, address):
# time.sleep(.3 + random.random())
# return [{ 'geometry': { 'location': { 'lng': 1, 'lat': 2 } } }]
results = geocoder.geocode(address=address) results = geocoder.geocode(address=address)
return results if results else [] return results if results else []