require 'carto/importer/table_setup' module CartoDB module Synchronization class Adapter include ::LoggerHelper STATEMENT_TIMEOUT = (1.hour * 1000).freeze DESTINATION_SCHEMA = 'public'.freeze THE_GEOM = 'the_geom'.freeze OVERWRITE_ERROR = 2013 def initialize(table_name, runner, database, user, overviews_creator, synchronization_id) @table_name = table_name @runner = runner @database = database @user = user @overviews_creator = overviews_creator @failed = false @table_setup = ::Carto::Importer::TableSetup.new( user: user, overviews_creator: overviews_creator, log: runner.log ) @error_code = nil @synchronization_id = synchronization_id end def run(&tracker) runner.run(&tracker) result = runner.results.select(&:success?).first if runner.remote_data_updated? if result.nil? data_for_exception = "Expecting success data for table '#{table_name}'\nResults:#{runner.results.to_s}\n" data_for_exception << "1st result:#{runner.results.first.inspect}" raise data_for_exception end Carto::GhostTablesManager.run_synchronized( user.id, attempts: 10, timeout: 3000, message: "Couldn't acquire bolt to register. Registering sync without bolt", user: user, synchronization_id: @synchronization_id ) do move_to_schema(result) geo_type = fix_the_geom_type!(user.database_schema, result.table_name) import_cleanup(user.database_schema, result.table_name) @table_setup.cartodbfy(result.table_name) overwrite(user.database_schema, table_name, result, geo_type) setup_table(table_name, geo_type) @table_setup.recreate_overviews(table_name) end end self rescue StandardError => exception @failed = true puts '==================' puts exception.to_s puts exception.backtrace puts '==================' drop(result.table_name) if result && exists?(result.table_name) raise exception end def user @user end def overwrite(schema, table_name, result, geo_type) # Determine what kind of overwrite to perform # overwrite_replace substitutes the existing table by the new one, # so any modifications since previous import/sync are lost. # overwrite_sync will preserve columns added since the import/sync, # and the geometry as well if the new table doesn't have it (nil geo_type) # For the time being the latter method will only be used with tables # that have had the geocoder analysis applied, resulting in an column # named carto_geocode_hash being present. # TODO: we could perform the sync if there's any column named `_carto_*` # (carto_geocode_hash would need be renamed as _carto_geocode_hash) sync = has_column(schema, table_name, 'carto_geocode_hash') if sync overwrite_sync(schema, table_name, result, geo_type) else overwrite_replace(schema, table_name, result) end end def overwrite_sync(schema, table_name, result, geo_type) return false unless runner.remote_data_updated? # NOTE the import table is already moved to the user schema; # this was done (#7543) because the cartodbfication performs # queries on CDB_UserQuotaSize and other functions expected # to exist in the schema of the table. qualified_result_table_name = %{"#{schema}"."#{result.table_name}"} skip_columns = '{the_geom, the_geom_webmercator}' database.transaction do if geo_type.nil? # If there's no geometry in the result table, not worth # syncing. Maybe those were added via geocoding database.execute(%{ SELECT cartodb.CDB_SyncTable( '#{qualified_result_table_name}', '#{schema}', '#{table_name}', '#{skip_columns}' )}) else database.execute(%{ SELECT cartodb.CDB_SyncTable( '#{qualified_result_table_name}', '#{schema}', '#{table_name}' )}) end end drop(result.table_name) if exists?(result.table_name) # TODO not sure whether these two are needed @table_setup.fix_oid(table_name) @table_setup.update_cdb_tablemetadata(table_name) rescue StandardError => exception @error_code = OVERWRITE_ERROR puts "Sync overwrite ERROR: #{exception.message}: #{exception.backtrace.join}" # Gets all attributes in the result except for 'log_trace', as it is too long for Rollbar result_hash = CartoDB::Importer2::Result::ATTRIBUTES.map { |m| [m, result.send(m)] if m != 'log_trace' } .compact.to_h log_error(message: 'Error in sync overwrite', exception: exception, result: result_hash) drop(result.table_name) if exists?(result.table_name) raise exception end def overwrite_replace(schema, table_name, result) return false unless runner.remote_data_updated? @table_setup.copy_privileges(schema, table_name, schema, result.table_name) index_statements = @table_setup.generate_index_statements(schema, table_name) temporary_name = temporary_name_for(result.table_name) database.transaction do rename(table_name, temporary_name) if exists?(table_name) drop(temporary_name) if exists?(temporary_name) rename(result.table_name, table_name) end @table_setup.fix_oid(table_name) @table_setup.update_cdb_tablemetadata(table_name) @table_setup.run_index_statements(index_statements, @database) rescue StandardError => exception @error_code = OVERWRITE_ERROR puts "Sync overwrite ERROR: #{exception.message}: #{exception.backtrace.join}" # Gets all attributes in the result except for 'log_trace', as it is too long for Rollbar result_hash = CartoDB::Importer2::Result::ATTRIBUTES.map { |m| [m, result.send(m)] if m != 'log_trace' } .compact.to_h log_error(message: 'Error in sync overwrite', exception: exception, result: result_hash) drop(result.table_name) if exists?(result.table_name) raise exception end def setup_table(table_name, geo_type) table = Carto::UserTable.find(user.tables.where(name: table_name).first.id).service table.force_schema = true table.import_to_cartodb(table_name) table.schema(reload: true) table.reload # We send the detected geometry type to avoid manipulating geoms twice # set_the_geom_column! should just edit the metadata with the specified type table.send :set_the_geom_column!, geo_type table.save rescue StandardError log_error(message: 'Error in setup cartodbfy', exception: exception) ensure @table_setup.fix_oid(table_name) end def has_column(schema_name, table_name, column_name) qualified_table_name = "\"#{schema_name}\".#{table_name}" sql = %{ SELECT TRUE as has_column FROM pg_catalog.pg_attribute a WHERE a.attname = '#{column_name}' AND a.attnum > 0 AND NOT a.attisdropped AND a.attrelid = '#{qualified_table_name}'::regclass::oid LIMIT 1 } result = user.in_database[sql].first result && result[:has_column] end # From Table#get_the_geom_type!, adapted to unregistered tables # returns type to run Table#get_the_geom_type! afterwards again, which # saves the type in table metadata def fix_the_geom_type!(schema_name, table_name) qualified_table_name = "\"#{schema_name}\".#{table_name}" type = nil the_geom_data = user.in_database[%Q{ SELECT a.attname, t.typname FROM pg_attribute a, pg_type t WHERE attrelid = '#{qualified_table_name}'::regclass AND attname = '#{THE_GEOM}' AND a.atttypid = t.oid AND a.attstattarget < 0 LIMIT 1 }].first return nil unless the_geom_data if the_geom_data[:typname] != 'geometry' user.in_database.execute %{ ALTER TABLE #{qualified_table_name} RENAME COLUMN "#{THE_GEOM}" TO "the_geom_str" } return nil end geom_type = user.in_database[%Q{ SELECT GeometryType(#{THE_GEOM}) FROM #{qualified_table_name} WHERE #{THE_GEOM} IS NOT null LIMIT 1 }].first type = geom_type[:geometrytype].to_s.downcase if geom_type # if the geometry is MULTIPOINT we convert it to POINT if type == 'multipoint' user.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database| user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_GeometryN(the_geom,1);") end type = 'point' end # if the geometry is LINESTRING or POLYGON we convert it to MULTILINESTRING or MULTIPOLYGON if %w(linestring polygon).include?(type) user.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database| user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_Multi(the_geom);") type = user_database[%Q{ SELECT GeometryType(#{THE_GEOM}) FROM #{qualified_table_name} WHERE #{THE_GEOM} IS NOT null LIMIT 1 }].first[:geometrytype] end end type end # From Table#import_cleanup, with column schema checks adapted to unregistered tables def import_cleanup(schema_name, table_name) qualified_table_name = "\"#{schema_name}\".#{table_name}" user.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database| # For consistency with regular imports, also eases testing # The sanitization of @table_name is applied to the newly imported table_name # This should not be necessary, since setup_table, called after cartodbfication # also perfomes column sanitization via Table#import_to_cartodb # but is kept because there may be existing syncs for which this double sanitization # (version 1 sanitization which wasn't idemponent) had the effect of altering some # satinizated names (e.g. __1 -> _1). table = ::Table.new(name: @table_name, user_id: @user.id) # we could as well: table = Carto::UserTable.find(@user.tables.where(name: @table_name).first.id).service table.sanitize_columns(table_name: table_name, database_schema: schema_name, connection: user_database) # When tables are created using ogr2ogr they are added a ogc_fid or gid primary key # In that case: # - If cartodb_id already exists, remove ogc_fid # - If cartodb_id does not exist, treat this field as the auxiliary column aux_cartodb_id_column = [:ogc_fid, :gid].find do |col| valid_cartodb_id_candidate?(user, table_name, qualified_table_name, col) end # Remove primary key existing_pk = user_database[%Q{ SELECT c.conname AS pk_name FROM pg_class r, pg_constraint c, pg_namespace n WHERE r.oid = c.conrelid AND contype='p' AND relname = '#{table_name}' AND r.relnamespace = n.oid and n.nspname= '#{schema_name}' }].first existing_pk = existing_pk[:pk_name] unless existing_pk.nil? user_database.run(%Q{ ALTER TABLE #{qualified_table_name} DROP CONSTRAINT "#{existing_pk}" }) unless existing_pk.nil? # All normal fields casted to text varchar_columns = user_database[%Q{ SELECT a.attname, t.typname FROM pg_attribute a, pg_type t WHERE attrelid = '#{qualified_table_name}'::regclass AND typname = 'varchar' AND a.atttypid = t.oid }].all varchar_columns.each do |column| user_database.run(%Q{ALTER TABLE #{qualified_table_name} ALTER COLUMN "#{column[:attname]}" TYPE text}) end # If there's an auxiliary column, copy to cartodb_id and restart the sequence to the max(cartodb_id)+1 if aux_cartodb_id_column.present? begin already_had_cartodb_id = false user_database.run(%Q{ALTER TABLE #{qualified_table_name} ADD COLUMN cartodb_id SERIAL}) rescue StandardError already_had_cartodb_id = true end unless already_had_cartodb_id user_database.run(%Q{UPDATE #{qualified_table_name} SET cartodb_id = CAST(#{aux_cartodb_id_column} AS INTEGER)}) cartodb_id_sequence_name = user_database["SELECT pg_get_serial_sequence('#{schema_name}.#{table_name}', 'cartodb_id')"].first[:pg_get_serial_sequence] max_cartodb_id = user_database[%Q{SELECT max(cartodb_id) FROM #{qualified_table_name}}].first[:max] # only reset the sequence on real imports. if max_cartodb_id user_database.run("ALTER SEQUENCE #{cartodb_id_sequence_name} RESTART WITH #{max_cartodb_id + 1}") end end user_database.run(%Q{ALTER TABLE #{qualified_table_name} DROP COLUMN #{aux_cartodb_id_column}}) end end end def success? (!@failed && runner.success?) end def etag runner.etag end def last_modified runner.last_modified end def checksum runner.checksum end def move_to_schema(result, schema=DESTINATION_SCHEMA) # The new table to sync is moved to user schema to allow CartoDBfication. # This temporary table should not be registered (check ghost_tables_manager.rb) return self if schema == result.schema database.execute(%Q{ ALTER TABLE "#{result.schema}"."#{result.table_name}" SET SCHEMA "#{user.database_schema}" }) end def rename(current_name, new_name) database.execute(%Q{ ALTER TABLE "#{user.database_schema}"."#{current_name}" RENAME TO #{new_name} }) end def drop(table_name) database.execute(%Q(DROP TABLE "#{user.database_schema}"."#{table_name}")) end def exists?(table_name) database.table_exists?(table_name) end def results runner.results end def error_code @error_code || runner.results.map(&:error_code).compact.first end def runner_log_trace runner.results.map(&:log_trace).compact.first end def error_message '' end def temporary_name_for(table_name) "#{table_name}_to_be_deleted" end private def valid_cartodb_id_candidate?(user, table_name, qualified_table_name, col_name) return false unless column_names(user, table_name).include?(col_name) user.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT, as: :superuser) do |db| return db["SELECT 1 FROM #{qualified_table_name} WHERE #{col_name} IS NULL LIMIT 1"].first.nil? end end def column_names(user, table_name) user.in_database.schema(table_name, schema: user.database_schema).map { |row| row[0] } rescue StandardError => e log_error(message: 'Error in column_names from sync adapter', exception: e) [] end attr_reader :table_name, :runner, :database, :user def log_context super.merge(table: { name: table_name }, current_user: user) end end end end