cartodb-postgresql/scripts-available/CDB_SyncTable.sql

168 lines
5.3 KiB
MySQL
Raw Normal View History

/*
Gets the column names of a given table.
Sample usage:
SELECT cartodb._CDB_GetColumns('public.films');
*/
CREATE OR REPLACE FUNCTION cartodb._CDB_GetColumns(src_table REGCLASS)
RETURNS SETOF NAME
AS $$
SELECT
a.attname as "colname"
FROM
pg_catalog.pg_attribute a
WHERE
a.attnum > 0
AND NOT a.attisdropped
AND a.attrelid = (
SELECT c.oid
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.oid = src_table::oid
AND pg_catalog.pg_table_is_visible(c.oid)
);
$$ LANGUAGE sql STABLE PARALLEL UNSAFE;
/*
Given an array of quoted column names, it generates an UPDATE SET
clause with the following form:
the_geom = changed.the_geom,
id = changed.id,
elevation = changed.elevation
Example of usage:
SELECT cartodb.__CDB_GetUpdateSetClause('{the_geom, id, elevation}', 'changed');
*/
CREATE OR REPLACE FUNCTION cartodb.__CDB_GetUpdateSetClause(colnames TEXT[], update_source TEXT)
RETURNS TEXT
AS $$
DECLARE
set_clause_list TEXT[];
col TEXT;
BEGIN
FOREACH col IN ARRAY colnames
LOOP
set_clause_list := array_append(set_clause_list, format('%1$s = %2$s.%1$s', col, update_source));
END lOOP;
RETURN array_to_string(set_clause_list, ', ');
END;
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
/*
Given a prefix, generate a safe unique NAME for a temp table.
Example of usage:
SELECT cartodb.__CDB_GenerateUniqueName('src_sync'); --> src_sync_718794_120106
*/
CREATE OR REPLACE FUNCTION cartodb.__CDB_GenerateUniqueName(prefix TEXT)
RETURNS NAME
AS $$
SELECT format('%s_%s_%s', prefix, txid_current(), (random()*1000000)::int)::NAME;
$$ LANGUAGE sql VOLATILE PARALLEL UNSAFE;
2019-06-28 19:49:15 +08:00
/*
Given a table name and an array of column names,
return array of column names qualified with the table name
Example of usage:
SELECT cartodb.__CDB_QualifyColumns('t', ARRAY['a','b']); --> ARRAY['t.a','t.b']
*/
CREATE OR REPLACE FUNCTION cartodb.__CDB_QualifyColumns(tablename TEXT, colnames TEXT[]) RETURNS TEXT[] AS
$$
SELECT array_agg(tablename || '.' || _colname) from unnest(colnames) _colname;
$$ LANGUAGE sql IMMUTABLE PARALLEL SAFE;
/*
A Table Syncer
Assumptions:
- Both tables contain a consistent cartodb_id column
2019-05-28 22:40:01 +08:00
- Destination table has all columns of the source or does not exist
Sample usage:
SELECT cartodb.CDB_SyncTable('radar_stations', 'public', 'syncdest');
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest', '{the_geom, the_geom_webmercator}');
*/
CREATE OR REPLACE FUNCTION cartodb.CDB_SyncTable(src_table REGCLASS, dst_schema REGNAMESPACE, dst_table NAME, skip_cols NAME[] = '{}')
RETURNS void
AS $$
DECLARE
fq_dest_table TEXT;
colnames TEXT[];
2019-06-28 19:49:15 +08:00
dst_colnames TEXT;
src_colnames TEXT;
src_hash_table_name NAME;
dst_hash_table_name NAME;
2019-05-27 17:54:38 +08:00
update_set_clause TEXT;
num_rows BIGINT;
err_context text;
2019-05-28 22:40:01 +08:00
t timestamptz;
BEGIN
-- If the destination table does not exist, just copy the source table
fq_dest_table := format('%I.%I', dst_schema, dst_table);
EXECUTE format('CREATE TABLE IF NOT EXISTS %s as TABLE %I', fq_dest_table, src_table);
GET DIAGNOSTICS num_rows = ROW_COUNT;
IF num_rows > 0 THEN
RAISE NOTICE 'INSERTED % row(s)', num_rows;
RETURN;
END IF;
skip_cols := skip_cols || '{cartodb_id}';
-- Get the list of columns from the source table, excluding skip_cols
SELECT ARRAY(SELECT quote_ident(c) FROM cartodb._CDB_GetColumns(src_table) as c EXCEPT SELECT unnest(skip_cols)) INTO colnames;
-- Deal with deleted rows: ids in dest but not in source
2019-05-28 22:40:01 +08:00
t := clock_timestamp();
EXECUTE format(
2019-06-28 19:49:15 +08:00
'DELETE FROM %1$s _dst WHERE NOT EXISTS (SELECT * FROM %2$I _src WHERE _src.cartodb_id=_dst.cartodb_id)',
fq_dest_table, src_table);
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'DELETED % row(s)', num_rows;
2019-05-28 22:40:01 +08:00
RAISE DEBUG 'DELETE time (s): %', clock_timestamp() - t;
-- Deal with inserted rows: ids in source but not in dest
2019-05-28 22:40:01 +08:00
t := clock_timestamp();
EXECUTE format('
2019-06-28 19:49:15 +08:00
INSERT INTO %1$s(cartodb_id, %2$s)
SELECT cartodb_id, %2$s FROM %3$I _src WHERE NOT EXISTS (SELECT * FROM %1$s _dst WHERE _src.cartodb_id=_dst.cartodb_id)
', fq_dest_table, array_to_string(colnames, ','), src_table);
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'INSERTED % row(s)', num_rows;
2019-05-28 22:40:01 +08:00
RAISE DEBUG 'INSERT time (s): %', clock_timestamp() - t;
-- Deal with modified rows: ids in source and dest but different hashes
2019-05-28 22:40:01 +08:00
t := clock_timestamp();
2019-06-28 19:49:15 +08:00
update_set_clause := cartodb.__CDB_GetUpdateSetClause(colnames, '_changed');
dst_colnames := array_to_string(cartodb.__CDB_QualifyColumns('_dst', colnames), ',');
src_colnames := array_to_string(cartodb.__CDB_QualifyColumns('_src', colnames), ',');
EXECUTE format('
2019-06-28 19:49:15 +08:00
UPDATE %1$s _update SET %2$s
FROM (
SELECT _src.* FROM %3$s _src JOIN %1$s _dst ON (_dst.cartodb_id = _src.cartodb_id)
WHERE md5(ROW(%4$s)::text) <> md5(ROW(%5$s)::text)
) _changed
WHERE _update.cartodb_id = _changed.cartodb_id;
', fq_dest_table, update_set_clause, src_table, dst_colnames, src_colnames);
GET DIAGNOSTICS num_rows = ROW_COUNT;
RAISE NOTICE 'MODIFIED % row(s)', num_rows;
2019-05-28 22:40:01 +08:00
RAISE DEBUG 'UPDATE time (s): %', clock_timestamp() - t;
END;
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE;