Add federated server list remotes

Includes:
- CDB_Federated_Server_List_Remote_Schemas
- CDB_Federated_Server_List_Remote_Tables
This commit is contained in:
Raul Marin 2019-10-29 18:26:36 +01:00
parent e22aaee5f5
commit 144c4e722b
5 changed files with 286 additions and 29 deletions

View File

@ -11,20 +11,19 @@ AS $$
$$
LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
-- Produce a valid DB name for servers generated for the Federated Server
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Generate_Server_Name(input_name TEXT, check_existence BOOL)
RETURNS NAME
AS $$
DECLARE
object_name text := format('%s%s', @extschema@.__CDB_FS_Name_Pattern(), input_name);
internal_server_name text := format('%s%s', @extschema@.__CDB_FS_Name_Pattern(), input_name);
BEGIN
-- We discard anything that would be truncated
IF (char_length(object_name) < 64) THEN
IF (check_existence AND (NOT EXISTS (SELECT * FROM pg_foreign_server WHERE srvname = object_name))) THEN
IF (char_length(internal_server_name) < 64) THEN
IF (check_existence AND (NOT EXISTS (SELECT * FROM pg_foreign_server WHERE srvname = internal_server_name))) THEN
RAISE EXCEPTION 'Server "%" does not exist', input_name;
END IF;
RETURN object_name::name;
RETURN internal_server_name::name;
ELSE
RAISE EXCEPTION 'Server name is too long to be used as identifier';
END IF;
@ -32,21 +31,20 @@ END
$$
LANGUAGE PLPGSQL IMMUTABLE PARALLEL SAFE;
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Extract_Server_Name(fdw_stored_name NAME)
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Extract_Server_Name(internal_server_name NAME)
RETURNS TEXT
AS $$
SELECT right(fdw_stored_name,
char_length(fdw_stored_name::TEXT) - char_length(@extschema@.__CDB_FS_Name_Pattern()))::TEXT;
SELECT right(internal_server_name,
char_length(internal_server_name::TEXT) - char_length(@extschema@.__CDB_FS_Name_Pattern()))::TEXT;
$$
LANGUAGE SQL IMMUTABLE PARALLEL SAFE;
-- Produce a valid name for a schema generated for the Federated Server
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Generate_Schema_Name(server_name TEXT, schema_name TEXT)
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Generate_Schema_Name(internal_server_name TEXT, schema_name TEXT)
RETURNS NAME
AS $$
DECLARE
server_full_name text := @extschema@.__CDB_FS_Generate_Server_Name(server_name, check_existence := true);
hash_value text := md5(server_full_name::text || '__' || schema_name::text);
hash_value text := md5(internal_server_name::text || '__' || schema_name::text);
schema_name text := format('%s%s%s', @extschema@.__CDB_FS_Name_Pattern(), 'schema_', hash_value);
BEGIN
RETURN schema_name::name;
@ -56,12 +54,11 @@ LANGUAGE PLPGSQL IMMUTABLE PARALLEL SAFE;
-- Produce a valid name for a role generated for the Federated Server
-- This needs to include the current database in its hash to avoid collisions in clusters with more than one database
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Generate_Server_Role_Name(server_name TEXT)
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Generate_Server_Role_Name(internal_server_name TEXT)
RETURNS NAME
AS $$
DECLARE
server_full_name text := @extschema@.__CDB_FS_Generate_Server_Name(server_name, check_existence := true);
hash_value text := md5(current_database()::text || '__' || server_full_name::text);
hash_value text := md5(current_database()::text || '__' || internal_server_name::text);
role_name text := format('%s%s%s', @extschema@.__CDB_FS_Name_Pattern(), 'role_', hash_value);
BEGIN
RETURN role_name::name;
@ -70,22 +67,36 @@ $$
LANGUAGE PLPGSQL IMMUTABLE PARALLEL SAFE;
-- Creates (if not exist) a schema to place the objects for a remote schema
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Create_Schema(server_name TEXT, schema_name TEXT)
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_Create_Schema(internal_server_name TEXT, schema_name TEXT)
RETURNS NAME
AS $$
DECLARE
schema_name text := @extschema@.__CDB_FS_Generate_Schema_Name(server_name, schema_name);
role_name text := @extschema@.__CDB_FS_Generate_Server_Role_Name(server_name);
schema_name text := @extschema@.__CDB_FS_Generate_Schema_Name(internal_server_name, schema_name);
role_name text := @extschema@.__CDB_FS_Generate_Server_Role_Name(internal_server_name);
BEGIN
BEGIN
EXECUTE 'CREATE SCHEMA IF NOT EXISTS ' || quote_ident(schema_name) || ' AUTHORIZATION ' || quote_ident(role_name);
EXCEPTION WHEN OTHERS THEN
RAISE EXCEPTION 'TODO: This needs a better error handling after reviewing permissions';
END;
IF NOT EXISTS (SELECT oid FROM pg_namespace WHERE nspname = schema_name) THEN
BEGIN
EXECUTE 'CREATE SCHEMA IF NOT EXISTS ' || quote_ident(schema_name) || ' AUTHORIZATION ' || quote_ident(role_name);
EXCEPTION WHEN OTHERS THEN
RAISE EXCEPTION 'TODO: This needs a better error handling after reviewing permissions: %', SQLERRM;
END;
END IF;
RETURN schema_name;
END
$$
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
-- Returns the type of a server by internal name
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_server_type(remote_server name)
RETURNS name
AS $$
SELECT f.fdwname
FROM pg_foreign_server s
JOIN pg_foreign_data_wrapper f ON s.srvfdw = f.oid
WHERE s.srvname = remote_server;
$$
LANGUAGE SQL VOLATILE PARALLEL UNSAFE;
-- List registered servers
-- TODO: Decide whether we want to show extra config (extensions, fetch_size, use_remote_estimate)
@ -109,7 +120,7 @@ BEGIN
@extschema@.__CDB_FS_Extract_Server_Name(s.srvname) AS "Name",
-- Which driver are we using (postgres_fdw, odbc_fdw...)
f.fdwname::text AS "Driver",
@extschema@.__CDB_FS_server_type(s.srvname)::text AS "Driver",
-- Read options from pg_foreign_server
(SELECT option_value FROM pg_options_to_table(s.srvoptions) WHERE option_name LIKE 'host') AS "Host",
@ -120,7 +131,6 @@ BEGIN
-- Read username from pg_user_mappings
(SELECT option_value FROM pg_options_to_table(u.umoptions) WHERE option_name LIKE 'user') AS "Username"
FROM pg_foreign_server s
JOIN pg_foreign_data_wrapper f ON f.oid=s.srvfdw
LEFT JOIN pg_user_mappings u
ON u.srvid = s.oid
WHERE s.srvname ILIKE server_name
@ -137,10 +147,13 @@ AS $$
DECLARE
user_mapping jsonb := json_build_object(
'user_mapping',
jsonb_build_object('user', input_config->'credentials'->'username',
'password', input_config->'credentials'->'password')
jsonb_build_object( 'user', input_config->'credentials'->'username',
'password', input_config->'credentials'->'password')
);
BEGIN
IF NOT (input_config ? 'credentials') THEN
RAISE EXCEPTION 'Credentials are mandatory';
END IF;
RETURN (input_config - 'credentials')::jsonb || user_mapping;
END
$$
@ -174,7 +187,7 @@ AS $$
DECLARE
server_internal name := @extschema@.__CDB_FS_Generate_Server_Name(input_name := server, check_existence := false);
final_config json := @extschema@.__CDB_FS_credentials_to_user_mapping(@extschema@.__CDB_FS_add_default_options(config));
role_name name;
role_name name := @extschema@.__CDB_FS_Generate_Server_Role_Name(server_internal);
row record;
option record;
BEGIN
@ -187,7 +200,6 @@ BEGIN
IF NOT EXISTS (SELECT * FROM pg_foreign_server WHERE srvname = server_internal) THEN
BEGIN
EXECUTE FORMAT('CREATE SERVER %I FOREIGN DATA WRAPPER postgres_fdw', server_internal);
role_name := @extschema@.__CDB_FS_Generate_Server_Role_Name(server);
EXECUTE FORMAT('CREATE ROLE %I NOLOGIN', role_name);
EXECUTE FORMAT('GRANT USAGE ON FOREIGN DATA WRAPPER postgres_fdw TO %I', role_name);
EXECUTE FORMAT('GRANT USAGE ON FOREIGN SERVER %I TO %I', server_internal, role_name);
@ -238,7 +250,7 @@ RETURNS void
AS $$
DECLARE
server_internal text := @extschema@.__CDB_FS_Generate_Server_Name(input_name := server, check_existence := true);
role_name name := @extschema@.__CDB_FS_Generate_Server_Role_Name(server);
role_name name := @extschema@.__CDB_FS_Generate_Server_Role_Name(server_internal);
BEGIN
EXECUTE FORMAT ('DROP USER MAPPING FOR public SERVER %I', server_internal);
EXECUTE FORMAT ('DROP OWNED BY %I CASCADE', role_name);

