diff --git a/client/sql/20_table_augmentation.sql b/client/sql/20_table_augmentation.sql new file mode 100644 index 0000000..16a595b --- /dev/null +++ b/client/sql/20_table_augmentation.sql @@ -0,0 +1,111 @@ +CREATE TYPE ds_fdw_metadata as (schemaname text, tabname text, servername text); +CREATE TYPE ds_return_metadata as (colnames text[], coltypes text[]); + +CREATE OR REPLACE FUNCTION cdb_dataservices_client.OBS_ProcessTable(table_name text, output_table_name text, params json) +RETURNS boolean AS $$ +DECLARE + username text; + useruuid text; + orgname text; + dbname text; + input_schema text; + result boolean; +BEGIN + IF session_user = 'publicuser' OR session_user ~ 'cartodb_publicuser_*' THEN + RAISE EXCEPTION 'The api_key must be provided'; + END IF; + + SELECT session_user INTO useruuid; + + SELECT u, o INTO username, orgname FROM cdb_dataservices_client._cdb_entity_config() AS (u text, o text); + -- JSON value stored "" is taken as literal + IF username IS NULL OR username = '' OR username = '""' THEN + RAISE EXCEPTION 'Username is a mandatory argument'; + END IF; + + IF orgname IS NULL OR orgname = '' OR orgname = '""' THEN + input_schema := 'public'; + ELSE + input_schema := username; + END IF; + + SELECT current_database() INTO dbname; + + SELECT cdb_dataservices_client._OBS_ProcessTable(username::text, useruuid::text, input_schema::text, dbname::text,table_name::text, output_table_name::text, params::json) INTO result; + + RETURN true; +END; +$$ LANGUAGE 'plpgsql' SECURITY DEFINER; + + + +CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_ProcessTable(username text, useruuid text, input_schema text, dbname text, table_name text, output_table_name text, params json) +RETURNS boolean AS $$ + try: + tabname = None + # Obtain metadata for FDW connection + ds_fdw_metadata = plpy.execute("SELECT schemaname, tabname, servername " + "FROM cdb_dataservices_client._OBS_ConnectUserTable('{0}'::text, '{1}'::text, '{2}'::text, '{3}'::text, '{4}'::text);" + .format(username, useruuid, input_schema, dbname, table_name)) + + schemaname = ds_fdw_metadata[0]["schemaname"] + tabname = ds_fdw_metadata[0]["tabname"] + servername = ds_fdw_metadata[0]["servername"] + + # Obtain return types for augmentation procedure + ds_return_metadata = plpy.execute("SELECT colnames, coltypes " + "FROM cdb_dataservices_client._OBS_GetReturnMetadata('{0}'::json);" + .format(params)) + + colnames_array = ds_return_metadata[0]["colnames"] + coltypes_array = ds_return_metadata[0]["coltypes"] + + # Populate a new table with the augmented results + plpy.execute("CREATE TABLE {0} AS " + "(SELECT results.{1}, user_table.* " + "FROM {3} as user_table, " + "cdb_dataservices_client._OBS_GetProcessedData('{2}'::text, '{3}'::text, '{4}'::json) as results({1} numeric, cartodb_id int) " + "WHERE results.cartodb_id = user_table.cartodb_id)" + .format(output_table_name, colnames_array[0] ,schemaname, tabname, params)) + + plpy.execute('ALTER TABLE {0} OWNER TO "{1}";' + .format(output_table_name, useruuid)) + + # Wipe user FDW data from the server + wiped = plpy.execute("SELECT cdb_dataservices_client._OBS_DisconnectUserTable('{0}'::text, '{1}'::text, '{2}'::text)" + .format(schemaname, tabname, servername)) + + return True + except Exception as e: + plpy.warning('Error trying to augment table {0}'.format(e)) + # Wipe user FDW data from the server in case of failure + if tabname: + wiped = plpy.execute("SELECT cdb_dataservices_client._OBS_DisconnectUserTable('{0}'::text, '{1}'::text, '{2}'::text)" + .format(schemaname, tabname, servername)) + return False +$$ LANGUAGE plpythonu; + + +CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_ConnectUserTable(username text, useruuid text, input_schema text, dbname text, table_name text) +RETURNS ds_fdw_metadata AS $$ + CONNECT _server_conn_str(); + SELECT cdb_dataservices_server.OBS_ConnectUserTable(username text, useruuid text, input_schema text, dbname text, table_name text); +$$ LANGUAGE plproxy; + +CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_GetReturnMetadata(params json) +RETURNS ds_return_metadata AS $$ + CONNECT _server_conn_str(); + SELECT cdb_dataservices_server.OBS_GetReturnMetadata(params json); +$$ LANGUAGE plproxy; + +CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_GetProcessedData(table_schema text, table_name text, params json) +RETURNS SETOF record AS $$ + CONNECT _server_conn_str(); + SELECT cdb_dataservices_server.OBS_GetProcessedData(table_schema text, table_name text, params json); +$$ LANGUAGE plproxy; + +CREATE OR REPLACE FUNCTION cdb_dataservices_client._OBS_DisconnectUserTable(table_schema text, table_name text, server_name text) +RETURNS boolean AS $$ + CONNECT _server_conn_str(); + SELECT cdb_dataservices_server.OBS_DisconnectUserTable(table_schema text, table_name text, server_name text); +$$ LANGUAGE plproxy; diff --git a/server/extension/sql/125_data_observatory_table_augment.sql b/server/extension/sql/125_data_observatory_table_augment.sql new file mode 100644 index 0000000..ba32c3c --- /dev/null +++ b/server/extension/sql/125_data_observatory_table_augment.sql @@ -0,0 +1,54 @@ +CREATE TYPE ds_fdw_metadata as (schemaname text, tabname text, servername text); +CREATE TYPE ds_return_metadata as (colnames text[], coltypes text[]); + + +CREATE OR REPLACE FUNCTION cdb_dataservices_server.OBS_ConnectUserTable(username text, useruuid text, input_schema text, dbname text, table_name text) +RETURNS ds_fdw_metadata AS $$ + ---- quota/checks and internal call to function + --TODO: need username and orgname +$$ LANGUAGE plpythonu; + +CREATE OR REPLACE FUNCTION cdb_dataservices_server.OBS_GetReturnMetadata(params json) +RETURNS ds_return_metadata AS $$ + ---- quota/checks and internal call to function + --TODO: need username and orgname +$$ LANGUAGE plpythonu; + +CREATE OR REPLACE FUNCTION cdb_dataservices_server.OBS_GetAugmentedColumns(table_schema text, table_name text, params json) +RETURNS SETOF record AS $$ + ---- quota/checks and internal call to function + --TODO: need username and orgname +$$ LANGUAGE plpythonu; + + +CREATE OR REPLACE FUNCTION cdb_dataservices_server.OBS_DisconnectUserTable(table_schema text, table_name text, servername text) +RETURNS boolean AS $$ + ---- quota/checks and internal call to function + --TODO: need username and orgname +$$ LANGUAGE plpythonu; + + +CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_ConnectUserTable(username text, useruuid text, input_schema text, dbname text, table_name text) +RETURNS ds_fdw_metadata AS $$ + CONNECT cdb_dataservices_server._obs_server_conn_str(username, orgname); + SELECT cdb_observatory._OBS_ConnectUserTable(username text, useruuid text, input_schema text, dbname text, host text, table_name text); +$$ LANGUAGE plproxy; + +CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_GetReturnMetadata(params json) +RETURNS ds_return_metadata AS $$ + CONNECT cdb_dataservices_server._obs_server_conn_str(username, orgname); + SELECT cdb_observatory._OBS_GetReturnMetadata(params json); +$$ LANGUAGE plproxy; + +CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_GetAugmentedColumns(table_schema text, table_name text, params json) +RETURNS SETOF record AS $$ + CONNECT cdb_dataservices_server._obs_server_conn_str(username, orgname); + SELECT cdb_observatory._OBS_GetReturnMetadata(params json); +$$ LANGUAGE plproxy; + + +CREATE OR REPLACE FUNCTION cdb_dataservices_server._OBS_DisconnectUserTable(table_schema text, table_name text, servername text) +RETURNS boolean AS $$ + CONNECT cdb_dataservices_server._obs_server_conn_str(username, orgname); + SELECT cdb_observatory._OBS_DisconnectUserTable(table_schema text, table_name text, servername text); +$$ LANGUAGE plproxy;