commit
83ab128a01
179
scripts-available/CDB_SyncTable.sql
Normal file
179
scripts-available/CDB_SyncTable.sql
Normal file
@ -0,0 +1,179 @@
|
|||||||
|
/*
|
||||||
|
Gets the column names of a given table.
|
||||||
|
|
||||||
|
Sample usage:
|
||||||
|
|
||||||
|
SELECT cartodb._CDB_GetColumns('public.films');
|
||||||
|
*/
|
||||||
|
CREATE OR REPLACE FUNCTION cartodb._CDB_GetColumns(src_table REGCLASS)
|
||||||
|
RETURNS SETOF NAME
|
||||||
|
AS $$
|
||||||
|
SELECT
|
||||||
|
a.attname as "colname"
|
||||||
|
FROM
|
||||||
|
pg_catalog.pg_attribute a
|
||||||
|
WHERE
|
||||||
|
a.attnum > 0
|
||||||
|
AND NOT a.attisdropped
|
||||||
|
AND a.attrelid = (
|
||||||
|
SELECT c.oid
|
||||||
|
FROM pg_catalog.pg_class c
|
||||||
|
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
|
||||||
|
WHERE c.oid = src_table::oid
|
||||||
|
AND pg_catalog.pg_table_is_visible(c.oid)
|
||||||
|
);
|
||||||
|
$$ LANGUAGE sql STABLE PARALLEL UNSAFE;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Given an array of quoted column names, it generates an UPDATE SET
|
||||||
|
clause with the following form:
|
||||||
|
|
||||||
|
the_geom = changed.the_geom,
|
||||||
|
id = changed.id,
|
||||||
|
elevation = changed.elevation
|
||||||
|
|
||||||
|
Example of usage:
|
||||||
|
|
||||||
|
SELECT cartodb.__CDB_GetUpdateSetClause('{the_geom, id, elevation}', 'changed');
|
||||||
|
*/
|
||||||
|
CREATE OR REPLACE FUNCTION cartodb.__CDB_GetUpdateSetClause(colnames TEXT[], update_source TEXT)
|
||||||
|
RETURNS TEXT
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
set_clause_list TEXT[];
|
||||||
|
col TEXT;
|
||||||
|
BEGIN
|
||||||
|
FOREACH col IN ARRAY colnames
|
||||||
|
LOOP
|
||||||
|
set_clause_list := array_append(set_clause_list, format('%1$s = %2$s.%1$s', col, update_source));
|
||||||
|
END lOOP;
|
||||||
|
RETURN array_to_string(set_clause_list, ', ');
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
Given a prefix, generate a safe unique NAME for a temp table.
|
||||||
|
|
||||||
|
Example of usage:
|
||||||
|
|
||||||
|
SELECT cartodb.__CDB_GenerateUniqueName('src_sync'); --> src_sync_718794_120106
|
||||||
|
|
||||||
|
*/
|
||||||
|
CREATE OR REPLACE FUNCTION cartodb.__CDB_GenerateUniqueName(prefix TEXT)
|
||||||
|
RETURNS NAME
|
||||||
|
AS $$
|
||||||
|
SELECT format('%s_%s_%s', prefix, txid_current(), (random()*1000000)::int)::NAME;
|
||||||
|
$$ LANGUAGE sql VOLATILE PARALLEL UNSAFE;
|
||||||
|
|
||||||
|
|
||||||
|
/*
|
||||||
|
A Table Syncer
|
||||||
|
|
||||||
|
Assumptions:
|
||||||
|
- Both tables contain a consistent cartodb_id column
|
||||||
|
- Destination table has all columns of the source or does not exist
|
||||||
|
|
||||||
|
Sample usage:
|
||||||
|
|
||||||
|
SELECT cartodb.CDB_SyncTable('radar_stations', 'public', 'syncdest');
|
||||||
|
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest', '{the_geom, the_geom_webmercator}');
|
||||||
|
|
||||||
|
*/
|
||||||
|
CREATE OR REPLACE FUNCTION cartodb.CDB_SyncTable(src_table REGCLASS, dst_schema REGNAMESPACE, dst_table NAME, skip_cols NAME[] = '{}')
|
||||||
|
RETURNS void
|
||||||
|
AS $$
|
||||||
|
DECLARE
|
||||||
|
fq_dest_table TEXT;
|
||||||
|
|
||||||
|
colnames TEXT[];
|
||||||
|
quoted_colnames TEXT;
|
||||||
|
|
||||||
|
src_hash_table_name NAME;
|
||||||
|
dst_hash_table_name NAME;
|
||||||
|
|
||||||
|
update_set_clause TEXT;
|
||||||
|
|
||||||
|
num_rows BIGINT;
|
||||||
|
err_context text;
|
||||||
|
|
||||||
|
t timestamptz;
|
||||||
|
BEGIN
|
||||||
|
-- If the destination table does not exist, just copy the source table
|
||||||
|
fq_dest_table := format('%I.%I', dst_schema, dst_table);
|
||||||
|
EXECUTE format('CREATE TABLE IF NOT EXISTS %s as TABLE %I', fq_dest_table, src_table);
|
||||||
|
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
||||||
|
IF num_rows > 0 THEN
|
||||||
|
RAISE NOTICE 'INSERTED % row(s)', num_rows;
|
||||||
|
RETURN;
|
||||||
|
END IF;
|
||||||
|
|
||||||
|
skip_cols := skip_cols || '{cartodb_id}';
|
||||||
|
|
||||||
|
-- Get the list of columns from the source table, excluding skip_cols
|
||||||
|
SELECT ARRAY(SELECT quote_ident(c) FROM cartodb._CDB_GetColumns(src_table) as c EXCEPT SELECT unnest(skip_cols)) INTO colnames;
|
||||||
|
quoted_colnames := array_to_string(colnames, ',');
|
||||||
|
|
||||||
|
src_hash_table_name := cartodb.__CDB_GenerateUniqueName('src_sync');
|
||||||
|
dst_hash_table_name := cartodb.__CDB_GenerateUniqueName('dst_sync');
|
||||||
|
|
||||||
|
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', src_hash_table_name);
|
||||||
|
EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', dst_hash_table_name);
|
||||||
|
|
||||||
|
-- Compute hash tables for src_table and dst_table h[cartodb_id] = hash(row)
|
||||||
|
-- It'll take the form of a temp table with an index (easy to run set operations)
|
||||||
|
t := clock_timestamp();
|
||||||
|
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %I', src_hash_table_name, quoted_colnames, src_table);
|
||||||
|
EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %s', dst_hash_table_name, quoted_colnames, fq_dest_table);
|
||||||
|
RAISE DEBUG 'Populate hash tables time (s): %', clock_timestamp() - t;
|
||||||
|
|
||||||
|
-- Create indexes
|
||||||
|
-- We use hash indexes as they are fit for id comparison.
|
||||||
|
t := clock_timestamp();
|
||||||
|
EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', src_hash_table_name);
|
||||||
|
EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', dst_hash_table_name);
|
||||||
|
RAISE DEBUG 'Index creation on hash tables time (s): %', clock_timestamp() - t;
|
||||||
|
|
||||||
|
-- Deal with deleted rows: ids in dest but not in source
|
||||||
|
t := clock_timestamp();
|
||||||
|
EXECUTE format(
|
||||||
|
'DELETE FROM %s WHERE cartodb_id IN (SELECT cartodb_id FROM %I EXCEPT SELECT cartodb_id FROM %I)',
|
||||||
|
fq_dest_table,
|
||||||
|
dst_hash_table_name,
|
||||||
|
src_hash_table_name);
|
||||||
|
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
||||||
|
RAISE NOTICE 'DELETED % row(s)', num_rows;
|
||||||
|
RAISE DEBUG 'DELETE time (s): %', clock_timestamp() - t;
|
||||||
|
|
||||||
|
-- Deal with inserted rows: ids in source but not in dest
|
||||||
|
t := clock_timestamp();
|
||||||
|
EXECUTE format('
|
||||||
|
INSERT INTO %1$s (cartodb_id,%2$s)
|
||||||
|
SELECT h.cartodb_id,%2$s FROM (SELECT cartodb_id FROM %3$I EXCEPT SELECT cartodb_id FROM %4$I) h
|
||||||
|
LEFT JOIN %5$I s ON s.cartodb_id = h.cartodb_id;
|
||||||
|
', fq_dest_table, quoted_colnames, src_hash_table_name, dst_hash_table_name, src_table);
|
||||||
|
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
||||||
|
RAISE NOTICE 'INSERTED % row(s)', num_rows;
|
||||||
|
RAISE DEBUG 'INSERT time (s): %', clock_timestamp() - t;
|
||||||
|
|
||||||
|
-- Deal with modified rows: ids in source and dest but different hashes
|
||||||
|
t := clock_timestamp();
|
||||||
|
update_set_clause := cartodb.__CDB_GetUpdateSetClause(colnames, 'changed');
|
||||||
|
EXECUTE format('
|
||||||
|
UPDATE %1$s dst SET %2$s
|
||||||
|
FROM (
|
||||||
|
SELECT *
|
||||||
|
FROM %3$s src
|
||||||
|
WHERE cartodb_id IN
|
||||||
|
(SELECT sh.cartodb_id FROM %4$I sh
|
||||||
|
LEFT JOIN %5$I dh ON sh.cartodb_id = dh.cartodb_id
|
||||||
|
WHERE sh.hash <> dh.hash)
|
||||||
|
) changed
|
||||||
|
WHERE dst.cartodb_id = changed.cartodb_id;
|
||||||
|
', fq_dest_table, update_set_clause, src_table, src_hash_table_name, dst_hash_table_name);
|
||||||
|
GET DIAGNOSTICS num_rows = ROW_COUNT;
|
||||||
|
RAISE NOTICE 'MODIFIED % row(s)', num_rows;
|
||||||
|
RAISE DEBUG 'UPDATE time (s): %', clock_timestamp() - t;
|
||||||
|
END;
|
||||||
|
$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE;
|
1
scripts-enabled/CDB_SyncTable.sql
Symbolic link
1
scripts-enabled/CDB_SyncTable.sql
Symbolic link
@ -0,0 +1 @@
|
|||||||
|
../scripts-available/CDB_SyncTable.sql
|
@ -7,7 +7,7 @@ SELECT _CDB_AnalysisDataSize('public');
|
|||||||
CREATE TABLE analysis_2f13a3dbd7_41bd92976fc6dd97072afe4ee450054f4c0715d5(id int);
|
CREATE TABLE analysis_2f13a3dbd7_41bd92976fc6dd97072afe4ee450054f4c0715d5(id int);
|
||||||
CREATE TABLE analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94(id int);
|
CREATE TABLE analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94(id int);
|
||||||
CREATE TABLE analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da9(id int);
|
CREATE TABLE analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da9(id int);
|
||||||
SELECT _CDB_AnalysisTablesInSchema('public');
|
SELECT _CDB_AnalysisTablesInSchema('public') t ORDER BY t;
|
||||||
SELECT _CDB_AnalysisDataSize('public');
|
SELECT _CDB_AnalysisDataSize('public');
|
||||||
SELECT CDB_CheckAnalysisQuota('analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94');
|
SELECT CDB_CheckAnalysisQuota('analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94');
|
||||||
SELECT CDB_SetUserQuotaInBytes(1);
|
SELECT CDB_SetUserQuotaInBytes(1);
|
||||||
|
58
test/CDB_SyncTableTest.sql
Normal file
58
test/CDB_SyncTableTest.sql
Normal file
@ -0,0 +1,58 @@
|
|||||||
|
-- Setup: create and populate a table to test the syncs
|
||||||
|
\set QUIET on
|
||||||
|
BEGIN;
|
||||||
|
SET client_min_messages TO error;
|
||||||
|
CREATE TABLE test_sync_source (
|
||||||
|
cartodb_id bigint,
|
||||||
|
lat double precision,
|
||||||
|
lon double precision,
|
||||||
|
name text
|
||||||
|
);
|
||||||
|
INSERT INTO test_sync_source VALUES
|
||||||
|
(1, 1.0, 1.0, 'foo'),
|
||||||
|
(2, 2.0, 2.0, 'bar'),
|
||||||
|
(3, 3.0, 3.0, 'patata'),
|
||||||
|
(4, 4.0, 4.0, 'melon');
|
||||||
|
SET client_min_messages TO notice;
|
||||||
|
\set QUIET off
|
||||||
|
|
||||||
|
|
||||||
|
\echo 'First table sync: it should be simply just copied to the destination'
|
||||||
|
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
|
||||||
|
|
||||||
|
\echo 'Next table sync: there shall be no changes'
|
||||||
|
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
|
||||||
|
|
||||||
|
\echo 'Remove a row from the source and check it is deleted from the dest table'
|
||||||
|
DELETE FROM test_sync_source WHERE cartodb_id = 3;
|
||||||
|
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
|
||||||
|
|
||||||
|
\echo 'Insert a new row and check that it is inserted in the dest table'
|
||||||
|
INSERT INTO test_sync_source VALUES (5, 5.0, 5.0, 'sandia');
|
||||||
|
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
|
||||||
|
|
||||||
|
\echo 'Modify row and check that it is modified in the dest table'
|
||||||
|
UPDATE test_sync_source SET name = 'cantaloupe' WHERE cartodb_id = 4;
|
||||||
|
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest');
|
||||||
|
|
||||||
|
\echo 'Sanity check: the end result is the same source table'
|
||||||
|
SELECT * FROM test_sync_source ORDER BY cartodb_id;
|
||||||
|
SELECT * FROM test_sync_dest ORDER BY cartodb_id;
|
||||||
|
|
||||||
|
|
||||||
|
\echo 'It shall exclude geom columns if instructed to do so'
|
||||||
|
\set QUIET on
|
||||||
|
SET client_min_messages TO error;
|
||||||
|
SELECT cartodb.CDB_SetUserQuotaInBytes(0); -- Set user quota to infinite
|
||||||
|
SELECT cartodb.CDB_CartodbfyTable('test_sync_source');
|
||||||
|
SELECT cartodb.CDB_CartodbfyTable('test_sync_dest');
|
||||||
|
UPDATE test_sync_dest SET the_geom = cartodb.CDB_LatLng(lat, lon); -- A "gecoding"
|
||||||
|
\set QUIET off
|
||||||
|
SET client_min_messages TO notice;
|
||||||
|
SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest', '{the_geom, the_geom_webmercator}');
|
||||||
|
SELECT * FROM test_sync_source;
|
||||||
|
SELECT * FROM test_sync_dest;
|
||||||
|
|
||||||
|
|
||||||
|
-- Cleanup
|
||||||
|
ROLLBACK;
|
61
test/CDB_SyncTableTest_expect
Normal file
61
test/CDB_SyncTableTest_expect
Normal file
@ -0,0 +1,61 @@
|
|||||||
|
First table sync: it should be simply just copied to the destination
|
||||||
|
NOTICE: INSERTED 4 row(s)
|
||||||
|
|
||||||
|
Next table sync: there shall be no changes
|
||||||
|
NOTICE: relation "test_sync_dest" already exists, skipping
|
||||||
|
NOTICE: DELETED 0 row(s)
|
||||||
|
NOTICE: INSERTED 0 row(s)
|
||||||
|
NOTICE: MODIFIED 0 row(s)
|
||||||
|
|
||||||
|
Remove a row from the source and check it is deleted from the dest table
|
||||||
|
DELETE 1
|
||||||
|
NOTICE: relation "test_sync_dest" already exists, skipping
|
||||||
|
NOTICE: DELETED 1 row(s)
|
||||||
|
NOTICE: INSERTED 0 row(s)
|
||||||
|
NOTICE: MODIFIED 0 row(s)
|
||||||
|
|
||||||
|
Insert a new row and check that it is inserted in the dest table
|
||||||
|
INSERT 0 1
|
||||||
|
NOTICE: relation "test_sync_dest" already exists, skipping
|
||||||
|
NOTICE: DELETED 0 row(s)
|
||||||
|
NOTICE: INSERTED 1 row(s)
|
||||||
|
NOTICE: MODIFIED 0 row(s)
|
||||||
|
|
||||||
|
Modify row and check that it is modified in the dest table
|
||||||
|
UPDATE 1
|
||||||
|
NOTICE: relation "test_sync_dest" already exists, skipping
|
||||||
|
NOTICE: DELETED 0 row(s)
|
||||||
|
NOTICE: INSERTED 0 row(s)
|
||||||
|
NOTICE: MODIFIED 1 row(s)
|
||||||
|
|
||||||
|
Sanity check: the end result is the same source table
|
||||||
|
1|1|1|foo
|
||||||
|
2|2|2|bar
|
||||||
|
4|4|4|cantaloupe
|
||||||
|
5|5|5|sandia
|
||||||
|
1|1|1|foo
|
||||||
|
2|2|2|bar
|
||||||
|
4|4|4|cantaloupe
|
||||||
|
5|5|5|sandia
|
||||||
|
It shall exclude geom columns if instructed to do so
|
||||||
|
0
|
||||||
|
test_sync_source
|
||||||
|
test_sync_dest
|
||||||
|
SET
|
||||||
|
NOTICE: relation "test_sync_dest" already exists, skipping
|
||||||
|
NOTICE: cdb_invalidate_varnish(public.test_sync_dest) called
|
||||||
|
NOTICE: DELETED 0 row(s)
|
||||||
|
NOTICE: cdb_invalidate_varnish(public.test_sync_dest) called
|
||||||
|
NOTICE: INSERTED 0 row(s)
|
||||||
|
NOTICE: cdb_invalidate_varnish(public.test_sync_dest) called
|
||||||
|
NOTICE: MODIFIED 0 row(s)
|
||||||
|
|
||||||
|
1|||1|1|foo
|
||||||
|
2|||2|2|bar
|
||||||
|
5|||5|5|sandia
|
||||||
|
4|||4|4|cantaloupe
|
||||||
|
1|0101000020E6100000000000000000F03F000000000000F03F|0101000020110F0000DB0B4ADA772DFB402B432E49D22DFB40|1|1|foo
|
||||||
|
2|0101000020E610000000000000000000400000000000000040|0101000020110F00003C0C4ADA772D0B4177F404ABE12E0B41|2|2|bar
|
||||||
|
5|0101000020E610000000000000000014400000000000001440|0101000020110F000099476EE86AFC20413E7EB983F2012141|5|5|sandia
|
||||||
|
4|0101000020E610000000000000000010400000000000001040|0101000020110F00003C0C4ADA772D1B4160AB497020331B41|4|4|cantaloupe
|
||||||
|
ROLLBACK
|
Loading…
Reference in New Issue
Block a user