View File

@ -0,0 +1,114 @@
--------------------------------------------------------------------------------
-- Private functions
--------------------------------------------------------------------------------
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_List_Foreign_Schemas_PG(server_internal name)
RETURNS TABLE(remote_schema name)
AS $$
DECLARE
-- Import schemata from the information schema
--
-- "The view schemata contains all schemas in the current database
-- that the current user has access to (by way of being the owner
-- or having some privilege)."
-- See https://www.postgresql.org/docs/11/infoschema-schemata.html
--
-- "The information schema is defined in the SQL standard and can
-- therefore be expected to be portable and remain stable"
-- See https://www.postgresql.org/docs/11/information-schema.html
inf_schema name := 'information_schema';
remote_table name := 'schemata';
local_schema name := @extschema@.__CDB_FS_Create_Schema(server_internal, inf_schema);
BEGIN
-- Import the foreign schemata if not done
IF NOT EXISTS (
SELECT * FROM pg_class
WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = local_schema)
AND relname = remote_table
) THEN
EXECUTE format('IMPORT FOREIGN SCHEMA %I LIMIT TO (%I) FROM SERVER %I INTO %I', inf_schema, remote_table, server_internal, local_schema);
END IF;
-- Return the result we're interested in
RETURN QUERY EXECUTE format('SELECT schema_name::name AS remote_schema FROM %I.%I', local_schema, remote_table);
END
$$
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
CREATE OR REPLACE FUNCTION @extschema@.__CDB_FS_List_Foreign_Tables_PG(server_internal name, remote_schema name)
RETURNS TABLE(remote_table name)
AS $func$
DECLARE
-- Import `tables` from the information schema
--
-- "The view tables contains all tables and views defined in the
-- current database. Only those tables and views are shown that
-- the current user has access to (by way of being the owner or
-- having some privilege)."
-- https://www.postgresql.org/docs/11/infoschema-tables.html
-- Create local target schema if it does not exists
inf_schema name := 'information_schema';
remote_table name := 'tables';
local_schema name := @extschema@.__CDB_FS_Create_Schema(server_internal, inf_schema);
BEGIN
-- Import the foreign `tables` if not done
IF NOT EXISTS (
SELECT * FROM pg_class
WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname = local_schema)
AND relname = 'tables'
) THEN
EXECUTE format('IMPORT FOREIGN SCHEMA %I LIMIT TO (%I) FROM SERVER %I INTO %I', inf_schema, remote_table, server_internal, local_schema);
END IF;
-- Return the result we're interested in
-- Note: in this context, schema names are not to be quoted
RETURN QUERY EXECUTE format($q$SELECT table_name::name AS remote_table FROM %I.%I WHERE table_schema = '%s'$q$, local_schema, remote_table, remote_schema);
END
$func$
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
--------------------------------------------------------------------------------
-- Public functions
--------------------------------------------------------------------------------
--
-- List remote schemas in a federated server that the current user has access to.
--
CREATE OR REPLACE FUNCTION @extschema@.CDB_Federated_Server_List_Remote_Schemas(remote_server name)
RETURNS TABLE(remote_schema name)
AS $$
DECLARE
server_internal name := @extschema@.__CDB_FS_Generate_Server_Name(input_name := remote_server, check_existence := true);
server_type name := @extschema@.__CDB_FS_server_type(server_internal);
BEGIN
CASE server_type
WHEN 'postgres_fdw' THEN
RETURN QUERY SELECT @extschema@.__CDB_FS_List_Foreign_Schemas_PG(server_internal);
ELSE
RAISE EXCEPTION 'Not implemented server type % for remote server %', server_type, remote_server;
END CASE;
END
$$
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;
--
-- List remote tables in a federated server that the current user has access to.
--
CREATE OR REPLACE FUNCTION @extschema@.CDB_Federated_Server_List_Remote_Tables(remote_server name, remote_schema name)
RETURNS TABLE(remote_table name)
AS $$
DECLARE
server_internal name := @extschema@.__CDB_FS_Generate_Server_Name(input_name := remote_server, check_existence := true);
server_type name := @extschema@.__CDB_FS_server_type(server_internal);
BEGIN
CASE server_type
WHEN 'postgres_fdw' THEN
RETURN QUERY SELECT @extschema@.__CDB_FS_List_Foreign_Tables_PG(server_internal, remote_schema);
ELSE
RAISE EXCEPTION 'Not implemented server type % for remote server %', server_type, remote_server;
END CASE;
END
$$
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE;

