2016-02-04 00:50:13 +08:00
- - - - - - - - - - - - - - - - - - - - - - - - - --
-- FDW MANAGEMENT FUNCTIONS
--
-- All the FDW settings are read from the `cdb_conf.fdws` entry json file.
- - - - - - - - - - - - - - - - - - - - - - - - - --
2019-05-31 21:29:28 +08:00
CREATE OR REPLACE FUNCTION @ extschema @ . _CDB_Setup_FDW ( fdw_name text , config json )
2016-02-04 00:50:13 +08:00
RETURNS void
AS $ $
DECLARE
row record ;
option record ;
org_role text ;
BEGIN
2016-02-04 19:06:22 +08:00
-- This function tries to be as idempotent as possible, by not creating anything more than once
-- (not even using IF NOT EXIST to avoid throwing warnings)
IF NOT EXISTS ( SELECT * FROM pg_extension WHERE extname = ' postgres_fdw ' ) THEN
2016-02-04 00:50:13 +08:00
CREATE EXTENSION postgres_fdw ;
END IF ;
-- Create FDW first if it does not exist
2016-02-04 19:06:22 +08:00
IF NOT EXISTS ( SELECT * FROM pg_foreign_server WHERE srvname = fdw_name )
2016-02-04 00:50:13 +08:00
THEN
2016-02-04 19:06:22 +08:00
EXECUTE FORMAT ( ' CREATE SERVER %I FOREIGN DATA WRAPPER postgres_fdw ' , fdw_name ) ;
2016-02-04 00:50:13 +08:00
END IF ;
-- Set FDW settings
FOR row IN SELECT p . key , p . value from lateral json_each_text ( config - > ' server ' ) p
LOOP
2016-02-04 19:06:22 +08:00
IF NOT EXISTS ( WITH a AS ( select split_part ( unnest ( srvoptions ) , ' = ' , 1 ) as options from pg_foreign_server where srvname = fdw_name ) SELECT * from a where options = row . key )
2016-02-04 00:50:13 +08:00
THEN
2016-02-04 19:06:22 +08:00
EXECUTE FORMAT ( ' ALTER SERVER %I OPTIONS (ADD %I %L) ' , fdw_name , row . key , row . value ) ;
2016-02-04 00:50:13 +08:00
ELSE
2016-02-04 19:06:22 +08:00
EXECUTE FORMAT ( ' ALTER SERVER %I OPTIONS (SET %I %L) ' , fdw_name , row . key , row . value ) ;
2016-02-04 00:50:13 +08:00
END IF ;
END LOOP ;
-- Create user mappings
2016-02-04 19:06:22 +08:00
FOR row IN SELECT p . key , p . value from lateral json_each ( config - > ' users ' ) p LOOP
2016-02-04 00:50:13 +08:00
-- Check if entry on pg_user_mappings exists
2016-02-05 20:15:20 +08:00
IF NOT EXISTS ( SELECT * FROM pg_user_mappings WHERE srvname = fdw_name AND usename = row . key ) THEN
2016-02-04 19:06:22 +08:00
EXECUTE FORMAT ( ' CREATE USER MAPPING FOR %I SERVER %I ' , row . key , fdw_name ) ;
2016-02-04 00:50:13 +08:00
END IF ;
-- Update user mapping settings
2016-02-04 19:06:22 +08:00
FOR option IN SELECT o . key , o . value from lateral json_each_text ( row . value ) o LOOP
2016-02-05 20:15:20 +08:00
IF NOT EXISTS ( WITH a AS ( select split_part ( unnest ( umoptions ) , ' = ' , 1 ) as options from pg_user_mappings WHERE srvname = fdw_name AND usename = row . key ) SELECT * from a where options = option . key ) THEN
2016-02-04 19:06:22 +08:00
EXECUTE FORMAT ( ' ALTER USER MAPPING FOR %I SERVER %I OPTIONS (ADD %I %L) ' , row . key , fdw_name , option . key , option . value ) ;
2016-02-04 00:50:13 +08:00
ELSE
2016-02-04 19:06:22 +08:00
EXECUTE FORMAT ( ' ALTER USER MAPPING FOR %I SERVER %I OPTIONS (SET %I %L) ' , row . key , fdw_name , option . key , option . value ) ;
2016-02-04 00:50:13 +08:00
END IF ;
END LOOP ;
END LOOP ;
-- Create schema if it does not exist.
2016-02-04 19:06:22 +08:00
IF NOT EXISTS ( SELECT * from pg_namespace WHERE nspname = fdw_name ) THEN
EXECUTE FORMAT ( ' CREATE SCHEMA %I ' , fdw_name ) ;
2016-02-04 00:50:13 +08:00
END IF ;
-- Give the organization role usage permisions over the schema
2019-05-31 21:29:28 +08:00
SELECT @ extschema @ . CDB_Organization_Member_Group_Role_Member_Name ( ) INTO org_role ;
2016-02-05 20:15:20 +08:00
EXECUTE FORMAT ( ' GRANT USAGE ON SCHEMA %I TO %I ' , fdw_name , org_role ) ;
2016-02-04 00:50:13 +08:00
-- Bring here the remote cdb_tablemetadata
2016-02-05 20:15:20 +08:00
IF NOT EXISTS ( SELECT * FROM PG_CLASS WHERE relnamespace = ( SELECT oid FROM pg_namespace WHERE nspname = fdw_name ) and relname = ' cdb_tablemetadata ' ) THEN
2019-05-31 21:29:28 +08:00
EXECUTE FORMAT ( ' CREATE FOREIGN TABLE %I.cdb_tablemetadata (tabname text, updated_at timestamp with time zone) SERVER %I OPTIONS (table_name '' cdb_tablemetadata_text '' , schema_name '' @extschema@ '' , updatable '' false '' ) ' , fdw_name , fdw_name ) ;
2016-02-04 00:50:13 +08:00
END IF ;
2016-02-04 19:06:22 +08:00
EXECUTE FORMAT ( ' GRANT SELECT ON %I.cdb_tablemetadata TO %I ' , fdw_name , org_role ) ;
2016-02-04 00:50:13 +08:00
END
$ $
2017-10-24 20:16:56 +08:00
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE ;
2016-02-04 00:50:13 +08:00
2019-05-31 21:29:28 +08:00
CREATE OR REPLACE FUNCTION @ extschema @ . _CDB_Setup_FDWS ( )
2016-02-04 00:50:13 +08:00
RETURNS VOID AS
$ $
DECLARE
row record ;
BEGIN
2019-05-31 21:29:28 +08:00
FOR row IN SELECT p . key , p . value from lateral json_each ( @ extschema @ . CDB_Conf_GetConf ( ' fdws ' ) ) p LOOP
EXECUTE ' SELECT @extschema@._CDB_Setup_FDW($1, $2) ' USING row . key , row . value ;
2016-02-04 00:50:13 +08:00
END LOOP ;
END
2016-02-04 19:06:22 +08:00
$ $
2017-10-24 20:16:56 +08:00
LANGUAGE PLPGSQL VOLATILE PARALLEL UNSAFE ;
2016-02-04 00:50:13 +08:00
2016-02-04 01:07:23 +08:00
2019-05-31 21:29:28 +08:00
CREATE OR REPLACE FUNCTION @ extschema @ . _CDB_Setup_FDW ( fdw_name text )
2016-02-04 01:07:23 +08:00
RETURNS void AS
$ BODY $
DECLARE
config json ;
BEGIN
2019-05-31 21:29:28 +08:00
SELECT p . value FROM LATERAL json_each ( @ extschema @ . CDB_Conf_GetConf ( ' fdws ' ) ) p WHERE p . key = fdw_name INTO config ;
EXECUTE ' SELECT @extschema@._CDB_Setup_FDW($1, $2) ' USING fdw_name , config ;
2016-02-04 01:07:23 +08:00
END
$ BODY $
2017-10-24 20:16:56 +08:00
LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE ;
2016-02-04 01:07:23 +08:00
2019-05-31 21:29:28 +08:00
CREATE OR REPLACE FUNCTION @ extschema @ . CDB_Add_Remote_Table ( source text , table_name text )
2016-02-04 19:06:22 +08:00
RETURNS void AS
2016-02-04 00:50:13 +08:00
$ $
BEGIN
2019-05-31 21:29:28 +08:00
PERFORM @ extschema @ . _CDB_Setup_FDW ( source ) ;
2016-02-04 19:06:22 +08:00
EXECUTE FORMAT ( ' IMPORT FOREIGN SCHEMA %I LIMIT TO (%I) FROM SERVER %I INTO %I; ' , source , table_name , source , source ) ;
- -- Grant SELECT to publicuser
EXECUTE FORMAT ( ' GRANT SELECT ON %I.%I TO publicuser; ' , source , table_name ) ;
2016-02-04 00:50:13 +08:00
END
$ $
2017-10-24 20:16:56 +08:00
LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE ;
2016-02-05 01:26:43 +08:00
2019-05-31 21:29:28 +08:00
CREATE OR REPLACE FUNCTION @ extschema @ . CDB_Get_Foreign_Updated_At ( foreign_table regclass )
2016-02-05 01:26:43 +08:00
RETURNS timestamp with time zone AS
$ $
DECLARE
remote_table_name text ;
fdw_schema_name text ;
time timestamp with time zone ;
BEGIN
-- This will turn a local foreign table (referenced as regclass) to its fully qualified text remote table reference.
WITH a AS ( SELECT ftoptions FROM pg_foreign_table WHERE ftrelid = foreign_table LIMIT 1 ) ,
b as ( SELECT ( pg_options_to_table ( ftoptions ) ) . * FROM a )
SELECT FORMAT ( ' %I.%I ' , ( SELECT option_value FROM b WHERE option_name = ' schema_name ' ) , ( SELECT option_value FROM b WHERE option_name = ' table_name ' ) )
INTO remote_table_name ;
-- We assume that the remote cdb_tablemetadata is called cdb_tablemetadata and is on the same schema as the queried table.
SELECT nspname FROM pg_class c , pg_namespace n WHERE c . oid = foreign_table AND c . relnamespace = n . oid INTO fdw_schema_name ;
2019-06-28 20:52:37 +08:00
BEGIN
EXECUTE FORMAT ( ' SELECT updated_at FROM %I.cdb_tablemetadata WHERE tabname=%L ORDER BY updated_at DESC LIMIT 1 ' , fdw_schema_name , remote_table_name ) INTO time ;
EXCEPTION
WHEN undefined_table THEN
-- If you add a GET STACKED DIAGNOSTICS text_var = RETURNED_SQLSTATE
-- you get a code 42P01 which corresponds to undefined_table
2019-07-03 22:19:46 +08:00
RAISE NOTICE ' CDB_Get_Foreign_Updated_At: could not find %.cdb_tablemetadata while checking % updated_at, returning NULL timestamp ' , fdw_schema_name , foreign_table ;
2019-06-28 20:52:37 +08:00
END ;
2016-02-05 01:26:43 +08:00
RETURN time ;
END
$ $
2017-10-24 20:16:56 +08:00
LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE ;
2016-02-09 20:40:18 +08:00
2019-05-31 21:29:28 +08:00
CREATE OR REPLACE FUNCTION @ extschema @ . _cdb_dbname_of_foreign_table ( reloid oid )
2016-02-10 00:11:16 +08:00
RETURNS TEXT AS $ $
SELECT option_value FROM pg_options_to_table ( (
SELECT fs . srvoptions
FROM pg_foreign_table ft
LEFT JOIN pg_foreign_server fs ON ft . ftserver = fs . oid
WHERE ft . ftrelid = reloid
) ) WHERE option_name = ' dbname ' ;
2017-10-24 20:16:56 +08:00
$ $ LANGUAGE SQL VOLATILE PARALLEL UNSAFE ;
2016-02-10 00:11:16 +08:00
2016-02-09 20:40:18 +08:00
-- Return a set of (dbname, schema_name, table_name, updated_at)
-- It is aware of foreign tables
-- It assumes the local (schema_name, table_name) map to the remote ones with the same name
2016-02-12 18:27:26 +08:00
-- Note: dbname is never quoted whereas schema and table names are when needed.
2019-05-31 21:29:28 +08:00
CREATE OR REPLACE FUNCTION @ extschema @ . CDB_QueryTables_Updated_At ( query text )
2016-02-09 23:38:11 +08:00
RETURNS TABLE ( dbname text , schema_name text , table_name text , updated_at timestamptz )
2016-02-09 20:40:18 +08:00
AS $ $
WITH query_tables AS (
2019-05-31 21:29:28 +08:00
SELECT unnest ( @ extschema @ . CDB_QueryTablesText ( query ) ) schema_table_name
2016-02-09 20:40:18 +08:00
) , query_tables_oid AS (
SELECT schema_table_name , schema_table_name : : regclass : : oid AS reloid
FROM query_tables
) ,
fqtn AS (
SELECT
2019-05-31 21:29:28 +08:00
( CASE WHEN c . relkind = ' f ' THEN @ extschema @ . _cdb_dbname_of_foreign_table ( query_tables_oid . reloid )
2016-02-11 23:09:01 +08:00
ELSE current_database ( )
END ) : : text AS dbname ,
2016-02-11 01:58:01 +08:00
quote_ident ( n . nspname : : text ) schema_name ,
quote_ident ( c . relname : : text ) table_name ,
2016-02-09 20:40:18 +08:00
c . relkind ,
query_tables_oid . reloid
FROM query_tables_oid , pg_catalog . pg_class c
LEFT JOIN pg_catalog . pg_namespace n ON c . relnamespace = n . oid
WHERE c . oid = query_tables_oid . reloid
)
SELECT fqtn . dbname , fqtn . schema_name , fqtn . table_name ,
2019-05-31 21:29:28 +08:00
( CASE WHEN relkind = ' f ' THEN @ extschema @ . CDB_Get_Foreign_Updated_At ( reloid )
ELSE ( SELECT md . updated_at FROM @ extschema @ . CDB_TableMetadata md WHERE md . tabname = reloid )
2016-02-09 20:40:18 +08:00
END ) AS updated_at
FROM fqtn ;
2017-10-24 20:16:56 +08:00
$ $ LANGUAGE SQL VOLATILE PARALLEL UNSAFE ;
2016-02-09 20:40:18 +08:00
-- Return the last updated time of a set of tables
-- It is aware of foreign tables
-- It assumes the local (schema_name, table_name) map to the remote ones with the same name
2019-05-31 21:29:28 +08:00
CREATE OR REPLACE FUNCTION @ extschema @ . CDB_Last_Updated_Time ( tables text [ ] )
2016-02-09 20:40:18 +08:00
RETURNS timestamptz AS $ $
WITH t AS (
SELECT unnest ( tables ) AS schema_table_name
) , t_oid AS (
SELECT ( t . schema_table_name ) : : regclass : : oid as reloid FROM t
) , t_updated_at AS (
SELECT
2019-05-31 21:29:28 +08:00
( CASE WHEN relkind = ' f ' THEN @ extschema @ . CDB_Get_Foreign_Updated_At ( reloid )
ELSE ( SELECT md . updated_at FROM @ extschema @ . CDB_TableMetadata md WHERE md . tabname = reloid )
2016-02-09 20:40:18 +08:00
END ) AS updated_at
FROM t_oid
LEFT JOIN pg_catalog . pg_class c ON c . oid = reloid
) SELECT max ( updated_at ) FROM t_updated_at ;
2017-10-24 20:16:56 +08:00
$ $ LANGUAGE SQL VOLATILE PARALLEL UNSAFE ;
2019-11-07 01:26:37 +08:00
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
-- Deprecated
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - --
DROP FUNCTION IF EXISTS @ extschema @ . __CDB_User_FDW_Object_Names ( name ) ;
DROP FUNCTION IF EXISTS @ extschema @ . _CDB_SetUp_User_PG_FDW_Server ( name , json ) ;
DROP FUNCTION IF EXISTS @ extschema @ . _CDB_Drop_User_PG_FDW_Server ( name , boolean ) ;
DROP FUNCTION IF EXISTS @ extschema @ . CDB_SetUp_User_PG_FDW_Table ( name , name , name ) ;