require 'uuidtools' require 'carto/importer/table_setup' require_relative '../models/visualization/support_tables' require_relative '../../lib/carto/ghost_tables_manager' require_dependency 'carto/db/user_schema' require_dependency 'visualization/derived_creator' module CartoDB module Connector class Importer ORIGIN_SCHEMA = 'cdb_importer' DESTINATION_SCHEMA = 'public' MAX_RENAME_RETRIES = 20 # The following columns are not validated because we are comparing schemas of a cartodbfied table and one that # is about to be imported. COLUMNS_NOT_TO_VALIDATE = [:cartodb_id, :the_geom_webmercator].freeze attr_reader :imported_table_visualization_ids, :rejected_layers attr_accessor :table # @param runner CartoDB::Importer2::Runner # @param table_registrar CartoDB::TableRegistrar # @param quota_checker CartoDB::QuotaChecker # @param database # @param data_import_id String UUID # @param destination_schema String|nil # @param public_user_roles Array|nil def initialize(runner:, table_registrar:, quota_checker:, database:, data_import_id:, overviews_creator: nil, destination_schema: DESTINATION_SCHEMA, public_user_roles: [CartoDB::PUBLIC_DB_USER], collision_strategy: nil) @aborted = false @runner = runner @table_registrar = table_registrar @quota_checker = quota_checker @database = database @data_import_id = data_import_id @overviews_creator = overviews_creator @destination_schema = destination_schema @support_tables_helper =, {public_user_roles: public_user_roles}) @imported_table_visualization_ids = [] @rejected_layers = [] @collision_strategy = collision_strategy @table_setup = user: user, overviews_creator: overviews_creator, log: runner.log ) end def run(tracker) if quota_checker.will_be_over_table_quota?(results.length) log('Results would set overquota') @aborted = true results.each { |result| drop(result.table_name) } else log('Checking public map quota') vis = do |v| v.type == Carto::Visualization::TYPE_DERIVED && v.privacy != Carto::Visualization::PRIVACY_PRIVATE end if log('Results would set public map overquota') raise end log('Proceeding to register') register_results(results) { |result| create_overviews(result) } create_visualization if data_import.create_visualization end self end def register_results(results) Carto::GhostTablesManager.run_synchronized(, attempts: 10, timeout: 3000, message: "Couldn't acquire bolt to register. Registering without bolt", user: user, import_id: @data_import_id ) do do |result| register(result) end end end def register(result) @support_tables_helper.reset log("Before renaming from #{result.table_name} to #{}") names_taken = overwrite_strategy? ? [] : taken_names name =, taken_names: names_taken) overwrite = overwrite_strategy? && taken_names.include?(name) if overwrite raise unless compatible_schemas_for_overwrite?(name) move_to_schema(result, result.table_name, ORIGIN_SCHEMA, @destination_schema) overwrite_register(result, name) do drop("\"#{@destination_schema}\".\"#{name}\"") rename(result, result.table_name, name, @destination_schema) end else new_register(name, result) end = name log("Before persisting metadata '#{name}' data_import_id: #{data_import_id}") persist_metadata(name, data_import_id, overwrite) log("Table '#{name}' registered") rescue => exception if exception.message =~ /canceling statement due to statement timeout/i drop("#{ORIGIN_SCHEMA}.#{result.table_name}") raise exception.message, CartoDB::Importer2::ERRORS_MAP[CartoDB::Importer2::StatementTimeoutError] ) else raise exception end end def create_overviews(result) dataset = @overviews_creator.dataset( dataset.create_overviews! rescue => exception # In case of overview creation failure we'll just omit the # overviews creation and continue with the process. # Since the actual creation is handled by a single SQL # function, and thus executed in a transaction, we shouldn't # need any clean up here. (Either all overviews were created # or nothing changed) log("Overviews creation failed: #{exception.message}") CartoDB::Logger.error( message: "Overviews creation failed", exception: exception, user: Carto::User.find(data_import.user_id), table_name: ) end def create_visualization if runner.visualizations.empty? create_default_visualization else user = Carto::User.find(data_import.user_id) renamed_tables = { |r| [r.original_name,] }.to_h runner.visualizations.each do |visualization| persister = vis = persister.save_import(user, visualization, renamed_tables: renamed_tables) bind_visualization_to_data_import(vis) end end end def create_default_visualization tables = get_imported_tables if tables.length > 0 user = ::User.where(id: data_import.user_id).first vis, @rejected_layers =, tables).create bind_visualization_to_data_import(vis) end end def bind_visualization_to_data_import(vis) data_import.visualization_id = data_import.reload end def get_imported_tables do |table_id| Carto::Visualization.find(table_id).table end end def success? !over_table_quota? && runner.success? end def drop_all(results) results.each { |result| drop(result.qualified_table_name) } end def drop(table_name) table_name database.execute(%(DROP TABLE #{table_name})) rescue => exception log("Couldn't drop table #{table_name}: #{exception}. Backtrace: #{exception.backtrace} ") self end def move_to_schema(result, table_name, origin_schema, destination_schema) return self if origin_schema == destination_schema database.execute(%Q{ ALTER TABLE "#{origin_schema}"."#{table_name}" SET SCHEMA "#{destination_schema}" }) @support_tables_helper.tables = { |table| { schema: origin_schema, name: table } } @support_tables_helper.change_schema(destination_schema, table_name) rescue => e drop("#{origin_schema}.#{table_name}") raise e end # Renames table from current_name to new_name. # It doesn't check if `new_name` is valid. To get a valid name use `Carto::ValidTableNameProposer` def rename(result, current_name, new_name, schema) database.execute(%{ ALTER TABLE "#{schema}"."#{current_name}" RENAME TO "#{new_name}" }) rename_the_geom_index_if_exists(current_name, new_name, schema) @support_tables_helper.tables = { |table| { schema: schema, name: table } } # Delay recreation of constraints until schema change results = @support_tables_helper.rename(current_name, new_name, false) if results[:success] result.update_support_tables(results[:names]) else raise 'unsuccessful support tables renaming' end new_name rescue => exception drop("#{schema}.#{current_name}") CartoDB::Logger.debug(message: 'Error in table rename: dropping importer table', exception: exception, table_name: current_name, new_table_name: new_name, data_import: @data_import_id) raise exception end def rename_the_geom_index_if_exists(current_name, new_name, schema) database.execute(%Q{ ALTER INDEX IF EXISTS "#{schema}"."#{current_name}_geom_idx" RENAME TO "the_geom_#{UUIDTools::UUID.timestamp_create.to_s.gsub('-', '_')}" }) rescue => exception log("Silently failed rename_the_geom_index_if_exists from " + "#{current_name} to #{new_name} with exception #{exception}. " + "Backtrace: #{exception.backtrace}. ") end def persist_metadata(name, data_import_id, overwrite_table) user_table = overwrite_strategy? ? Carto::UserTable.where(user_id:, name: name).first : nil table_registrar.register(name, data_import_id, user_table) @table = table_registrar.table @imported_table_visualization_ids << unless overwrite_table table.update_bounding_box self end def results runner.results end def over_table_quota? @aborted || quota_checker.over_table_quota? end def error_code return 8002 if over_table_quota? end def data_import @data_import ||= DataImport[@data_import_id] end def overwrite_register(result, name) index_statements = @table_setup.generate_index_statements(@destination_schema, name) database.transaction do log("Replacing #{name} with #{result.table_name}") begin # the logic inside the transaction may vary, so we let the caller to implement it yield(database, @destination_schema) rescue => e log("Unable to replace #{name} with #{result.table_name}. Rollingback transaction and dropping #{result.table_name}: #{e}") drop("\"#{@destination_schema}\".\"#{result.table_name}\"") raise e end end @table_setup.cartodbfy(name) @table_setup.fix_oid(name) @table_setup.run_index_statements(index_statements, @database) @table_setup.recreate_overviews(name) @table_setup.update_cdb_tablemetadata(name) end private def new_register(name, result) database.transaction do rename(result, result.table_name, name, ORIGIN_SCHEMA) begin log("Before moving schema '#{name}' from #{ORIGIN_SCHEMA} to #{@destination_schema}") move_to_schema(result, name, ORIGIN_SCHEMA, @destination_schema) rescue => e log("Error replacing data in import: #{e}: #{e.backtrace}") raise e end end end def compatible_schemas_for_overwrite?(name) orig_schema = user.in_database.schema(results.first.tables.first, reload: true, schema: ORIGIN_SCHEMA) dest_schema = user.in_database.schema(name, reload: true, schema: user.database_schema) dest_schema.each do |dest_row| next if COLUMNS_NOT_TO_VALIDATE.include?(dest_row[0]) return false unless orig_schema.any? { |orig_row| rows_assignable?(dest_row, orig_row) } end true end def rows_assignable?(dest_row, orig_row) (orig_row[0] == :the_geom && orig_row[0] == dest_row[0] && dest_row[1][:db_type].include?(orig_row[1][:db_type])) || (orig_row[0] == dest_row[0] && orig_row[1][:db_type].convert_to_cartodb_type == dest_row[1][:db_type].convert_to_cartodb_type) end def user table_registrar.user end def overwrite_strategy? @collision_strategy == Carto::DataImportConstants::COLLISION_STRATEGY_OVERWRITE end def log(message) runner.log.append(message) end def exists_user_table_for_user_id(table_name, user_id) !Carto::UserTable.where(name: table_name, user_id: user_id).first.nil? end def taken_names end attr_reader :runner, :table_registrar, :quota_checker, :database, :data_import_id end end end