View File

@ -0,0 +1 @@
../scripts-available/CDB_FederatedServer_ListRemote.sql

View File

@ -0,0 +1,117 @@
-- ===================================================================
-- create FDW objects
-- ===================================================================
\set QUIET on
SET client_min_messages TO error;
\set VERBOSITY terse
SET SESSION AUTHORIZATION postgres;
CREATE EXTENSION postgres_fdw;
CREATE ROLE cdb_fs_tester SUPERUSER LOGIN PASSWORD 'cdb_fs_passwd';
CREATE DATABASE cdb_fs_tester OWNER cdb_fs_tester;
SELECT 'C1', cartodb.CDB_Federated_Server_Register_PG(server := 'loopback'::text, config := '{
"server": {
"host": "localhost",
"port": @@PGPORT@@
},
"credentials": {
"username": "cdb_fs_tester",
"password": "cdb_fs_passwd"
}
}'::jsonb);
SELECT 'C2', cartodb.CDB_Federated_Server_Register_PG(server := 'loopback2'::text, config := '{
"server": {
"host": "localhost",
"port": @@PGPORT@@
},
"credentials": {
"username": "cdb_fs_tester",
"password": "cdb_fs_passwd"
}
}'::jsonb);
-- ===================================================================
-- create objects used through FDW loopback server
-- ===================================================================
\c cdb_fs_tester cdb_fs_tester
CREATE TYPE user_enum AS ENUM ('foo', 'bar', 'buz');
CREATE SCHEMA "S 1";
CREATE TABLE "S 1"."T 1" (
"C 1" int NOT NULL,
c2 int NOT NULL,
c3 text,
c4 timestamptz,
c5 timestamp,
c6 varchar(10),
c7 char(10),
c8 user_enum,
CONSTRAINT t1_pkey PRIMARY KEY ("C 1")
);
CREATE TABLE "S 1"."T 2" (
c1 int NOT NULL,
c2 text,
CONSTRAINT t2_pkey PRIMARY KEY (c1)
);
CREATE TABLE "S 1"."T 3" (
c1 int NOT NULL,
c2 int NOT NULL,
c3 text,
CONSTRAINT t3_pkey PRIMARY KEY (c1)
);
CREATE TABLE "S 1"."T 4" (
c1 int NOT NULL,
c2 int NOT NULL,
c3 text,
CONSTRAINT t4_pkey PRIMARY KEY (c1)
);
-- Disable autovacuum for these tables to avoid unexpected effects of that
ALTER TABLE "S 1"."T 1" SET (autovacuum_enabled = 'false');
ALTER TABLE "S 1"."T 2" SET (autovacuum_enabled = 'false');
ALTER TABLE "S 1"."T 3" SET (autovacuum_enabled = 'false');
ALTER TABLE "S 1"."T 4" SET (autovacuum_enabled = 'false');
\c contrib_regression postgres
SET client_min_messages TO notice;
\set VERBOSITY terse
\set QUIET off
-- ===================================================================
-- Test the listing functions
-- ===================================================================
\echo 'Test listing of remote schemas (sunny day)'
SELECT * FROM cartodb.CDB_Federated_Server_List_Remote_Schemas(remote_server => 'loopback')
WHERE remote_schema NOT LIKE 'pg_%' -- Exclude toast and temp schemas
ORDER BY remote_schema;
\echo 'Test listing of remote tables (sunny day)'
SELECT * FROM cartodb.CDB_Federated_Server_List_Remote_Tables(remote_server => 'loopback', remote_schema => 'S 1')
ORDER BY remote_table;
-- ===================================================================
-- Cleanup
-- ===================================================================
\set QUIET on
\c cdb_fs_tester cdb_fs_tester
DROP TABLE "S 1". "T 1";
DROP TABLE "S 1". "T 2";
DROP TABLE "S 1". "T 3";
DROP TABLE "S 1". "T 4";
DROP SCHEMA "S 1";
DROP TYPE user_enum;
\c contrib_regression postgres
\set QUIET on
SET client_min_messages TO error;
\set VERBOSITY terse
SELECT 'D1', cartodb.CDB_Federated_Server_Unregister(server := 'loopback'::text);
SELECT 'D2', cartodb.CDB_Federated_Server_Unregister(server := 'loopback2'::text);
DROP DATABASE cdb_fs_tester;
DROP ROLE cdb_fs_tester;
\set QUIET off

View File

@ -0,0 +1,13 @@
C1|
C2|
Test listing of remote schemas (sunny day)
S 1
information_schema
public
Test listing of remote tables (sunny day)
T 1
T 2
T 3
T 4
D1|
D2|