@ -11,7 +11,7 @@ module CartoDB
THE_GEOM = 'the_geom' . freeze
THE_GEOM = 'the_geom' . freeze
OVERWRITE_ERROR = 2013
OVERWRITE_ERROR = 2013
def initialize ( table_name , runner , database , user , overviews_creator , synchronization_id )
def initialize ( table_name , runner , database , user , overviews_creator , synchronization_id , logger = nil )
@table_name = table_name
@table_name = table_name
@runner = runner
@runner = runner
@database = database
@database = database
@ -25,6 +25,7 @@ module CartoDB
)
)
@error_code = nil
@error_code = nil
@synchronization_id = synchronization_id
@synchronization_id = synchronization_id
@logger = logger
end
end
def run ( & tracker )
def run ( & tracker )
@ -140,11 +141,7 @@ module CartoDB
table_statements = @table_setup . generate_table_statements ( schema , table_name )
table_statements = @table_setup . generate_table_statements ( schema , table_name )
temporary_name = temporary_name_for ( result . table_name )
temporary_name = temporary_name_for ( result . table_name )
database . transaction do
swap_tables ( table_name , temporary_name , result )
rename ( table_name , temporary_name ) if exists? ( table_name )
drop ( temporary_name ) if exists? ( temporary_name )
rename ( result . table_name , table_name )
end
@table_setup . fix_oid ( table_name )
@table_setup . fix_oid ( table_name )
@table_setup . update_cdb_tablemetadata ( table_name )
@table_setup . update_cdb_tablemetadata ( table_name )
@table_setup . run_table_statements ( table_statements , @database )
@table_setup . run_table_statements ( table_statements , @database )
@ -389,6 +386,43 @@ module CartoDB
private
private
def swap_tables ( table_name , temporary_name , result )
database . transaction do
rename ( table_name , temporary_name ) if exists? ( table_name )
drop ( temporary_name ) if exists? ( temporary_name )
rename ( result . table_name , table_name )
end
rescue Exception = > exception
if exception . message . include? ( 'canceling statement due to statement timeout' )
# Check if the table has any lock and cancel locking queries
locks = user . in_database ( as : :superuser ) . fetch ( %Q{
SELECT pid , query
FROM pg_stat_activity
WHERE pid in (
SELECT pid FROM pg_locks l
JOIN pg_class t ON l . relation = t . oid
AND t . relkind = 'r'
WHERE t . relname IN ( '#{table_name}' )
) ;
} ) . all
@logger . append_and_store " Transaction timed out as the table is blocked by other queries. Terminating locking queries and retrying in 60 seconds... " if @logger && locks . present?
locks . each do | lock |
@logger . append_and_store " Terminating query: #{ lock [ :query ] } " if @logger
user . in_database ( as : :superuser ) . execute %Q{
SELECT pg_terminate_backend ( #{lock[:pid]});
}
end
sleep ( 60 ) # wait 60 seconds and retry the swap
database . transaction do
rename ( table_name , temporary_name ) if exists? ( table_name )
drop ( temporary_name ) if exists? ( temporary_name )
rename ( result . table_name , table_name )
end
else
raise exception
end
end
def valid_cartodb_id_candidate? ( user , table_name , qualified_table_name , col_name )
def valid_cartodb_id_candidate? ( user , table_name , qualified_table_name , col_name )
return false unless column_names ( user , table_name ) . include? ( col_name )
return false unless column_names ( user , table_name ) . include? ( col_name )
user . transaction_with_timeout ( statement_timeout : STATEMENT_TIMEOUT , as : :superuser ) do | db |
user . transaction_with_timeout ( statement_timeout : STATEMENT_TIMEOUT , as : :superuser ) do | db |