Third iteration, expect a viable cartodb_id

This commit is contained in:
Carla Iriberri 2016-03-02 14:58:53 +01:00
parent 0d5f83b3c4
commit a18cbeb2cd

View File

@ -466,33 +466,20 @@ END;
$$ LANGUAGE 'plpgsql';
DO $$
BEGIN
IF NOT EXISTS (SELECT 1 FROM pg_type WHERE typname = '_cdb_has_usable_primary_id_record') THEN
CREATE TYPE _cdb_has_usable_primary_id_record
AS (has_usable_primary_key boolean,
text_key_column boolean);
END IF;
END$$;
-- Find out if the table already has a usable primary key
-- If the table has both a usable key and usable geometry
-- we can no-op on the table copy and just ensure that the
-- indexes and triggers are in place
CREATE OR REPLACE FUNCTION cartodb._CDB_Has_Usable_Primary_ID(reloid REGCLASS)
RETURNS _cdb_has_usable_primary_id_record
CREATE OR REPLACE FUNCTION _CDB_Has_Usable_Primary_ID(reloid REGCLASS)
RETURNS BOOLEAN
AS $$
DECLARE
rec RECORD;
const RECORD;
idc RECORD;
i INTEGER;
sql TEXT;
useable_key BOOLEAN = false;
has_usable_primary_key BOOLEAN = false;
-- In case cartodb_id is a text column
text_key_column BOOLEAN = false;
BEGIN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', 'entered function';
@ -503,138 +490,96 @@ BEGIN
-- Do we already have a properly named column?
SELECT a.attname, i.indisprimary, i.indisunique, a.attnotnull, a.atttypid
INTO rec
FROM pg_class c
JOIN pg_attribute a ON a.attrelid = c.oid
FROM pg_class c
JOIN pg_attribute a ON a.attrelid = c.oid
JOIN pg_type t ON a.atttypid = t.oid
LEFT JOIN pg_index i ON c.oid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE c.oid = reloid
WHERE c.oid = reloid
AND NOT a.attisdropped
AND a.attname = const.pkey;
-- Found something named right...
IF FOUND THEN
-- And it's an integer or varchar column...
IF rec.atttypid IN (20,21,23,1043) THEN
-- And it's a unique primary key! Done!
IF (rec.indisprimary OR rec.indisunique) AND rec.attnotnull THEN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', Format('found good ''%s''', const.pkey);
RETURN true;
-- And it's a unique primary key! Done!
IF (rec.indisprimary OR rec.indisunique) AND rec.attnotnull THEN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', Format('found good ''%s''', const.pkey);
SELECT true as has_usable_primary_key, text_key_column INTO idc;
RETURN idc;
-- Check and see if the column values are unique and not null,
-- if they are, we can use this column...
ELSE
-- Check and see if the column values are unique and not null,
-- if they are, we can use this column...
-- Assume things are OK until proven otherwise...
useable_key := true;
BEGIN
sql := Format('ALTER TABLE %s ADD CONSTRAINT %s_pk PRIMARY KEY (%s)', reloid::text, const.pkey, const.pkey);
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', sql;
EXECUTE sql;
EXCEPTION
-- Failed unique check...
WHEN unique_violation THEN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', Format('column %s is not unique', const.pkey);
useable_key := false;
-- Failed not null check...
WHEN not_null_violation THEN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', Format('column %s contains nulls', const.pkey);
useable_key := false;
-- Other fatal error
WHEN others THEN
PERFORM _CDB_Error(sql, '_CDB_Has_Usable_Primary_ID');
END;
-- Clean up test constraint
IF useable_key THEN
PERFORM _CDB_SQL(Format('ALTER TABLE %s DROP CONSTRAINT %s_pk', reloid::text, const.pkey));
-- Move non-unique column out of the way
ELSE
-- Assume things are OK until proven otherwise...
useable_key := true;
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %',
Format('found non-unique ''%s'', renaming it', const.pkey);
-- If the column is varchar, try to cast it to integer
IF rec.atttypid IN (1043) THEN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): Found text column %', rec.atttypid;
BEGIN
sql := Format('SELECT %I::integer FROM %s', rec.attname, reloid::text);
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): Running %', sql;
EXECUTE sql;
text_key_column := true
EXCEPTION
WHEN invalid_text_representation THEN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): Column % of type text is not a valid integer column', rec.attname;
useable_key := false;
WHEN others THEN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): Exception %', SQLSTATE;
useable_key := false;
END;
END IF;
IF useable_key THEN
BEGIN
sql := Format('ALTER TABLE %s ADD CONSTRAINT %s_pk PRIMARY KEY (%s)', reloid::text, const.pkey, const.pkey);
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', sql;
EXECUTE sql;
EXCEPTION
-- Failed unique check...
WHEN unique_violation THEN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', Format('column %s is not unique', const.pkey);
useable_key := false;
-- Failed not null check...
WHEN not_null_violation THEN
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', Format('column %s contains nulls', const.pkey);
useable_key := false;
-- Other fatal error
WHEN others THEN
PERFORM _CDB_Error(sql, '_CDB_Has_Usable_Primary_ID');
END;
END IF;
-- Clean up test constraint
IF useable_key THEN
PERFORM _CDB_SQL(Format('ALTER TABLE %s DROP CONSTRAINT %s_pk', reloid::text, const.pkey));
-- Move non-unique column out of the way
ELSE
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %',
Format('found non-unique ''%s'', renaming it', const.pkey);
PERFORM _CDB_SQL(
Format('ALTER TABLE %s RENAME COLUMN %s TO %I',
reloid::text, rec.attname,
cartodb._CDB_Unique_Column_Identifier(NULL, const.pkey, NULL, reloid)),
'_CDB_Has_Usable_Primary_ID');
END IF;
SELECT useable_key as has_usable_primary_key, text_key_column INTO idc;
RETURN idc;
PERFORM _CDB_SQL(
Format('ALTER TABLE %s RENAME COLUMN %s TO %I',
reloid::text, rec.attname,
cartodb._CDB_Unique_Column_Identifier(NULL, const.pkey, NULL, reloid)),
'_CDB_Has_Usable_Primary_ID');
END IF;
-- It's not an integer column, we have to rename it
ELSE
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %',
Format('found non-integer ''%s'', renaming it', const.pkey);
PERFORM _CDB_SQL(
Format('ALTER TABLE %s RENAME COLUMN %s TO %I',
reloid::text, rec.attname, cartodb._CDB_Unique_Column_Identifier(NULL, const.pkey, NULL, reloid)),
'_CDB_Has_Usable_Primary_ID');
RETURN useable_key;
END IF;
-- There's no column there named pkey
ELSE
-- Is there another suitable primary key already?
-- Is there another integer suitable primary key already?
SELECT a.attname
INTO rec
FROM pg_class c
JOIN pg_attribute a ON a.attrelid = c.oid
JOIN pg_attribute a ON a.attrelid = c.oid
JOIN pg_type t ON a.atttypid = t.oid
LEFT JOIN pg_index i ON c.oid = i.indrelid AND a.attnum = ANY(i.indkey)
WHERE c.oid = reloid AND NOT a.attisdropped
AND i.indisprimary AND i.indisunique AND a.attnotnull AND a.atttypid IN (20,21,23);
-- Yes! Ok, rename it.
IF FOUND THEN
PERFORM _CDB_SQL(Format('ALTER TABLE %s RENAME COLUMN %s TO %s', reloid::text, rec.attname, const.pkey),'_CDB_Has_Usable_Primary_ID');
SELECT true as has_usable_primary_key, text_key_column INTO idc;
RETURN idc;
RETURN true;
ELSE
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %',
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %',
Format('found no useful column for ''%s''', const.pkey);
END IF;
END IF;
RAISE DEBUG 'CDB(_CDB_Has_Usable_Primary_ID): %', 'function complete';
-- Didn't find re-usable key, so return FALSE
SELECT false as has_usable_primary_key, text_key_column INTO idc;
RETURN idc;
RETURN false;
END;
$$ LANGUAGE 'plpgsql';
@ -837,7 +782,7 @@ $$ LANGUAGE 'plpgsql';
-- a "good" one, and the same for the geometry columns. If all the required
-- columns are in place already, it no-ops and just renames the table to
-- the destination if necessary.
CREATE OR REPLACE FUNCTION cartodb._CDB_Rewrite_Table(reloid REGCLASS, destschema TEXT DEFAULT NULL)
CREATE OR REPLACE FUNCTION _CDB_Rewrite_Table(reloid REGCLASS, destschema TEXT DEFAULT NULL)
RETURNS BOOLEAN
AS $$
DECLARE
@ -858,15 +803,15 @@ DECLARE
rec RECORD;
const RECORD;
idc RECORD;
gc RECORD;
sql TEXT;
str TEXT;
table_srid INTEGER;
geom_srid INTEGER;
has_usable_primary_key BOOLEAN;
has_usable_pk_sequence BOOLEAN;
BEGIN
RAISE DEBUG 'CDB(_CDB_Rewrite_Table): %', 'entered function';
@ -877,7 +822,7 @@ BEGIN
-- Save the raw schema/table names for later
SELECT n.nspname, c.relname, c.relname
INTO STRICT relschema, relname, destname
FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid
FROM pg_class c JOIN pg_namespace n ON c.relnamespace = n.oid
WHERE c.oid = reloid;
-- Default the destination to current schema if unspecified
@ -888,16 +833,15 @@ BEGIN
-- See if there is a primary key column we need to carry along to the
-- new table. If this is true, it implies there is an indexed
-- primary key of integer type named (by default) cartodb_id
SELECT has_usable_primary_key, text_key_column
FROM _CDB_Has_Usable_Primary_ID(reloid)
INTO STRICT idc;
SELECT _CDB_Has_Usable_Primary_ID(reloid)
INTO STRICT has_usable_primary_key;
RAISE DEBUG 'CDB(_CDB_Rewrite_Table): has_usable_primary_key %', idc.has_usable_primary_key;
RAISE DEBUG 'CDB(_CDB_Rewrite_Table): has_usable_primary_key %', has_usable_primary_key;
-- See if the candidate primary key column has a sequence for default
-- values. No usable pk implies has_usable_pk_sequence = false.
has_usable_pk_sequence := false;
IF idc.has_usable_primary_key THEN
IF has_usable_primary_key THEN
SELECT _CDB_Has_Usable_PK_Sequence(reloid)
INTO STRICT has_usable_pk_sequence;
END IF;
@ -929,19 +873,12 @@ BEGIN
RAISE DEBUG 'CDB(_CDB_Rewrite_Table): has_usable_geoms %', gc.has_usable_geoms;
-- We can only avoid a rewrite if both the key and
-- We can only avoid a rewrite if both the key and
-- geometry are usable
-- If cartodb_id column is of type text, cast it
IF idc.text_key_column THEN
sql := Format('ALTER TABLE %s ALTER cartodb_id TYPE int USING %I::integer', reloid::text, 'cartodb_id');
PERFORM _CDB_SQL(sql,'_CDB_Rewrite_Table');
END IF;
RAISE DEBUG 'idc %', idc;
-- No table re-write is required, BUT a rename is required to
-- a destination schema, so do that now
IF idc.has_usable_primary_key AND has_usable_pk_sequence AND gc.has_usable_geoms THEN
IF has_usable_primary_key AND has_usable_pk_sequence AND gc.has_usable_geoms THEN
IF destschema != relschema THEN
RAISE DEBUG 'CDB(_CDB_Rewrite_Table): perfect table needs to be moved to schema (%)', destschema;
@ -979,8 +916,8 @@ BEGIN
sql := Format('CREATE TABLE %s AS SELECT ', copyname);
-- Add cartodb ID!
IF idc.has_usable_primary_key THEN
sql := sql || const.pkey;
IF has_usable_primary_key THEN
sql := sql || const.pkey || '::bigint ';
ELSE
sql := sql || 'nextval(''' || destseq || ''') AS ' || const.pkey;
END IF;