951f257654
Simplify code by relying on automatic removal of temp tables with ON COMMIT DROP, so that we avoid the messy EXCEPTION management.
146 lines
4.8 KiB
PL/PgSQL
146 lines
4.8 KiB
PL/PgSQL
/*
|
|
Gets the column names of a given table.
|
|
|
|
Sample usage:
|
|
|
|
SELECT _CDB_GetColumns('public.films');
|
|
*/
|
|
CREATE OR REPLACE FUNCTION _CDB_GetColumns(src_table REGCLASS)
|
|
RETURNS SETOF NAME
|
|
AS $$
|
|
SELECT
|
|
a.attname as "colname"
|
|
FROM
|
|
pg_catalog.pg_attribute a
|
|
WHERE
|
|
a.attnum > 0
|
|
AND NOT a.attisdropped
|
|
AND a.attrelid = (
|
|
SELECT c.oid
|
|
FROM pg_catalog.pg_class c
|
|
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
|
|
WHERE c.oid = src_table::oid
|
|
AND pg_catalog.pg_table_is_visible(c.oid)
|
|
);
|
|
$$ LANGUAGE sql STABLE PARALLEL UNSAFE;
|
|
|
|
|
|
/*
|
|
Given an array of quoted column names, it generates an UPDATE SET
|
|
clause with the following form:
|
|
|
|
the_geom = changed.the_geom,
|
|
id = changed.id,
|
|
elevation = changed.elevation
|
|
|
|
Example of usage:
|
|
|
|
SELECT __CDB_GetUpdateSetClause('{the_geom, id, elevation}', 'changed');
|
|
*/
|
|
CREATE OR REPLACE FUNCTION __CDB_GetUpdateSetClause(colnames TEXT[], update_source TEXT)
|
|
RETURNS TEXT
|
|
AS $$
|
|
DECLARE
|
|
set_clause_list TEXT[];
|
|
col TEXT;
|
|
BEGIN
|
|
FOREACH col IN ARRAY colnames
|
|
LOOP
|
|
set_clause_list := array_append(set_clause_list, format('%1$s = %2$s.%1$s', col, update_source));
|
|
END lOOP;
|
|
RETURN array_to_string(set_clause_list, ', ');
|
|
END;
|
|
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
|
|
|
|
/*
|
|
A Table Syncer
|
|
|
|
Assumptions:
|
|
- Both tables contain a consistent cartodb_id column
|
|
- Destination table has all columns of the source
|
|
|
|
Sample usage:
|
|
|
|
SELECT CDB_SyncTable('radar_stations', 'public', 'syncdest');
|
|
|
|
*/
|
|
CREATE OR REPLACE FUNCTION CDB_SyncTable(src_table REGCLASS, dst_schema REGNAMESPACE, dst_table NAME)
|
|
RETURNS void
|
|
AS $$
|
|
DECLARE
|
|
fq_dest_table TEXT;
|
|
|
|
colnames TEXT[];
|
|
quoted_colnames TEXT;
|
|
|
|
src_hash_table_name NAME;
|
|
dst_hash_table_name NAME;
|
|
|
|
update_set_clause TEXT;
|
|
|
|
num_rows BIGINT;
|
|
err_context text;
|
|
BEGIN
|
|
-- If the destination table does not exist, just copy the source table
|
|
fq_dest_table := format('%I.%I', dst_schema, dst_table);
|
|
EXECUTE format('CREATE TABLE IF NOT EXISTS %s as TABLE %I', fq_dest_table, src_table);
|
|
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
|
IF num_rows > 0 THEN
|
|
RAISE NOTICE 'INSERTED % row(s)', num_rows;
|
|
RETURN;
|
|
END IF;
|
|
|
|
-- Get the list of columns from the source table, excluding cartodb_id
|
|
SELECT ARRAY(SELECT quote_ident(c) FROM _CDB_GetColumns(src_table) as c WHERE c <> 'cartodb_id') INTO colnames;
|
|
quoted_colnames := array_to_string(colnames, ',');
|
|
RAISE NOTICE 'quoted_colnames = %', quoted_colnames;
|
|
|
|
src_hash_table_name := format('src_sync_%s', txid_current());
|
|
dst_hash_table_name := format('dst_sync_%s', txid_current());
|
|
|
|
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', src_hash_table_name);
|
|
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', dst_hash_table_name);
|
|
|
|
-- Compute hash for src_table h[cartodb_id] = hash(row)
|
|
-- It'll take the form of a temp table with an index (easy to run set operations)
|
|
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %I', src_hash_table_name, quoted_colnames, src_table);
|
|
|
|
-- Compute hash for dst_table, only for columns present in src_table
|
|
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %s', dst_hash_table_name, quoted_colnames, fq_dest_table);
|
|
|
|
-- TODO create indexes
|
|
|
|
-- Deal with deleted rows: ids in dest but not in source
|
|
EXECUTE format('DELETE FROM %s WHERE cartodb_id in (SELECT cartodb_id FROM %I WHERE cartodb_id NOT IN (SELECT cartodb_id FROM %I))', fq_dest_table, dst_hash_table_name, src_hash_table_name);
|
|
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
|
RAISE NOTICE 'DELETED % row(s)', num_rows;
|
|
|
|
-- Deal with inserted rows: ids in source but not in dest
|
|
EXECUTE format('
|
|
INSERT INTO %s (cartodb_id,%s)
|
|
SELECT h.cartodb_id,%s FROM %I h
|
|
LEFT JOIN %I s ON s.cartodb_id = h.cartodb_id
|
|
WHERE h.cartodb_id NOT IN (SELECT cartodb_id FROM %I);
|
|
', fq_dest_table, quoted_colnames, quoted_colnames, src_hash_table_name, src_table, dst_hash_table_name);
|
|
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
|
RAISE NOTICE 'INSERTED % row(s)', num_rows;
|
|
|
|
-- Deal with modified rows: ids in source and dest but different hashes
|
|
update_set_clause := __CDB_GetUpdateSetClause(colnames, 'changed');
|
|
EXECUTE format('
|
|
UPDATE %1$s dst SET %2$s
|
|
FROM (
|
|
SELECT *
|
|
FROM %3$s src
|
|
WHERE cartodb_id IN
|
|
(SELECT sh.cartodb_id FROM %4$I sh
|
|
LEFT JOIN %5$I dh ON sh.cartodb_id = dh.cartodb_id
|
|
WHERE sh.hash <> dh.hash)
|
|
) changed
|
|
WHERE dst.cartodb_id = changed.cartodb_id;
|
|
', fq_dest_table, update_set_clause, src_table, src_hash_table_name, dst_hash_table_name);
|
|
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
|
RAISE NOTICE 'MODIFIED % row(s)', num_rows;
|
|
END;
|
|
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE;
|