Add populate table function and several fixes
This commit is contained in:
parent
fc291a7c63
commit
1e9b551160
@ -1,8 +1,11 @@
|
|||||||
CREATE TYPE cdb_dataservices_client.ds_fdw_metadata as (schemaname text, tabname text, servername text);
|
CREATE TYPE cdb_dataservices_client.ds_fdw_metadata as (schemaname text, tabname text, servername text);
|
||||||
CREATE TYPE cdb_dataservices_client.ds_return_metadata as (colnames text[], coltypes text[]);
|
CREATE TYPE cdb_dataservices_client.ds_return_metadata as (colnames text[], coltypes text[]);
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(table_name text, output_table_name text, function_name text, params json)
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(
|
||||||
RETURNS boolean AS $$
|
output_table_name text,
|
||||||
|
function_name text,
|
||||||
|
params json
|
||||||
|
) RETURNS boolean AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
username text;
|
username text;
|
||||||
user_db_role text;
|
user_db_role text;
|
||||||
@ -28,14 +31,26 @@ BEGIN
|
|||||||
user_schema := username;
|
user_schema := username;
|
||||||
END IF;
|
END IF;
|
||||||
|
|
||||||
SELECT cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(username, orgname, user_db_role, user_schema, output_table_name, function_name, params) INTO result;
|
SELECT cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(
|
||||||
|
username,
|
||||||
|
orgname,
|
||||||
|
user_db_role,
|
||||||
|
user_schema,
|
||||||
|
output_table_name,
|
||||||
|
function_name,
|
||||||
|
params
|
||||||
|
) INTO result;
|
||||||
|
|
||||||
RETURN result;
|
RETURN result;
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE 'plpgsql' SECURITY DEFINER;
|
$$ LANGUAGE 'plpgsql' SECURITY DEFINER;
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure(table_name text, output_table_name text, function_name text, params json)
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure(
|
||||||
RETURNS boolean AS $$
|
table_name text,
|
||||||
|
output_table_name text,
|
||||||
|
function_name text,
|
||||||
|
params json
|
||||||
|
) RETURNS boolean AS $$
|
||||||
DECLARE
|
DECLARE
|
||||||
username text;
|
username text;
|
||||||
user_db_role text;
|
user_db_role text;
|
||||||
@ -64,15 +79,32 @@ BEGIN
|
|||||||
|
|
||||||
SELECT current_database() INTO dbname;
|
SELECT current_database() INTO dbname;
|
||||||
|
|
||||||
SELECT cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(username, orgname, user_db_role, user_schema, dbname, table_name, output_table_name, function_name, params) INTO result;
|
SELECT cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(
|
||||||
|
username,
|
||||||
|
orgname,
|
||||||
|
user_db_role,
|
||||||
|
user_schema,
|
||||||
|
dbname,
|
||||||
|
table_name,
|
||||||
|
output_table_name,
|
||||||
|
function_name,
|
||||||
|
params
|
||||||
|
) INTO result;
|
||||||
|
|
||||||
RETURN result;
|
RETURN result;
|
||||||
END;
|
END;
|
||||||
$$ LANGUAGE 'plpgsql' SECURITY DEFINER;
|
$$ LANGUAGE 'plpgsql' SECURITY DEFINER;
|
||||||
|
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(username text, orgname text, user_db_role text, user_schema text, output_table_name text, function_name text, params json)
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PrepareTableOBS_GetMeasure(
|
||||||
RETURNS boolean AS $$
|
username text,
|
||||||
|
orgname text,
|
||||||
|
user_db_role text,
|
||||||
|
user_schema text,
|
||||||
|
output_table_name text,
|
||||||
|
function_name text,
|
||||||
|
params json
|
||||||
|
) RETURNS boolean AS $$
|
||||||
# Obtain return types for augmentation procedure
|
# Obtain return types for augmentation procedure
|
||||||
ds_return_metadata = plpy.execute("SELECT colnames, coltypes "
|
ds_return_metadata = plpy.execute("SELECT colnames, coltypes "
|
||||||
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);"
|
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);"
|
||||||
@ -105,27 +137,154 @@ RETURNS boolean AS $$
|
|||||||
return True
|
return True
|
||||||
$$ LANGUAGE plpythonu;
|
$$ LANGUAGE plpythonu;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client.__DST_PopulateTableOBS_GetMeasure(
|
||||||
|
username text,
|
||||||
|
orgname text,
|
||||||
|
user_db_role text,
|
||||||
|
user_schema text,
|
||||||
|
dbname text,
|
||||||
|
table_name text,
|
||||||
|
output_table_name text,
|
||||||
|
function_name text,
|
||||||
|
params json
|
||||||
|
) RETURNS boolean AS $$
|
||||||
|
# Obtain return types for augmentation procedure
|
||||||
|
ds_return_metadata = plpy.execute(
|
||||||
|
"SELECT colnames, coltypes "
|
||||||
|
"FROM cdb_dataservices_client._DST_GetReturnMetadata({username}::text, {orgname}::text, {function_name}::text, {params}::json);" .format(
|
||||||
|
username=plpy.quote_nullable(username),
|
||||||
|
orgname=plpy.quote_nullable(orgname),
|
||||||
|
function_name=plpy.quote_literal(function_name),
|
||||||
|
params=plpy.quote_literal(params)))
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_ConnectUserTable(username text, orgname text, user_db_role text, user_schema text, dbname text, table_name text)
|
colnames_arr = ds_return_metadata[0]["colnames"]
|
||||||
RETURNS cdb_dataservices_client.ds_fdw_metadata AS $$
|
coltypes_arr = ds_return_metadata[0]["coltypes"]
|
||||||
|
|
||||||
|
# Prepare column and type strings required in the SQL queries
|
||||||
|
columns_with_types_arr = [
|
||||||
|
colnames_arr[i] +
|
||||||
|
' ' +
|
||||||
|
coltypes_arr[i] for i in range(
|
||||||
|
0,
|
||||||
|
len(colnames_arr))]
|
||||||
|
columns_with_types = ','.join(columns_with_types_arr)
|
||||||
|
aliased_colname_list = ','.join(
|
||||||
|
['result.' + name for name in colnames_arr])
|
||||||
|
|
||||||
|
# Instruct the OBS server side to establish a FDW
|
||||||
|
# The metadata is obtained as well in order to:
|
||||||
|
# - (a) be able to write the query to grab the actual data to be executed in the remote server via pl/proxy,
|
||||||
|
# - (b) be able to tell OBS to free resources when done.
|
||||||
|
ds_fdw_metadata = plpy.execute(
|
||||||
|
"SELECT schemaname, tabname, servername "
|
||||||
|
"FROM cdb_dataservices_client._DST_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, "
|
||||||
|
"{schema}::text, {dbname}::text, {table_name}::text);" .format(
|
||||||
|
username=plpy.quote_nullable(username),
|
||||||
|
orgname=plpy.quote_nullable(orgname),
|
||||||
|
user_db_role=plpy.quote_literal(user_db_role),
|
||||||
|
schema=plpy.quote_literal(user_schema),
|
||||||
|
dbname=plpy.quote_literal(dbname),
|
||||||
|
table_name=plpy.quote_literal(table_name)))
|
||||||
|
|
||||||
|
server_schema = ds_fdw_metadata[0]["schemaname"]
|
||||||
|
server_table_name = ds_fdw_metadata[0]["tabname"]
|
||||||
|
server_name = ds_fdw_metadata[0]["servername"]
|
||||||
|
|
||||||
|
# Create a new table with the required columns
|
||||||
|
|
||||||
|
plpy.warning(
|
||||||
|
'INSERT INTO "{schema}".{analysis_table_name} '
|
||||||
|
'SELECT ut.cartodb_id, ut.the_geom, {colname_list} '
|
||||||
|
'FROM "{schema}".{table_name} ut '
|
||||||
|
'LEFT JOIN _DST_FetchJoinFdwTableData({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, '
|
||||||
|
'{function_name}::text, {params}::json) '
|
||||||
|
'AS result ({columns_with_types}, cartodb_id int) '
|
||||||
|
'ON result.cartodb_id = ut.cartodb_id;' .format(
|
||||||
|
schema=user_schema,
|
||||||
|
analysis_table_name=output_table_name,
|
||||||
|
colname_list=aliased_colname_list,
|
||||||
|
table_name=table_name,
|
||||||
|
username=plpy.quote_nullable(username),
|
||||||
|
orgname=plpy.quote_nullable(orgname),
|
||||||
|
server_schema=plpy.quote_literal(server_schema),
|
||||||
|
server_table_name=plpy.quote_literal(server_table_name),
|
||||||
|
function_name=plpy.quote_literal(function_name),
|
||||||
|
params=plpy.quote_literal(params),
|
||||||
|
columns_with_types=columns_with_types))
|
||||||
|
plpy.execute(
|
||||||
|
'INSERT INTO "{schema}".{analysis_table_name} '
|
||||||
|
'SELECT ut.cartodb_id, ut.the_geom, {colname_list} '
|
||||||
|
'FROM "{schema}".{table_name} ut '
|
||||||
|
'LEFT JOIN _DST_FetchJoinFdwTableData({username}::text, {orgname}::text, {server_schema}::text, {server_table_name}::text, '
|
||||||
|
'{function_name}::text, {params}::json) '
|
||||||
|
'AS result ({columns_with_types}, cartodb_id int) '
|
||||||
|
'ON result.cartodb_id = ut.cartodb_id;' .format(
|
||||||
|
schema=user_schema,
|
||||||
|
analysis_table_name=output_table_name,
|
||||||
|
colname_list=aliased_colname_list,
|
||||||
|
table_name=table_name,
|
||||||
|
username=plpy.quote_nullable(username),
|
||||||
|
orgname=plpy.quote_nullable(orgname),
|
||||||
|
server_schema=plpy.quote_literal(server_schema),
|
||||||
|
server_table_name=plpy.quote_literal(server_table_name),
|
||||||
|
function_name=plpy.quote_literal(function_name),
|
||||||
|
params=plpy.quote_literal(params),
|
||||||
|
columns_with_types=columns_with_types))
|
||||||
|
|
||||||
|
# Wipe user FDW data from the server
|
||||||
|
wiped = plpy.execute(
|
||||||
|
"SELECT cdb_dataservices_client._DST_DisconnectUserTable({username}::text, {orgname}::text, {server_schema}::text, "
|
||||||
|
"{server_table_name}::text, {fdw_server}::text)" .format(
|
||||||
|
username=plpy.quote_nullable(username),
|
||||||
|
orgname=plpy.quote_nullable(orgname),
|
||||||
|
server_schema=plpy.quote_literal(server_schema),
|
||||||
|
server_table_name=plpy.quote_literal(server_table_name),
|
||||||
|
fdw_server=plpy.quote_literal(server_name)))
|
||||||
|
|
||||||
|
return True
|
||||||
|
$$ LANGUAGE plpythonu;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_ConnectUserTable(
|
||||||
|
username text,
|
||||||
|
orgname text,
|
||||||
|
user_db_role text,
|
||||||
|
user_schema text,
|
||||||
|
dbname text,
|
||||||
|
table_name text
|
||||||
|
)RETURNS cdb_dataservices_client.ds_fdw_metadata AS $$
|
||||||
CONNECT _server_conn_str();
|
CONNECT _server_conn_str();
|
||||||
TARGET cdb_dataservices_server._DST_ConnectUserTable;
|
TARGET cdb_dataservices_server._DST_ConnectUserTable;
|
||||||
$$ LANGUAGE plproxy;
|
$$ LANGUAGE plproxy;
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_GetReturnMetadata(username text, orgname text, function_name text, params json)
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_GetReturnMetadata(
|
||||||
RETURNS cdb_dataservices_client.ds_return_metadata AS $$
|
username text,
|
||||||
|
orgname text,
|
||||||
|
function_name text,
|
||||||
|
params json
|
||||||
|
) RETURNS cdb_dataservices_client.ds_return_metadata AS $$
|
||||||
CONNECT _server_conn_str();
|
CONNECT _server_conn_str();
|
||||||
TARGET cdb_dataservices_server._DST_GetReturnMetadata;
|
TARGET cdb_dataservices_server._DST_GetReturnMetadata;
|
||||||
$$ LANGUAGE plproxy;
|
$$ LANGUAGE plproxy;
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_FetchJoinFdwTableData(
|
||||||
RETURNS SETOF record AS $$
|
username text,
|
||||||
|
orgname text,
|
||||||
|
table_schema text,
|
||||||
|
table_name text,
|
||||||
|
function_name text,
|
||||||
|
params json
|
||||||
|
) RETURNS SETOF record AS $$
|
||||||
CONNECT _server_conn_str();
|
CONNECT _server_conn_str();
|
||||||
TARGET cdb_dataservices_server._DST_FetchJoinFdwTableData;
|
TARGET cdb_dataservices_server._DST_FetchJoinFdwTableData;
|
||||||
$$ LANGUAGE plproxy;
|
$$ LANGUAGE plproxy;
|
||||||
|
|
||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, server_name text)
|
CREATE OR REPLACE FUNCTION cdb_dataservices_client._DST_DisconnectUserTable(
|
||||||
RETURNS boolean AS $$
|
username text,
|
||||||
|
orgname text,
|
||||||
|
table_schema text,
|
||||||
|
table_name text,
|
||||||
|
server_name text
|
||||||
|
) RETURNS boolean AS $$
|
||||||
CONNECT _server_conn_str();
|
CONNECT _server_conn_str();
|
||||||
TARGET cdb_dataservices_server._DST_DisconnectUserTable;
|
TARGET cdb_dataservices_server._DST_DisconnectUserTable;
|
||||||
$$ LANGUAGE plproxy;
|
$$ LANGUAGE plproxy;
|
||||||
|
@ -1 +1,2 @@
|
|||||||
GRANT EXECUTE ON FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(table_name text, output_table_name text, function_name text, params json) TO publicuser;
|
GRANT EXECUTE ON FUNCTION cdb_dataservices_client._DST_PrepareTableOBS_GetMeasure(output_table_name text, function_name text, params json) TO publicuser;
|
||||||
|
GRANT EXECUTE ON FUNCTION cdb_dataservices_client._DST_PopulateTableOBS_GetMeasure(table_name text, output_table_name text, function_name text, params json) TO publicuser;
|
||||||
|
@ -5,7 +5,7 @@ CREATE TYPE cdb_dataservices_server.ds_return_metadata as (colnames text[], colt
|
|||||||
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text)
|
CREATE OR REPLACE FUNCTION cdb_dataservices_server._DST_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text)
|
||||||
RETURNS cdb_dataservices_server.ds_fdw_metadata AS $$
|
RETURNS cdb_dataservices_server.ds_fdw_metadata AS $$
|
||||||
host_addr = plpy.execute("SELECT split_part(inet_client_addr()::text, '/', 1) as user_host")[0]['user_host']
|
host_addr = plpy.execute("SELECT split_part(inet_client_addr()::text, '/', 1) as user_host")[0]['user_host']
|
||||||
return plpy.execute("SELECT * FROM cdb_dataservices_server.__OBS_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, {schema}::text, {dbname}::text, {host_addr}::text, {table_name}::text)"
|
return plpy.execute("SELECT * FROM cdb_dataservices_server.__DST_ConnectUserTable({username}::text, {orgname}::text, {user_db_role}::text, {schema}::text, {dbname}::text, {host_addr}::text, {table_name}::text)"
|
||||||
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), user_db_role=plpy.quote_literal(user_db_role), schema=plpy.quote_literal(input_schema), dbname=plpy.quote_literal(dbname), table_name=plpy.quote_literal(table_name), host_addr=plpy.quote_literal(host_addr))
|
.format(username=plpy.quote_nullable(username), orgname=plpy.quote_nullable(orgname), user_db_role=plpy.quote_literal(user_db_role), schema=plpy.quote_literal(input_schema), dbname=plpy.quote_literal(dbname), table_name=plpy.quote_literal(table_name), host_addr=plpy.quote_literal(host_addr))
|
||||||
)[0]
|
)[0]
|
||||||
$$ LANGUAGE plpythonu;
|
$$ LANGUAGE plpythonu;
|
||||||
|
Loading…
Reference in New Issue
Block a user