Merge pull request #267 from CartoDB/augment_revamp_analysis

Table-level OBS_GetMeasue revamp
This commit is contained in:
Carla 2016-09-01 17:21:56 +02:00 committed by GitHub
commit fc99f7aba9
7 changed files with 284 additions and 282 deletions

View File

@ -1,8 +1,53 @@
CREATE TYPE cdb_dataservices_client.ds_fdw_metadata as (schemaname text, tabname text, servername text);
CREATE TYPE cdb_dataservices_client.ds_return_metadata as (colnames text[], coltypes text[]);
CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_GetTable(table_name text, output_table_name text, function_name text, params json)
RETURNS boolean AS $$
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(
output_table_name text,
params json
) RETURNS boolean AS $$
DECLARE
username text;
user_db_role text;
orgname text;
user_schema text;
result boolean;
BEGIN
IF session_user = 'publicuser' OR session_user ~ 'cartodb_publicuser_*' THEN
RAISE EXCEPTION 'The api_key must be provided';
END IF;
SELECT session_user INTO user_db_role;
SELECT u, o INTO username, orgname FROM cdb_dataservices_client._cdb_entity_config() AS (u text, o text);
-- JSON value stored "" is taken as literal
IF username IS NULL OR username = '' OR username = '""' THEN
RAISE EXCEPTION 'Username is a mandatory argument';
END IF;
IF orgname IS NULL OR orgname = '' OR orgname = '""' THEN
user_schema := 'public';
ELSE
user_schema := username;
END IF;
SELECT cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(
username,
orgname,
user_db_role,
user_schema,
output_table_name,
params
) INTO result;
RETURN result;
END;
$$ LANGUAGE 'plpgsql' SECURITY DEFINER;
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure(
table_name text,
output_table_name text,
params json
) RETURNS boolean AS $$
DECLARE
username text;
user_db_role text;
@ -31,238 +76,200 @@ BEGIN
SELECT current_database() INTO dbname;
SELECT cdb_dataservices_client.__OBS_GetTable(username, orgname, user_db_role, user_schema, dbname, table_name, output_table_name, function_name, params) INTO result;
SELECT cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(
username,
orgname,
user_db_role,
user_schema,
dbname,
table_name,
output_table_name,
params
) INTO result;
RETURN result;
END;
$$ LANGUAGE 'plpgsql' SECURITY DEFINER;
CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_AugmentTable(table_name text, function_name text, params json)
RETURNS boolean AS $$
DECLARE
username text;
user_db_role text;
orgname text;
dbname text;
user_schema text;
result boolean;
BEGIN
IF session_user = 'publicuser' OR session_user ~ 'cartodb_publicuser_*' THEN
RAISE EXCEPTION 'The api_key must be provided';
END IF;
SELECT session_user INTO user_db_role;
SELECT u, o INTO username, orgname FROM cdb_dataservices_client._cdb_entity_config() AS (u text, o text);
-- JSON value stored "" is taken as literal
IF username IS NULL OR username = '' OR username = '""' THEN
RAISE EXCEPTION 'Username is a mandatory argument';
END IF;
IF orgname IS NULL OR orgname = '' OR orgname = '""' THEN
user_schema := 'public';
ELSE
user_schema := username;
END IF;
SELECT current_database() INTO dbname;
SELECT cdb_dataservices_client.__OBS_AugmentTable(username, orgname, user_db_role, user_schema, dbname, table_name, function_name, params) INTO result;
RETURN result;
END;
$$ LANGUAGE 'plpgsql' SECURITY DEFINER;
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__OBS_AugmentTable(username text, orgname text, user_db_role text, user_schema text, dbname text, table_name text, function_name text, params json)
RETURNS boolean AS $$
from time import strftime
try:
server_table_name = None
temporary_table_name = 'ds_tmp_' + str(strftime("%s")) + table_name
# Obtain return types for augmentation procedure
ds_return_metadata = plpy.execute("SELECT colnames, coltypes "
"FROM cdb_dataservices_client._OBS_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);"
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), function_name=plpy.quote_literal(function_name), params=plpy.quote_literal(params))
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(
username text,
orgname text,
user_db_role text,
user_schema text,
output_table_name text,
params json
) RETURNS boolean AS $$
function_name = 'GetMeasure'
# Obtain return types for augmentation procedure
ds_return_metadata = plpy.execute("SELECT colnames, coltypes "
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);"
.format(
username=plpy.quote_nullable(username),
orgname=plpy.quote_nullable(orgname),
function_name=plpy.quote_literal(function_name),
params=plpy.quote_literal(params)
)
)
if ds_return_metadata[0]["colnames"]:
colnames_arr = ds_return_metadata[0]["colnames"]
coltypes_arr = ds_return_metadata[0]["coltypes"]
# Prepare column and type strings required in the SQL queries
colnames = ','.join(colnames_arr)
columns_with_types_arr = [colnames_arr[i] + ' ' + coltypes_arr[i] for i in range(0,len(colnames_arr))]
columns_with_types = ','.join(columns_with_types_arr)
else:
raise Exception('Error retrieving OBS_GetMeasure metadata')
# Instruct the OBS server side to establish a FDW
# The metadata is obtained as well in order to:
# - (a) be able to write the query to grab the actual data to be executed in the remote server via pl/proxy,
# - (b) be able to tell OBS to free resources when done.
ds_fdw_metadata = plpy.execute("SELECT schemaname, tabname, servername "
"FROM cdb_dataservices_client._OBS_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, {user_schema}::text, {dbname}::text, {table_name}::text);"
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), user_db_role=plpy.quote_literal(user_db_role), user_schema=plpy.quote_literal(user_schema), dbname=plpy.quote_literal(dbname), table_name=plpy.quote_literal(table_name))
)
# Prepare column and type strings required in the SQL queries
columns_with_types_arr = [colnames_arr[i] + ' ' + coltypes_arr[i] for i in range(0,len(colnames_arr))]
columns_with_types = ','.join(columns_with_types_arr)
server_schema = ds_fdw_metadata[0]["schemaname"]
server_table_name = ds_fdw_metadata[0]["tabname"]
server_name = ds_fdw_metadata[0]["servername"]
# Create temporary table with the augmented results
plpy.execute('CREATE UNLOGGED TABLE "{user_schema}".{temp_table_name} AS '
'(SELECT {columns}, cartodb_id '
'FROM cdb_dataservices_client._OBS_FetchJoinFdwTableData('
'{username}::text, {orgname}::text, {schema}::text, {table_name}::text, {function_name}::text, {params}::json) '
'AS results({columns_with_types}, cartodb_id int) )'
.format(columns=colnames, username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname),
user_schema=user_schema, schema=plpy.quote_literal(server_schema), table_name=plpy.quote_literal(server_table_name),
function_name=plpy.quote_literal(function_name), params=plpy.quote_literal(params), columns_with_types=columns_with_types,
temp_table_name=temporary_table_name)
)
# Wipe user FDW data from the server
wiped = plpy.execute("SELECT cdb_dataservices_client._OBS_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, {fdw_server}::text)"
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), server_schema=plpy.quote_literal(server_schema), server_table_name=plpy.quote_literal(server_table_name), fdw_server=plpy.quote_literal(server_name))
)
# Add index to cartodb_id
plpy.execute('CREATE UNIQUE INDEX {temp_table_name}_pkey ON "{user_schema}".{temp_table_name} (cartodb_id)'
.format(user_schema=user_schema, temp_table_name=temporary_table_name)
)
# Prepare table to receive augmented results in new columns
for idx, column in enumerate(colnames_arr):
if colnames_arr[idx] is not 'the_geom':
plpy.execute('ALTER TABLE "{user_schema}".{table_name} ADD COLUMN {column_name} {column_type}'
.format(user_schema=user_schema, table_name=table_name, column_name=colnames_arr[idx], column_type=coltypes_arr[idx])
)
# Populate the user table with the augmented results
plpy.execute('UPDATE "{user_schema}".{table_name} SET {columns} = '
'(SELECT {columns} FROM "{user_schema}".{temporary_table_name} '
'WHERE "{user_schema}".{temporary_table_name}.cartodb_id = "{user_schema}".{table_name}.cartodb_id)'
.format(columns = colnames, username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname),
user_schema = user_schema, table_name=table_name, function_name=function_name, params=params, columns_with_types=columns_with_types,
temporary_table_name=temporary_table_name)
)
plpy.execute('DROP TABLE IF EXISTS "{user_schema}".{temporary_table_name}'
.format(user_schema=user_schema, table_name=table_name, temporary_table_name=temporary_table_name)
)
return True
except Exception as e:
plpy.warning('Error trying to augment table {0}'.format(e))
# Wipe user FDW data from the server in case of failure if the table was connected
if server_table_name:
# Wipe local temporary table
plpy.execute('DROP TABLE IF EXISTS "{user_schema}".{temporary_table_name}'
.format(user_schema=user_schema, table_name=table_name, temporary_table_name=temporary_table_name)
)
wiped = plpy.execute("SELECT cdb_dataservices_client._OBS_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, {fdw_server}::text)"
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), server_schema=plpy.quote_literal(server_schema), server_table_name=plpy.quote_literal(server_table_name), fdw_server=plpy.quote_literal(server_name))
)
return False
$$ LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__OBS_GetTable(username text, orgname text, user_db_role text, user_schema text, dbname text, table_name text, output_table_name text, function_name text, params json)
RETURNS boolean AS $$
try:
server_table_name = None
# Obtain return types for augmentation procedure
ds_return_metadata = plpy.execute("SELECT colnames, coltypes "
"FROM cdb_dataservices_client._OBS_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);"
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), function_name=plpy.quote_literal(function_name), params=plpy.quote_literal(params))
)
colnames_arr = ds_return_metadata[0]["colnames"]
coltypes_arr = ds_return_metadata[0]["coltypes"]
# Prepare column and type strings required in the SQL queries
colnames = ','.join(colnames_arr)
columns_with_types_arr = [colnames_arr[i] + ' ' + coltypes_arr[i] for i in range(0,len(colnames_arr))]
columns_with_types = ','.join(columns_with_types_arr)
# Instruct the OBS server side to establish a FDW
# The metadata is obtained as well in order to:
# - (a) be able to write the query to grab the actual data to be executed in the remote server via pl/proxy,
# - (b) be able to tell OBS to free resources when done.
ds_fdw_metadata = plpy.execute("SELECT schemaname, tabname, servername "
"FROM cdb_dataservices_client._OBS_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, {schema}::text, {dbname}::text, {table_name}::text);"
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), user_db_role=plpy.quote_literal(user_db_role), schema=plpy.quote_literal(user_schema), dbname=plpy.quote_literal(dbname), table_name=plpy.quote_literal(table_name))
)
server_schema = ds_fdw_metadata[0]["schemaname"]
server_table_name = ds_fdw_metadata[0]["tabname"]
server_name = ds_fdw_metadata[0]["servername"]
# Get list of user columns to include in the new table
user_table_columns = ','.join(
plpy.execute('SELECT array_agg(\'user_table.\' || attname) AS columns '
'FROM pg_attribute WHERE attrelid = \'"{user_schema}".{table_name}\'::regclass '
'AND attnum > 0 AND NOT attisdropped AND attname NOT LIKE \'the_geom_webmercator\' '
'AND NOT attname LIKE ANY(string_to_array(\'{colnames}\',\',\'));'
.format(user_schema=user_schema, table_name=table_name, colnames=colnames)
)[0]["columns"]
# Create a new table with the required columns
plpy.execute('CREATE TABLE "{schema}".{table_name} ( '
'cartodb_id int, the_geom geometry, {columns_with_types} '
');'
.format(schema=user_schema, table_name=output_table_name, columns_with_types=columns_with_types)
)
# Populate a new table with the augmented results
plpy.execute('CREATE TABLE "{user_schema}".{output_table_name} AS '
'(SELECT results.{columns}, {user_table_columns} '
'FROM {table_name} AS user_table '
'LEFT JOIN cdb_dataservices_client._OBS_FetchJoinFdwTableData({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, {function_name}::text, {params}::json) as results({columns_with_types}, cartodb_id int) '
'ON results.cartodb_id = user_table.cartodb_id)'
.format(output_table_name=output_table_name, columns=colnames, user_table_columns=user_table_columns, username=plpy.quote_nullable(username),
orgname=plpy.quote_nullable(orgname), user_schema=user_schema, server_schema=plpy.quote_literal(server_schema), server_table_name=plpy.quote_literal(server_table_name),
table_name=table_name, function_name=plpy.quote_literal(function_name), params=plpy.quote_literal(params), columns_with_types=columns_with_types)
)
plpy.execute('ALTER TABLE "{schema}".{table_name} OWNER TO "{user}";'
.format(schema=user_schema, table_name=output_table_name, user=user_db_role)
)
plpy.execute('ALTER TABLE "{schema}".{table_name} OWNER TO "{user}";'
.format(schema=user_schema, table_name=output_table_name, user=user_db_role)
)
# Wipe user FDW data from the server
wiped = plpy.execute("SELECT cdb_dataservices_client._OBS_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, {fdw_server}::text)"
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), server_schema=plpy.quote_literal(server_schema), server_table_name=plpy.quote_literal(server_table_name), fdw_server=plpy.quote_literal(server_name))
)
return True
except Exception as e:
plpy.warning('Error trying to get table {0}'.format(e))
# Wipe user FDW data from the server in case of failure if the table was connected
if server_table_name:
wiped = plpy.execute("SELECT cdb_dataservices_client._OBS_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, {fdw_server}::text)"
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), server_schema=plpy.quote_literal(server_schema), server_table_name=plpy.quote_literal(server_table_name), fdw_server=plpy.quote_literal(server_name))
)
return False
return True
$$ LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(
username text,
orgname text,
user_db_role text,
user_schema text,
dbname text,
table_name text,
output_table_name text,
params json
) RETURNS boolean AS $$
function_name = 'GetMeasure'
# Obtain return types for augmentation procedure
ds_return_metadata = plpy.execute(
"SELECT colnames, coltypes "
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);" .format(
username=plpy.quote_nullable(username),
orgname=plpy.quote_nullable(orgname),
function_name=plpy.quote_literal(function_name),
params=plpy.quote_literal(params)))
CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_ConnectUserTable(username text, orgname text, user_db_role text, user_schema text, dbname text, table_name text)
RETURNS cdb_dataservices_client.ds_fdw_metadata AS $$
CONNECT _server_conn_str();
TARGET cdb_dataservices_server._OBS_ConnectUserTable;
if ds_return_metadata[0]["colnames"]:
colnames_arr = ds_return_metadata[0]["colnames"]
coltypes_arr = ds_return_metadata[0]["coltypes"]
else:
raise Exception('Error retrieving OBS_GetMeasure metadata')
# Prepare column and type strings required in the SQL queries
columns_with_types_arr = [
colnames_arr[i] +
' ' +
coltypes_arr[i] for i in range(
0,
len(colnames_arr))]
columns_with_types = ','.join(columns_with_types_arr)
aliased_colname_list = ','.join(
['result.' + name for name in colnames_arr])
# Instruct the OBS server side to establish a FDW
# The metadata is obtained as well in order to:
# - (a) be able to write the query to grab the actual data to be executed in the remote server via pl/proxy,
# - (b) be able to tell OBS to free resources when done.
ds_fdw_metadata = plpy.execute(
"SELECT schemaname, tabname, servername "
"FROM cdb_dataservices_client._DST_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, "
"{schema}::text, {dbname}::text, {table_name}::text);" .format(
username=plpy.quote_nullable(username),
orgname=plpy.quote_nullable(orgname),
user_db_role=plpy.quote_literal(user_db_role),
schema=plpy.quote_literal(user_schema),
dbname=plpy.quote_literal(dbname),
table_name=plpy.quote_literal(table_name)))
if ds_fdw_metadata[0]["schemaname"]:
server_schema = ds_fdw_metadata[0]["schemaname"]
server_table_name = ds_fdw_metadata[0]["tabname"]
server_name = ds_fdw_metadata[0]["servername"]
else:
raise Exception('Error connecting dataset via FDW')
# Create a new table with the required columns
plpy.execute(
'INSERT INTO "{schema}".{analysis_table_name} '
'SELECT ut.cartodb_id, ut.the_geom, {colname_list} '
'FROM "{schema}".{table_name} ut '
'LEFT JOIN _DST_FetchJoinFdwTableData({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, '
'{function_name}::text, {params}::json) '
'AS result ({columns_with_types}, cartodb_id int) '
'ON result.cartodb_id = ut.cartodb_id;' .format(
schema=user_schema,
analysis_table_name=output_table_name,
colname_list=aliased_colname_list,
table_name=table_name,
username=plpy.quote_nullable(username),
orgname=plpy.quote_nullable(orgname),
server_schema=plpy.quote_literal(server_schema),
server_table_name=plpy.quote_literal(server_table_name),
function_name=plpy.quote_literal(function_name),
params=plpy.quote_literal(params),
columns_with_types=columns_with_types))
# Wipe user FDW data from the server
wiped = plpy.execute(
"SELECT cdb_dataservices_client._DST_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, "
"{server_table_name}::text, {fdw_server}::text)" .format(
username=plpy.quote_nullable(username),
orgname=plpy.quote_nullable(orgname),
server_schema=plpy.quote_literal(server_schema),
server_table_name=plpy.quote_literal(server_table_name),
fdw_server=plpy.quote_literal(server_name)))
return True
$$ LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_ConnectUserTable(
username text,
orgname text,
user_db_role text,
user_schema text,
dbname text,
table_name text
)RETURNS cdb_dataservices_client.ds_fdw_metadata AS $$
CONNECT cdb_dataservices_client._server_conn_str();
TARGET cdb_dataservices_server._DST_ConnectUserTable;
$$ LANGUAGE plproxy;
CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_GetReturnMetadata(username text, orgname text, function_name text, params json)
RETURNS cdb_dataservices_client.ds_return_metadata AS $$
CONNECT _server_conn_str();
TARGET cdb_dataservices_server._OBS_GetReturnMetadata;
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_GetReturnMetadata(
username text,
orgname text,
function_name text,
params json
) RETURNS cdb_dataservices_client.ds_return_metadata AS $$
CONNECT cdb_dataservices_client._server_conn_str();
TARGET cdb_dataservices_server._DST_GetReturnMetadata;
$$ LANGUAGE plproxy;
CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
RETURNS SETOF record AS $$
CONNECT _server_conn_str();
TARGET cdb_dataservices_server._OBS_FetchJoinFdwTableData;
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_FetchJoinFdwTableData(
username text,
orgname text,
table_schema text,
table_name text,
function_name text,
params json
) RETURNS SETOF record AS $$
CONNECT cdb_dataservices_client._server_conn_str();
TARGET cdb_dataservices_server._DST_FetchJoinFdwTableData;
$$ LANGUAGE plproxy;
CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, server_name text)
RETURNS boolean AS $$
CONNECT _server_conn_str();
TARGET cdb_dataservices_server._OBS_DisconnectUserTable;
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(
username text,
orgname text,
table_schema text,
table_name text,
server_name text
) RETURNS boolean AS $$
CONNECT cdb_dataservices_client._server_conn_str();
TARGET cdb_dataservices_server._DST_DisconnectUserTable;
$$ LANGUAGE plproxy;

