cartodb-4.42/app/models/synchronization/adapter.rb

417 lines
16 KiB
Ruby
Raw Normal View History

2024-04-06 13:25:13 +08:00
require 'carto/importer/table_setup'
module CartoDB
module Synchronization
class Adapter
include ::LoggerHelper
STATEMENT_TIMEOUT = (1.hour * 1000).freeze
DESTINATION_SCHEMA = 'public'.freeze
THE_GEOM = 'the_geom'.freeze
OVERWRITE_ERROR = 2013
def initialize(table_name, runner, database, user, overviews_creator, synchronization_id)
@table_name = table_name
@runner = runner
@database = database
@user = user
@overviews_creator = overviews_creator
@failed = false
@table_setup = ::Carto::Importer::TableSetup.new(
user: user,
overviews_creator: overviews_creator,
log: runner.log
)
@error_code = nil
@synchronization_id = synchronization_id
end
def run(&tracker)
runner.run(&tracker)
result = runner.results.select(&:success?).first
if runner.remote_data_updated?
if result.nil?
data_for_exception = "Expecting success data for table '#{table_name}'\nResults:#{runner.results.to_s}\n"
data_for_exception << "1st result:#{runner.results.first.inspect}"
raise data_for_exception
end
Carto::GhostTablesManager.run_synchronized(
user.id, attempts: 10, timeout: 3000,
message: "Couldn't acquire bolt to register. Registering sync without bolt",
user: user,
synchronization_id: @synchronization_id
) do
move_to_schema(result)
geo_type = fix_the_geom_type!(user.database_schema, result.table_name)
import_cleanup(user.database_schema, result.table_name)
@table_setup.cartodbfy(result.table_name)
overwrite(user.database_schema, table_name, result, geo_type)
setup_table(table_name, geo_type)
@table_setup.recreate_overviews(table_name)
end
end
self
rescue StandardError => exception
@failed = true
puts '=================='
puts exception.to_s
puts exception.backtrace
puts '=================='
drop(result.table_name) if result && exists?(result.table_name)
raise exception
end
def user
@user
end
def overwrite(schema, table_name, result, geo_type)
# Determine what kind of overwrite to perform
# overwrite_replace substitutes the existing table by the new one,
# so any modifications since previous import/sync are lost.
# overwrite_sync will preserve columns added since the import/sync,
# and the geometry as well if the new table doesn't have it (nil geo_type)
# For the time being the latter method will only be used with tables
# that have had the geocoder analysis applied, resulting in an column
# named carto_geocode_hash being present.
# TODO: we could perform the sync if there's any column named `_carto_*`
# (carto_geocode_hash would need be renamed as _carto_geocode_hash)
sync = has_column(schema, table_name, 'carto_geocode_hash')
if sync
overwrite_sync(schema, table_name, result, geo_type)
else
overwrite_replace(schema, table_name, result)
end
end
def overwrite_sync(schema, table_name, result, geo_type)
return false unless runner.remote_data_updated?
# NOTE the import table is already moved to the user schema;
# this was done (#7543) because the cartodbfication performs
# queries on CDB_UserQuotaSize and other functions expected
# to exist in the schema of the table.
qualified_result_table_name = %{"#{schema}"."#{result.table_name}"}
skip_columns = '{the_geom, the_geom_webmercator}'
database.transaction do
if geo_type.nil?
# If there's no geometry in the result table, not worth
# syncing. Maybe those were added via geocoding
database.execute(%{
SELECT cartodb.CDB_SyncTable(
'#{qualified_result_table_name}',
'#{schema}', '#{table_name}',
'#{skip_columns}'
)})
else
database.execute(%{
SELECT cartodb.CDB_SyncTable(
'#{qualified_result_table_name}',
'#{schema}', '#{table_name}'
)})
end
end
drop(result.table_name) if exists?(result.table_name)
# TODO not sure whether these two are needed
@table_setup.fix_oid(table_name)
@table_setup.update_cdb_tablemetadata(table_name)
rescue StandardError => exception
@error_code = OVERWRITE_ERROR
puts "Sync overwrite ERROR: #{exception.message}: #{exception.backtrace.join}"
# Gets all attributes in the result except for 'log_trace', as it is too long for Rollbar
result_hash = CartoDB::Importer2::Result::ATTRIBUTES.map { |m| [m, result.send(m)] if m != 'log_trace' }
.compact.to_h
log_error(message: 'Error in sync overwrite', exception: exception, result: result_hash)
drop(result.table_name) if exists?(result.table_name)
raise exception
end
def overwrite_replace(schema, table_name, result)
return false unless runner.remote_data_updated?
@table_setup.copy_privileges(schema, table_name, schema, result.table_name)
index_statements = @table_setup.generate_index_statements(schema, table_name)
temporary_name = temporary_name_for(result.table_name)
database.transaction do
rename(table_name, temporary_name) if exists?(table_name)
drop(temporary_name) if exists?(temporary_name)
rename(result.table_name, table_name)
end
@table_setup.fix_oid(table_name)
@table_setup.update_cdb_tablemetadata(table_name)
@table_setup.run_index_statements(index_statements, @database)
rescue StandardError => exception
@error_code = OVERWRITE_ERROR
puts "Sync overwrite ERROR: #{exception.message}: #{exception.backtrace.join}"
# Gets all attributes in the result except for 'log_trace', as it is too long for Rollbar
result_hash = CartoDB::Importer2::Result::ATTRIBUTES.map { |m| [m, result.send(m)] if m != 'log_trace' }
.compact.to_h
log_error(message: 'Error in sync overwrite', exception: exception, result: result_hash)
drop(result.table_name) if exists?(result.table_name)
raise exception
end
def setup_table(table_name, geo_type)
table = Carto::UserTable.find(user.tables.where(name: table_name).first.id).service
table.force_schema = true
table.import_to_cartodb(table_name)
table.schema(reload: true)
table.reload
# We send the detected geometry type to avoid manipulating geoms twice
# set_the_geom_column! should just edit the metadata with the specified type
table.send :set_the_geom_column!, geo_type
table.save
rescue StandardError
log_error(message: 'Error in setup cartodbfy', exception: exception)
ensure
@table_setup.fix_oid(table_name)
end
def has_column(schema_name, table_name, column_name)
qualified_table_name = "\"#{schema_name}\".#{table_name}"
sql = %{
SELECT TRUE as has_column FROM pg_catalog.pg_attribute a
WHERE
a.attname = '#{column_name}'
AND a.attnum > 0
AND NOT a.attisdropped
AND a.attrelid = '#{qualified_table_name}'::regclass::oid
LIMIT 1
}
result = user.in_database[sql].first
result && result[:has_column]
end
# From Table#get_the_geom_type!, adapted to unregistered tables
# returns type to run Table#get_the_geom_type! afterwards again, which
# saves the type in table metadata
def fix_the_geom_type!(schema_name, table_name)
qualified_table_name = "\"#{schema_name}\".#{table_name}"
type = nil
the_geom_data = user.in_database[%Q{
SELECT a.attname, t.typname
FROM pg_attribute a, pg_type t
WHERE attrelid = '#{qualified_table_name}'::regclass
AND attname = '#{THE_GEOM}'
AND a.atttypid = t.oid
AND a.attstattarget < 0
LIMIT 1
}].first
return nil unless the_geom_data
if the_geom_data[:typname] != 'geometry'
user.in_database.execute %{
ALTER TABLE #{qualified_table_name} RENAME COLUMN "#{THE_GEOM}" TO "the_geom_str"
}
return nil
end
geom_type = user.in_database[%Q{
SELECT GeometryType(#{THE_GEOM})
FROM #{qualified_table_name}
WHERE #{THE_GEOM} IS NOT null
LIMIT 1
}].first
type = geom_type[:geometrytype].to_s.downcase if geom_type
# if the geometry is MULTIPOINT we convert it to POINT
if type == 'multipoint'
user.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_GeometryN(the_geom,1);")
end
type = 'point'
end
# if the geometry is LINESTRING or POLYGON we convert it to MULTILINESTRING or MULTIPOLYGON
if %w(linestring polygon).include?(type)
user.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_Multi(the_geom);")
type = user_database[%Q{
SELECT GeometryType(#{THE_GEOM})
FROM #{qualified_table_name}
WHERE #{THE_GEOM} IS NOT null
LIMIT 1
}].first[:geometrytype]
end
end
type
end
# From Table#import_cleanup, with column schema checks adapted to unregistered tables
def import_cleanup(schema_name, table_name)
qualified_table_name = "\"#{schema_name}\".#{table_name}"
user.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
# For consistency with regular imports, also eases testing
# The sanitization of @table_name is applied to the newly imported table_name
# This should not be necessary, since setup_table, called after cartodbfication
# also perfomes column sanitization via Table#import_to_cartodb
# but is kept because there may be existing syncs for which this double sanitization
# (version 1 sanitization which wasn't idemponent) had the effect of altering some
# satinizated names (e.g. __1 -> _1).
table = ::Table.new(name: @table_name, user_id: @user.id)
# we could as well: table = Carto::UserTable.find(@user.tables.where(name: @table_name).first.id).service
table.sanitize_columns(table_name: table_name, database_schema: schema_name, connection: user_database)
# When tables are created using ogr2ogr they are added a ogc_fid or gid primary key
# In that case:
# - If cartodb_id already exists, remove ogc_fid
# - If cartodb_id does not exist, treat this field as the auxiliary column
aux_cartodb_id_column = [:ogc_fid, :gid].find do |col|
valid_cartodb_id_candidate?(user, table_name, qualified_table_name, col)
end
# Remove primary key
existing_pk = user_database[%Q{
SELECT c.conname AS pk_name
FROM pg_class r, pg_constraint c, pg_namespace n
WHERE r.oid = c.conrelid AND contype='p' AND relname = '#{table_name}'
AND r.relnamespace = n.oid and n.nspname= '#{schema_name}'
}].first
existing_pk = existing_pk[:pk_name] unless existing_pk.nil?
user_database.run(%Q{
ALTER TABLE #{qualified_table_name} DROP CONSTRAINT "#{existing_pk}"
}) unless existing_pk.nil?
# All normal fields casted to text
varchar_columns = user_database[%Q{
SELECT a.attname, t.typname
FROM pg_attribute a, pg_type t
WHERE attrelid = '#{qualified_table_name}'::regclass
AND typname = 'varchar'
AND a.atttypid = t.oid
}].all
varchar_columns.each do |column|
user_database.run(%Q{ALTER TABLE #{qualified_table_name} ALTER COLUMN "#{column[:attname]}" TYPE text})
end
# If there's an auxiliary column, copy to cartodb_id and restart the sequence to the max(cartodb_id)+1
if aux_cartodb_id_column.present?
begin
already_had_cartodb_id = false
user_database.run(%Q{ALTER TABLE #{qualified_table_name} ADD COLUMN cartodb_id SERIAL})
rescue StandardError
already_had_cartodb_id = true
end
unless already_had_cartodb_id
user_database.run(%Q{UPDATE #{qualified_table_name} SET cartodb_id = CAST(#{aux_cartodb_id_column} AS INTEGER)})
cartodb_id_sequence_name = user_database["SELECT pg_get_serial_sequence('#{schema_name}.#{table_name}', 'cartodb_id')"].first[:pg_get_serial_sequence]
max_cartodb_id = user_database[%Q{SELECT max(cartodb_id) FROM #{qualified_table_name}}].first[:max]
# only reset the sequence on real imports.
if max_cartodb_id
user_database.run("ALTER SEQUENCE #{cartodb_id_sequence_name} RESTART WITH #{max_cartodb_id + 1}")
end
end
user_database.run(%Q{ALTER TABLE #{qualified_table_name} DROP COLUMN #{aux_cartodb_id_column}})
end
end
end
def success?
(!@failed && runner.success?)
end
def etag
runner.etag
end
def last_modified
runner.last_modified
end
def checksum
runner.checksum
end
def move_to_schema(result, schema=DESTINATION_SCHEMA)
# The new table to sync is moved to user schema to allow CartoDBfication.
# This temporary table should not be registered (check ghost_tables_manager.rb)
return self if schema == result.schema
database.execute(%Q{
ALTER TABLE "#{result.schema}"."#{result.table_name}"
SET SCHEMA "#{user.database_schema}"
})
end
def rename(current_name, new_name)
database.execute(%Q{
ALTER TABLE "#{user.database_schema}"."#{current_name}"
RENAME TO #{new_name}
})
end
def drop(table_name)
database.execute(%Q(DROP TABLE "#{user.database_schema}"."#{table_name}"))
end
def exists?(table_name)
database.table_exists?(table_name)
end
def results
runner.results
end
def error_code
@error_code || runner.results.map(&:error_code).compact.first
end
def runner_log_trace
runner.results.map(&:log_trace).compact.first
end
def error_message
''
end
def temporary_name_for(table_name)
"#{table_name}_to_be_deleted"
end
private
def valid_cartodb_id_candidate?(user, table_name, qualified_table_name, col_name)
return false unless column_names(user, table_name).include?(col_name)
user.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT, as: :superuser) do |db|
return db["SELECT 1 FROM #{qualified_table_name} WHERE #{col_name} IS NULL LIMIT 1"].first.nil?
end
end
def column_names(user, table_name)
user.in_database.schema(table_name, schema: user.database_schema).map { |row| row[0] }
rescue StandardError => e
log_error(message: 'Error in column_names from sync adapter', exception: e)
[]
end
attr_reader :table_name, :runner, :database, :user
def log_context
super.merge(table: { name: table_name }, current_user: user)
end
end
end
end