Merge pull request #127 from CartoDB/table_level_functions
Add table level functions and mocks
This commit is contained in:
commit
876515f9aa
67
src/pg/sql/15_fdw_utilities.sql
Normal file
67
src/pg/sql/15_fdw_utilities.sql
Normal file
@ -0,0 +1,67 @@
|
|||||||
|
CREATE OR REPLACE FUNCTION cdb_observatory._OBS_ConnectRemoteTable(fdw_name text, schema_name text, user_dbname text, user_hostname text, username text, user_tablename text, user_schema text)
|
||||||
|
RETURNS void
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
row record;
|
||||||
|
option record;
|
||||||
|
connection_str json;
|
||||||
|
BEGIN
|
||||||
|
-- Build connection string
|
||||||
|
connection_str := '{"server":{"extensions":"postgis", "dbname":"'
|
||||||
|
|| user_dbname ||'", "host":"' || user_hostname ||'", "port":"5432"}, "users":{"public"'
|
||||||
|
|| ':{"user":"' || username ||'", "password":""} } }';
|
||||||
|
|
||||||
|
-- This function tries to be as idempotent as possible, by not creating anything more than once
|
||||||
|
-- (not even using IF NOT EXIST to avoid throwing warnings)
|
||||||
|
IF NOT EXISTS ( SELECT * FROM pg_extension WHERE extname = 'postgres_fdw') THEN
|
||||||
|
CREATE EXTENSION postgres_fdw;
|
||||||
|
END IF;
|
||||||
|
-- Create FDW first if it does not exist
|
||||||
|
IF NOT EXISTS ( SELECT * FROM pg_foreign_server WHERE srvname = fdw_name)
|
||||||
|
THEN
|
||||||
|
EXECUTE FORMAT('CREATE SERVER %I FOREIGN DATA WRAPPER postgres_fdw', fdw_name);
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- Set FDW settings
|
||||||
|
FOR row IN SELECT p.key, p.value from lateral json_each_text(connection_str->'server') p
|
||||||
|
LOOP
|
||||||
|
IF NOT EXISTS (WITH a AS (select split_part(unnest(srvoptions), '=', 1) as options from pg_foreign_server where srvname=fdw_name) SELECT * from a where options = row.key)
|
||||||
|
THEN
|
||||||
|
EXECUTE FORMAT('ALTER SERVER %I OPTIONS (ADD %I %L)', fdw_name, row.key, row.value);
|
||||||
|
ELSE
|
||||||
|
EXECUTE FORMAT('ALTER SERVER %I OPTIONS (SET %I %L)', fdw_name, row.key, row.value);
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
-- Create user mappings
|
||||||
|
FOR row IN SELECT p.key, p.value from lateral json_each(connection_str->'users') p LOOP
|
||||||
|
-- Check if entry on pg_user_mappings exists
|
||||||
|
IF NOT EXISTS ( SELECT * FROM pg_user_mappings WHERE srvname = fdw_name AND usename = row.key ) THEN
|
||||||
|
EXECUTE FORMAT ('CREATE USER MAPPING FOR %I SERVER %I', row.key, fdw_name);
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- Update user mapping settings
|
||||||
|
FOR option IN SELECT o.key, o.value from lateral json_each_text(row.value) o LOOP
|
||||||
|
IF NOT EXISTS (WITH a AS (select split_part(unnest(umoptions), '=', 1) as options from pg_user_mappings WHERE srvname = fdw_name AND usename = row.key) SELECT * from a where options = option.key) THEN
|
||||||
|
EXECUTE FORMAT('ALTER USER MAPPING FOR %I SERVER %I OPTIONS (ADD %I %L)', row.key, fdw_name, option.key, option.value);
|
||||||
|
ELSE
|
||||||
|
EXECUTE FORMAT('ALTER USER MAPPING FOR %I SERVER %I OPTIONS (SET %I %L)', row.key, fdw_name, option.key, option.value);
|
||||||
|
END IF;
|
||||||
|
END LOOP;
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
-- Create schema if it does not exist.
|
||||||
|
IF NOT EXISTS ( SELECT * from pg_namespace WHERE nspname=fdw_name) THEN
|
||||||
|
EXECUTE FORMAT ('CREATE SCHEMA %I', fdw_name);
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- Bring the remote cdb_tablemetadata
|
||||||
|
IF NOT EXISTS ( SELECT * FROM PG_CLASS WHERE relnamespace = (SELECT oid FROM pg_namespace WHERE nspname=fdw_name) and relname='cdb_tablemetadata') THEN
|
||||||
|
EXECUTE FORMAT ('CREATE FOREIGN TABLE %I.cdb_tablemetadata (tabname text, updated_at timestamp with time zone) SERVER %I OPTIONS (table_name ''cdb_tablemetadata_text'', schema_name ''public'', updatable ''false'')', fdw_name, fdw_name);
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
-- Import target table
|
||||||
|
EXECUTE FORMAT ('IMPORT FOREIGN SCHEMA %I LIMIT TO (%I) from SERVER %I INTO %I', user_schema, user_tablename, fdw_name, schema_name);
|
||||||
|
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE PLPGSQL;
|
119
src/pg/sql/50_table_level_functions.sql
Normal file
119
src/pg/sql/50_table_level_functions.sql
Normal file
@ -0,0 +1,119 @@
|
|||||||
|
CREATE TYPE cdb_observatory.ds_fdw_metadata as (schemaname text, tabname text, servername text);
|
||||||
|
CREATE TYPE cdb_observatory.ds_return_metadata as (colnames text[], coltypes text[]);
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cdb_observatory._OBS_ConnectUserTable(username text, orgname text, user_db_role text, input_schema text, dbname text, host_addr text, table_name text)
|
||||||
|
RETURNS cdb_observatory.ds_fdw_metadata
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
fdw_server text;
|
||||||
|
fdw_import_schema text;
|
||||||
|
connection_str json;
|
||||||
|
import_foreign_schema_q text;
|
||||||
|
epoch_timestamp text;
|
||||||
|
BEGIN
|
||||||
|
|
||||||
|
SELECT extract(epoch from now() at time zone 'utc')::int INTO epoch_timestamp;
|
||||||
|
fdw_server := 'fdw_server_' || username || '_' || epoch_timestamp;
|
||||||
|
fdw_import_schema:= fdw_server;
|
||||||
|
|
||||||
|
-- Import foreign table
|
||||||
|
EXECUTE FORMAT ('SELECT cdb_observatory._OBS_ConnectRemoteTable(%L, %L, %L, %L, %L, %L, %L)', fdw_server, fdw_import_schema, dbname, host_addr, user_db_role, table_name, input_schema);
|
||||||
|
|
||||||
|
RETURN (fdw_import_schema::text, table_name::text, fdw_server::text);
|
||||||
|
|
||||||
|
EXCEPTION
|
||||||
|
WHEN others THEN
|
||||||
|
-- Disconnect user imported table. Delete schema and FDW server.
|
||||||
|
EXECUTE 'DROP FOREIGN TABLE IF EXISTS ' || fdw_import_schema || '.' || table_name;
|
||||||
|
EXECUTE 'DROP SCHEMA IF EXISTS ' || fdw_import_schema || ' CASCADE';
|
||||||
|
EXECUTE 'DROP SERVER IF EXISTS ' || fdw_server || ' CASCADE;';
|
||||||
|
RETURN (null, null, null);
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql SECURITY DEFINER;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cdb_observatory._OBS_GetReturnMetadata(username text, orgname text, function_name text, params json)
|
||||||
|
RETURNS cdb_observatory.ds_return_metadata
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
colnames text[];
|
||||||
|
coltypes text[];
|
||||||
|
requested_measures text[];
|
||||||
|
measure text;
|
||||||
|
BEGIN
|
||||||
|
|
||||||
|
-- Simple mock, there should be real logic in here.
|
||||||
|
|
||||||
|
IF $3 NOT ILIKE 'GetMeasure' OR $3 IS NULL THEN
|
||||||
|
RAISE 'This function is not supported yet: %', $3;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
SELECT translate($4::json->>'tag_name','[]', '{}')::text[] INTO requested_measures;
|
||||||
|
|
||||||
|
FOREACH measure IN ARRAY requested_measures
|
||||||
|
LOOP
|
||||||
|
IF NOT measure ILIKE ANY (Array['total_pop', 'pop_16_over']::text[]) THEN
|
||||||
|
RAISE 'This measure is not supported yet: %', measure;
|
||||||
|
END IF;
|
||||||
|
SELECT array_append(colnames, measure) INTO colnames;
|
||||||
|
SELECT array_append(coltypes, 'double precision'::text) INTO coltypes;
|
||||||
|
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
RETURN (colnames::text[], coltypes::text[]);
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql;
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cdb_observatory._OBS_FetchJoinFdwTableData(username text, orgname text, table_schema text, table_name text, function_name text, params json)
|
||||||
|
RETURNS SETOF record
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
data_query text;
|
||||||
|
tag_name text[];
|
||||||
|
tag text;
|
||||||
|
tags_list text;
|
||||||
|
tags_query text;
|
||||||
|
rec RECORD;
|
||||||
|
BEGIN
|
||||||
|
SELECT translate($6::json->>'tag_name','[]', '{}')::text[] INTO tag_name;
|
||||||
|
SELECT array_to_string(tag_name, ',') INTO tags_list;
|
||||||
|
tags_query := '';
|
||||||
|
|
||||||
|
FOREACH tag IN ARRAY tag_name
|
||||||
|
LOOP
|
||||||
|
SELECT tags_query || ' sum(' || tag || '/fraction)::double precision as ' || tag || ', ' INTO tags_query;
|
||||||
|
|
||||||
|
END LOOP;
|
||||||
|
|
||||||
|
-- Simple mock, there should be real logic in here.
|
||||||
|
data_query := '(WITH _areas AS(SELECT ST_Area(a.the_geom::geography)'
|
||||||
|
|| '/ (1000 * 1000) as fraction, a.geoid, b.cartodb_id FROM '
|
||||||
|
|| 'observatory.obs_c6fb99c47d61289fbb8e561ff7773799d3fcc308 as a, '
|
||||||
|
|| table_schema || '.' || table_name || ' AS b '
|
||||||
|
|| 'WHERE b.the_geom && a.the_geom ), values AS (SELECT geoid, '
|
||||||
|
|| tags_list
|
||||||
|
|| ' FROM observatory.obs_1a098da56badf5f32e336002b0a81708c40d29cd ) '
|
||||||
|
|| 'SELECT '
|
||||||
|
|| tags_query
|
||||||
|
|| ' cartodb_id::int FROM _areas, values '
|
||||||
|
|| 'WHERE values.geoid = _areas.geoid GROUP BY cartodb_id);';
|
||||||
|
|
||||||
|
|
||||||
|
FOR rec IN EXECUTE data_query
|
||||||
|
LOOP
|
||||||
|
RETURN NEXT rec;
|
||||||
|
END LOOP;
|
||||||
|
RETURN;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql SECURITY DEFINER;
|
||||||
|
|
||||||
|
|
||||||
|
CREATE OR REPLACE FUNCTION cdb_observatory._OBS_DisconnectUserTable(username text, orgname text, table_schema text, table_name text, servername text)
|
||||||
|
RETURNS boolean
|
||||||
|
AS $$
|
||||||
|
BEGIN
|
||||||
|
EXECUTE 'DROP FOREIGN TABLE IF EXISTS "' || table_schema || '".' || table_name;
|
||||||
|
EXECUTE 'DROP SCHEMA IF EXISTS ' || table_schema || ' CASCADE';
|
||||||
|
EXECUTE 'DROP SERVER IF EXISTS ' || servername || ' CASCADE;';
|
||||||
|
RETURN true;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql SECURITY DEFINER;
|
Loading…
Reference in New Issue
Block a user