View File

@ -1,2 +1,2 @@
GRANT EXECUTE ON FUNCTION cdb_dataservices_client._obs_augmenttable(table_name text, function_name text, params json) TO publicuser;
GRANT EXECUTE ON FUNCTION cdb_dataservices_client._obs_gettable(table_name text, output_table_name text, function_name text, params json) TO publicuser;
GRANT EXECUTE ON FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(output_table_name text, params json) TO publicuser;
GRANT EXECUTE ON FUNCTION cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure(table_name text, output_table_name text, params json) TO publicuser;

View File

@ -3,65 +3,63 @@ SET search_path TO public,cartodb,cdb_dataservices_client;
CREATE TABLE my_table(cartodb_id int);
INSERT INTO my_table (cartodb_id) VALUES (1);
-- Mock the server functions
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text)
RETURNS cdb_dataservices_client.ds_fdw_metadata AS $$
BEGIN
RETURN ('dummy_schema'::text, 'dummy_table'::text, 'dummy_server'::text);
END;
$$ LANGUAGE 'plpgsql';
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_GetReturnMetadata(username text, orgname text, function_name text, params json)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_GetReturnMetadata(username text, orgname text, function_name text, params json)
RETURNS cdb_dataservices_client.ds_return_metadata AS $$
BEGIN
RETURN (Array['total_pop'], Array['double precision']);
END;
$$ LANGUAGE 'plpgsql';
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
RETURNS RECORD AS $$
BEGIN
RETURN (23.4::double precision, 1::int);
END;
$$ LANGUAGE 'plpgsql';
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, servername text)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, servername text)
RETURNS boolean AS $$
BEGIN
RETURN true;
END;
$$ LANGUAGE 'plpgsql';
-- Augment a table with the total_pop column
SELECT cdb_dataservices_client._OBS_AugmentTable('my_table', 'dummy', '{"dummy":"dummy"}'::json);
_obs_augmenttable
-------------------
-- Create a sample user table
CREATE TABLE user_table (cartodb_id int, the_geom geometry);
INSERT INTO user_table(cartodb_id, the_geom) VALUES (1, '0101000020E6100000F74FC902E07D52C05FE24CC7654B4440');
INSERT INTO user_table(cartodb_id, the_geom) VALUES (2, '0101000020E6100000F74FC902E07D52C05FE24CC7654B4440');
INSERT INTO user_table(cartodb_id, the_geom) VALUES (3, '0101000020E6100000F74FC902E07D52C05FE24CC7654B4440');
-- Prepare a table with the total_pop column
SELECT cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure('my_table_dst', '{"dummy":"dummy"}'::json);
_dst_preparetableobs_getmeasure
---------------------------------
t
(1 row)
-- The results of the table should return the mocked value of 23.4 in the total_pop column
SELECT * FROM my_table;
cartodb_id | total_pop
------------+-----------
1 | 23.4
(1 row)
-- The table should now exist and be empty
SELECT * FROM my_table_dst;
cartodb_id | the_geom | total_pop
------------+----------+-----------
(0 rows)
-- Mock again the function for it to return a different value now
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
RETURNS RECORD AS $$
BEGIN
RETURN (577777.4::double precision, 1::int);
END;
$$ LANGUAGE 'plpgsql';
-- Augment a new table with total_pop
SELECT cdb_dataservices_client._OBS_GetTable('my_table', 'my_table_new', 'dummy', '{"dummy":"dummy"}'::json);
_obs_gettable
---------------
-- Populate the table with measurement data
SELECT cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure('user_table', 'my_table_dst', '{"dummy":"dummy"}'::json);
_dst_populatetableobs_getmeasure
----------------------------------
t
(1 row)
-- Check that the table contains the new value for total_pop and not the value already existent in the table
SELECT * FROM my_table_new;
total_pop | cartodb_id
-----------+------------
577777.4 | 1
(1 row)
-- The table should now show the results
SELECT * FROM my_table_dst;
cartodb_id | the_geom | total_pop
------------+----------------------------------------------------+-----------
1 | 0101000020E6100000F74FC902E07D52C05FE24CC7654B4440 | 23.4
2 | 0101000020E6100000F74FC902E07D52C05FE24CC7654B4440 |
3 | 0101000020E6100000F74FC902E07D52C05FE24CC7654B4440 |
(3 rows)
-- Clean tables
DROP TABLE my_table;
DROP TABLE my_table_new;
DROP TABLE my_table_dst;

