Batching, better support for null columns, and bulk geocoding integration tests
This commit is contained in:
parent
e280444479
commit
c2a207b1cd
@ -1987,7 +1987,7 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(
|
|||||||
TARGET cdb_dataservices_server._DST_DisconnectUserTable;
|
TARGET cdb_dataservices_server._DST_DisconnectUserTable;
|
||||||
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
|
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
|
||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
|
||||||
country_column text, state_column text, city_column text, street_column text)
|
country_column text, state_column text, city_column text, street_column text, batch_size integer DEFAULT 100)
|
||||||
RETURNS SETOF cdb_dataservices_client.geocoding AS $$
|
RETURNS SETOF cdb_dataservices_client.geocoding AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
query_row_count integer;
|
query_row_count integer;
|
||||||
@ -1995,11 +1995,18 @@ DECLARE
|
|||||||
|
|
||||||
cartodb_id_batch integer;
|
cartodb_id_batch integer;
|
||||||
batches_n integer;
|
batches_n integer;
|
||||||
BATCHES_SIZE CONSTANT numeric := 100;
|
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
|
||||||
|
MAX_BATCH_SIZE CONSTANT numeric := 1000;
|
||||||
current_row_count integer ;
|
current_row_count integer ;
|
||||||
|
|
||||||
temp_table_name text;
|
temp_table_name text;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
IF batch_size IS NULL THEN
|
||||||
|
batch_size := DEFAULT_BATCH_SIZE;
|
||||||
|
ELSIF batch_size > MAX_BATCH_SIZE THEN
|
||||||
|
RAISE EXCEPTION 'batch_size must be lower than %', MAX_BATCH_SIZE + 1;
|
||||||
|
END IF;
|
||||||
|
|
||||||
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
|
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
|
||||||
|
|
||||||
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
|
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
|
||||||
@ -2009,7 +2016,7 @@ BEGIN
|
|||||||
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
|
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
|
||||||
END IF;
|
END IF;
|
||||||
|
|
||||||
EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', BATCHES_SIZE, query) INTO batches_n;
|
EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', batch_size, query) INTO batches_n;
|
||||||
|
|
||||||
RAISE DEBUG 'batches_n: %', batches_n;
|
RAISE DEBUG 'batches_n: %', batches_n;
|
||||||
|
|
||||||
@ -2019,6 +2026,11 @@ BEGIN
|
|||||||
'(cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb)',
|
'(cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb)',
|
||||||
temp_table_name);
|
temp_table_name);
|
||||||
|
|
||||||
|
select
|
||||||
|
coalesce(street_column, ''''''), coalesce(city_column, ''''''),
|
||||||
|
coalesce(state_column, ''''''), coalesce(country_column, '''''')
|
||||||
|
into street_column, city_column, state_column, country_column;
|
||||||
|
|
||||||
FOR cartodb_id_batch in 0..(batches_n - 1)
|
FOR cartodb_id_batch in 0..(batches_n - 1)
|
||||||
LOOP
|
LOOP
|
||||||
|
|
||||||
@ -2032,7 +2044,7 @@ BEGIN
|
|||||||
'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
|
'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
|
||||||
'FROM geocoding_data ' ||
|
'FROM geocoding_data ' ||
|
||||||
'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name)
|
'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name)
|
||||||
USING BATCHES_SIZE, cartodb_id_batch;
|
USING batch_size, cartodb_id_batch;
|
||||||
|
|
||||||
GET DIAGNOSTICS current_row_count = ROW_COUNT;
|
GET DIAGNOSTICS current_row_count = ROW_COUNT;
|
||||||
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
|
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text,
|
||||||
country_column text, state_column text, city_column text, street_column text)
|
country_column text, state_column text, city_column text, street_column text, batch_size integer DEFAULT 100)
|
||||||
RETURNS SETOF cdb_dataservices_client.geocoding AS $$
|
RETURNS SETOF cdb_dataservices_client.geocoding AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
query_row_count integer;
|
query_row_count integer;
|
||||||
@ -7,11 +7,18 @@ DECLARE
|
|||||||
|
|
||||||
cartodb_id_batch integer;
|
cartodb_id_batch integer;
|
||||||
batches_n integer;
|
batches_n integer;
|
||||||
BATCHES_SIZE CONSTANT numeric := 100;
|
DEFAULT_BATCH_SIZE CONSTANT numeric := 100;
|
||||||
|
MAX_BATCH_SIZE CONSTANT numeric := 1000;
|
||||||
current_row_count integer ;
|
current_row_count integer ;
|
||||||
|
|
||||||
temp_table_name text;
|
temp_table_name text;
|
||||||
BEGIN
|
BEGIN
|
||||||
|
IF batch_size IS NULL THEN
|
||||||
|
batch_size := DEFAULT_BATCH_SIZE;
|
||||||
|
ELSIF batch_size > MAX_BATCH_SIZE THEN
|
||||||
|
RAISE EXCEPTION 'batch_size must be lower than %', MAX_BATCH_SIZE + 1;
|
||||||
|
END IF;
|
||||||
|
|
||||||
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
|
EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count;
|
||||||
|
|
||||||
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
|
RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %',
|
||||||
@ -21,7 +28,7 @@ BEGIN
|
|||||||
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
|
RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count;
|
||||||
END IF;
|
END IF;
|
||||||
|
|
||||||
EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', BATCHES_SIZE, query) INTO batches_n;
|
EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', batch_size, query) INTO batches_n;
|
||||||
|
|
||||||
RAISE DEBUG 'batches_n: %', batches_n;
|
RAISE DEBUG 'batches_n: %', batches_n;
|
||||||
|
|
||||||
@ -31,6 +38,11 @@ BEGIN
|
|||||||
'(cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb)',
|
'(cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb)',
|
||||||
temp_table_name);
|
temp_table_name);
|
||||||
|
|
||||||
|
select
|
||||||
|
coalesce(street_column, ''''''), coalesce(city_column, ''''''),
|
||||||
|
coalesce(state_column, ''''''), coalesce(country_column, '''''')
|
||||||
|
into street_column, city_column, state_column, country_column;
|
||||||
|
|
||||||
FOR cartodb_id_batch in 0..(batches_n - 1)
|
FOR cartodb_id_batch in 0..(batches_n - 1)
|
||||||
LOOP
|
LOOP
|
||||||
|
|
||||||
@ -44,7 +56,7 @@ BEGIN
|
|||||||
'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
|
'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' ||
|
||||||
'FROM geocoding_data ' ||
|
'FROM geocoding_data ' ||
|
||||||
'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name)
|
'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name)
|
||||||
USING BATCHES_SIZE, cartodb_id_batch;
|
USING batch_size, cartodb_id_batch;
|
||||||
|
|
||||||
GET DIAGNOSTICS current_row_count = ROW_COUNT;
|
GET DIAGNOSTICS current_row_count = ROW_COUNT;
|
||||||
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
|
RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count;
|
||||||
|
@ -22,13 +22,15 @@ class IntegrationTestHelper:
|
|||||||
}
|
}
|
||||||
|
|
||||||
@classmethod
|
@classmethod
|
||||||
def execute_query(cls, sql_api_url, query):
|
def execute_query_raw(cls, sql_api_url, query):
|
||||||
requests.packages.urllib3.disable_warnings()
|
requests.packages.urllib3.disable_warnings()
|
||||||
query_url = "{0}?q={1}".format(sql_api_url, query)
|
query_url = "{0}?q={1}".format(sql_api_url, query)
|
||||||
print "Executing query: {0}".format(query_url)
|
print "Executing query: {0}".format(query_url)
|
||||||
query_response = requests.get(query_url)
|
query_response = requests.get(query_url)
|
||||||
if query_response.status_code != 200:
|
if query_response.status_code != 200:
|
||||||
raise Exception(json.loads(query_response.text)['error'])
|
raise Exception(json.loads(query_response.text)['error'])
|
||||||
query_response_data = json.loads(query_response.text)
|
return json.loads(query_response.text)
|
||||||
|
|
||||||
return query_response_data['rows'][0]
|
@classmethod
|
||||||
|
def execute_query(cls, sql_api_url, query):
|
||||||
|
return cls.execute_query_raw(sql_api_url, query)['rows'][0]
|
||||||
|
@ -1,10 +1,13 @@
|
|||||||
|
#!/usr/local/bin/python
|
||||||
|
# -*- coding: utf-8 -*-
|
||||||
|
|
||||||
from unittest import TestCase
|
from unittest import TestCase
|
||||||
from nose.tools import assert_raises
|
from nose.tools import assert_raises
|
||||||
from nose.tools import assert_not_equal, assert_equal
|
from nose.tools import assert_not_equal, assert_equal
|
||||||
from ..helpers.integration_test_helper import IntegrationTestHelper
|
from ..helpers.integration_test_helper import IntegrationTestHelper
|
||||||
|
|
||||||
|
|
||||||
class TestStreetFunctions(TestCase):
|
class TestStreetFunctionsSetUp(TestCase):
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
self.env_variables = IntegrationTestHelper.get_environment_variables()
|
self.env_variables = IntegrationTestHelper.get_environment_variables()
|
||||||
@ -15,19 +18,107 @@ class TestStreetFunctions(TestCase):
|
|||||||
self.env_variables['api_key']
|
self.env_variables['api_key']
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
class TestStreetFunctions(TestStreetFunctionsSetUp):
|
||||||
|
|
||||||
def test_if_select_with_street_point_is_ok(self):
|
def test_if_select_with_street_point_is_ok(self):
|
||||||
query = "SELECT cdb_geocode_street_point(street) " \
|
query = "SELECT cdb_geocode_street_point(street) " \
|
||||||
"as geometry FROM {0} LIMIT 1&api_key={1}".format(
|
"as geometry FROM {0} LIMIT 1&api_key={1}".format(
|
||||||
self.env_variables['table_name'],
|
self.env_variables['table_name'],
|
||||||
self.env_variables['api_key'])
|
self.env_variables['api_key'])
|
||||||
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||||
assert_not_equal(geometry['geometry'], None)
|
assert_not_equal(geometry['geometry'], None)
|
||||||
|
|
||||||
def test_if_select_with_street_without_api_key_raise_error(self):
|
def test_if_select_with_street_without_api_key_raise_error(self):
|
||||||
query = "SELECT cdb_geocode_street_point(street) " \
|
query = "SELECT cdb_geocode_street_point(street) " \
|
||||||
"as geometry FROM {0} LIMIT 1".format(
|
"as geometry FROM {0} LIMIT 1".format(
|
||||||
self.env_variables['table_name'])
|
self.env_variables['table_name'])
|
||||||
try:
|
try:
|
||||||
IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
IntegrationTestHelper.execute_query(self.sql_api_url, query)
|
||||||
except Exception as e:
|
except Exception as e:
|
||||||
assert_equal(e.message[0], "The api_key must be provided")
|
assert_equal(e.message[0], "The api_key must be provided")
|
||||||
|
|
||||||
|
|
||||||
|
class TestBulkStreetFunctions(TestStreetFunctionsSetUp):
|
||||||
|
|
||||||
|
def test_full_spec(self):
|
||||||
|
query = "select cartodb_id, st_x(the_geom), st_y(the_geom) " \
|
||||||
|
"FROM cdb_dataservices_client.cdb_bulk_geocode_street_point(" \
|
||||||
|
"'select 1 as cartodb_id, ''Spain'' as country, " \
|
||||||
|
"''Castilla y León'' as state, ''Valladolid'' as city, " \
|
||||||
|
"''Plaza Mayor'' as street " \
|
||||||
|
"UNION " \
|
||||||
|
"select 2 as cartodb_id, ''Spain'' as country, " \
|
||||||
|
"''Castilla y León'' as state, ''Valladolid'' as city, " \
|
||||||
|
"''Paseo Zorrilla'' as street' " \
|
||||||
|
", 'country', 'state', 'city', 'street')"
|
||||||
|
response = self._run_authenticated(query)
|
||||||
|
|
||||||
|
assert_equal(response['total_rows'], 2)
|
||||||
|
|
||||||
|
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||||
|
self._assert_x_y(row_by_cartodb_id[1], -3.7074009, 40.415511)
|
||||||
|
self._assert_x_y(row_by_cartodb_id[2], -4.7404453, 41.6314339)
|
||||||
|
|
||||||
|
def test_empty_columns(self):
|
||||||
|
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||||
|
"FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \
|
||||||
|
"'select * from jsonb_to_recordset(''[" \
|
||||||
|
"{\"cartodb_id\": 1, \"address\": \"1901 amphitheatre parkway, mountain view, ca, us\"}" \
|
||||||
|
"]''::jsonb) as (cartodb_id integer, address text)', " \
|
||||||
|
"'''''', '''''', '''''', 'address')"
|
||||||
|
response = self._run_authenticated(query)
|
||||||
|
|
||||||
|
assert_equal(response['total_rows'], 1)
|
||||||
|
|
||||||
|
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||||
|
self._assert_x_y(row_by_cartodb_id[1], -122.0885504, 37.4238657)
|
||||||
|
|
||||||
|
def test_null_columns(self):
|
||||||
|
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||||
|
"FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \
|
||||||
|
"'select * from jsonb_to_recordset(''[" \
|
||||||
|
"{\"cartodb_id\": 1, \"address\": \"1901 amphitheatre parkway, mountain view, ca, us\"}" \
|
||||||
|
"]''::jsonb) as (cartodb_id integer, address text)', " \
|
||||||
|
"null, null, null, 'address')"
|
||||||
|
response = self._run_authenticated(query)
|
||||||
|
|
||||||
|
assert_equal(response['total_rows'], 1)
|
||||||
|
|
||||||
|
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||||
|
self._assert_x_y(row_by_cartodb_id[1], -122.0885504, 37.4238657)
|
||||||
|
|
||||||
|
def test_batching(self):
|
||||||
|
query = "select *, st_x(the_geom), st_y(the_geom) " \
|
||||||
|
"FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \
|
||||||
|
"'select * from jsonb_to_recordset(''[" \
|
||||||
|
"{\"cartodb_id\": 1, \"address\": \"1900 amphitheatre parkway, mountain view, ca, us\"}," \
|
||||||
|
"{\"cartodb_id\": 2, \"address\": \"1901 amphitheatre parkway, mountain view, ca, us\"}," \
|
||||||
|
"{\"cartodb_id\": 3, \"address\": \"1902 amphitheatre parkway, mountain view, ca, us\"}" \
|
||||||
|
"]''::jsonb) as (cartodb_id integer, address text)', " \
|
||||||
|
"null, null, null, 'address', 2)"
|
||||||
|
response = self._run_authenticated(query)
|
||||||
|
# from nose.tools import set_trace; set_trace()
|
||||||
|
|
||||||
|
assert_equal(response['total_rows'], 3)
|
||||||
|
|
||||||
|
row_by_cartodb_id = self._row_by_cartodb_id(response)
|
||||||
|
self._assert_x_y(row_by_cartodb_id[1], -122.0875324, 37.4227968)
|
||||||
|
self._assert_x_y(row_by_cartodb_id[2], -122.0885504, 37.4238657)
|
||||||
|
self._assert_x_y(row_by_cartodb_id[3], -122.0876674, 37.4235729)
|
||||||
|
|
||||||
|
|
||||||
|
def _run_authenticated(self, query):
|
||||||
|
authenticated_query = "{}&api_key={}".format(query,
|
||||||
|
self.env_variables[
|
||||||
|
'api_key'])
|
||||||
|
return IntegrationTestHelper.execute_query_raw(self.sql_api_url,
|
||||||
|
authenticated_query)
|
||||||
|
|
||||||
|
def _row_by_cartodb_id(self, response):
|
||||||
|
return {r['cartodb_id']: r for r in response['rows']}
|
||||||
|
|
||||||
|
def _assert_x_y(self, row, expected_x, expected_y):
|
||||||
|
assert_equal(row['st_x'], expected_x)
|
||||||
|
assert_equal(row['st_y'], expected_y)
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user