Merge pull request #199 from CartoDB/fdw-function-calls-with-schema
Fdw function calls with schema
This commit is contained in:
commit
eb4564ecee
3
Makefile
3
Makefile
@ -92,7 +92,8 @@ $(EXTENSION)--$(EXTVERSION).sql: $(CDBSCRIPTS) cartodb_version.sql Makefile
|
||||
echo '\echo Use "CREATE EXTENSION $(EXTENSION)" to load this file. \quit' > $@
|
||||
cat $(CDBSCRIPTS) | \
|
||||
$(SED) -e 's/public\./cartodb./g' \
|
||||
-e 's/:DATABASE_USERNAME/cdb_org_admin/g' >> $@
|
||||
-e 's/:DATABASE_USERNAME/cdb_org_admin/g' \
|
||||
-e "s/''public''/''cartodb''/g" >> $@
|
||||
echo "GRANT USAGE ON SCHEMA cartodb TO public;" >> $@
|
||||
cat cartodb_version.sql >> $@
|
||||
|
||||
|
199
scripts-available/CDB_ForeignTable.sql
Normal file
199
scripts-available/CDB_ForeignTable.sql
Normal file
@ -0,0 +1,199 @@
|
||||
---------------------------
|
||||
-- FDW MANAGEMENT FUNCTIONS
|
||||
--
|
||||
-- All the FDW settings are read from the `cdb_conf.fdws` entry json file.
|
||||
---------------------------
|
||||
|
||||
CREATE OR REPLACE FUNCTION cartodb._CDB_Setup_FDW(fdw_name text, config json)
|
||||
RETURNS void
|
||||
AS $$
|
||||
DECLARE
|
||||
row record;
|
||||
option record;
|
||||
org_role text;
|
||||
BEGIN
|
||||
-- This function tries to be as idempotent as possible, by not creating anything more than once
|
||||
-- (not even using IF NOT EXIST to avoid throwing warnings)
|
||||
IF NOT EXISTS ( SELECT * FROM pg_extension WHERE extname = 'postgres_fdw') THEN
|
||||
CREATE EXTENSION postgres_fdw;
|
||||
END IF;
|
||||
-- Create FDW first if it does not exist
|
||||
IF NOT EXISTS ( SELECT * FROM pg_foreign_server WHERE srvname = fdw_name)
|
||||
THEN
|
||||
EXECUTE FORMAT('CREATE SERVER %I FOREIGN DATA WRAPPER postgres_fdw', fdw_name);
|
||||
END IF;
|
||||
|
||||
-- Set FDW settings
|
||||
FOR row IN SELECT p.key, p.value from lateral json_each_text(config->'server') p
|
||||
LOOP
|
||||
IF NOT EXISTS (WITH a AS (select split_part(unnest(srvoptions), '=', 1) as options from pg_foreign_server where srvname=fdw_name) SELECT * from a where options = row.key)
|
||||
THEN
|
||||
EXECUTE FORMAT('ALTER SERVER %I OPTIONS (ADD %I %L)', fdw_name, row.key, row.value);
|
||||
ELSE
|
||||
EXECUTE FORMAT('ALTER SERVER %I OPTIONS (SET %I %L)', fdw_name, row.key, row.value);
|
||||
END IF;
|
||||
END LOOP;
|
||||
|
||||
-- Create user mappings
|
||||
FOR row IN SELECT p.key, p.value from lateral json_each(config->'users') p LOOP
|
||||
-- Check if entry on pg_user_mappings exists
|
||||
|
||||
IF NOT EXISTS ( SELECT * FROM pg_user_mappings WHERE srvname = fdw_name AND usename = row.key ) THEN
|
||||
EXECUTE FORMAT ('CREATE USER MAPPING FOR %I SERVER %I', row.key, fdw_name);
|
||||
END IF;
|
||||
|
||||
-- Update user mapping settings
|
||||
FOR option IN SELECT o.key, o.value from lateral json_each_text(row.value) o LOOP
|
||||
IF NOT EXISTS (WITH a AS (select split_part(unnest(umoptions), '=', 1) as options from pg_user_mappings WHERE srvname = fdw_name AND usename = row.key) SELECT * from a where options = option.key) THEN
|
||||
EXECUTE FORMAT('ALTER USER MAPPING FOR %I SERVER %I OPTIONS (ADD %I %L)', row.key, fdw_name, option.key, option.value);
|
||||
ELSE
|
||||
EXECUTE FORMAT('ALTER USER MAPPING FOR %I SERVER %I OPTIONS (SET %I %L)', row.key, fdw_name, option.key, option.value);
|
||||
END IF;
|
||||
END LOOP;
|
||||
END LOOP;
|
||||
|
||||
-- Create schema if it does not exist.
|
||||
IF NOT EXISTS ( SELECT * from pg_namespace WHERE nspname=fdw_name) THEN
|
||||
EXECUTE FORMAT ('CREATE SCHEMA %I', fdw_name);
|
||||
END IF;
|
||||
|
||||
-- Give the organization role usage permisions over the schema
|
||||
SELECT cartodb.CDB_Organization_Member_Group_Role_Member_Name() INTO org_role;
|
||||
EXECUTE FORMAT ('GRANT USAGE ON SCHEMA %I TO %I', fdw_name, org_role);
|
||||
|
||||
-- Bring here the remote cdb_tablemetadata
|
||||
IF NOT EXISTS ( SELECT * FROM PG_CLASS WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname=fdw_name) and relname='cdb_tablemetadata') THEN
|
||||
EXECUTE FORMAT ('CREATE FOREIGN TABLE %I.cdb_tablemetadata (tabname text, updated_at timestamp with time zone) SERVER %I OPTIONS (table_name ''cdb_tablemetadata_text'', schema_name ''public'', updatable ''false'')', fdw_name, fdw_name);
|
||||
END IF;
|
||||
EXECUTE FORMAT ('GRANT SELECT ON %I.cdb_tablemetadata TO %I', fdw_name, org_role);
|
||||
|
||||
END
|
||||
$$
|
||||
LANGUAGE PLPGSQL;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cartodb._CDB_Setup_FDWS()
|
||||
RETURNS VOID AS
|
||||
$$
|
||||
DECLARE
|
||||
row record;
|
||||
BEGIN
|
||||
FOR row IN SELECT p.key, p.value from lateral json_each(cartodb.CDB_Conf_GetConf('fdws')) p LOOP
|
||||
EXECUTE 'SELECT cartodb._CDB_Setup_FDW($1, $2)' USING row.key, row.value;
|
||||
END LOOP;
|
||||
END
|
||||
$$
|
||||
LANGUAGE PLPGSQL;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION cartodb._CDB_Setup_FDW(fdw_name text)
|
||||
RETURNS void AS
|
||||
$BODY$
|
||||
DECLARE
|
||||
config json;
|
||||
BEGIN
|
||||
SELECT p.value FROM LATERAL json_each(cartodb.CDB_Conf_GetConf('fdws')) p WHERE p.key = fdw_name INTO config;
|
||||
EXECUTE 'SELECT cartodb._CDB_Setup_FDW($1, $2)' USING fdw_name, config;
|
||||
END
|
||||
$BODY$
|
||||
LANGUAGE plpgsql VOLATILE;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cartodb.CDB_Add_Remote_Table(source text, table_name text)
|
||||
RETURNS void AS
|
||||
$$
|
||||
BEGIN
|
||||
PERFORM cartodb._CDB_Setup_FDW(source);
|
||||
EXECUTE FORMAT ('IMPORT FOREIGN SCHEMA %I LIMIT TO (%I) FROM SERVER %I INTO %I;', source, table_name, source, source);
|
||||
--- Grant SELECT to publicuser
|
||||
EXECUTE FORMAT ('GRANT SELECT ON %I.%I TO publicuser;', source, table_name);
|
||||
END
|
||||
$$
|
||||
LANGUAGE plpgsql;
|
||||
|
||||
CREATE OR REPLACE FUNCTION cartodb.CDB_Get_Foreign_Updated_At(foreign_table regclass)
|
||||
RETURNS timestamp with time zone AS
|
||||
$$
|
||||
DECLARE
|
||||
remote_table_name text;
|
||||
fdw_schema_name text;
|
||||
time timestamp with time zone;
|
||||
BEGIN
|
||||
-- This will turn a local foreign table (referenced as regclass) to its fully qualified text remote table reference.
|
||||
WITH a AS (SELECT ftoptions FROM pg_foreign_table WHERE ftrelid=foreign_table LIMIT 1),
|
||||
b as (SELECT (pg_options_to_table(ftoptions)).* FROM a)
|
||||
SELECT FORMAT('%I.%I', (SELECT option_value FROM b WHERE option_name='schema_name'), (SELECT option_value FROM b WHERE option_name='table_name'))
|
||||
INTO remote_table_name;
|
||||
|
||||
-- We assume that the remote cdb_tablemetadata is called cdb_tablemetadata and is on the same schema as the queried table.
|
||||
SELECT nspname FROM pg_class c, pg_namespace n WHERE c.oid=foreign_table AND c.relnamespace = n.oid INTO fdw_schema_name;
|
||||
EXECUTE FORMAT('SELECT updated_at FROM %I.cdb_tablemetadata WHERE tabname=%L ORDER BY updated_at DESC LIMIT 1', fdw_schema_name, remote_table_name) INTO time;
|
||||
RETURN time;
|
||||
END
|
||||
$$
|
||||
LANGUAGE plpgsql;
|
||||
|
||||
|
||||
CREATE OR REPLACE FUNCTION cartodb._cdb_dbname_of_foreign_table(reloid oid)
|
||||
RETURNS TEXT AS $$
|
||||
SELECT option_value FROM pg_options_to_table((
|
||||
|
||||
SELECT fs.srvoptions
|
||||
FROM pg_foreign_table ft
|
||||
LEFT JOIN pg_foreign_server fs ON ft.ftserver = fs.oid
|
||||
WHERE ft.ftrelid = reloid
|
||||
|
||||
)) WHERE option_name='dbname';
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
|
||||
-- Return a set of (dbname, schema_name, table_name, updated_at)
|
||||
-- It is aware of foreign tables
|
||||
-- It assumes the local (schema_name, table_name) map to the remote ones with the same name
|
||||
-- Note: dbname is never quoted whereas schema and table names are when needed.
|
||||
CREATE OR REPLACE FUNCTION cartodb.CDB_QueryTables_Updated_At(query text)
|
||||
RETURNS TABLE(dbname text, schema_name text, table_name text, updated_at timestamptz)
|
||||
AS $$
|
||||
WITH query_tables AS (
|
||||
SELECT unnest(CDB_QueryTablesText(query)) schema_table_name
|
||||
), query_tables_oid AS (
|
||||
SELECT schema_table_name, schema_table_name::regclass::oid AS reloid
|
||||
FROM query_tables
|
||||
),
|
||||
fqtn AS (
|
||||
SELECT
|
||||
(CASE WHEN c.relkind = 'f' THEN cartodb._cdb_dbname_of_foreign_table(query_tables_oid.reloid)
|
||||
ELSE current_database()
|
||||
END)::text AS dbname,
|
||||
quote_ident(n.nspname::text) schema_name,
|
||||
quote_ident(c.relname::text) table_name,
|
||||
c.relkind,
|
||||
query_tables_oid.reloid
|
||||
FROM query_tables_oid, pg_catalog.pg_class c
|
||||
LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid
|
||||
WHERE c.oid = query_tables_oid.reloid
|
||||
)
|
||||
SELECT fqtn.dbname, fqtn.schema_name, fqtn.table_name,
|
||||
(CASE WHEN relkind = 'f' THEN cartodb.CDB_Get_Foreign_Updated_At(reloid)
|
||||
ELSE (SELECT md.updated_at FROM CDB_TableMetadata md WHERE md.tabname = reloid)
|
||||
END) AS updated_at
|
||||
FROM fqtn;
|
||||
$$ LANGUAGE SQL;
|
||||
|
||||
|
||||
-- Return the last updated time of a set of tables
|
||||
-- It is aware of foreign tables
|
||||
-- It assumes the local (schema_name, table_name) map to the remote ones with the same name
|
||||
CREATE OR REPLACE FUNCTION cartodb.CDB_Last_Updated_Time(tables text[])
|
||||
RETURNS timestamptz AS $$
|
||||
WITH t AS (
|
||||
SELECT unnest(tables) AS schema_table_name
|
||||
), t_oid AS (
|
||||
SELECT (t.schema_table_name)::regclass::oid as reloid FROM t
|
||||
), t_updated_at AS (
|
||||
SELECT
|
||||
(CASE WHEN relkind = 'f' THEN cartodb.CDB_Get_Foreign_Updated_At(reloid)
|
||||
ELSE (SELECT md.updated_at FROM CDB_TableMetadata md WHERE md.tabname = reloid)
|
||||
END) AS updated_at
|
||||
FROM t_oid
|
||||
LEFT JOIN pg_catalog.pg_class c ON c.oid = reloid
|
||||
) SELECT max(updated_at) FROM t_updated_at;
|
||||
$$ LANGUAGE SQL;
|
@ -5,6 +5,11 @@ CREATE TABLE IF NOT EXISTS
|
||||
updated_at timestamp with time zone not null default now()
|
||||
);
|
||||
|
||||
CREATE OR REPLACE VIEW public.CDB_TableMetadata_Text AS
|
||||
SELECT FORMAT('%I.%I', n.nspname::text, c.relname::text) tabname, updated_at
|
||||
FROM public.CDB_TableMetadata, pg_catalog.pg_class c
|
||||
LEFT JOIN pg_catalog.pg_namespace n ON c.relnamespace = n.oid;
|
||||
|
||||
-- No one can see this
|
||||
-- Updates are only possible trough the security definer trigger
|
||||
-- GRANT SELECT ON public.CDB_TableMetadata TO public;
|
||||
|
1
scripts-enabled/250-CDB_ForeignTable.sql
Symbolic link
1
scripts-enabled/250-CDB_ForeignTable.sql
Symbolic link
@ -0,0 +1 @@
|
||||
../scripts-available/CDB_ForeignTable.sql
|
@ -172,8 +172,7 @@ function drop_raster_table() {
|
||||
sql ${ROLE} "DROP TABLE ${ROLE}.${TABLENAME};"
|
||||
}
|
||||
|
||||
|
||||
function setup() {
|
||||
function setup_database() {
|
||||
${CMD} -c "CREATE DATABASE ${DATABASE}"
|
||||
sql "CREATE SCHEMA cartodb;"
|
||||
sql "GRANT USAGE ON SCHEMA cartodb TO public;"
|
||||
@ -184,7 +183,10 @@ function setup() {
|
||||
${CMD} -d ${DATABASE} -f scripts-available/CDB_Organizations.sql
|
||||
# trick to allow forcing a schema when loading SQL files (see: http://bit.ly/1HeLnhL)
|
||||
${CMD} -d ${DATABASE} -f test/extension/run_at_cartodb_schema.sql
|
||||
}
|
||||
|
||||
function setup() {
|
||||
setup_database
|
||||
|
||||
log_info "############################# SETUP #############################"
|
||||
create_role_and_schema cdb_testmember_1
|
||||
@ -199,6 +201,10 @@ function setup() {
|
||||
sql cdb_testmember_2 'SELECT * FROM cdb_testmember_2.bar;'
|
||||
}
|
||||
|
||||
|
||||
function tear_down_database() {
|
||||
${CMD} -c "DROP DATABASE ${DATABASE}"
|
||||
}
|
||||
function tear_down() {
|
||||
log_info "########################### USER TEAR DOWN ###########################"
|
||||
sql cdb_testmember_1 "SELECT * FROM cartodb.CDB_Organization_Remove_Access_Permission('cdb_testmember_1', 'foo', 'cdb_testmember_2');"
|
||||
@ -219,9 +225,10 @@ function tear_down() {
|
||||
sql 'DROP ROLE cdb_testmember_1;'
|
||||
sql 'DROP ROLE cdb_testmember_2;'
|
||||
|
||||
${CMD} -c "DROP DATABASE ${DATABASE}"
|
||||
tear_down_database
|
||||
}
|
||||
|
||||
|
||||
function run_tests() {
|
||||
local FAILED_TESTS=()
|
||||
|
||||
@ -429,6 +436,98 @@ function test_cdb_querytables_happy_cases() {
|
||||
sql postgres 'DROP SCHEMA foo;'
|
||||
}
|
||||
|
||||
function test_foreign_tables() {
|
||||
${CMD} -d ${DATABASE} -f scripts-available/CDB_QueryStatements.sql
|
||||
${CMD} -d ${DATABASE} -f scripts-available/CDB_QueryTables.sql
|
||||
${CMD} -d ${DATABASE} -f scripts-available/CDB_TableMetadata.sql
|
||||
${CMD} -d ${DATABASE} -f scripts-available/CDB_Conf.sql
|
||||
${CMD} -d ${DATABASE} -f scripts-available/CDB_ForeignTable.sql
|
||||
|
||||
|
||||
DATABASE=fdw_target setup_database
|
||||
${CMD} -d fdw_target -f scripts-available/CDB_QueryStatements.sql
|
||||
${CMD} -d fdw_target -f scripts-available/CDB_QueryTables.sql
|
||||
${CMD} -d fdw_target -f scripts-available/CDB_TableMetadata.sql
|
||||
|
||||
DATABASE=fdw_target sql postgres 'CREATE SCHEMA test_fdw;'
|
||||
DATABASE=fdw_target sql postgres 'CREATE TABLE test_fdw.foo (a int);'
|
||||
DATABASE=fdw_target sql postgres 'INSERT INTO test_fdw.foo (a) values (42);'
|
||||
DATABASE=fdw_target sql postgres 'CREATE TABLE test_fdw.foo2 (a int);'
|
||||
DATABASE=fdw_target sql postgres 'INSERT INTO test_fdw.foo2 (a) values (42);'
|
||||
DATABASE=fdw_target sql postgres "CREATE USER fdw_user WITH PASSWORD 'foobarino';"
|
||||
DATABASE=fdw_target sql postgres 'GRANT USAGE ON SCHEMA test_fdw TO fdw_user;'
|
||||
DATABASE=fdw_target sql postgres 'GRANT SELECT ON TABLE test_fdw.foo TO fdw_user;'
|
||||
DATABASE=fdw_target sql postgres 'GRANT SELECT ON TABLE test_fdw.foo2 TO fdw_user;'
|
||||
DATABASE=fdw_target sql postgres 'GRANT SELECT ON cdb_tablemetadata_text TO fdw_user;'
|
||||
|
||||
DATABASE=fdw_target sql postgres "SELECT cdb_tablemetadatatouch('test_fdw.foo'::regclass);"
|
||||
DATABASE=fdw_target sql postgres "SELECT cdb_tablemetadatatouch('test_fdw.foo2'::regclass);"
|
||||
|
||||
sql postgres "SELECT cartodb.CDB_Conf_SetConf('fdws', '{\"test_fdw\": {\"server\": {\"host\": \"localhost\", \"dbname\": \"fdw_target\"},
|
||||
\"users\": {\"public\": {\"user\": \"fdw_user\", \"password\": \"foobarino\"}}}}')"
|
||||
|
||||
sql postgres "SELECT cartodb._CDB_Setup_FDW('test_fdw')"
|
||||
|
||||
sql postgres "SHOW server_version_num"
|
||||
if [ "$RESULT" -gt 90499 ]
|
||||
then
|
||||
sql postgres "SELECT cartodb.CDB_Add_Remote_Table('test_fdw', 'foo')"
|
||||
sql postgres "SELECT * from test_fdw.foo;"
|
||||
else
|
||||
echo "NOTICE: PostgreSQL version is less than 9.5 ($RESULT). Skipping CDB_Add_Remote_Table."
|
||||
sql postgres "CREATE FOREIGN TABLE test_fdw.foo (a int) SERVER test_fdw OPTIONS (table_name 'foo', schema_name 'test_fdw')"
|
||||
fi
|
||||
|
||||
sql postgres "SELECT n.nspname,
|
||||
c.relname,
|
||||
s.srvname FROM pg_catalog.pg_foreign_table ft
|
||||
INNER JOIN pg_catalog.pg_class c ON c.oid = ft.ftrelid
|
||||
INNER JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
|
||||
INNER JOIN pg_catalog.pg_foreign_server s ON s.oid = ft.ftserver
|
||||
ORDER BY 1, 2" should "test_fdw|cdb_tablemetadata|test_fdw
|
||||
test_fdw|foo|test_fdw"
|
||||
|
||||
sql postgres "SELECT cartodb.CDB_Get_Foreign_Updated_At('test_fdw.foo'::regclass) < NOW()" should 't'
|
||||
|
||||
sql postgres "SELECT a from test_fdw.foo LIMIT 1;" should 42
|
||||
|
||||
# Check function CDB_QueryTables_Updated_At
|
||||
sql postgres 'CREATE TABLE local (b int);'
|
||||
sql postgres 'INSERT INTO local (b) VALUES (43);'
|
||||
sql postgres "SELECT cdb_tablemetadatatouch('public.local'::regclass);"
|
||||
local query='$query$ SELECT * FROM test_fdw.foo, local $query$::text'
|
||||
sql postgres "SELECT dbname, schema_name, table_name FROM cartodb.CDB_QueryTables_Updated_At(${query}) ORDER BY dbname;" should 'fdw_target|test_fdw|foo
|
||||
test_extension|public|local'
|
||||
sql postgres "SELECT table_name FROM cartodb.CDB_QueryTables_Updated_At(${query}) order by updated_at;" should 'foo
|
||||
local'
|
||||
|
||||
# Check function CDB_Last_Updated_Time
|
||||
sql postgres "SELECT cartodb.CDB_Last_Updated_Time('{test_fdw.foo,public.local}'::text[]) < now()" should 't'
|
||||
sql postgres "SELECT cartodb.CDB_Last_Updated_Time('{test_fdw.foo,public.local}'::text[]) > (now() - interval '1 minute')" should 't'
|
||||
|
||||
# Check we quote names on output as needed (as CDB_QueryTablesText does)
|
||||
sql postgres 'CREATE TABLE "local-table-with-dashes" (c int)';
|
||||
sql postgres 'INSERT INTO "local-table-with-dashes" (c) VALUES (44)';
|
||||
sql postgres "SELECT cdb_tablemetadatatouch('public.local-table-with-dashes'::regclass);"
|
||||
query='$query$ SELECT * FROM test_fdw.foo, local, public."local-table-with-dashes" $query$::text'
|
||||
sql postgres "SELECT dbname, schema_name, table_name FROM cartodb.CDB_QueryTables_Updated_At(${query}) ORDER BY dbname, schema_name, table_name;" should 'fdw_target|test_fdw|foo
|
||||
test_extension|public|local
|
||||
test_extension|public|"local-table-with-dashes"'
|
||||
|
||||
# Check CDB_Last_Updated_Time supports quoted identifiers
|
||||
sql postgres "SELECT cartodb.CDB_Last_Updated_Time(ARRAY['test_extension.public.\"local-table-with-dashes\"']::text[]) < now()" should 't'
|
||||
sql postgres "SELECT cartodb.CDB_Last_Updated_Time(ARRAY['test_extension.public.\"local-table-with-dashes\"']::text[]) > (now() - interval '1 minute')" should 't'
|
||||
|
||||
DATABASE=fdw_target sql postgres 'REVOKE USAGE ON SCHEMA test_fdw FROM fdw_user;'
|
||||
DATABASE=fdw_target sql postgres 'REVOKE SELECT ON test_fdw.foo FROM fdw_user;'
|
||||
DATABASE=fdw_target sql postgres 'REVOKE SELECT ON test_fdw.foo2 FROM fdw_user;'
|
||||
DATABASE=fdw_target sql postgres 'REVOKE SELECT ON cdb_tablemetadata_text FROM fdw_user;'
|
||||
DATABASE=fdw_target sql postgres 'DROP ROLE fdw_user;'
|
||||
|
||||
sql postgres "select pg_terminate_backend(pid) from pg_stat_activity where datname='fdw_target';"
|
||||
DATABASE=fdw_target tear_down_database
|
||||
}
|
||||
|
||||
#################################################### TESTS END HERE ####################################################
|
||||
|
||||
run_tests $@
|
||||
|
Loading…
Reference in New Issue
Block a user