View File

@ -6,54 +6,51 @@ CREATE TABLE my_table(cartodb_id int);
INSERT INTO my_table (cartodb_id) VALUES (1);
-- Mock the server functions
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text)
RETURNS cdb_dataservices_client.ds_fdw_metadata AS $$
BEGIN
RETURN ('dummy_schema'::text, 'dummy_table'::text, 'dummy_server'::text);
END;
$$ LANGUAGE 'plpgsql';
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_GetReturnMetadata(username text, orgname text, function_name text, params json)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_GetReturnMetadata(username text, orgname text, function_name text, params json)
RETURNS cdb_dataservices_client.ds_return_metadata AS $$
BEGIN
RETURN (Array['total_pop'], Array['double precision']);
END;
$$ LANGUAGE 'plpgsql';
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
RETURNS RECORD AS $$
BEGIN
RETURN (23.4::double precision, 1::int);
END;
$$ LANGUAGE 'plpgsql';
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, servername text)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, servername text)
RETURNS boolean AS $$
BEGIN
RETURN true;
END;
$$ LANGUAGE 'plpgsql';
-- Augment a table with the total_pop column
SELECT cdb_dataservices_client._OBS_AugmentTable('my_table', 'dummy', '{"dummy":"dummy"}'::json);
-- Create a sample user table
CREATE TABLE user_table (cartodb_id int, the_geom geometry);
INSERT INTO user_table(cartodb_id, the_geom) VALUES (1, '0101000020E6100000F74FC902E07D52C05FE24CC7654B4440');
INSERT INTO user_table(cartodb_id, the_geom) VALUES (2, '0101000020E6100000F74FC902E07D52C05FE24CC7654B4440');
INSERT INTO user_table(cartodb_id, the_geom) VALUES (3, '0101000020E6100000F74FC902E07D52C05FE24CC7654B4440');
-- The results of the table should return the mocked value of 23.4 in the total_pop column
SELECT * FROM my_table;
-- Prepare a table with the total_pop column
SELECT cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure('my_table_dst', '{"dummy":"dummy"}'::json);
-- Mock again the function for it to return a different value now
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
RETURNS RECORD AS $$
BEGIN
RETURN (577777.4::double precision, 1::int);
END;
$$ LANGUAGE 'plpgsql';
-- The table should now exist and be empty
SELECT * FROM my_table_dst;
-- Augment a new table with total_pop
SELECT cdb_dataservices_client._OBS_GetTable('my_table', 'my_table_new', 'dummy', '{"dummy":"dummy"}'::json);
-- Populate the table with measurement data
SELECT cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure('user_table', 'my_table_dst', '{"dummy":"dummy"}'::json);
-- Check that the table contains the new value for total_pop and not the value already existent in the table
SELECT * FROM my_table_new;
-- The table should now show the results
SELECT * FROM my_table_dst;
-- Clean tables
DROP TABLE my_table;
DROP TABLE my_table_new;
DROP TABLE my_table_dst;

