You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

368 lines
13 KiB

require 'uuidtools'
require 'carto/importer/table_setup'
require_relative '../models/visualization/support_tables'
require_relative '../../lib/carto/ghost_tables_manager'
require_dependency 'carto/db/user_schema'
require_dependency 'visualization/derived_creator'
module CartoDB
module Connector
class Importer
ORIGIN_SCHEMA = 'cdb_importer'
DESTINATION_SCHEMA = 'public'
MAX_RENAME_RETRIES = 20
# The following columns are not validated because we are comparing schemas of a cartodbfied table and one that
# is about to be imported.
COLUMNS_NOT_TO_VALIDATE = [:cartodb_id, :the_geom_webmercator].freeze
attr_reader :imported_table_visualization_ids, :rejected_layers
attr_accessor :table
# @param runner CartoDB::Importer2::Runner
# @param table_registrar CartoDB::TableRegistrar
# @param quota_checker CartoDB::QuotaChecker
# @param database
# @param data_import_id String UUID
# @param destination_schema String|nil
# @param public_user_roles Array|nil
def initialize(runner:, table_registrar:, quota_checker:, database:, data_import_id:, overviews_creator: nil,
destination_schema: DESTINATION_SCHEMA, public_user_roles: [CartoDB::PUBLIC_DB_USER],
collision_strategy: nil)
@aborted = false
@runner = runner
@table_registrar = table_registrar
@quota_checker = quota_checker
@database = database
@data_import_id = data_import_id
@overviews_creator = overviews_creator
@destination_schema = destination_schema
@support_tables_helper = CartoDB::Visualization::SupportTables.new(database,
{public_user_roles: public_user_roles})
@imported_table_visualization_ids = []
@rejected_layers = []
@collision_strategy = collision_strategy
@table_setup = ::Carto::Importer::TableSetup.new(
user: user,
overviews_creator: overviews_creator,
log: runner.log
)
end
def run(tracker)
runner.run(&tracker)
if quota_checker.will_be_over_table_quota?(results.length)
log('Results would set overquota')
@aborted = true
results.each { |result|
drop(result.table_name)
}
else
log('Checking public map quota')
vis = runner.visualizations.select do |v|
v.type == Carto::Visualization::TYPE_DERIVED && v.privacy != Carto::Visualization::PRIVACY_PRIVATE
end
if CartoDB::QuotaChecker.new(data_import.user).will_be_over_public_map_quota?(vis.count)
log('Results would set public map overquota')
raise CartoDB::Importer2::MapQuotaExceededError.new
end
log('Proceeding to register')
register_results(results)
results.select(&:success?).each { |result|
create_overviews(result)
}
create_visualization if data_import.create_visualization
end
self
end
def register_results(results)
Carto::GhostTablesManager.run_synchronized(
user.id, attempts: 10, timeout: 3000,
message: "Couldn't acquire bolt to register. Registering without bolt",
user: user,
import_id: @data_import_id
) do
results.select(&:success?).each do |result|
register(result)
end
end
end
def register(result)
@support_tables_helper.reset
log("Before renaming from #{result.table_name} to #{result.name}")
names_taken = overwrite_strategy? ? [] : taken_names
name = Carto::ValidTableNameProposer.new.propose_valid_table_name(result.name, taken_names: names_taken)
overwrite = overwrite_strategy? && taken_names.include?(name)
if overwrite
raise ::CartoDB::Importer2::IncompatibleSchemas.new unless compatible_schemas_for_overwrite?(name)
move_to_schema(result, result.table_name, ORIGIN_SCHEMA, @destination_schema)
overwrite_register(result, name) do
drop("\"#{@destination_schema}\".\"#{name}\"")
rename(result, result.table_name, name, @destination_schema)
end
else
new_register(name, result)
end
result.name = name
log("Before persisting metadata '#{name}' data_import_id: #{data_import_id}")
persist_metadata(name, data_import_id, overwrite)
log("Table '#{name}' registered")
rescue => exception
if exception.message =~ /canceling statement due to statement timeout/i
drop("#{ORIGIN_SCHEMA}.#{result.table_name}")
raise CartoDB::Importer2::StatementTimeoutError.new(
exception.message,
CartoDB::Importer2::ERRORS_MAP[CartoDB::Importer2::StatementTimeoutError]
)
else
raise exception
end
end
def create_overviews(result)
dataset = @overviews_creator.dataset(result.name)
dataset.create_overviews!
rescue => exception
# In case of overview creation failure we'll just omit the
# overviews creation and continue with the process.
# Since the actual creation is handled by a single SQL
# function, and thus executed in a transaction, we shouldn't
# need any clean up here. (Either all overviews were created
# or nothing changed)
log("Overviews creation failed: #{exception.message}")
CartoDB::Logger.error(
message: "Overviews creation failed",
exception: exception,
user: Carto::User.find(data_import.user_id),
table_name: result.name
)
end
def create_visualization
if runner.visualizations.empty?
create_default_visualization
else
user = Carto::User.find(data_import.user_id)
renamed_tables = results.map { |r| [r.original_name, r.name] }.to_h
runner.visualizations.each do |visualization|
persister = Carto::VisualizationsExportPersistenceService.new
vis = persister.save_import(user, visualization, renamed_tables: renamed_tables)
bind_visualization_to_data_import(vis)
end
end
end
def create_default_visualization
tables = get_imported_tables
if tables.length > 0
user = ::User.where(id: data_import.user_id).first
vis, @rejected_layers = CartoDB::Visualization::DerivedCreator.new(user, tables).create
bind_visualization_to_data_import(vis)
end
end
def bind_visualization_to_data_import(vis)
data_import.visualization_id = vis.id
data_import.save
data_import.reload
end
def get_imported_tables
@imported_table_visualization_ids.map do |table_id|
Carto::Visualization.find(table_id).table
end
end
def success?
!over_table_quota? && runner.success?
end
def drop_all(results)
results.each { |result| drop(result.qualified_table_name) }
end
def drop(table_name)
Carto::OverviewsService.new(database).delete_overviews table_name
database.execute(%(DROP TABLE #{table_name}))
rescue => exception
log("Couldn't drop table #{table_name}: #{exception}. Backtrace: #{exception.backtrace} ")
self
end
def move_to_schema(result, table_name, origin_schema, destination_schema)
return self if origin_schema == destination_schema
database.execute(%Q{
ALTER TABLE "#{origin_schema}"."#{table_name}"
SET SCHEMA "#{destination_schema}"
})
@support_tables_helper.tables = result.support_tables.map { |table|
{ schema: origin_schema, name: table }
}
@support_tables_helper.change_schema(destination_schema, table_name)
rescue => e
drop("#{origin_schema}.#{table_name}")
raise e
end
# Renames table from current_name to new_name.
# It doesn't check if `new_name` is valid. To get a valid name use `Carto::ValidTableNameProposer`
def rename(result, current_name, new_name, schema)
database.execute(%{
ALTER TABLE "#{schema}"."#{current_name}" RENAME TO "#{new_name}"
})
rename_the_geom_index_if_exists(current_name, new_name, schema)
@support_tables_helper.tables = result.support_tables.map { |table|
{ schema: schema, name: table }
}
# Delay recreation of constraints until schema change
results = @support_tables_helper.rename(current_name, new_name, false)
if results[:success]
result.update_support_tables(results[:names])
else
raise 'unsuccessful support tables renaming'
end
new_name
rescue => exception
drop("#{schema}.#{current_name}")
CartoDB::Logger.debug(message: 'Error in table rename: dropping importer table',
exception: exception,
table_name: current_name,
new_table_name: new_name,
data_import: @data_import_id)
raise exception
end
def rename_the_geom_index_if_exists(current_name, new_name, schema)
database.execute(%Q{
ALTER INDEX IF EXISTS "#{schema}"."#{current_name}_geom_idx"
RENAME TO "the_geom_#{UUIDTools::UUID.timestamp_create.to_s.gsub('-', '_')}"
})
rescue => exception
log("Silently failed rename_the_geom_index_if_exists from " +
"#{current_name} to #{new_name} with exception #{exception}. " +
"Backtrace: #{exception.backtrace}. ")
end
def persist_metadata(name, data_import_id, overwrite_table)
user_table = overwrite_strategy? ? Carto::UserTable.where(user_id: user.id, name: name).first : nil
table_registrar.register(name, data_import_id, user_table)
@table = table_registrar.table
@imported_table_visualization_ids << @table.table_visualization.id unless overwrite_table
table.update_bounding_box
self
end
def results
runner.results
end
def over_table_quota?
@aborted || quota_checker.over_table_quota?
end
def error_code
return 8002 if over_table_quota?
results.map(&:error_code).compact.first
end
def data_import
@data_import ||= DataImport[@data_import_id]
end
def overwrite_register(result, name)
index_statements = @table_setup.generate_index_statements(@destination_schema, name)
database.transaction do
log("Replacing #{name} with #{result.table_name}")
begin
# the logic inside the transaction may vary, so we let the caller to implement it
yield(database, @destination_schema)
rescue => e
log("Unable to replace #{name} with #{result.table_name}. Rollingback transaction and dropping #{result.table_name}: #{e}")
drop("\"#{@destination_schema}\".\"#{result.table_name}\"")
raise e
end
end
@table_setup.cartodbfy(name)
@table_setup.fix_oid(name)
@table_setup.run_index_statements(index_statements, @database)
@table_setup.recreate_overviews(name)
@table_setup.update_cdb_tablemetadata(name)
end
private
def new_register(name, result)
database.transaction do
rename(result, result.table_name, name, ORIGIN_SCHEMA)
begin
log("Before moving schema '#{name}' from #{ORIGIN_SCHEMA} to #{@destination_schema}")
move_to_schema(result, name, ORIGIN_SCHEMA, @destination_schema)
rescue => e
log("Error replacing data in import: #{e}: #{e.backtrace}")
raise e
end
end
end
def compatible_schemas_for_overwrite?(name)
orig_schema = user.in_database.schema(results.first.tables.first, reload: true, schema: ORIGIN_SCHEMA)
dest_schema = user.in_database.schema(name, reload: true, schema: user.database_schema)
dest_schema.each do |dest_row|
next if COLUMNS_NOT_TO_VALIDATE.include?(dest_row[0])
return false unless orig_schema.any? { |orig_row| rows_assignable?(dest_row, orig_row) }
end
true
end
def rows_assignable?(dest_row, orig_row)
(orig_row[0] == :the_geom && orig_row[0] == dest_row[0] && dest_row[1][:db_type].include?(orig_row[1][:db_type])) ||
(orig_row[0] == dest_row[0] && orig_row[1][:db_type].convert_to_cartodb_type == dest_row[1][:db_type].convert_to_cartodb_type)
end
def user
table_registrar.user
end
def overwrite_strategy?
@collision_strategy == Carto::DataImportConstants::COLLISION_STRATEGY_OVERWRITE
end
def log(message)
runner.log.append(message)
end
def exists_user_table_for_user_id(table_name, user_id)
!Carto::UserTable.where(name: table_name, user_id: user_id).first.nil?
end
def taken_names
Carto::Db::UserSchema.new(table_registrar.user).table_names
end
attr_reader :runner, :table_registrar, :quota_checker, :database, :data_import_id
end
end
end