diff --git a/client/cdb_dataservices_client--0.25.0.sql b/client/cdb_dataservices_client--0.25.0.sql index 4fa6f2b..f1346e9 100644 --- a/client/cdb_dataservices_client--0.25.0.sql +++ b/client/cdb_dataservices_client--0.25.0.sql @@ -1987,7 +1987,7 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable( TARGET cdb_dataservices_server._DST_DisconnectUserTable; $$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE; CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, - country_column text, state_column text, city_column text, street_column text) + country_column text, state_column text, city_column text, street_column text, batch_size integer DEFAULT 100) RETURNS SETOF cdb_dataservices_client.geocoding AS $$ DECLARE query_row_count integer; @@ -1995,11 +1995,18 @@ DECLARE cartodb_id_batch integer; batches_n integer; - BATCHES_SIZE CONSTANT numeric := 100; + DEFAULT_BATCH_SIZE CONSTANT numeric := 100; + MAX_BATCH_SIZE CONSTANT numeric := 1000; current_row_count integer ; temp_table_name text; BEGIN + IF batch_size IS NULL THEN + batch_size := DEFAULT_BATCH_SIZE; + ELSIF batch_size > MAX_BATCH_SIZE THEN + RAISE EXCEPTION 'batch_size must be lower than %', MAX_BATCH_SIZE + 1; + END IF; + EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count; RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %', @@ -2009,7 +2016,7 @@ BEGIN RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count; END IF; - EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', BATCHES_SIZE, query) INTO batches_n; + EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', batch_size, query) INTO batches_n; RAISE DEBUG 'batches_n: %', batches_n; @@ -2019,6 +2026,11 @@ BEGIN '(cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb)', temp_table_name); + select + coalesce(street_column, ''''''), coalesce(city_column, ''''''), + coalesce(state_column, ''''''), coalesce(country_column, '''''') + into street_column, city_column, state_column, country_column; + FOR cartodb_id_batch in 0..(batches_n - 1) LOOP @@ -2032,7 +2044,7 @@ BEGIN 'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' || 'FROM geocoding_data ' || 'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name) - USING BATCHES_SIZE, cartodb_id_batch; + USING batch_size, cartodb_id_batch; GET DIAGNOSTICS current_row_count = ROW_COUNT; RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count; diff --git a/client/sql/21_bulk_geocoding_functions.sql b/client/sql/21_bulk_geocoding_functions.sql index cc15023..a733245 100644 --- a/client/sql/21_bulk_geocoding_functions.sql +++ b/client/sql/21_bulk_geocoding_functions.sql @@ -1,5 +1,5 @@ CREATE OR REPLACE FUNCTION cdb_dataservices_client.cdb_bulk_geocode_street_point (query text, - country_column text, state_column text, city_column text, street_column text) + country_column text, state_column text, city_column text, street_column text, batch_size integer DEFAULT 100) RETURNS SETOF cdb_dataservices_client.geocoding AS $$ DECLARE query_row_count integer; @@ -7,11 +7,18 @@ DECLARE cartodb_id_batch integer; batches_n integer; - BATCHES_SIZE CONSTANT numeric := 100; + DEFAULT_BATCH_SIZE CONSTANT numeric := 100; + MAX_BATCH_SIZE CONSTANT numeric := 1000; current_row_count integer ; temp_table_name text; BEGIN + IF batch_size IS NULL THEN + batch_size := DEFAULT_BATCH_SIZE; + ELSIF batch_size > MAX_BATCH_SIZE THEN + RAISE EXCEPTION 'batch_size must be lower than %', MAX_BATCH_SIZE + 1; + END IF; + EXECUTE format('SELECT COUNT(1) from (%s) _x', query) INTO query_row_count; RAISE DEBUG 'cdb_bulk_geocode_street_point --> query_row_count: %; query: %; country: %; state: %; city: %; street: %', @@ -21,7 +28,7 @@ BEGIN RAISE EXCEPTION 'Remaining quota: %. Estimated cost: %', remaining_quota, query_row_count; END IF; - EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', BATCHES_SIZE, query) INTO batches_n; + EXECUTE format('SELECT ceil(max(cartodb_id)::float/%s) FROM (%s) _x', batch_size, query) INTO batches_n; RAISE DEBUG 'batches_n: %', batches_n; @@ -31,6 +38,11 @@ BEGIN '(cartodb_id integer, the_geom geometry(Multipolygon,4326), metadata jsonb)', temp_table_name); + select + coalesce(street_column, ''''''), coalesce(city_column, ''''''), + coalesce(state_column, ''''''), coalesce(country_column, '''''') + into street_column, city_column, state_column, country_column; + FOR cartodb_id_batch in 0..(batches_n - 1) LOOP @@ -44,7 +56,7 @@ BEGIN 'INSERT INTO %s SELECT (cdb_dataservices_client._cdb_bulk_geocode_street_point(jsonb_agg(data))).* ' || 'FROM geocoding_data ' || 'WHERE batch = $2', street_column, city_column, state_column, country_column, query, temp_table_name) - USING BATCHES_SIZE, cartodb_id_batch; + USING batch_size, cartodb_id_batch; GET DIAGNOSTICS current_row_count = ROW_COUNT; RAISE DEBUG 'Batch % --> %', cartodb_id_batch, current_row_count; diff --git a/test/helpers/integration_test_helper.py b/test/helpers/integration_test_helper.py index 1c51533..7fdcd38 100644 --- a/test/helpers/integration_test_helper.py +++ b/test/helpers/integration_test_helper.py @@ -22,13 +22,15 @@ class IntegrationTestHelper: } @classmethod - def execute_query(cls, sql_api_url, query): + def execute_query_raw(cls, sql_api_url, query): requests.packages.urllib3.disable_warnings() query_url = "{0}?q={1}".format(sql_api_url, query) print "Executing query: {0}".format(query_url) query_response = requests.get(query_url) if query_response.status_code != 200: raise Exception(json.loads(query_response.text)['error']) - query_response_data = json.loads(query_response.text) + return json.loads(query_response.text) - return query_response_data['rows'][0] + @classmethod + def execute_query(cls, sql_api_url, query): + return cls.execute_query_raw(sql_api_url, query)['rows'][0] diff --git a/test/integration/test_street_functions.py b/test/integration/test_street_functions.py index a409864..180cc70 100644 --- a/test/integration/test_street_functions.py +++ b/test/integration/test_street_functions.py @@ -1,10 +1,13 @@ +#!/usr/local/bin/python +# -*- coding: utf-8 -*- + from unittest import TestCase from nose.tools import assert_raises from nose.tools import assert_not_equal, assert_equal from ..helpers.integration_test_helper import IntegrationTestHelper -class TestStreetFunctions(TestCase): +class TestStreetFunctionsSetUp(TestCase): def setUp(self): self.env_variables = IntegrationTestHelper.get_environment_variables() @@ -15,19 +18,107 @@ class TestStreetFunctions(TestCase): self.env_variables['api_key'] ) + +class TestStreetFunctions(TestStreetFunctionsSetUp): + def test_if_select_with_street_point_is_ok(self): query = "SELECT cdb_geocode_street_point(street) " \ "as geometry FROM {0} LIMIT 1&api_key={1}".format( - self.env_variables['table_name'], - self.env_variables['api_key']) + self.env_variables['table_name'], + self.env_variables['api_key']) geometry = IntegrationTestHelper.execute_query(self.sql_api_url, query) assert_not_equal(geometry['geometry'], None) def test_if_select_with_street_without_api_key_raise_error(self): query = "SELECT cdb_geocode_street_point(street) " \ - "as geometry FROM {0} LIMIT 1".format( - self.env_variables['table_name']) + "as geometry FROM {0} LIMIT 1".format( + self.env_variables['table_name']) try: IntegrationTestHelper.execute_query(self.sql_api_url, query) except Exception as e: assert_equal(e.message[0], "The api_key must be provided") + + +class TestBulkStreetFunctions(TestStreetFunctionsSetUp): + + def test_full_spec(self): + query = "select cartodb_id, st_x(the_geom), st_y(the_geom) " \ + "FROM cdb_dataservices_client.cdb_bulk_geocode_street_point(" \ + "'select 1 as cartodb_id, ''Spain'' as country, " \ + "''Castilla y León'' as state, ''Valladolid'' as city, " \ + "''Plaza Mayor'' as street " \ + "UNION " \ + "select 2 as cartodb_id, ''Spain'' as country, " \ + "''Castilla y León'' as state, ''Valladolid'' as city, " \ + "''Paseo Zorrilla'' as street' " \ + ", 'country', 'state', 'city', 'street')" + response = self._run_authenticated(query) + + assert_equal(response['total_rows'], 2) + + row_by_cartodb_id = self._row_by_cartodb_id(response) + self._assert_x_y(row_by_cartodb_id[1], -3.7074009, 40.415511) + self._assert_x_y(row_by_cartodb_id[2], -4.7404453, 41.6314339) + + def test_empty_columns(self): + query = "select *, st_x(the_geom), st_y(the_geom) " \ + "FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \ + "'select * from jsonb_to_recordset(''[" \ + "{\"cartodb_id\": 1, \"address\": \"1901 amphitheatre parkway, mountain view, ca, us\"}" \ + "]''::jsonb) as (cartodb_id integer, address text)', " \ + "'''''', '''''', '''''', 'address')" + response = self._run_authenticated(query) + + assert_equal(response['total_rows'], 1) + + row_by_cartodb_id = self._row_by_cartodb_id(response) + self._assert_x_y(row_by_cartodb_id[1], -122.0885504, 37.4238657) + + def test_null_columns(self): + query = "select *, st_x(the_geom), st_y(the_geom) " \ + "FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \ + "'select * from jsonb_to_recordset(''[" \ + "{\"cartodb_id\": 1, \"address\": \"1901 amphitheatre parkway, mountain view, ca, us\"}" \ + "]''::jsonb) as (cartodb_id integer, address text)', " \ + "null, null, null, 'address')" + response = self._run_authenticated(query) + + assert_equal(response['total_rows'], 1) + + row_by_cartodb_id = self._row_by_cartodb_id(response) + self._assert_x_y(row_by_cartodb_id[1], -122.0885504, 37.4238657) + + def test_batching(self): + query = "select *, st_x(the_geom), st_y(the_geom) " \ + "FROM cdb_dataservices_client.cdb_bulk_geocode_street_point( " \ + "'select * from jsonb_to_recordset(''[" \ + "{\"cartodb_id\": 1, \"address\": \"1900 amphitheatre parkway, mountain view, ca, us\"}," \ + "{\"cartodb_id\": 2, \"address\": \"1901 amphitheatre parkway, mountain view, ca, us\"}," \ + "{\"cartodb_id\": 3, \"address\": \"1902 amphitheatre parkway, mountain view, ca, us\"}" \ + "]''::jsonb) as (cartodb_id integer, address text)', " \ + "null, null, null, 'address', 2)" + response = self._run_authenticated(query) + # from nose.tools import set_trace; set_trace() + + assert_equal(response['total_rows'], 3) + + row_by_cartodb_id = self._row_by_cartodb_id(response) + self._assert_x_y(row_by_cartodb_id[1], -122.0875324, 37.4227968) + self._assert_x_y(row_by_cartodb_id[2], -122.0885504, 37.4238657) + self._assert_x_y(row_by_cartodb_id[3], -122.0876674, 37.4235729) + + + def _run_authenticated(self, query): + authenticated_query = "{}&api_key={}".format(query, + self.env_variables[ + 'api_key']) + return IntegrationTestHelper.execute_query_raw(self.sql_api_url, + authenticated_query) + + def _row_by_cartodb_id(self, response): + return {r['cartodb_id']: r for r in response['rows']} + + def _assert_x_y(self, row, expected_x, expected_y): + assert_equal(row['st_x'], expected_x) + assert_equal(row['st_y'], expected_y) +