View File

@ -2,34 +2,34 @@ CREATE TYPE cdb_dataservices_server.ds_fdw_metadata as (schemaname text, tabname
CREATE TYPE cdb_dataservices_server.ds_return_metadata as (colnames text[], coltypes text[]);
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text)
RETURNS cdb_dataservices_server.ds_fdw_metadata AS $$
host_addr = plpy.execute("SELECT split_part(inet_client_addr()::text, '/', 1) as user_host")[0]['user_host']
return plpy.execute("SELECT * FROM cdb_dataservices_server.__OBS_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, {schema}::text, {dbname}::text, {host_addr}::text, {table_name}::text)"
return plpy.execute("SELECT * FROM cdb_dataservices_server.__DST_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, {schema}::text, {dbname}::text, {host_addr}::text, {table_name}::text)"
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), user_db_role=plpy.quote_literal(user_db_role), schema=plpy.quote_literal(input_schema), dbname=plpy.quote_literal(dbname), table_name=plpy.quote_literal(table_name), host_addr=plpy.quote_literal(host_addr))
)[0]
$$ LANGUAGE plpythonu;
CREATE OR REPLACE FUNCTION cdb_dataservices_server.__OBS_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, host_addr text, table_name text)
CREATE OR REPLACE FUNCTION cdb_dataservices_server.__DST_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, host_addr text, table_name text)
RETURNS cdb_dataservices_server.ds_fdw_metadata AS $$
CONNECT cdb_dataservices_server._obs_server_conn_str(username, orgname);
TARGET cdb_observatory._OBS_ConnectUserTable;
$$ LANGUAGE plproxy;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_GetReturnMetadata(username text, orgname text, function_name text, params json)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_GetReturnMetadata(username text, orgname text, function_name text, params json)
RETURNS cdb_dataservices_server.ds_return_metadata AS $$
CONNECT cdb_dataservices_server._obs_server_conn_str(username, orgname);
TARGET cdb_observatory._OBS_GetReturnMetadata;
$$ LANGUAGE plproxy;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
RETURNS SETOF record AS $$
CONNECT cdb_dataservices_server._obs_server_conn_str(username, orgname);
TARGET cdb_observatory._OBS_FetchJoinFdwTableData;
$$ LANGUAGE plproxy;
CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, servername text)
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, servername text)
RETURNS boolean AS $$
CONNECT cdb_dataservices_server._obs_server_conn_str(username, orgname);
TARGET cdb_observatory._OBS_DisconnectUserTable;

