diff --git a/scripts-available/CDB_SyncTable.sql b/scripts-available/CDB_SyncTable.sql new file mode 100644 index 0000000..521495a --- /dev/null +++ b/scripts-available/CDB_SyncTable.sql @@ -0,0 +1,179 @@ +/* + Gets the column names of a given table. + + Sample usage: + + SELECT cartodb._CDB_GetColumns('public.films'); +*/ +CREATE OR REPLACE FUNCTION cartodb._CDB_GetColumns(src_table REGCLASS) +RETURNS SETOF NAME +AS $$ + SELECT + a.attname as "colname" + FROM + pg_catalog.pg_attribute a + WHERE + a.attnum > 0 + AND NOT a.attisdropped + AND a.attrelid = ( + SELECT c.oid + FROM pg_catalog.pg_class c + LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace + WHERE c.oid = src_table::oid + AND pg_catalog.pg_table_is_visible(c.oid) + ); +$$ LANGUAGE sql STABLE PARALLEL UNSAFE; + + +/* + Given an array of quoted column names, it generates an UPDATE SET + clause with the following form: + + the_geom = changed.the_geom, + id = changed.id, + elevation = changed.elevation + + Example of usage: + + SELECT cartodb.__CDB_GetUpdateSetClause('{the_geom, id, elevation}', 'changed'); +*/ +CREATE OR REPLACE FUNCTION cartodb.__CDB_GetUpdateSetClause(colnames TEXT[], update_source TEXT) +RETURNS TEXT +AS $$ +DECLARE + set_clause_list TEXT[]; + col TEXT; +BEGIN + FOREACH col IN ARRAY colnames + LOOP + set_clause_list := array_append(set_clause_list, format('%1$s = %2$s.%1$s', col, update_source)); + END lOOP; + RETURN array_to_string(set_clause_list, ', '); +END; +$$ LANGUAGE plpgsql IMMUTABLE PARALLEL SAFE; + + +/* + Given a prefix, generate a safe unique NAME for a temp table. + + Example of usage: + + SELECT cartodb.__CDB_GenerateUniqueName('src_sync'); --> src_sync_718794_120106 + +*/ +CREATE OR REPLACE FUNCTION cartodb.__CDB_GenerateUniqueName(prefix TEXT) +RETURNS NAME +AS $$ + SELECT format('%s_%s_%s', prefix, txid_current(), (random()*1000000)::int)::NAME; +$$ LANGUAGE sql VOLATILE PARALLEL UNSAFE; + + +/* + A Table Syncer + + Assumptions: + - Both tables contain a consistent cartodb_id column + - Destination table has all columns of the source or does not exist + + Sample usage: + + SELECT cartodb.CDB_SyncTable('radar_stations', 'public', 'syncdest'); + SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest', '{the_geom, the_geom_webmercator}'); + +*/ +CREATE OR REPLACE FUNCTION cartodb.CDB_SyncTable(src_table REGCLASS, dst_schema REGNAMESPACE, dst_table NAME, skip_cols NAME[] = '{}') +RETURNS void +AS $$ +DECLARE + fq_dest_table TEXT; + + colnames TEXT[]; + quoted_colnames TEXT; + + src_hash_table_name NAME; + dst_hash_table_name NAME; + + update_set_clause TEXT; + + num_rows BIGINT; + err_context text; + + t timestamptz; +BEGIN + -- If the destination table does not exist, just copy the source table + fq_dest_table := format('%I.%I', dst_schema, dst_table); + EXECUTE format('CREATE TABLE IF NOT EXISTS %s as TABLE %I', fq_dest_table, src_table); + GET DIAGNOSTICS num_rows = ROW_COUNT; + IF num_rows > 0 THEN + RAISE NOTICE 'INSERTED % row(s)', num_rows; + RETURN; + END IF; + + skip_cols := skip_cols || '{cartodb_id}'; + + -- Get the list of columns from the source table, excluding skip_cols + SELECT ARRAY(SELECT quote_ident(c) FROM cartodb._CDB_GetColumns(src_table) as c EXCEPT SELECT unnest(skip_cols)) INTO colnames; + quoted_colnames := array_to_string(colnames, ','); + + src_hash_table_name := cartodb.__CDB_GenerateUniqueName('src_sync'); + dst_hash_table_name := cartodb.__CDB_GenerateUniqueName('dst_sync'); + + EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', src_hash_table_name); + EXECUTE format('CREATE TEMP TABLE %I(cartodb_id BIGINT, hash TEXT) ON COMMIT DROP', dst_hash_table_name); + + -- Compute hash tables for src_table and dst_table h[cartodb_id] = hash(row) + -- It'll take the form of a temp table with an index (easy to run set operations) + t := clock_timestamp(); + EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %I', src_hash_table_name, quoted_colnames, src_table); + EXECUTE format('INSERT INTO %I SELECT cartodb_id, md5(ROW(%s)::text) hash FROM %s', dst_hash_table_name, quoted_colnames, fq_dest_table); + RAISE DEBUG 'Populate hash tables time (s): %', clock_timestamp() - t; + + -- Create indexes + -- We use hash indexes as they are fit for id comparison. + t := clock_timestamp(); + EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', src_hash_table_name); + EXECUTE format('CREATE INDEX ON %I USING HASH (cartodb_id)', dst_hash_table_name); + RAISE DEBUG 'Index creation on hash tables time (s): %', clock_timestamp() - t; + + -- Deal with deleted rows: ids in dest but not in source + t := clock_timestamp(); + EXECUTE format( + 'DELETE FROM %s WHERE cartodb_id IN (SELECT cartodb_id FROM %I EXCEPT SELECT cartodb_id FROM %I)', + fq_dest_table, + dst_hash_table_name, + src_hash_table_name); + GET DIAGNOSTICS num_rows = ROW_COUNT; + RAISE NOTICE 'DELETED % row(s)', num_rows; + RAISE DEBUG 'DELETE time (s): %', clock_timestamp() - t; + + -- Deal with inserted rows: ids in source but not in dest + t := clock_timestamp(); + EXECUTE format(' + INSERT INTO %1$s (cartodb_id,%2$s) + SELECT h.cartodb_id,%2$s FROM (SELECT cartodb_id FROM %3$I EXCEPT SELECT cartodb_id FROM %4$I) h + LEFT JOIN %5$I s ON s.cartodb_id = h.cartodb_id; + ', fq_dest_table, quoted_colnames, src_hash_table_name, dst_hash_table_name, src_table); + GET DIAGNOSTICS num_rows = ROW_COUNT; + RAISE NOTICE 'INSERTED % row(s)', num_rows; + RAISE DEBUG 'INSERT time (s): %', clock_timestamp() - t; + + -- Deal with modified rows: ids in source and dest but different hashes + t := clock_timestamp(); + update_set_clause := cartodb.__CDB_GetUpdateSetClause(colnames, 'changed'); + EXECUTE format(' + UPDATE %1$s dst SET %2$s + FROM ( + SELECT * + FROM %3$s src + WHERE cartodb_id IN + (SELECT sh.cartodb_id FROM %4$I sh + LEFT JOIN %5$I dh ON sh.cartodb_id = dh.cartodb_id + WHERE sh.hash <> dh.hash) + ) changed + WHERE dst.cartodb_id = changed.cartodb_id; + ', fq_dest_table, update_set_clause, src_table, src_hash_table_name, dst_hash_table_name); + GET DIAGNOSTICS num_rows = ROW_COUNT; + RAISE NOTICE 'MODIFIED % row(s)', num_rows; + RAISE DEBUG 'UPDATE time (s): %', clock_timestamp() - t; +END; +$$ LANGUAGE plpgsql VOLATILE PARALLEL UNSAFE; diff --git a/scripts-enabled/CDB_SyncTable.sql b/scripts-enabled/CDB_SyncTable.sql new file mode 120000 index 0000000..c258915 --- /dev/null +++ b/scripts-enabled/CDB_SyncTable.sql @@ -0,0 +1 @@ +../scripts-available/CDB_SyncTable.sql \ No newline at end of file diff --git a/test/CDB_AnalysisCheckTest.sql b/test/CDB_AnalysisCheckTest.sql index a1aa527..1f85e7a 100644 --- a/test/CDB_AnalysisCheckTest.sql +++ b/test/CDB_AnalysisCheckTest.sql @@ -7,7 +7,7 @@ SELECT _CDB_AnalysisDataSize('public'); CREATE TABLE analysis_2f13a3dbd7_41bd92976fc6dd97072afe4ee450054f4c0715d5(id int); CREATE TABLE analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94(id int); CREATE TABLE analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da9(id int); -SELECT _CDB_AnalysisTablesInSchema('public'); +SELECT _CDB_AnalysisTablesInSchema('public') t ORDER BY t; SELECT _CDB_AnalysisDataSize('public'); SELECT CDB_CheckAnalysisQuota('analysis_2f13a3dbd7_f00cee44e9e6152b450bde3a92eb9ae0d099da94'); SELECT CDB_SetUserQuotaInBytes(1); diff --git a/test/CDB_SyncTableTest.sql b/test/CDB_SyncTableTest.sql new file mode 100644 index 0000000..a945640 --- /dev/null +++ b/test/CDB_SyncTableTest.sql @@ -0,0 +1,58 @@ +-- Setup: create and populate a table to test the syncs +\set QUIET on +BEGIN; +SET client_min_messages TO error; +CREATE TABLE test_sync_source ( + cartodb_id bigint, + lat double precision, + lon double precision, + name text +); +INSERT INTO test_sync_source VALUES + (1, 1.0, 1.0, 'foo'), + (2, 2.0, 2.0, 'bar'), + (3, 3.0, 3.0, 'patata'), + (4, 4.0, 4.0, 'melon'); +SET client_min_messages TO notice; +\set QUIET off + + +\echo 'First table sync: it should be simply just copied to the destination' +SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest'); + +\echo 'Next table sync: there shall be no changes' +SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest'); + +\echo 'Remove a row from the source and check it is deleted from the dest table' +DELETE FROM test_sync_source WHERE cartodb_id = 3; +SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest'); + +\echo 'Insert a new row and check that it is inserted in the dest table' +INSERT INTO test_sync_source VALUES (5, 5.0, 5.0, 'sandia'); +SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest'); + +\echo 'Modify row and check that it is modified in the dest table' +UPDATE test_sync_source SET name = 'cantaloupe' WHERE cartodb_id = 4; +SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest'); + +\echo 'Sanity check: the end result is the same source table' +SELECT * FROM test_sync_source ORDER BY cartodb_id; +SELECT * FROM test_sync_dest ORDER BY cartodb_id; + + +\echo 'It shall exclude geom columns if instructed to do so' +\set QUIET on +SET client_min_messages TO error; +SELECT cartodb.CDB_SetUserQuotaInBytes(0); -- Set user quota to infinite +SELECT cartodb.CDB_CartodbfyTable('test_sync_source'); +SELECT cartodb.CDB_CartodbfyTable('test_sync_dest'); +UPDATE test_sync_dest SET the_geom = cartodb.CDB_LatLng(lat, lon); -- A "gecoding" +\set QUIET off +SET client_min_messages TO notice; +SELECT cartodb.CDB_SyncTable('test_sync_source', 'public', 'test_sync_dest', '{the_geom, the_geom_webmercator}'); +SELECT * FROM test_sync_source; +SELECT * FROM test_sync_dest; + + +-- Cleanup +ROLLBACK; diff --git a/test/CDB_SyncTableTest_expect b/test/CDB_SyncTableTest_expect new file mode 100644 index 0000000..ffe8ee2 --- /dev/null +++ b/test/CDB_SyncTableTest_expect @@ -0,0 +1,61 @@ +First table sync: it should be simply just copied to the destination +NOTICE: INSERTED 4 row(s) + +Next table sync: there shall be no changes +NOTICE: relation "test_sync_dest" already exists, skipping +NOTICE: DELETED 0 row(s) +NOTICE: INSERTED 0 row(s) +NOTICE: MODIFIED 0 row(s) + +Remove a row from the source and check it is deleted from the dest table +DELETE 1 +NOTICE: relation "test_sync_dest" already exists, skipping +NOTICE: DELETED 1 row(s) +NOTICE: INSERTED 0 row(s) +NOTICE: MODIFIED 0 row(s) + +Insert a new row and check that it is inserted in the dest table +INSERT 0 1 +NOTICE: relation "test_sync_dest" already exists, skipping +NOTICE: DELETED 0 row(s) +NOTICE: INSERTED 1 row(s) +NOTICE: MODIFIED 0 row(s) + +Modify row and check that it is modified in the dest table +UPDATE 1 +NOTICE: relation "test_sync_dest" already exists, skipping +NOTICE: DELETED 0 row(s) +NOTICE: INSERTED 0 row(s) +NOTICE: MODIFIED 1 row(s) + +Sanity check: the end result is the same source table +1|1|1|foo +2|2|2|bar +4|4|4|cantaloupe +5|5|5|sandia +1|1|1|foo +2|2|2|bar +4|4|4|cantaloupe +5|5|5|sandia +It shall exclude geom columns if instructed to do so +0 +test_sync_source +test_sync_dest +SET +NOTICE: relation "test_sync_dest" already exists, skipping +NOTICE: cdb_invalidate_varnish(public.test_sync_dest) called +NOTICE: DELETED 0 row(s) +NOTICE: cdb_invalidate_varnish(public.test_sync_dest) called +NOTICE: INSERTED 0 row(s) +NOTICE: cdb_invalidate_varnish(public.test_sync_dest) called +NOTICE: MODIFIED 0 row(s) + +1|||1|1|foo +2|||2|2|bar +5|||5|5|sandia +4|||4|4|cantaloupe +1|0101000020E6100000000000000000F03F000000000000F03F|0101000020110F0000DB0B4ADA772DFB402B432E49D22DFB40|1|1|foo +2|0101000020E610000000000000000000400000000000000040|0101000020110F00003C0C4ADA772D0B4177F404ABE12E0B41|2|2|bar +5|0101000020E610000000000000000014400000000000001440|0101000020110F000099476EE86AFC20413E7EB983F2012141|5|5|sandia +4|0101000020E610000000000000000010400000000000001040|0101000020110F00003C0C4ADA772D1B4160AB497020331B41|4|4|cantaloupe +ROLLBACK