Send optimal batch size
This commit is contained in:
parent
286a75fa8e
commit
531ad28158
@ -112,7 +112,8 @@ CREATE TYPE cdb_dataservices_client.service_quota_info AS (
|
|||||||
monthly_quota NUMERIC,
|
monthly_quota NUMERIC,
|
||||||
used_quota NUMERIC,
|
used_quota NUMERIC,
|
||||||
soft_limit BOOLEAN,
|
soft_limit BOOLEAN,
|
||||||
provider TEXT
|
provider TEXT,
|
||||||
|
max_batch_size NUMERIC
|
||||||
);
|
);
|
||||||
--
|
--
|
||||||
-- Public dataservices API function
|
-- Public dataservices API function
|
||||||
@ -1987,25 +1988,36 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(
|
|||||||
TARGET cdb_dataservices_server._DST_DisconnectUserTable;
|
TARGET cdb_dataservices_server._DST_DisconnectUserTable;
|
||||||
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
|
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
|
||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
|
||||||
street_column text, city_column text default null, state_column text default null, country_column text default null, batch_size integer DEFAULT 50)
|
street_column text, city_column text default null, state_column text default null, country_column text default null, batch_size integer DEFAULT NULL)
|
||||||
RETURNS SETOF cdb_dataservices_client.geocoding AS $$
|
RETURNS SETOF cdb_dataservices_client.geocoding AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
query_row_count integer;
|
query_row_count integer;
|
||||||
enough_quota boolean;
|
enough_quota boolean;
|
||||||
remaining_quota integer;
|
remaining_quota integer;
|
||||||
|
max_batch_size integer;
|
||||||
|
|
||||||
cartodb_id_batch integer;
|
cartodb_id_batch integer;
|
||||||
batches_n integer;
|
batches_n integer;
|
||||||
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
|
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
|
||||||
MAX_BATCH_SIZE CONSTANT numeric := 10000;
|
MAX_SAFE_BATCH_SIZE CONSTANT numeric := 5000;
|
||||||
current_row_count integer ;
|
current_row_count integer ;
|
||||||
|
|
||||||
temp_table_name text;
|
temp_table_name text;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
SELECT csqi.monthly_quota - csqi.used_quota AS remaining_quota, csqi.max_batch_size
|
||||||
|
INTO remaining_quota, max_batch_size
|
||||||
|
FROM cdb_dataservices_client.cdb_service_quota_info() csqi
|
||||||
|
WHERE service = 'hires_geocoder';
|
||||||
|
RAISE DEBUG 'remaining_quota: %; max_batch_size: %', remaining_quota, max_batch_size;
|
||||||
|
|
||||||
IF batch_size IS NULL THEN
|
IF batch_size IS NULL THEN
|
||||||
RAISE EXCEPTION 'batch_size can''t be null';
|
batch_size := max_batch_size;
|
||||||
ELSIF batch_size > MAX_BATCH_SIZE THEN
|
ELSIF batch_size > max_batch_size THEN
|
||||||
RAISE EXCEPTION 'batch_size must be lower than %', MAX_BATCH_SIZE + 1;
|
RAISE EXCEPTION 'batch_size must be lower than %', max_batch_size + 1;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
IF batch_size > MAX_SAFE_BATCH_SIZE THEN
|
||||||
|
batch_size := MAX_SAFE_BATCH_SIZE;
|
||||||
END IF;
|
END IF;
|
||||||
|
|
||||||
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
|
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
|
||||||
@ -2013,11 +2025,7 @@ BEGIN
|
|||||||
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
|
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
|
||||||
query_row_count, query, country_column, state_column, city_column, street_column;
|
query_row_count, query, country_column, state_column, city_column, street_column;
|
||||||
SELECT cdb_dataservices_client.cdb_enough_quota('hires_geocoder', query_row_count) INTO enough_quota;
|
SELECT cdb_dataservices_client.cdb_enough_quota('hires_geocoder', query_row_count) INTO enough_quota;
|
||||||
IF enough_quota IS NOT NULL AND NOT enough_quota THEN
|
IF remaining_quota < query_row_count THEN
|
||||||
SELECT csqi.monthly_quota - csqi.used_quota AS remaining_quota
|
|
||||||
INTO remaining_quota
|
|
||||||
FROM cdb_dataservices_client.cdb_service_quota_info() csqi
|
|
||||||
WHERE service = 'hires_geocoder';
|
|
||||||
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
|
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
|
||||||
END IF;
|
END IF;
|
||||||
|
|
||||||
@ -2036,25 +2044,27 @@ BEGIN
|
|||||||
coalesce(state_column, ''''''), coalesce(country_column, '''''')
|
coalesce(state_column, ''''''), coalesce(country_column, '''''')
|
||||||
into street_column, city_column, state_column, country_column;
|
into street_column, city_column, state_column, country_column;
|
||||||
|
|
||||||
FOR cartodb_id_batch in 0..(batches_n - 1)
|
IF batches_n > 0 THEN
|
||||||
LOOP
|
FOR cartodb_id_batch in 0..(batches_n - 1)
|
||||||
|
LOOP
|
||||||
|
|
||||||
EXECUTE format(
|
EXECUTE format(
|
||||||
'WITH geocoding_data as (' ||
|
'WITH geocoding_data as (' ||
|
||||||
' SELECT ' ||
|
' SELECT ' ||
|
||||||
' json_build_object(''id'', cartodb_id, ''address'', %s, ''city'', %s, ''state'', %s, ''country'', %s) as data , ' ||
|
' json_build_object(''id'', cartodb_id, ''address'', %s, ''city'', %s, ''state'', %s, ''country'', %s) as data , ' ||
|
||||||
' floor((cartodb_id-1)::float/$1) as batch' ||
|
' floor((cartodb_id-1)::float/$1) as batch' ||
|
||||||
' FROM (%s) _x' ||
|
' FROM (%s) _x' ||
|
||||||
') ' ||
|
') ' ||
|
||||||
'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
|
'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
|
||||||
'FROM geocoding_data ' ||
|
'FROM geocoding_data ' ||
|
||||||
'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name)
|
'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name)
|
||||||
USING batch_size, cartodb_id_batch;
|
USING batch_size, cartodb_id_batch;
|
||||||
|
|
||||||
GET DIAGNOSTICS current_row_count = ROW_COUNT;
|
GET DIAGNOSTICS current_row_count = ROW_COUNT;
|
||||||
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
|
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
|
||||||
|
|
||||||
END LOOP;
|
END LOOP;
|
||||||
|
END IF;
|
||||||
|
|
||||||
RETURN QUERY EXECUTE 'SELECT * FROM ' || quote_ident(temp_table_name);
|
RETURN QUERY EXECUTE 'SELECT * FROM ' || quote_ident(temp_table_name);
|
||||||
END;
|
END;
|
||||||
|
@ -39,5 +39,6 @@ CREATE TYPE cdb_dataservices_client.service_quota_info AS (
|
|||||||
monthly_quota NUMERIC,
|
monthly_quota NUMERIC,
|
||||||
used_quota NUMERIC,
|
used_quota NUMERIC,
|
||||||
soft_limit BOOLEAN,
|
soft_limit BOOLEAN,
|
||||||
provider TEXT
|
provider TEXT,
|
||||||
|
max_batch_size NUMERIC
|
||||||
);
|
);
|
||||||
|
@ -1,23 +1,34 @@
|
|||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
|
||||||
street_column text, city_column text default null, state_column text default null, country_column text default null, batch_size integer DEFAULT 50)
|
street_column text, city_column text default null, state_column text default null, country_column text default null, batch_size integer DEFAULT NULL)
|
||||||
RETURNS SETOF cdb_dataservices_client.geocoding AS $$
|
RETURNS SETOF cdb_dataservices_client.geocoding AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
query_row_count integer;
|
query_row_count integer;
|
||||||
enough_quota boolean;
|
enough_quota boolean;
|
||||||
remaining_quota integer;
|
remaining_quota integer;
|
||||||
|
max_batch_size integer;
|
||||||
|
|
||||||
cartodb_id_batch integer;
|
cartodb_id_batch integer;
|
||||||
batches_n integer;
|
batches_n integer;
|
||||||
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
|
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
|
||||||
MAX_BATCH_SIZE CONSTANT numeric := 10000;
|
MAX_SAFE_BATCH_SIZE CONSTANT numeric := 5000;
|
||||||
current_row_count integer ;
|
current_row_count integer ;
|
||||||
|
|
||||||
temp_table_name text;
|
temp_table_name text;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
SELECT csqi.monthly_quota - csqi.used_quota AS remaining_quota, csqi.max_batch_size
|
||||||
|
INTO remaining_quota, max_batch_size
|
||||||
|
FROM cdb_dataservices_client.cdb_service_quota_info() csqi
|
||||||
|
WHERE service = 'hires_geocoder';
|
||||||
|
RAISE DEBUG 'remaining_quota: %; max_batch_size: %', remaining_quota, max_batch_size;
|
||||||
|
|
||||||
IF batch_size IS NULL THEN
|
IF batch_size IS NULL THEN
|
||||||
RAISE EXCEPTION 'batch_size can''t be null';
|
batch_size := max_batch_size;
|
||||||
ELSIF batch_size > MAX_BATCH_SIZE THEN
|
ELSIF batch_size > max_batch_size THEN
|
||||||
RAISE EXCEPTION 'batch_size must be lower than %', MAX_BATCH_SIZE + 1;
|
RAISE EXCEPTION 'batch_size must be lower than %', max_batch_size + 1;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
IF batch_size > MAX_SAFE_BATCH_SIZE THEN
|
||||||
|
batch_size := MAX_SAFE_BATCH_SIZE;
|
||||||
END IF;
|
END IF;
|
||||||
|
|
||||||
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
|
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
|
||||||
@ -25,11 +36,7 @@ BEGIN
|
|||||||
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
|
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
|
||||||
query_row_count, query, country_column, state_column, city_column, street_column;
|
query_row_count, query, country_column, state_column, city_column, street_column;
|
||||||
SELECT cdb_dataservices_client.cdb_enough_quota('hires_geocoder', query_row_count) INTO enough_quota;
|
SELECT cdb_dataservices_client.cdb_enough_quota('hires_geocoder', query_row_count) INTO enough_quota;
|
||||||
IF enough_quota IS NOT NULL AND NOT enough_quota THEN
|
IF remaining_quota < query_row_count THEN
|
||||||
SELECT csqi.monthly_quota - csqi.used_quota AS remaining_quota
|
|
||||||
INTO remaining_quota
|
|
||||||
FROM cdb_dataservices_client.cdb_service_quota_info() csqi
|
|
||||||
WHERE service = 'hires_geocoder';
|
|
||||||
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
|
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
|
||||||
END IF;
|
END IF;
|
||||||
|
|
||||||
@ -48,25 +55,27 @@ BEGIN
|
|||||||
coalesce(state_column, ''''''), coalesce(country_column, '''''')
|
coalesce(state_column, ''''''), coalesce(country_column, '''''')
|
||||||
into street_column, city_column, state_column, country_column;
|
into street_column, city_column, state_column, country_column;
|
||||||
|
|
||||||
FOR cartodb_id_batch in 0..(batches_n - 1)
|
IF batches_n > 0 THEN
|
||||||
LOOP
|
FOR cartodb_id_batch in 0..(batches_n - 1)
|
||||||
|
LOOP
|
||||||
|
|
||||||
EXECUTE format(
|
EXECUTE format(
|
||||||
'WITH geocoding_data as (' ||
|
'WITH geocoding_data as (' ||
|
||||||
' SELECT ' ||
|
' SELECT ' ||
|
||||||
' json_build_object(''id'', cartodb_id, ''address'', %s, ''city'', %s, ''state'', %s, ''country'', %s) as data , ' ||
|
' json_build_object(''id'', cartodb_id, ''address'', %s, ''city'', %s, ''state'', %s, ''country'', %s) as data , ' ||
|
||||||
' floor((cartodb_id-1)::float/$1) as batch' ||
|
' floor((cartodb_id-1)::float/$1) as batch' ||
|
||||||
' FROM (%s) _x' ||
|
' FROM (%s) _x' ||
|
||||||
') ' ||
|
') ' ||
|
||||||
'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
|
'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
|
||||||
'FROM geocoding_data ' ||
|
'FROM geocoding_data ' ||
|
||||||
'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name)
|
'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name)
|
||||||
USING batch_size, cartodb_id_batch;
|
USING batch_size, cartodb_id_batch;
|
||||||
|
|
||||||
GET DIAGNOSTICS current_row_count = ROW_COUNT;
|
GET DIAGNOSTICS current_row_count = ROW_COUNT;
|
||||||
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
|
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
|
||||||
|
|
||||||
END LOOP;
|
END LOOP;
|
||||||
|
END IF;
|
||||||
|
|
||||||
RETURN QUERY EXECUTE 'SELECT * FROM ' || quote_ident(temp_table_name);
|
RETURN QUERY EXECUTE 'SELECT * FROM ' || quote_ident(temp_table_name);
|
||||||
END;
|
END;
|
||||||
|
@ -1,21 +1,21 @@
|
|||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
-- Test bulk size mandatory
|
ALTER FUNCTION cdb_dataservices_client.cdb_service_quota_info() RENAME TO cdb_service_quota_info_mocked;
|
||||||
SELECT cdb_dataservices_client.cdb_bulk_geocode_street_point('select 1 as cartodb_id', '''Valladolid, Spain''', null, null, null, null);
|
CREATE FUNCTION cdb_dataservices_client.cdb_service_quota_info ()
|
||||||
ERROR: batch_size can't be null
|
RETURNS SETOF cdb_dataservices_client.service_quota_info AS $$
|
||||||
-- Test quota check by mocking quota 0
|
SELECT 'hires_geocoder'::cdb_dataservices_client.service_type AS service, 0::NUMERIC AS monthly_quota, 0::NUMERIC AS used_quota, FALSE AS soft_limit, 'google' AS provider, 1::NUMERIC AS max_batch_size;
|
||||||
|
$$ LANGUAGE SQL;
|
||||||
ALTER FUNCTION cdb_dataservices_client.cdb_enough_quota (service TEXT ,input_size NUMERIC) RENAME TO cdb_enough_quota_mocked;
|
ALTER FUNCTION cdb_dataservices_client.cdb_enough_quota (service TEXT ,input_size NUMERIC) RENAME TO cdb_enough_quota_mocked;
|
||||||
CREATE FUNCTION cdb_dataservices_client.cdb_enough_quota (service TEXT ,input_size NUMERIC)
|
CREATE FUNCTION cdb_dataservices_client.cdb_enough_quota (service TEXT ,input_size NUMERIC)
|
||||||
RETURNS BOOLEAN as $$
|
RETURNS BOOLEAN as $$
|
||||||
SELECT FALSE;
|
SELECT FALSE;
|
||||||
$$ LANGUAGE SQL;
|
$$ LANGUAGE SQL;
|
||||||
ALTER FUNCTION cdb_dataservices_client.cdb_service_quota_info() RENAME TO cdb_service_quota_info_mocked;
|
-- Test bulk size not mandatory (it will get the optimal)
|
||||||
CREATE FUNCTION cdb_dataservices_client.cdb_service_quota_info ()
|
SELECT cdb_dataservices_client.cdb_bulk_geocode_street_point('select 1 as cartodb_id', '''Valladolid, Spain''', null, null, null, null);
|
||||||
RETURNS SETOF cdb_dataservices_client.service_quota_info AS $$
|
ERROR: Remaining quota: 0. Estimated cost: 1
|
||||||
SELECT 'hires_geocoder'::cdb_dataservices_client.service_type AS service, 0::NUMERIC AS monthly_quota, 0::NUMERIC AS used_quota, FALSE AS soft_limit, 'google' AS provider;
|
-- Test quota check by mocking quota 0
|
||||||
$$ LANGUAGE SQL;
|
|
||||||
SELECT cdb_dataservices_client.cdb_bulk_geocode_street_point('select 1 as cartodb_id', '''Valladolid, Spain''');
|
SELECT cdb_dataservices_client.cdb_bulk_geocode_street_point('select 1 as cartodb_id', '''Valladolid, Spain''');
|
||||||
ERROR: Remaining quota: 0. Estimated cost: 1
|
ERROR: Remaining quota: 0. Estimated cost: 1
|
||||||
DROP FUNCTION cdb_dataservices_client.cdb_service_quota_info;
|
DROP FUNCTION cdb_dataservices_client.cdb_service_quota_info;
|
||||||
DROP FUNCTION cdb_dataservices_client.cdb_enough_quota;
|
DROP FUNCTION cdb_dataservices_client.cdb_enough_quota;
|
||||||
ALTER FUNCTION cdb_dataservices_client.cdb_service_quota_info_mocked() RENAME TO cdb_service_quota_info;
|
|
||||||
ALTER FUNCTION cdb_dataservices_client.cdb_enough_quota_mocked (service TEXT ,input_size NUMERIC) RENAME TO cdb_enough_quota;
|
ALTER FUNCTION cdb_dataservices_client.cdb_enough_quota_mocked (service TEXT ,input_size NUMERIC) RENAME TO cdb_enough_quota;
|
||||||
|
ALTER FUNCTION cdb_dataservices_client.cdb_service_quota_info_mocked() RENAME TO cdb_service_quota_info;
|
||||||
|
@ -1,26 +1,26 @@
|
|||||||
\set VERBOSITY terse
|
\set VERBOSITY terse
|
||||||
|
|
||||||
-- Test bulk size mandatory
|
ALTER FUNCTION cdb_dataservices_client.cdb_service_quota_info() RENAME TO cdb_service_quota_info_mocked;
|
||||||
SELECT cdb_dataservices_client.cdb_bulk_geocode_street_point('select 1 as cartodb_id', '''Valladolid, Spain''', null, null, null, null);
|
CREATE FUNCTION cdb_dataservices_client.cdb_service_quota_info ()
|
||||||
|
RETURNS SETOF cdb_dataservices_client.service_quota_info AS $$
|
||||||
|
SELECT 'hires_geocoder'::cdb_dataservices_client.service_type AS service, 0::NUMERIC AS monthly_quota, 0::NUMERIC AS used_quota, FALSE AS soft_limit, 'google' AS provider, 1::NUMERIC AS max_batch_size;
|
||||||
|
$$ LANGUAGE SQL;
|
||||||
|
|
||||||
-- Test quota check by mocking quota 0
|
|
||||||
ALTER FUNCTION cdb_dataservices_client.cdb_enough_quota (service TEXT ,input_size NUMERIC) RENAME TO cdb_enough_quota_mocked;
|
ALTER FUNCTION cdb_dataservices_client.cdb_enough_quota (service TEXT ,input_size NUMERIC) RENAME TO cdb_enough_quota_mocked;
|
||||||
CREATE FUNCTION cdb_dataservices_client.cdb_enough_quota (service TEXT ,input_size NUMERIC)
|
CREATE FUNCTION cdb_dataservices_client.cdb_enough_quota (service TEXT ,input_size NUMERIC)
|
||||||
RETURNS BOOLEAN as $$
|
RETURNS BOOLEAN as $$
|
||||||
SELECT FALSE;
|
SELECT FALSE;
|
||||||
$$ LANGUAGE SQL;
|
$$ LANGUAGE SQL;
|
||||||
|
|
||||||
ALTER FUNCTION cdb_dataservices_client.cdb_service_quota_info() RENAME TO cdb_service_quota_info_mocked;
|
-- Test bulk size not mandatory (it will get the optimal)
|
||||||
CREATE FUNCTION cdb_dataservices_client.cdb_service_quota_info ()
|
SELECT cdb_dataservices_client.cdb_bulk_geocode_street_point('select 1 as cartodb_id', '''Valladolid, Spain''', null, null, null, null);
|
||||||
RETURNS SETOF cdb_dataservices_client.service_quota_info AS $$
|
|
||||||
SELECT 'hires_geocoder'::cdb_dataservices_client.service_type AS service, 0::NUMERIC AS monthly_quota, 0::NUMERIC AS used_quota, FALSE AS soft_limit, 'google' AS provider;
|
|
||||||
$$ LANGUAGE SQL;
|
|
||||||
|
|
||||||
|
-- Test quota check by mocking quota 0
|
||||||
SELECT cdb_dataservices_client.cdb_bulk_geocode_street_point('select 1 as cartodb_id', '''Valladolid, Spain''');
|
SELECT cdb_dataservices_client.cdb_bulk_geocode_street_point('select 1 as cartodb_id', '''Valladolid, Spain''');
|
||||||
|
|
||||||
DROP FUNCTION cdb_dataservices_client.cdb_service_quota_info;
|
DROP FUNCTION cdb_dataservices_client.cdb_service_quota_info;
|
||||||
DROP FUNCTION cdb_dataservices_client.cdb_enough_quota;
|
DROP FUNCTION cdb_dataservices_client.cdb_enough_quota;
|
||||||
|
|
||||||
ALTER FUNCTION cdb_dataservices_client.cdb_service_quota_info_mocked() RENAME TO cdb_service_quota_info;
|
|
||||||
ALTER FUNCTION cdb_dataservices_client.cdb_enough_quota_mocked (service TEXT ,input_size NUMERIC) RENAME TO cdb_enough_quota;
|
ALTER FUNCTION cdb_dataservices_client.cdb_enough_quota_mocked (service TEXT ,input_size NUMERIC) RENAME TO cdb_enough_quota;
|
||||||
|
ALTER FUNCTION cdb_dataservices_client.cdb_service_quota_info_mocked() RENAME TO cdb_service_quota_info;
|
||||||
|
|
||||||
|
@ -1861,7 +1861,8 @@ BEGIN
|
|||||||
monthly_quota NUMERIC,
|
monthly_quota NUMERIC,
|
||||||
used_quota NUMERIC,
|
used_quota NUMERIC,
|
||||||
soft_limit BOOLEAN,
|
soft_limit BOOLEAN,
|
||||||
provider TEXT
|
provider TEXT,
|
||||||
|
max_batch_size NUMERIC
|
||||||
);
|
);
|
||||||
END IF;
|
END IF;
|
||||||
END $$;
|
END $$;
|
||||||
@ -1872,6 +1873,7 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_service_quota_info(
|
|||||||
RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
||||||
from cartodb_services.metrics.user import UserMetricsService
|
from cartodb_services.metrics.user import UserMetricsService
|
||||||
from datetime import date
|
from datetime import date
|
||||||
|
from cartodb_services.bulk_geocoders import BATCH_GEOCODER_CLASS_BY_PROVIDER
|
||||||
|
|
||||||
plpy.execute("SELECT cdb_dataservices_server._connect_to_redis('{0}')".format(username))
|
plpy.execute("SELECT cdb_dataservices_server._connect_to_redis('{0}')".format(username))
|
||||||
redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection']
|
redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection']
|
||||||
@ -1889,7 +1891,7 @@ RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
|||||||
used_quota = user_service.used_quota(user_isolines_config.service_type, today)
|
used_quota = user_service.used_quota(user_isolines_config.service_type, today)
|
||||||
soft_limit = user_isolines_config.soft_isolines_limit
|
soft_limit = user_isolines_config.soft_isolines_limit
|
||||||
provider = user_isolines_config.provider
|
provider = user_isolines_config.provider
|
||||||
ret += [[service, monthly_quota, used_quota, soft_limit, provider]]
|
ret += [[service, monthly_quota, used_quota, soft_limit, provider, 1]]
|
||||||
|
|
||||||
#-- Hires Geocoder
|
#-- Hires Geocoder
|
||||||
service = 'hires_geocoder'
|
service = 'hires_geocoder'
|
||||||
@ -1901,7 +1903,12 @@ RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
|||||||
used_quota = user_service.used_quota(user_geocoder_config.service_type, today)
|
used_quota = user_service.used_quota(user_geocoder_config.service_type, today)
|
||||||
soft_limit = user_geocoder_config.soft_geocoding_limit
|
soft_limit = user_geocoder_config.soft_geocoding_limit
|
||||||
provider = user_geocoder_config.provider
|
provider = user_geocoder_config.provider
|
||||||
ret += [[service, monthly_quota, used_quota, soft_limit, provider]]
|
batch_geocoder_class = BATCH_GEOCODER_CLASS_BY_PROVIDER.get(provider, None)
|
||||||
|
if batch_geocoder_class and hasattr(batch_geocoder_class, 'MAX_BATCH_SIZE'):
|
||||||
|
max_batch_size = batch_geocoder_class.MAX_BATCH_SIZE
|
||||||
|
else:
|
||||||
|
max_batch_size = 1
|
||||||
|
ret += [[service, monthly_quota, used_quota, soft_limit, provider, max_batch_size]]
|
||||||
|
|
||||||
#-- Routing
|
#-- Routing
|
||||||
service = 'routing'
|
service = 'routing'
|
||||||
@ -1913,7 +1920,7 @@ RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
|||||||
used_quota = user_service.used_quota(user_routing_config.service_type, today)
|
used_quota = user_service.used_quota(user_routing_config.service_type, today)
|
||||||
soft_limit = user_routing_config.soft_limit
|
soft_limit = user_routing_config.soft_limit
|
||||||
provider = user_routing_config.provider
|
provider = user_routing_config.provider
|
||||||
ret += [[service, monthly_quota, used_quota, soft_limit, provider]]
|
ret += [[service, monthly_quota, used_quota, soft_limit, provider, 1]]
|
||||||
|
|
||||||
#-- Observatory
|
#-- Observatory
|
||||||
service = 'observatory'
|
service = 'observatory'
|
||||||
@ -1925,7 +1932,7 @@ RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
|||||||
used_quota = user_service.used_quota(user_obs_config.service_type, today)
|
used_quota = user_service.used_quota(user_obs_config.service_type, today)
|
||||||
soft_limit = user_obs_config.soft_limit
|
soft_limit = user_obs_config.soft_limit
|
||||||
provider = user_obs_config.provider
|
provider = user_obs_config.provider
|
||||||
ret += [[service, monthly_quota, used_quota, soft_limit, provider]]
|
ret += [[service, monthly_quota, used_quota, soft_limit, provider, 1]]
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||||
|
@ -22,7 +22,8 @@ BEGIN
|
|||||||
monthly_quota NUMERIC,
|
monthly_quota NUMERIC,
|
||||||
used_quota NUMERIC,
|
used_quota NUMERIC,
|
||||||
soft_limit BOOLEAN,
|
soft_limit BOOLEAN,
|
||||||
provider TEXT
|
provider TEXT,
|
||||||
|
max_batch_size NUMERIC
|
||||||
);
|
);
|
||||||
END IF;
|
END IF;
|
||||||
END $$;
|
END $$;
|
||||||
@ -33,6 +34,7 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_server.cdb_service_quota_info(
|
|||||||
RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
||||||
from cartodb_services.metrics.user import UserMetricsService
|
from cartodb_services.metrics.user import UserMetricsService
|
||||||
from datetime import date
|
from datetime import date
|
||||||
|
from cartodb_services.bulk_geocoders import BATCH_GEOCODER_CLASS_BY_PROVIDER
|
||||||
|
|
||||||
plpy.execute("SELECT cdb_dataservices_server._connect_to_redis('{0}')".format(username))
|
plpy.execute("SELECT cdb_dataservices_server._connect_to_redis('{0}')".format(username))
|
||||||
redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection']
|
redis_conn = GD["redis_connection_{0}".format(username)]['redis_metrics_connection']
|
||||||
@ -50,7 +52,7 @@ RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
|||||||
used_quota = user_service.used_quota(user_isolines_config.service_type, today)
|
used_quota = user_service.used_quota(user_isolines_config.service_type, today)
|
||||||
soft_limit = user_isolines_config.soft_isolines_limit
|
soft_limit = user_isolines_config.soft_isolines_limit
|
||||||
provider = user_isolines_config.provider
|
provider = user_isolines_config.provider
|
||||||
ret += [[service, monthly_quota, used_quota, soft_limit, provider]]
|
ret += [[service, monthly_quota, used_quota, soft_limit, provider, 1]]
|
||||||
|
|
||||||
#-- Hires Geocoder
|
#-- Hires Geocoder
|
||||||
service = 'hires_geocoder'
|
service = 'hires_geocoder'
|
||||||
@ -62,7 +64,12 @@ RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
|||||||
used_quota = user_service.used_quota(user_geocoder_config.service_type, today)
|
used_quota = user_service.used_quota(user_geocoder_config.service_type, today)
|
||||||
soft_limit = user_geocoder_config.soft_geocoding_limit
|
soft_limit = user_geocoder_config.soft_geocoding_limit
|
||||||
provider = user_geocoder_config.provider
|
provider = user_geocoder_config.provider
|
||||||
ret += [[service, monthly_quota, used_quota, soft_limit, provider]]
|
batch_geocoder_class = BATCH_GEOCODER_CLASS_BY_PROVIDER.get(provider, None)
|
||||||
|
if batch_geocoder_class and hasattr(batch_geocoder_class, 'MAX_BATCH_SIZE'):
|
||||||
|
max_batch_size = batch_geocoder_class.MAX_BATCH_SIZE
|
||||||
|
else:
|
||||||
|
max_batch_size = 1
|
||||||
|
ret += [[service, monthly_quota, used_quota, soft_limit, provider, max_batch_size]]
|
||||||
|
|
||||||
#-- Routing
|
#-- Routing
|
||||||
service = 'routing'
|
service = 'routing'
|
||||||
@ -74,7 +81,7 @@ RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
|||||||
used_quota = user_service.used_quota(user_routing_config.service_type, today)
|
used_quota = user_service.used_quota(user_routing_config.service_type, today)
|
||||||
soft_limit = user_routing_config.soft_limit
|
soft_limit = user_routing_config.soft_limit
|
||||||
provider = user_routing_config.provider
|
provider = user_routing_config.provider
|
||||||
ret += [[service, monthly_quota, used_quota, soft_limit, provider]]
|
ret += [[service, monthly_quota, used_quota, soft_limit, provider, 1]]
|
||||||
|
|
||||||
#-- Observatory
|
#-- Observatory
|
||||||
service = 'observatory'
|
service = 'observatory'
|
||||||
@ -86,7 +93,7 @@ RETURNS SETOF cdb_dataservices_server.service_quota_info AS $$
|
|||||||
used_quota = user_service.used_quota(user_obs_config.service_type, today)
|
used_quota = user_service.used_quota(user_obs_config.service_type, today)
|
||||||
soft_limit = user_obs_config.soft_limit
|
soft_limit = user_obs_config.soft_limit
|
||||||
provider = user_obs_config.provider
|
provider = user_obs_config.provider
|
||||||
ret += [[service, monthly_quota, used_quota, soft_limit, provider]]
|
ret += [[service, monthly_quota, used_quota, soft_limit, provider, 1]]
|
||||||
|
|
||||||
return ret
|
return ret
|
||||||
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
$$ LANGUAGE plpythonu STABLE PARALLEL RESTRICTED;
|
||||||
|
@ -0,0 +1,11 @@
|
|||||||
|
from google import GoogleMapsBulkGeocoder
|
||||||
|
from here import HereMapsBulkGeocoder
|
||||||
|
from tomtom import TomTomBulkGeocoder
|
||||||
|
from mapbox import MapboxBulkGeocoder
|
||||||
|
|
||||||
|
BATCH_GEOCODER_CLASS_BY_PROVIDER = {
|
||||||
|
'google': GoogleMapsBulkGeocoder,
|
||||||
|
'heremaps': HereMapsBulkGeocoder,
|
||||||
|
'tomtom': TomTomBulkGeocoder,
|
||||||
|
'mapbox': MapboxBulkGeocoder
|
||||||
|
}
|
@ -16,7 +16,7 @@ HereJobStatus = namedtuple('HereJobStatus', 'total_count processed_count status'
|
|||||||
|
|
||||||
class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
|
class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
|
||||||
MAX_BATCH_SIZE = 1000000 # From the docs
|
MAX_BATCH_SIZE = 1000000 # From the docs
|
||||||
MIN_BATCHED_SEARCH = 100 # Under this, serial will be used
|
MIN_BATCHED_SEARCH = 1000 # Under this, serial will be used
|
||||||
BATCH_URL = 'https://batch.geocoder.cit.api.here.com/6.2/jobs'
|
BATCH_URL = 'https://batch.geocoder.cit.api.here.com/6.2/jobs'
|
||||||
# https://developer.here.com/documentation/batch-geocoder/topics/read-batch-request-output.html
|
# https://developer.here.com/documentation/batch-geocoder/topics/read-batch-request-output.html
|
||||||
META_COLS = ['relevance', 'matchType', 'matchCode', 'matchLevel', 'matchQualityStreet']
|
META_COLS = ['relevance', 'matchType', 'matchCode', 'matchLevel', 'matchQualityStreet']
|
||||||
@ -55,14 +55,17 @@ class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
|
|||||||
while True:
|
while True:
|
||||||
job_info = self._job_status(request_id)
|
job_info = self._job_status(request_id)
|
||||||
if job_info.processed_count == last_processed:
|
if job_info.processed_count == last_processed:
|
||||||
|
self._logger.debug('--> no progress ({})'.format(last_processed))
|
||||||
stalled_retries += 1
|
stalled_retries += 1
|
||||||
if stalled_retries > self.MAX_STALLED_RETRIES:
|
if stalled_retries > self.MAX_STALLED_RETRIES:
|
||||||
raise Exception('Too many retries for job {}'.format(request_id))
|
raise Exception('Too many retries for job {}'.format(request_id))
|
||||||
else:
|
else:
|
||||||
|
self._logger.debug('--> progress ({} != {})'.format(job_info.processed_count, last_processed))
|
||||||
stalled_retries = 0
|
stalled_retries = 0
|
||||||
last_processed = job_info.processed_count
|
last_processed = job_info.processed_count
|
||||||
|
|
||||||
self._logger.debug('--> Job poll check: {}'.format(job_info))
|
self._logger.debug('--> Job poll check ({}): {}'.format(
|
||||||
|
stalled_retries, job_info))
|
||||||
if job_info.status in self.JOB_FINAL_STATES:
|
if job_info.status in self.JOB_FINAL_STATES:
|
||||||
break
|
break
|
||||||
else:
|
else:
|
||||||
@ -95,7 +98,7 @@ class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
|
|||||||
request_params.update({
|
request_params.update({
|
||||||
'gen': 8,
|
'gen': 8,
|
||||||
'action': 'run',
|
'action': 'run',
|
||||||
#'mailto': 'juanignaciosl@carto.com',
|
# 'mailto': 'juanignaciosl@carto.com',
|
||||||
'header': 'true',
|
'header': 'true',
|
||||||
'inDelim': '|',
|
'inDelim': '|',
|
||||||
'outDelim': '|',
|
'outDelim': '|',
|
||||||
@ -121,8 +124,8 @@ class HereMapsBulkGeocoder(HereMapsGeocoder, StreetPointBulkGeocoder):
|
|||||||
timeout=(self.connect_timeout, self.read_timeout))
|
timeout=(self.connect_timeout, self.read_timeout))
|
||||||
polling_root = ET.fromstring(polling_r.text)
|
polling_root = ET.fromstring(polling_r.text)
|
||||||
return HereJobStatus(
|
return HereJobStatus(
|
||||||
total_count=polling_root.find('./Response/TotalCount').text,
|
total_count=int(polling_root.find('./Response/TotalCount').text),
|
||||||
processed_count=polling_root.find('./Response/ProcessedCount').text,
|
processed_count=int(polling_root.find('./Response/ProcessedCount').text),
|
||||||
status=polling_root.find('./Response/Status').text)
|
status=polling_root.find('./Response/Status').text)
|
||||||
|
|
||||||
def _download_results(self, job_id):
|
def _download_results(self, job_id):
|
||||||
|
@ -259,7 +259,8 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
|||||||
"""
|
"""
|
||||||
Useful just to test a good batch size
|
Useful just to test a good batch size
|
||||||
"""
|
"""
|
||||||
n = 50
|
n = 110
|
||||||
|
batch_size = 'NULL' # NULL for optimal
|
||||||
streets = []
|
streets = []
|
||||||
for i in range(0, n):
|
for i in range(0, n):
|
||||||
streets.append('{{"cartodb_id": {}, "address": "{} Yonge Street, ' \
|
streets.append('{{"cartodb_id": {}, "address": "{} Yonge Street, ' \
|
||||||
@ -270,7 +271,7 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
|||||||
"'select * from jsonb_to_recordset(''[" \
|
"'select * from jsonb_to_recordset(''[" \
|
||||||
"{}" \
|
"{}" \
|
||||||
"]''::jsonb) as (cartodb_id integer, address text)', " \
|
"]''::jsonb) as (cartodb_id integer, address text)', " \
|
||||||
"'address', null, null, null, {})".format(','.join(streets), n)
|
"'address', null, null, null, {})".format(','.join(streets), batch_size)
|
||||||
response = self._run_authenticated(query)
|
response = self._run_authenticated(query)
|
||||||
assert_equal(n - 1, len(response['rows']))
|
assert_equal(n - 1, len(response['rows']))
|
||||||
|
|
||||||
@ -307,6 +308,20 @@ class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
|||||||
assert_close_enough(self._x_y_by_cartodb_id(response)[1],
|
assert_close_enough(self._x_y_by_cartodb_id(response)[1],
|
||||||
self.fixture_points['Plaza España 1, Barcelona'])
|
self.fixture_points['Plaza España 1, Barcelona'])
|
||||||
|
|
||||||
|
def _test_known_table(self):
|
||||||
|
subquery = 'select * from known_table where cartodb_id < 1100'
|
||||||
|
subquery_count = 'select count(1) from ({}) _x'.format(subquery)
|
||||||
|
count = self._run_authenticated(subquery_count)['rows'][0]['count']
|
||||||
|
|
||||||
|
query = "select cartodb_id, st_x(the_geom), st_y(the_geom) " \
|
||||||
|
"FROM cdb_dataservices_client.cdb_bulk_geocode_street_point(" \
|
||||||
|
"'{}' " \
|
||||||
|
", 'street', 'city', NULL, 'country')".format(subquery)
|
||||||
|
response = self._run_authenticated(query)
|
||||||
|
assert_equal(len(response['rows']), count)
|
||||||
|
assert_not_equal(response['rows'][0]['st_x'], None)
|
||||||
|
|
||||||
|
|
||||||
def _run_authenticated(self, query):
|
def _run_authenticated(self, query):
|
||||||
authenticated_query = "{}&api_key={}".format(query,
|
authenticated_query = "{}&api_key={}".format(query,
|
||||||
self.env_variables[
|
self.env_variables[
|
||||||
|
Loading…
Reference in New Issue
Block a user