diff --git a/.gitignore b/.gitignore index 3cf5f35..ee5b680 100644 --- a/.gitignore +++ b/.gitignore @@ -5,3 +5,5 @@ cartodb_services.egg-info/ build/ dist/ .vscode/ +.idea/ +venv/ diff --git a/client/cdb_dataservices_client--0.25.0.sql b/client/cdb_dataservices_client--0.25.0.sql index 536bc10..088f949 100644 --- a/client/cdb_dataservices_client--0.25.0.sql +++ b/client/cdb_dataservices_client--0.25.0.sql @@ -4,7 +4,21 @@ -- Make sure we have a sane search path to create/update the extension SET search_path = "$user",cartodb,public,cdb_dataservices_client; --- +-- Taken from https://wiki.postgresql.org/wiki/Count_estimate +CREATE FUNCTION count_estimate(query text) RETURNS INTEGER AS +$func$ +DECLARE + rec record; + ROWS INTEGER; +BEGIN + FOR rec IN EXECUTE 'EXPLAIN ' || query LOOP + ROWS := SUBSTRING(rec."QUERY PLAN" FROM ' rows=([[:digit:]]+)'); + EXIT WHEN ROWS IS NOT NULL; + END LOOP; + + RETURN ROWS; +END +$func$ LANGUAGE plpgsql;-- -- Geocoder server connection config -- -- The purpose of this function is provide to the PL/Proxy functions @@ -1972,18 +1986,62 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable( CONNECT cdb_dataservices_client._server_conn_str(); TARGET cdb_dataservices_server._DST_DisconnectUserTable; $$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE; -CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (searchtext jsonb) +CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, searchtext text) RETURNS SETOF cdb_dataservices_client.geocoding AS $$ DECLARE + monthly_quota integer; + used_quota integer; + soft_limit boolean; + provider text; - username text; - orgname text; + remaining_quota integer; + estimated_row_count integer; + + cartodb_id_batch integer; + batches_n integer; + BATCHES_SIZE CONSTANT numeric := 100; + current_row_count integer ; BEGIN - -- TODO: check - -- TODO: bulk - RETURN QUERY SELECT * FROM cdb_dataservices_client._cdb_bulk_geocode_street_point(searchtext); + SELECT csqi.monthly_quota, csqi.used_quota, csqi.soft_limit, csqi.provider + into monthly_quota, used_quota, soft_limit, provider + FROM cdb_dataservices_client.cdb_service_quota_info() csqi WHERE service = 'hires_geocoder'; + + remaining_quota := monthly_quota - used_quota; + estimated_row_count := count_estimate(query); + RAISE DEBUG 'cdb_bulk_geocode_street_point --> estimated: %, remaining: %, monthly_quota: %, used_quota: %, soft_limit: %, provider: %', + estimated_row_count, remaining_quota, monthly_quota, used_quota, soft_limit, provider; + IF estimated_row_count > remaining_quota THEN + RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, estimated_row_count; + END IF; + + EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', BATCHES_SIZE, query) INTO batches_n; + + RAISE DEBUG 'batches_n: %', batches_n; + + CREATE TEMPORARY TABLE bulk_geocode_street_point + (cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb); + + FOR cartodb_id_batch in 0..(batches_n - 1) + LOOP + + EXECUTE format( + 'WITH geocoding_data as (' || + ' SELECT json_build_object(''id'', cartodb_id, ''address'', %s) as data , floor((cartodb_id-1)::float/$1) as batch' || + ' FROM (%s) _x' || + ')' || + 'INSERT INTO bulk_geocode_street_point SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' || + 'FROM geocoding_data ' || + 'WHERE batch = $2', searchtext, query) + USING BATCHES_SIZE, cartodb_id_batch; + + GET DIAGNOSTICS current_row_count = ROW_COUNT; + RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count; + + END LOOP; + + RETURN QUERY SELECT * FROM bulk_geocode_street_point; END; -$$ LANGUAGE 'plpgsql' SECURITY DEFINER STABLE PARALLEL UNSAFE; +$$ LANGUAGE 'plpgsql' SECURITY DEFINER VOLATILE PARALLEL UNSAFE; -- -- Exception-safe private DataServices API function -- diff --git a/client/sql/05_utils.sql b/client/sql/05_utils.sql new file mode 100644 index 0000000..d161c6a --- /dev/null +++ b/client/sql/05_utils.sql @@ -0,0 +1,15 @@ +-- Taken from https://wiki.postgresql.org/wiki/Count_estimate +CREATE FUNCTION count_estimate(query text) RETURNS INTEGER AS +$func$ +DECLARE + rec record; + ROWS INTEGER; +BEGIN + FOR rec IN EXECUTE 'EXPLAIN ' || query LOOP + ROWS := SUBSTRING(rec."QUERY PLAN" FROM ' rows=([[:digit:]]+)'); + EXIT WHEN ROWS IS NOT NULL; + END LOOP; + + RETURN ROWS; +END +$func$ LANGUAGE plpgsql; \ No newline at end of file diff --git a/client/sql/21_bulk_geocoding_functions.sql b/client/sql/21_bulk_geocoding_functions.sql index 2a1f259..12766fc 100644 --- a/client/sql/21_bulk_geocoding_functions.sql +++ b/client/sql/21_bulk_geocoding_functions.sql @@ -1,12 +1,56 @@ -CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (searchtext jsonb) +CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, searchtext text) RETURNS SETOF cdb_dataservices_client.geocoding AS $$ DECLARE + monthly_quota integer; + used_quota integer; + soft_limit boolean; + provider text; - username text; - orgname text; + remaining_quota integer; + estimated_row_count integer; + + cartodb_id_batch integer; + batches_n integer; + BATCHES_SIZE CONSTANT numeric := 100; + current_row_count integer ; BEGIN - -- TODO: check - -- TODO: bulk - RETURN QUERY SELECT * FROM cdb_dataservices_client._cdb_bulk_geocode_street_point(searchtext); + SELECT csqi.monthly_quota, csqi.used_quota, csqi.soft_limit, csqi.provider + into monthly_quota, used_quota, soft_limit, provider + FROM cdb_dataservices_client.cdb_service_quota_info() csqi WHERE service = 'hires_geocoder'; + + remaining_quota := monthly_quota - used_quota; + estimated_row_count := count_estimate(query); + RAISE DEBUG 'cdb_bulk_geocode_street_point --> estimated: %, remaining: %, monthly_quota: %, used_quota: %, soft_limit: %, provider: %', + estimated_row_count, remaining_quota, monthly_quota, used_quota, soft_limit, provider; + IF estimated_row_count > remaining_quota THEN + RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, estimated_row_count; + END IF; + + EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', BATCHES_SIZE, query) INTO batches_n; + + RAISE DEBUG 'batches_n: %', batches_n; + + CREATE TEMPORARY TABLE bulk_geocode_street_point + (cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb); + + FOR cartodb_id_batch in 0..(batches_n - 1) + LOOP + + EXECUTE format( + 'WITH geocoding_data as (' || + ' SELECT json_build_object(''id'', cartodb_id, ''address'', %s) as data , floor((cartodb_id-1)::float/$1) as batch' || + ' FROM (%s) _x' || + ')' || + 'INSERT INTO bulk_geocode_street_point SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' || + 'FROM geocoding_data ' || + 'WHERE batch = $2', searchtext, query) + USING BATCHES_SIZE, cartodb_id_batch; + + GET DIAGNOSTICS current_row_count = ROW_COUNT; + RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count; + + END LOOP; + + RETURN QUERY SELECT * FROM bulk_geocode_street_point; END; -$$ LANGUAGE 'plpgsql' SECURITY DEFINER STABLE PARALLEL UNSAFE; +$$ LANGUAGE 'plpgsql' SECURITY DEFINER VOLATILE PARALLEL UNSAFE; diff --git a/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py b/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py index a741539..3dff728 100644 --- a/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py +++ b/server/lib/python/cartodb_services/cartodb_services/google/geocoder.py @@ -12,7 +12,12 @@ from multiprocessing import Pool, TimeoutError import json +import time, random + def async_geocoder(geocoder, address): + # time.sleep(.3 + random.random()) + # return [{ 'geometry': { 'location': { 'lng': 1, 'lat': 2 } } }] + results = geocoder.geocode(address=address) return results if results else []