diff --git a/client/sql/20_table_augmentation.sql b/client/sql/20_table_augmentation.sql index 0b9ea48..caa14db 100644 --- a/client/sql/20_table_augmentation.sql +++ b/client/sql/20_table_augmentation.sql @@ -1,8 +1,11 @@ CREATE TYPE cdb_dataservices_client.ds_fdw_metadata as (schemaname text, tabname text, servername text); CREATE TYPE cdb_dataservices_client.ds_return_metadata as (colnames text[], coltypes text[]); -CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(table_name text, output_table_name text, function_name text, params json) -RETURNS boolean AS $$ +CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure( + output_table_name text, + function_name text, + params json +) RETURNS boolean AS $$ DECLARE username text; user_db_role text; @@ -28,14 +31,26 @@ BEGIN user_schema := username; END IF; - SELECT cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(username, orgname, user_db_role, user_schema, output_table_name, function_name, params) INTO result; + SELECT cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure( + username, + orgname, + user_db_role, + user_schema, + output_table_name, + function_name, + params + ) INTO result; RETURN result; END; $$ LANGUAGE 'plpgsql' SECURITY DEFINER; -CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure(table_name text, output_table_name text, function_name text, params json) -RETURNS boolean AS $$ +CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure( + table_name text, + output_table_name text, + function_name text, + params json +) RETURNS boolean AS $$ DECLARE username text; user_db_role text; @@ -64,15 +79,32 @@ BEGIN SELECT current_database() INTO dbname; - SELECT cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(username, orgname, user_db_role, user_schema, dbname, table_name, output_table_name, function_name, params) INTO result; + SELECT cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure( + username, + orgname, + user_db_role, + user_schema, + dbname, + table_name, + output_table_name, + function_name, + params + ) INTO result; RETURN result; END; $$ LANGUAGE 'plpgsql' SECURITY DEFINER; -CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(username text, orgname text, user_db_role text, user_schema text, output_table_name text, function_name text, params json) -RETURNS boolean AS $$ +CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure( + username text, + orgname text, + user_db_role text, + user_schema text, + output_table_name text, + function_name text, + params json +) RETURNS boolean AS $$ # Obtain return types for augmentation procedure ds_return_metadata = plpy.execute("SELECT colnames, coltypes " "FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);" @@ -105,27 +137,154 @@ RETURNS boolean AS $$ return True $$ LANGUAGE plpythonu; +CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure( + username text, + orgname text, + user_db_role text, + user_schema text, + dbname text, + table_name text, + output_table_name text, + function_name text, + params json +) RETURNS boolean AS $$ + # Obtain return types for augmentation procedure + ds_return_metadata = plpy.execute( + "SELECT colnames, coltypes " + "FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);" .format( + username=plpy.quote_nullable(username), + orgname=plpy.quote_nullable(orgname), + function_name=plpy.quote_literal(function_name), + params=plpy.quote_literal(params))) -CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_ConnectUserTable(username text, orgname text, user_db_role text, user_schema text, dbname text, table_name text) -RETURNS cdb_dataservices_client.ds_fdw_metadata AS $$ + colnames_arr = ds_return_metadata[0]["colnames"] + coltypes_arr = ds_return_metadata[0]["coltypes"] + + # Prepare column and type strings required in the SQL queries + columns_with_types_arr = [ + colnames_arr[i] + + ' ' + + coltypes_arr[i] for i in range( + 0, + len(colnames_arr))] + columns_with_types = ','.join(columns_with_types_arr) + aliased_colname_list = ','.join( + ['result.' + name for name in colnames_arr]) + + # Instruct the OBS server side to establish a FDW + # The metadata is obtained as well in order to: + # - (a) be able to write the query to grab the actual data to be executed in the remote server via pl/proxy, + # - (b) be able to tell OBS to free resources when done. + ds_fdw_metadata = plpy.execute( + "SELECT schemaname, tabname, servername " + "FROM cdb_dataservices_client._DST_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, " + "{schema}::text, {dbname}::text, {table_name}::text);" .format( + username=plpy.quote_nullable(username), + orgname=plpy.quote_nullable(orgname), + user_db_role=plpy.quote_literal(user_db_role), + schema=plpy.quote_literal(user_schema), + dbname=plpy.quote_literal(dbname), + table_name=plpy.quote_literal(table_name))) + + server_schema = ds_fdw_metadata[0]["schemaname"] + server_table_name = ds_fdw_metadata[0]["tabname"] + server_name = ds_fdw_metadata[0]["servername"] + + # Create a new table with the required columns + + plpy.warning( + 'INSERT INTO "{schema}".{analysis_table_name} ' + 'SELECT ut.cartodb_id, ut.the_geom, {colname_list} ' + 'FROM "{schema}".{table_name} ut ' + 'LEFT JOIN _DST_FetchJoinFdwTableData({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, ' + '{function_name}::text, {params}::json) ' + 'AS result ({columns_with_types}, cartodb_id int) ' + 'ON result.cartodb_id = ut.cartodb_id;' .format( + schema=user_schema, + analysis_table_name=output_table_name, + colname_list=aliased_colname_list, + table_name=table_name, + username=plpy.quote_nullable(username), + orgname=plpy.quote_nullable(orgname), + server_schema=plpy.quote_literal(server_schema), + server_table_name=plpy.quote_literal(server_table_name), + function_name=plpy.quote_literal(function_name), + params=plpy.quote_literal(params), + columns_with_types=columns_with_types)) + plpy.execute( + 'INSERT INTO "{schema}".{analysis_table_name} ' + 'SELECT ut.cartodb_id, ut.the_geom, {colname_list} ' + 'FROM "{schema}".{table_name} ut ' + 'LEFT JOIN _DST_FetchJoinFdwTableData({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, ' + '{function_name}::text, {params}::json) ' + 'AS result ({columns_with_types}, cartodb_id int) ' + 'ON result.cartodb_id = ut.cartodb_id;' .format( + schema=user_schema, + analysis_table_name=output_table_name, + colname_list=aliased_colname_list, + table_name=table_name, + username=plpy.quote_nullable(username), + orgname=plpy.quote_nullable(orgname), + server_schema=plpy.quote_literal(server_schema), + server_table_name=plpy.quote_literal(server_table_name), + function_name=plpy.quote_literal(function_name), + params=plpy.quote_literal(params), + columns_with_types=columns_with_types)) + + # Wipe user FDW data from the server + wiped = plpy.execute( + "SELECT cdb_dataservices_client._DST_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, " + "{server_table_name}::text, {fdw_server}::text)" .format( + username=plpy.quote_nullable(username), + orgname=plpy.quote_nullable(orgname), + server_schema=plpy.quote_literal(server_schema), + server_table_name=plpy.quote_literal(server_table_name), + fdw_server=plpy.quote_literal(server_name))) + + return True +$$ LANGUAGE plpythonu; + +CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_ConnectUserTable( + username text, + orgname text, + user_db_role text, + user_schema text, + dbname text, + table_name text +)RETURNS cdb_dataservices_client.ds_fdw_metadata AS $$ CONNECT _server_conn_str(); TARGET cdb_dataservices_server._DST_ConnectUserTable; $$ LANGUAGE plproxy; -CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_GetReturnMetadata(username text, orgname text, function_name text, params json) -RETURNS cdb_dataservices_client.ds_return_metadata AS $$ +CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_GetReturnMetadata( + username text, + orgname text, + function_name text, + params json +) RETURNS cdb_dataservices_client.ds_return_metadata AS $$ CONNECT _server_conn_str(); TARGET cdb_dataservices_server._DST_GetReturnMetadata; $$ LANGUAGE plproxy; -CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json) -RETURNS SETOF record AS $$ +CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_FetchJoinFdwTableData( + username text, + orgname text, + table_schema text, + table_name text, + function_name text, + params json +) RETURNS SETOF record AS $$ CONNECT _server_conn_str(); TARGET cdb_dataservices_server._DST_FetchJoinFdwTableData; $$ LANGUAGE plproxy; -CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, server_name text) -RETURNS boolean AS $$ +CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable( + username text, + orgname text, + table_schema text, + table_name text, + server_name text +) RETURNS boolean AS $$ CONNECT _server_conn_str(); TARGET cdb_dataservices_server._DST_DisconnectUserTable; $$ LANGUAGE plproxy; diff --git a/client/sql/95_grant_execute_manual.sql b/client/sql/95_grant_execute_manual.sql index 5cfd1a0..1fd0f9a 100644 --- a/client/sql/95_grant_execute_manual.sql +++ b/client/sql/95_grant_execute_manual.sql @@ -1 +1,2 @@ -GRANT EXECUTE ON FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(table_name text, output_table_name text, function_name text, params json) TO publicuser; +GRANT EXECUTE ON FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(output_table_name text, function_name text, params json) TO publicuser; +GRANT EXECUTE ON FUNCTION cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure(table_name text, output_table_name text, function_name text, params json) TO publicuser; diff --git a/server/extension/sql/125_data_observatory_table_augment.sql b/server/extension/sql/125_data_observatory_table_augment.sql index 1f72f3d..ca44e13 100644 --- a/server/extension/sql/125_data_observatory_table_augment.sql +++ b/server/extension/sql/125_data_observatory_table_augment.sql @@ -5,7 +5,7 @@ CREATE TYPE cdb_dataservices_server.ds_return_metadata as (colnames text[], colt CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text) RETURNS cdb_dataservices_server.ds_fdw_metadata AS $$ host_addr = plpy.execute("SELECT split_part(inet_client_addr()::text, '/', 1) as user_host")[0]['user_host'] - return plpy.execute("SELECT * FROM cdb_dataservices_server.__OBS_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, {schema}::text, {dbname}::text, {host_addr}::text, {table_name}::text)" + return plpy.execute("SELECT * FROM cdb_dataservices_server.__DST_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, {schema}::text, {dbname}::text, {host_addr}::text, {table_name}::text)" .format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), user_db_role=plpy.quote_literal(user_db_role), schema=plpy.quote_literal(input_schema), dbname=plpy.quote_literal(dbname), table_name=plpy.quote_literal(table_name), host_addr=plpy.quote_literal(host_addr)) )[0] $$ LANGUAGE plpythonu;