View File

@ -2,7 +2,7 @@ SELECT exists(SELECT *
FROM pg_proc p
INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid)
WHERE ns.nspname = 'cdb_dataservices_server'
AND proname = '_obs_connectusertable'
AND proname = '_dst_connectusertable'
AND oidvectortypes(p.proargtypes) = 'text, text, text, text, text, text');
exists
--------
@ -13,7 +13,7 @@ SELECT exists(SELECT *
FROM pg_proc p
INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid)
WHERE ns.nspname = 'cdb_dataservices_server'
AND proname = '_obs_getreturnmetadata'
AND proname = '_dst_getreturnmetadata'
AND oidvectortypes(p.proargtypes) = 'text, text, text, json');
exists
--------
@ -24,7 +24,7 @@ SELECT exists(SELECT *
FROM pg_proc p
INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid)
WHERE ns.nspname = 'cdb_dataservices_server'
AND proname = '_obs_fetchjoinfdwtabledata'
AND proname = '_dst_fetchjoinfdwtabledata'
AND oidvectortypes(p.proargtypes) = 'text, text, text, text, text, json');
exists
--------
@ -35,7 +35,7 @@ SELECT exists(SELECT *
FROM pg_proc p
INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid)
WHERE ns.nspname = 'cdb_dataservices_server'
AND proname = '_obs_disconnectusertable'
AND proname = '_dst_disconnectusertable'
AND oidvectortypes(p.proargtypes) = 'text, text, text, text, text');
exists
--------

