b8527514b7
I'm not 100% sure what's appropiate for plproxy functions so, for now, I've set them up as VOLATILE and PARALLEL UNSAFE
276 lines
10 KiB
PL/PgSQL
276 lines
10 KiB
PL/PgSQL
CREATE TYPE cdb_dataservices_client.ds_fdw_metadata as (schemaname text, tabname text, servername text);
|
|
CREATE TYPE cdb_dataservices_client.ds_return_metadata as (colnames text[], coltypes text[]);
|
|
|
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(
|
|
output_table_name text,
|
|
params json
|
|
) RETURNS boolean AS $$
|
|
DECLARE
|
|
username text;
|
|
user_db_role text;
|
|
orgname text;
|
|
user_schema text;
|
|
result boolean;
|
|
BEGIN
|
|
IF session_user = 'publicuser' OR session_user ~ 'cartodb_publicuser_*' THEN
|
|
RAISE EXCEPTION 'The api_key must be provided';
|
|
END IF;
|
|
|
|
SELECT session_user INTO user_db_role;
|
|
|
|
SELECT u, o INTO username, orgname FROM cdb_dataservices_client._cdb_entity_config() AS (u text, o text);
|
|
-- JSON value stored "" is taken as literal
|
|
IF username IS NULL OR username = '' OR username = '""' THEN
|
|
RAISE EXCEPTION 'Username is a mandatory argument';
|
|
END IF;
|
|
|
|
IF orgname IS NULL OR orgname = '' OR orgname = '""' THEN
|
|
user_schema := 'public';
|
|
ELSE
|
|
user_schema := username;
|
|
END IF;
|
|
|
|
SELECT cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(
|
|
username,
|
|
orgname,
|
|
user_db_role,
|
|
user_schema,
|
|
output_table_name,
|
|
params
|
|
) INTO result;
|
|
|
|
RETURN result;
|
|
END;
|
|
$$ LANGUAGE 'plpgsql' SECURITY DEFINER VOLATILE PARALLEL UNSAFE;
|
|
|
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure(
|
|
table_name text,
|
|
output_table_name text,
|
|
params json
|
|
) RETURNS boolean AS $$
|
|
DECLARE
|
|
username text;
|
|
user_db_role text;
|
|
orgname text;
|
|
dbname text;
|
|
user_schema text;
|
|
result boolean;
|
|
BEGIN
|
|
IF session_user = 'publicuser' OR session_user ~ 'cartodb_publicuser_*' THEN
|
|
RAISE EXCEPTION 'The api_key must be provided';
|
|
END IF;
|
|
|
|
SELECT session_user INTO user_db_role;
|
|
|
|
SELECT u, o INTO username, orgname FROM cdb_dataservices_client._cdb_entity_config() AS (u text, o text);
|
|
-- JSON value stored "" is taken as literal
|
|
IF username IS NULL OR username = '' OR username = '""' THEN
|
|
RAISE EXCEPTION 'Username is a mandatory argument';
|
|
END IF;
|
|
|
|
IF orgname IS NULL OR orgname = '' OR orgname = '""' THEN
|
|
user_schema := 'public';
|
|
ELSE
|
|
user_schema := username;
|
|
END IF;
|
|
|
|
SELECT current_database() INTO dbname;
|
|
|
|
SELECT cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(
|
|
username,
|
|
orgname,
|
|
user_db_role,
|
|
user_schema,
|
|
dbname,
|
|
table_name,
|
|
output_table_name,
|
|
params
|
|
) INTO result;
|
|
|
|
RETURN result;
|
|
END;
|
|
$$ LANGUAGE 'plpgsql' SECURITY DEFINER VOLATILE PARALLEL UNSAFE;
|
|
|
|
|
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(
|
|
username text,
|
|
orgname text,
|
|
user_db_role text,
|
|
user_schema text,
|
|
output_table_name text,
|
|
params json
|
|
) RETURNS boolean AS $$
|
|
function_name = 'OBS_GetMeasure'
|
|
# Obtain return types for augmentation procedure
|
|
ds_return_metadata = plpy.execute("SELECT colnames, coltypes "
|
|
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);"
|
|
.format(
|
|
username=plpy.quote_nullable(username),
|
|
orgname=plpy.quote_nullable(orgname),
|
|
function_name=plpy.quote_literal(function_name),
|
|
params=plpy.quote_literal(params)
|
|
)
|
|
)
|
|
if ds_return_metadata[0]["colnames"]:
|
|
colnames_arr = ds_return_metadata[0]["colnames"]
|
|
coltypes_arr = ds_return_metadata[0]["coltypes"]
|
|
else:
|
|
raise Exception('Error retrieving OBS_GetMeasure metadata')
|
|
|
|
|
|
# Prepare column and type strings required in the SQL queries
|
|
columns_with_types_arr = [colnames_arr[i] + ' ' + coltypes_arr[i] for i in range(0,len(colnames_arr))]
|
|
columns_with_types = ','.join(columns_with_types_arr)
|
|
|
|
# Create a new table with the required columns
|
|
plpy.execute('CREATE TABLE "{schema}".{table_name} ( '
|
|
'cartodb_id int, the_geom geometry, {columns_with_types} '
|
|
');'
|
|
.format(schema=user_schema, table_name=output_table_name, columns_with_types=columns_with_types)
|
|
)
|
|
|
|
plpy.execute('ALTER TABLE "{schema}".{table_name} OWNER TO "{user}";'
|
|
.format(schema=user_schema, table_name=output_table_name, user=user_db_role)
|
|
)
|
|
|
|
return True
|
|
$$ LANGUAGE plpythonu VOLATILE PARALLEL UNSAFE;
|
|
|
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(
|
|
username text,
|
|
orgname text,
|
|
user_db_role text,
|
|
user_schema text,
|
|
dbname text,
|
|
table_name text,
|
|
output_table_name text,
|
|
params json
|
|
) RETURNS boolean AS $$
|
|
function_name = 'OBS_GetMeasure'
|
|
# Obtain return types for augmentation procedure
|
|
ds_return_metadata = plpy.execute(
|
|
"SELECT colnames, coltypes "
|
|
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);" .format(
|
|
username=plpy.quote_nullable(username),
|
|
orgname=plpy.quote_nullable(orgname),
|
|
function_name=plpy.quote_literal(function_name),
|
|
params=plpy.quote_literal(params)))
|
|
|
|
if ds_return_metadata[0]["colnames"]:
|
|
colnames_arr = ds_return_metadata[0]["colnames"]
|
|
coltypes_arr = ds_return_metadata[0]["coltypes"]
|
|
else:
|
|
raise Exception('Error retrieving OBS_GetMeasure metadata')
|
|
|
|
# Prepare column and type strings required in the SQL queries
|
|
columns_with_types_arr = [
|
|
colnames_arr[i] +
|
|
' ' +
|
|
coltypes_arr[i] for i in range(
|
|
0,
|
|
len(colnames_arr))]
|
|
columns_with_types = ','.join(columns_with_types_arr)
|
|
aliased_colname_list = ','.join(
|
|
['result.' + name for name in colnames_arr])
|
|
|
|
# Instruct the OBS server side to establish a FDW
|
|
# The metadata is obtained as well in order to:
|
|
# - (a) be able to write the query to grab the actual data to be executed in the remote server via pl/proxy,
|
|
# - (b) be able to tell OBS to free resources when done.
|
|
ds_fdw_metadata = plpy.execute(
|
|
"SELECT schemaname, tabname, servername "
|
|
"FROM cdb_dataservices_client._DST_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, "
|
|
"{schema}::text, {dbname}::text, {table_name}::text);" .format(
|
|
username=plpy.quote_nullable(username),
|
|
orgname=plpy.quote_nullable(orgname),
|
|
user_db_role=plpy.quote_literal(user_db_role),
|
|
schema=plpy.quote_literal(user_schema),
|
|
dbname=plpy.quote_literal(dbname),
|
|
table_name=plpy.quote_literal(table_name)))
|
|
|
|
if ds_fdw_metadata[0]["schemaname"]:
|
|
server_schema = ds_fdw_metadata[0]["schemaname"]
|
|
server_table_name = ds_fdw_metadata[0]["tabname"]
|
|
server_name = ds_fdw_metadata[0]["servername"]
|
|
else:
|
|
raise Exception('Error connecting dataset via FDW')
|
|
|
|
# Create a new table with the required columns
|
|
plpy.execute(
|
|
'INSERT INTO "{schema}".{analysis_table_name} '
|
|
'SELECT ut.cartodb_id, ut.the_geom, {colname_list} '
|
|
'FROM "{schema}".{table_name} ut '
|
|
'LEFT JOIN _DST_FetchJoinFdwTableData({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, '
|
|
'{function_name}::text, {params}::json) '
|
|
'AS result ({columns_with_types}, cartodb_id int) '
|
|
'ON result.cartodb_id = ut.cartodb_id;' .format(
|
|
schema=user_schema,
|
|
analysis_table_name=output_table_name,
|
|
colname_list=aliased_colname_list,
|
|
table_name=table_name,
|
|
username=plpy.quote_nullable(username),
|
|
orgname=plpy.quote_nullable(orgname),
|
|
server_schema=plpy.quote_literal(server_schema),
|
|
server_table_name=plpy.quote_literal(server_table_name),
|
|
function_name=plpy.quote_literal(function_name),
|
|
params=plpy.quote_literal(params),
|
|
columns_with_types=columns_with_types))
|
|
|
|
# Wipe user FDW data from the server
|
|
wiped = plpy.execute(
|
|
"SELECT cdb_dataservices_client._DST_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, "
|
|
"{server_table_name}::text, {fdw_server}::text)" .format(
|
|
username=plpy.quote_nullable(username),
|
|
orgname=plpy.quote_nullable(orgname),
|
|
server_schema=plpy.quote_literal(server_schema),
|
|
server_table_name=plpy.quote_literal(server_table_name),
|
|
fdw_server=plpy.quote_literal(server_name)))
|
|
|
|
return True
|
|
$$ LANGUAGE plpythonu VOLATILE PARALLEL UNSAFE;
|
|
|
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_ConnectUserTable(
|
|
username text,
|
|
orgname text,
|
|
user_db_role text,
|
|
user_schema text,
|
|
dbname text,
|
|
table_name text
|
|
)RETURNS cdb_dataservices_client.ds_fdw_metadata AS $$
|
|
CONNECT cdb_dataservices_client._server_conn_str();
|
|
TARGET cdb_dataservices_server._DST_ConnectUserTable;
|
|
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
|
|
|
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_GetReturnMetadata(
|
|
username text,
|
|
orgname text,
|
|
function_name text,
|
|
params json
|
|
) RETURNS cdb_dataservices_client.ds_return_metadata AS $$
|
|
CONNECT cdb_dataservices_client._server_conn_str();
|
|
TARGET cdb_dataservices_server._DST_GetReturnMetadata;
|
|
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
|
|
|
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_FetchJoinFdwTableData(
|
|
username text,
|
|
orgname text,
|
|
table_schema text,
|
|
table_name text,
|
|
function_name text,
|
|
params json
|
|
) RETURNS SETOF record AS $$
|
|
CONNECT cdb_dataservices_client._server_conn_str();
|
|
TARGET cdb_dataservices_server._DST_FetchJoinFdwTableData;
|
|
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
|
|
|
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(
|
|
username text,
|
|
orgname text,
|
|
table_schema text,
|
|
table_name text,
|
|
server_name text
|
|
) RETURNS boolean AS $$
|
|
CONNECT cdb_dataservices_client._server_conn_str();
|
|
TARGET cdb_dataservices_server._DST_DisconnectUserTable;
|
|
$$ LANGUAGE plproxy VOLATILE PARALLEL UNSAFE;
|