From 9ade6588e2f3157abcad9372a9935ad204a5d191 Mon Sep 17 00:00:00 2001 From: Carla Date: Wed, 22 Jun 2016 18:58:37 +0200 Subject: [PATCH 1/2] Add augment functions, add mocks for data retrieval, create own fdw functions to avoid cartodb dependency --- src/pg/sql/15_fdw_utilities.sql | 67 +++++++++++++ src/pg/sql/50_table_level_functions.sql | 121 ++++++++++++++++++++++++ 2 files changed, 188 insertions(+) create mode 100644 src/pg/sql/15_fdw_utilities.sql create mode 100644 src/pg/sql/50_table_level_functions.sql diff --git a/src/pg/sql/15_fdw_utilities.sql b/src/pg/sql/15_fdw_utilities.sql new file mode 100644 index 0000000..7a52cb3 --- /dev/null +++ b/src/pg/sql/15_fdw_utilities.sql @@ -0,0 +1,67 @@ +CREATE OR REPLACE FUNCTION cdb_observatory._OBS_ConnectRemoteTable(fdw_name text, schema_name text, user_dbname text, user_hostname text, username text, user_tablename text, user_schema text) +RETURNS void +AS $$ +DECLARE + row record; + option record; + connection_str json; +BEGIN + -- Build connection string + connection_str := '{"server":{"extensions":"postgis", "dbname":"' + || user_dbname ||'", "host":"' || user_hostname ||'", "port":"5432"}, "users":{"public"' + || ':{"user":"' || username ||'", "password":""} } }'; + + -- This function tries to be as idempotent as possible, by not creating anything more than once + -- (not even using IF NOT EXIST to avoid throwing warnings) + IF NOT EXISTS ( SELECT * FROM pg_extension WHERE extname = 'postgres_fdw') THEN + CREATE EXTENSION postgres_fdw; + END IF; + -- Create FDW first if it does not exist + IF NOT EXISTS ( SELECT * FROM pg_foreign_server WHERE srvname = fdw_name) + THEN + EXECUTE FORMAT('CREATE SERVER %I FOREIGN DATA WRAPPER postgres_fdw', fdw_name); + END IF; + + -- Set FDW settings + FOR row IN SELECT p.key, p.value from lateral json_each_text(connection_str->'server') p + LOOP + IF NOT EXISTS (WITH a AS (select split_part(unnest(srvoptions), '=', 1) as options from pg_foreign_server where srvname=fdw_name) SELECT * from a where options = row.key) + THEN + EXECUTE FORMAT('ALTER SERVER %I OPTIONS (ADD %I %L)', fdw_name, row.key, row.value); + ELSE + EXECUTE FORMAT('ALTER SERVER %I OPTIONS (SET %I %L)', fdw_name, row.key, row.value); + END IF; + END LOOP; + + -- Create user mappings + FOR row IN SELECT p.key, p.value from lateral json_each(connection_str->'users') p LOOP + -- Check if entry on pg_user_mappings exists + IF NOT EXISTS ( SELECT * FROM pg_user_mappings WHERE srvname = fdw_name AND usename = row.key ) THEN + EXECUTE FORMAT ('CREATE USER MAPPING FOR %I SERVER %I', row.key, fdw_name); + END IF; + + -- Update user mapping settings + FOR option IN SELECT o.key, o.value from lateral json_each_text(row.value) o LOOP + IF NOT EXISTS (WITH a AS (select split_part(unnest(umoptions), '=', 1) as options from pg_user_mappings WHERE srvname = fdw_name AND usename = row.key) SELECT * from a where options = option.key) THEN + EXECUTE FORMAT('ALTER USER MAPPING FOR %I SERVER %I OPTIONS (ADD %I %L)', row.key, fdw_name, option.key, option.value); + ELSE + EXECUTE FORMAT('ALTER USER MAPPING FOR %I SERVER %I OPTIONS (SET %I %L)', row.key, fdw_name, option.key, option.value); + END IF; + END LOOP; + END LOOP; + + -- Create schema if it does not exist. + IF NOT EXISTS ( SELECT * from pg_namespace WHERE nspname=fdw_name) THEN + EXECUTE FORMAT ('CREATE SCHEMA %I', fdw_name); + END IF; + + -- Bring the remote cdb_tablemetadata + IF NOT EXISTS ( SELECT * FROM PG_CLASS WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname=fdw_name) and relname='cdb_tablemetadata') THEN + EXECUTE FORMAT ('CREATE FOREIGN TABLE %I.cdb_tablemetadata (tabname text, updated_at timestamp with time zone) SERVER %I OPTIONS (table_name ''cdb_tablemetadata_text'', schema_name ''public'', updatable ''false'')', fdw_name, fdw_name); + END IF; + + -- Import target table + EXECUTE FORMAT ('IMPORT FOREIGN SCHEMA %I LIMIT TO (%I) from SERVER %I INTO %I', user_schema, user_tablename, fdw_name, schema_name); + +END; +$$ LANGUAGE PLPGSQL; diff --git a/src/pg/sql/50_table_level_functions.sql b/src/pg/sql/50_table_level_functions.sql new file mode 100644 index 0000000..7049cda --- /dev/null +++ b/src/pg/sql/50_table_level_functions.sql @@ -0,0 +1,121 @@ +CREATE TYPE cdb_observatory.ds_fdw_metadata as (schemaname text, tabname text, servername text); +CREATE TYPE cdb_observatory.ds_return_metadata as (colnames text[], coltypes text[]); + +CREATE OR REPLACE FUNCTION cdb_observatory._OBS_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text) +RETURNS cdb_observatory.ds_fdw_metadata +AS $$ +DECLARE + fdw_server text; + fdw_import_schema text; + connection_str json; + import_foreign_schema_q text; + epoch_timestamp text; + user_host text; +BEGIN + + SELECT extract(epoch from now() at time zone 'utc')::int INTO epoch_timestamp; + fdw_server := 'fdw_server_' || username || '_' || epoch_timestamp; + fdw_import_schema:= fdw_server; + SELECT split_part(inet_client_addr()::text, '/', 1) INTO user_host; + + -- Import foreign table + EXECUTE FORMAT ('SELECT cdb_observatory._OBS_ConnectRemoteTable(%L, %L, %L, %L, %L, %L, %L)', fdw_server, fdw_import_schema, dbname, user_host, user_db_role, table_name, input_schema); + + RETURN (fdw_import_schema::text, table_name::text, fdw_server::text); + +EXCEPTION + WHEN others THEN + -- Disconnect user imported table. Delete schema and FDW server. + EXECUTE 'DROP FOREIGN TABLE IF EXISTS ' || fdw_import_schema || '.' || table_name; + EXECUTE 'DROP SCHEMA IF EXISTS ' || fdw_import_schema || ' CASCADE'; + EXECUTE 'DROP SERVER IF EXISTS ' || fdw_server || ' CASCADE;'; + RETURN (null, null, null); +END; +$$ LANGUAGE plpgsql SECURITY DEFINER; + +CREATE OR REPLACE FUNCTION cdb_observatory._OBS_GetReturnMetadata(username text, orgname text, function_name text, params json) +RETURNS cdb_observatory.ds_return_metadata +AS $$ +DECLARE + colnames text[]; + coltypes text[]; + requested_measures text[]; + measure text; +BEGIN + + -- Simple mock, there should be real logic in here. + + IF $3 NOT ILIKE 'GetMeasure' OR $3 IS NULL THEN + RAISE 'This function is not supported yet: %', $3; + END IF; + + SELECT translate($4::json->>'tag_name','[]', '{}')::text[] INTO requested_measures; + + FOREACH measure IN ARRAY requested_measures + LOOP + IF NOT measure ILIKE ANY (Array['total_pop', 'pop_16_over']::text[]) THEN + RAISE 'This measure is not supported yet: %', measure; + END IF; + SELECT array_append(colnames, measure) INTO colnames; + SELECT array_append(coltypes, 'double precision'::text) INTO coltypes; + + END LOOP; + + RETURN (colnames::text[], coltypes::text[]); +END; +$$ LANGUAGE plpgsql; + +CREATE OR REPLACE FUNCTION cdb_observatory._OBS_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json) +RETURNS SETOF record +AS $$ +DECLARE + data_query text; + tag_name text[]; + tag text; + tags_list text; + tags_query text; + rec RECORD; +BEGIN + SELECT translate($6::json->>'tag_name','[]', '{}')::text[] INTO tag_name; + SELECT array_to_string(tag_name, ',') INTO tags_list; + tags_query := ''; + + FOREACH tag IN ARRAY tag_name + LOOP + SELECT tags_query || ' sum(' || tag || '/fraction)::double precision as ' || tag || ', ' INTO tags_query; + + END LOOP; + + -- Simple mock, there should be real logic in here. + data_query := '(WITH _areas AS(SELECT ST_Area(a.the_geom::geography)' + || '/ (1000 * 1000) as fraction, a.geoid, b.cartodb_id FROM ' + || 'observatory.obs_c6fb99c47d61289fbb8e561ff7773799d3fcc308 as a, ' + || table_schema || '.' || table_name || ' AS b ' + || 'WHERE b.the_geom && a.the_geom ), values AS (SELECT geoid, ' + || tags_list + || ' FROM observatory.obs_1a098da56badf5f32e336002b0a81708c40d29cd ) ' + || 'SELECT ' + || tags_query + || ' cartodb_id::int FROM _areas, values ' + || 'WHERE values.geoid = _areas.geoid GROUP BY cartodb_id);'; + + + FOR rec IN EXECUTE data_query + LOOP + RETURN NEXT rec; + END LOOP; + RETURN; +END; +$$ LANGUAGE plpgsql SECURITY DEFINER; + + +CREATE OR REPLACE FUNCTION cdb_observatory._OBS_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, servername text) +RETURNS boolean +AS $$ +BEGIN + EXECUTE 'DROP FOREIGN TABLE IF EXISTS "' || table_schema || '".' || table_name; + EXECUTE 'DROP SCHEMA IF EXISTS ' || table_schema || ' CASCADE'; + EXECUTE 'DROP SERVER IF EXISTS ' || servername || ' CASCADE;'; + RETURN true; +END; +$$ LANGUAGE plpgsql SECURITY DEFINER; From d93752efa3b80eba801c5407c25a8f7590501ccb Mon Sep 17 00:00:00 2001 From: Carla Date: Fri, 15 Jul 2016 11:34:42 +0200 Subject: [PATCH 2/2] move addr_host as a parameter --- src/pg/sql/50_table_level_functions.sql | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/src/pg/sql/50_table_level_functions.sql b/src/pg/sql/50_table_level_functions.sql index 7049cda..27cf678 100644 --- a/src/pg/sql/50_table_level_functions.sql +++ b/src/pg/sql/50_table_level_functions.sql @@ -1,7 +1,7 @@ CREATE TYPE cdb_observatory.ds_fdw_metadata as (schemaname text, tabname text, servername text); CREATE TYPE cdb_observatory.ds_return_metadata as (colnames text[], coltypes text[]); -CREATE OR REPLACE FUNCTION cdb_observatory._OBS_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, table_name text) +CREATE OR REPLACE FUNCTION cdb_observatory._OBS_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, host_addr text, table_name text) RETURNS cdb_observatory.ds_fdw_metadata AS $$ DECLARE @@ -10,16 +10,14 @@ DECLARE connection_str json; import_foreign_schema_q text; epoch_timestamp text; - user_host text; BEGIN SELECT extract(epoch from now() at time zone 'utc')::int INTO epoch_timestamp; fdw_server := 'fdw_server_' || username || '_' || epoch_timestamp; fdw_import_schema:= fdw_server; - SELECT split_part(inet_client_addr()::text, '/', 1) INTO user_host; -- Import foreign table - EXECUTE FORMAT ('SELECT cdb_observatory._OBS_ConnectRemoteTable(%L, %L, %L, %L, %L, %L, %L)', fdw_server, fdw_import_schema, dbname, user_host, user_db_role, table_name, input_schema); + EXECUTE FORMAT ('SELECT cdb_observatory._OBS_ConnectRemoteTable(%L, %L, %L, %L, %L, %L, %L)', fdw_server, fdw_import_schema, dbname, host_addr, user_db_role, table_name, input_schema); RETURN (fdw_import_schema::text, table_name::text, fdw_server::text);