View File

@ -2,27 +2,27 @@ SELECT exists(SELECT *
FROM pg_proc p
INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid)
WHERE ns.nspname = 'cdb_dataservices_server'
AND proname = '_obs_connectusertable'
AND proname = '_dst_connectusertable'
AND oidvectortypes(p.proargtypes) = 'text, text, text, text, text, text');
SELECT exists(SELECT *
FROM pg_proc p
INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid)
WHERE ns.nspname = 'cdb_dataservices_server'
AND proname = '_obs_getreturnmetadata'
AND proname = '_dst_getreturnmetadata'
AND oidvectortypes(p.proargtypes) = 'text, text, text, json');
SELECT exists(SELECT *
FROM pg_proc p
INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid)
WHERE ns.nspname = 'cdb_dataservices_server'
AND proname = '_obs_fetchjoinfdwtabledata'
AND proname = '_dst_fetchjoinfdwtabledata'
AND oidvectortypes(p.proargtypes) = 'text, text, text, text, text, json');
SELECT exists(SELECT *
FROM pg_proc p
INNER JOIN pg_namespace ns ON (p.pronamespace = ns.oid)
WHERE ns.nspname = 'cdb_dataservices_server'
AND proname = '_obs_disconnectusertable'
AND proname = '_dst_disconnectusertable'
AND oidvectortypes(p.proargtypes) = 'text, text, text, text, text');