417 lines
16 KiB
Ruby
417 lines
16 KiB
Ruby
|
require 'carto/importer/table_setup'
|
||
|
|
||
|
module CartoDB
|
||
|
module Synchronization
|
||
|
class Adapter
|
||
|
|
||
|
include ::LoggerHelper
|
||
|
|
||
|
STATEMENT_TIMEOUT = (1.hour * 1000).freeze
|
||
|
DESTINATION_SCHEMA = 'public'.freeze
|
||
|
THE_GEOM = 'the_geom'.freeze
|
||
|
OVERWRITE_ERROR = 2013
|
||
|
|
||
|
def initialize(table_name, runner, database, user, overviews_creator, synchronization_id)
|
||
|
@table_name = table_name
|
||
|
@runner = runner
|
||
|
@database = database
|
||
|
@user = user
|
||
|
@overviews_creator = overviews_creator
|
||
|
@failed = false
|
||
|
@table_setup = ::Carto::Importer::TableSetup.new(
|
||
|
user: user,
|
||
|
overviews_creator: overviews_creator,
|
||
|
log: runner.log
|
||
|
)
|
||
|
@error_code = nil
|
||
|
@synchronization_id = synchronization_id
|
||
|
end
|
||
|
|
||
|
def run(&tracker)
|
||
|
runner.run(&tracker)
|
||
|
result = runner.results.select(&:success?).first
|
||
|
|
||
|
if runner.remote_data_updated?
|
||
|
if result.nil?
|
||
|
data_for_exception = "Expecting success data for table '#{table_name}'\nResults:#{runner.results.to_s}\n"
|
||
|
data_for_exception << "1st result:#{runner.results.first.inspect}"
|
||
|
raise data_for_exception
|
||
|
end
|
||
|
|
||
|
Carto::GhostTablesManager.run_synchronized(
|
||
|
user.id, attempts: 10, timeout: 3000,
|
||
|
message: "Couldn't acquire bolt to register. Registering sync without bolt",
|
||
|
user: user,
|
||
|
synchronization_id: @synchronization_id
|
||
|
) do
|
||
|
move_to_schema(result)
|
||
|
geo_type = fix_the_geom_type!(user.database_schema, result.table_name)
|
||
|
import_cleanup(user.database_schema, result.table_name)
|
||
|
@table_setup.cartodbfy(result.table_name)
|
||
|
overwrite(user.database_schema, table_name, result, geo_type)
|
||
|
setup_table(table_name, geo_type)
|
||
|
@table_setup.recreate_overviews(table_name)
|
||
|
end
|
||
|
end
|
||
|
self
|
||
|
rescue StandardError => exception
|
||
|
@failed = true
|
||
|
puts '=================='
|
||
|
puts exception.to_s
|
||
|
puts exception.backtrace
|
||
|
puts '=================='
|
||
|
drop(result.table_name) if result && exists?(result.table_name)
|
||
|
raise exception
|
||
|
end
|
||
|
|
||
|
def user
|
||
|
@user
|
||
|
end
|
||
|
|
||
|
def overwrite(schema, table_name, result, geo_type)
|
||
|
# Determine what kind of overwrite to perform
|
||
|
# overwrite_replace substitutes the existing table by the new one,
|
||
|
# so any modifications since previous import/sync are lost.
|
||
|
# overwrite_sync will preserve columns added since the import/sync,
|
||
|
# and the geometry as well if the new table doesn't have it (nil geo_type)
|
||
|
|
||
|
# For the time being the latter method will only be used with tables
|
||
|
# that have had the geocoder analysis applied, resulting in an column
|
||
|
# named carto_geocode_hash being present.
|
||
|
# TODO: we could perform the sync if there's any column named `_carto_*`
|
||
|
# (carto_geocode_hash would need be renamed as _carto_geocode_hash)
|
||
|
sync = has_column(schema, table_name, 'carto_geocode_hash')
|
||
|
|
||
|
if sync
|
||
|
overwrite_sync(schema, table_name, result, geo_type)
|
||
|
else
|
||
|
overwrite_replace(schema, table_name, result)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def overwrite_sync(schema, table_name, result, geo_type)
|
||
|
return false unless runner.remote_data_updated?
|
||
|
|
||
|
# NOTE the import table is already moved to the user schema;
|
||
|
# this was done (#7543) because the cartodbfication performs
|
||
|
# queries on CDB_UserQuotaSize and other functions expected
|
||
|
# to exist in the schema of the table.
|
||
|
qualified_result_table_name = %{"#{schema}"."#{result.table_name}"}
|
||
|
skip_columns = '{the_geom, the_geom_webmercator}'
|
||
|
|
||
|
database.transaction do
|
||
|
if geo_type.nil?
|
||
|
# If there's no geometry in the result table, not worth
|
||
|
# syncing. Maybe those were added via geocoding
|
||
|
database.execute(%{
|
||
|
SELECT cartodb.CDB_SyncTable(
|
||
|
'#{qualified_result_table_name}',
|
||
|
'#{schema}', '#{table_name}',
|
||
|
'#{skip_columns}'
|
||
|
)})
|
||
|
else
|
||
|
database.execute(%{
|
||
|
SELECT cartodb.CDB_SyncTable(
|
||
|
'#{qualified_result_table_name}',
|
||
|
'#{schema}', '#{table_name}'
|
||
|
)})
|
||
|
end
|
||
|
end
|
||
|
|
||
|
drop(result.table_name) if exists?(result.table_name)
|
||
|
|
||
|
# TODO not sure whether these two are needed
|
||
|
@table_setup.fix_oid(table_name)
|
||
|
@table_setup.update_cdb_tablemetadata(table_name)
|
||
|
rescue StandardError => exception
|
||
|
@error_code = OVERWRITE_ERROR
|
||
|
puts "Sync overwrite ERROR: #{exception.message}: #{exception.backtrace.join}"
|
||
|
|
||
|
# Gets all attributes in the result except for 'log_trace', as it is too long for Rollbar
|
||
|
result_hash = CartoDB::Importer2::Result::ATTRIBUTES.map { |m| [m, result.send(m)] if m != 'log_trace' }
|
||
|
.compact.to_h
|
||
|
log_error(message: 'Error in sync overwrite', exception: exception, result: result_hash)
|
||
|
drop(result.table_name) if exists?(result.table_name)
|
||
|
raise exception
|
||
|
end
|
||
|
|
||
|
def overwrite_replace(schema, table_name, result)
|
||
|
return false unless runner.remote_data_updated?
|
||
|
|
||
|
@table_setup.copy_privileges(schema, table_name, schema, result.table_name)
|
||
|
index_statements = @table_setup.generate_index_statements(schema, table_name)
|
||
|
|
||
|
temporary_name = temporary_name_for(result.table_name)
|
||
|
database.transaction do
|
||
|
rename(table_name, temporary_name) if exists?(table_name)
|
||
|
drop(temporary_name) if exists?(temporary_name)
|
||
|
rename(result.table_name, table_name)
|
||
|
end
|
||
|
@table_setup.fix_oid(table_name)
|
||
|
@table_setup.update_cdb_tablemetadata(table_name)
|
||
|
@table_setup.run_index_statements(index_statements, @database)
|
||
|
rescue StandardError => exception
|
||
|
@error_code = OVERWRITE_ERROR
|
||
|
puts "Sync overwrite ERROR: #{exception.message}: #{exception.backtrace.join}"
|
||
|
|
||
|
# Gets all attributes in the result except for 'log_trace', as it is too long for Rollbar
|
||
|
result_hash = CartoDB::Importer2::Result::ATTRIBUTES.map { |m| [m, result.send(m)] if m != 'log_trace' }
|
||
|
.compact.to_h
|
||
|
log_error(message: 'Error in sync overwrite', exception: exception, result: result_hash)
|
||
|
drop(result.table_name) if exists?(result.table_name)
|
||
|
raise exception
|
||
|
end
|
||
|
|
||
|
def setup_table(table_name, geo_type)
|
||
|
table = Carto::UserTable.find(user.tables.where(name: table_name).first.id).service
|
||
|
|
||
|
table.force_schema = true
|
||
|
|
||
|
table.import_to_cartodb(table_name)
|
||
|
table.schema(reload: true)
|
||
|
table.reload
|
||
|
|
||
|
# We send the detected geometry type to avoid manipulating geoms twice
|
||
|
# set_the_geom_column! should just edit the metadata with the specified type
|
||
|
table.send :set_the_geom_column!, geo_type
|
||
|
table.save
|
||
|
rescue StandardError
|
||
|
log_error(message: 'Error in setup cartodbfy', exception: exception)
|
||
|
ensure
|
||
|
@table_setup.fix_oid(table_name)
|
||
|
end
|
||
|
|
||
|
def has_column(schema_name, table_name, column_name)
|
||
|
qualified_table_name = "\"#{schema_name}\".#{table_name}"
|
||
|
sql = %{
|
||
|
SELECT TRUE as has_column FROM pg_catalog.pg_attribute a
|
||
|
WHERE
|
||
|
a.attname = '#{column_name}'
|
||
|
AND a.attnum > 0
|
||
|
AND NOT a.attisdropped
|
||
|
AND a.attrelid = '#{qualified_table_name}'::regclass::oid
|
||
|
LIMIT 1
|
||
|
}
|
||
|
result = user.in_database[sql].first
|
||
|
result && result[:has_column]
|
||
|
end
|
||
|
|
||
|
# From Table#get_the_geom_type!, adapted to unregistered tables
|
||
|
# returns type to run Table#get_the_geom_type! afterwards again, which
|
||
|
# saves the type in table metadata
|
||
|
def fix_the_geom_type!(schema_name, table_name)
|
||
|
qualified_table_name = "\"#{schema_name}\".#{table_name}"
|
||
|
|
||
|
type = nil
|
||
|
the_geom_data = user.in_database[%Q{
|
||
|
SELECT a.attname, t.typname
|
||
|
FROM pg_attribute a, pg_type t
|
||
|
WHERE attrelid = '#{qualified_table_name}'::regclass
|
||
|
AND attname = '#{THE_GEOM}'
|
||
|
AND a.atttypid = t.oid
|
||
|
AND a.attstattarget < 0
|
||
|
LIMIT 1
|
||
|
}].first
|
||
|
return nil unless the_geom_data
|
||
|
|
||
|
if the_geom_data[:typname] != 'geometry'
|
||
|
user.in_database.execute %{
|
||
|
ALTER TABLE #{qualified_table_name} RENAME COLUMN "#{THE_GEOM}" TO "the_geom_str"
|
||
|
}
|
||
|
return nil
|
||
|
end
|
||
|
|
||
|
geom_type = user.in_database[%Q{
|
||
|
SELECT GeometryType(#{THE_GEOM})
|
||
|
FROM #{qualified_table_name}
|
||
|
WHERE #{THE_GEOM} IS NOT null
|
||
|
LIMIT 1
|
||
|
}].first
|
||
|
|
||
|
type = geom_type[:geometrytype].to_s.downcase if geom_type
|
||
|
|
||
|
# if the geometry is MULTIPOINT we convert it to POINT
|
||
|
if type == 'multipoint'
|
||
|
user.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
|
||
|
user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_GeometryN(the_geom,1);")
|
||
|
end
|
||
|
type = 'point'
|
||
|
end
|
||
|
|
||
|
# if the geometry is LINESTRING or POLYGON we convert it to MULTILINESTRING or MULTIPOLYGON
|
||
|
if %w(linestring polygon).include?(type)
|
||
|
user.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
|
||
|
user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_Multi(the_geom);")
|
||
|
|
||
|
type = user_database[%Q{
|
||
|
SELECT GeometryType(#{THE_GEOM})
|
||
|
FROM #{qualified_table_name}
|
||
|
WHERE #{THE_GEOM} IS NOT null
|
||
|
LIMIT 1
|
||
|
}].first[:geometrytype]
|
||
|
end
|
||
|
end
|
||
|
|
||
|
type
|
||
|
end
|
||
|
|
||
|
# From Table#import_cleanup, with column schema checks adapted to unregistered tables
|
||
|
def import_cleanup(schema_name, table_name)
|
||
|
qualified_table_name = "\"#{schema_name}\".#{table_name}"
|
||
|
|
||
|
user.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
|
||
|
|
||
|
# For consistency with regular imports, also eases testing
|
||
|
# The sanitization of @table_name is applied to the newly imported table_name
|
||
|
# This should not be necessary, since setup_table, called after cartodbfication
|
||
|
# also perfomes column sanitization via Table#import_to_cartodb
|
||
|
# but is kept because there may be existing syncs for which this double sanitization
|
||
|
# (version 1 sanitization which wasn't idemponent) had the effect of altering some
|
||
|
# satinizated names (e.g. __1 -> _1).
|
||
|
table = ::Table.new(name: @table_name, user_id: @user.id)
|
||
|
# we could as well: table = Carto::UserTable.find(@user.tables.where(name: @table_name).first.id).service
|
||
|
table.sanitize_columns(table_name: table_name, database_schema: schema_name, connection: user_database)
|
||
|
|
||
|
# When tables are created using ogr2ogr they are added a ogc_fid or gid primary key
|
||
|
# In that case:
|
||
|
# - If cartodb_id already exists, remove ogc_fid
|
||
|
# - If cartodb_id does not exist, treat this field as the auxiliary column
|
||
|
aux_cartodb_id_column = [:ogc_fid, :gid].find do |col|
|
||
|
valid_cartodb_id_candidate?(user, table_name, qualified_table_name, col)
|
||
|
end
|
||
|
|
||
|
# Remove primary key
|
||
|
existing_pk = user_database[%Q{
|
||
|
SELECT c.conname AS pk_name
|
||
|
FROM pg_class r, pg_constraint c, pg_namespace n
|
||
|
WHERE r.oid = c.conrelid AND contype='p' AND relname = '#{table_name}'
|
||
|
AND r.relnamespace = n.oid and n.nspname= '#{schema_name}'
|
||
|
}].first
|
||
|
existing_pk = existing_pk[:pk_name] unless existing_pk.nil?
|
||
|
user_database.run(%Q{
|
||
|
ALTER TABLE #{qualified_table_name} DROP CONSTRAINT "#{existing_pk}"
|
||
|
}) unless existing_pk.nil?
|
||
|
|
||
|
# All normal fields casted to text
|
||
|
varchar_columns = user_database[%Q{
|
||
|
SELECT a.attname, t.typname
|
||
|
FROM pg_attribute a, pg_type t
|
||
|
WHERE attrelid = '#{qualified_table_name}'::regclass
|
||
|
AND typname = 'varchar'
|
||
|
AND a.atttypid = t.oid
|
||
|
}].all
|
||
|
|
||
|
varchar_columns.each do |column|
|
||
|
user_database.run(%Q{ALTER TABLE #{qualified_table_name} ALTER COLUMN "#{column[:attname]}" TYPE text})
|
||
|
end
|
||
|
|
||
|
# If there's an auxiliary column, copy to cartodb_id and restart the sequence to the max(cartodb_id)+1
|
||
|
if aux_cartodb_id_column.present?
|
||
|
begin
|
||
|
already_had_cartodb_id = false
|
||
|
user_database.run(%Q{ALTER TABLE #{qualified_table_name} ADD COLUMN cartodb_id SERIAL})
|
||
|
rescue StandardError
|
||
|
already_had_cartodb_id = true
|
||
|
end
|
||
|
unless already_had_cartodb_id
|
||
|
user_database.run(%Q{UPDATE #{qualified_table_name} SET cartodb_id = CAST(#{aux_cartodb_id_column} AS INTEGER)})
|
||
|
cartodb_id_sequence_name = user_database["SELECT pg_get_serial_sequence('#{schema_name}.#{table_name}', 'cartodb_id')"].first[:pg_get_serial_sequence]
|
||
|
max_cartodb_id = user_database[%Q{SELECT max(cartodb_id) FROM #{qualified_table_name}}].first[:max]
|
||
|
# only reset the sequence on real imports.
|
||
|
|
||
|
if max_cartodb_id
|
||
|
user_database.run("ALTER SEQUENCE #{cartodb_id_sequence_name} RESTART WITH #{max_cartodb_id + 1}")
|
||
|
end
|
||
|
end
|
||
|
user_database.run(%Q{ALTER TABLE #{qualified_table_name} DROP COLUMN #{aux_cartodb_id_column}})
|
||
|
end
|
||
|
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def success?
|
||
|
(!@failed && runner.success?)
|
||
|
end
|
||
|
|
||
|
def etag
|
||
|
runner.etag
|
||
|
end
|
||
|
|
||
|
def last_modified
|
||
|
runner.last_modified
|
||
|
end
|
||
|
|
||
|
def checksum
|
||
|
runner.checksum
|
||
|
end
|
||
|
|
||
|
def move_to_schema(result, schema=DESTINATION_SCHEMA)
|
||
|
# The new table to sync is moved to user schema to allow CartoDBfication.
|
||
|
# This temporary table should not be registered (check ghost_tables_manager.rb)
|
||
|
return self if schema == result.schema
|
||
|
database.execute(%Q{
|
||
|
ALTER TABLE "#{result.schema}"."#{result.table_name}"
|
||
|
SET SCHEMA "#{user.database_schema}"
|
||
|
})
|
||
|
end
|
||
|
|
||
|
def rename(current_name, new_name)
|
||
|
database.execute(%Q{
|
||
|
ALTER TABLE "#{user.database_schema}"."#{current_name}"
|
||
|
RENAME TO #{new_name}
|
||
|
})
|
||
|
end
|
||
|
|
||
|
def drop(table_name)
|
||
|
database.execute(%Q(DROP TABLE "#{user.database_schema}"."#{table_name}"))
|
||
|
end
|
||
|
|
||
|
def exists?(table_name)
|
||
|
database.table_exists?(table_name)
|
||
|
end
|
||
|
|
||
|
def results
|
||
|
runner.results
|
||
|
end
|
||
|
|
||
|
def error_code
|
||
|
@error_code || runner.results.map(&:error_code).compact.first
|
||
|
end
|
||
|
|
||
|
def runner_log_trace
|
||
|
runner.results.map(&:log_trace).compact.first
|
||
|
end
|
||
|
|
||
|
def error_message
|
||
|
''
|
||
|
end
|
||
|
|
||
|
def temporary_name_for(table_name)
|
||
|
"#{table_name}_to_be_deleted"
|
||
|
end
|
||
|
|
||
|
private
|
||
|
|
||
|
def valid_cartodb_id_candidate?(user, table_name, qualified_table_name, col_name)
|
||
|
return false unless column_names(user, table_name).include?(col_name)
|
||
|
user.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT, as: :superuser) do |db|
|
||
|
return db["SELECT 1 FROM #{qualified_table_name} WHERE #{col_name} IS NULL LIMIT 1"].first.nil?
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def column_names(user, table_name)
|
||
|
user.in_database.schema(table_name, schema: user.database_schema).map { |row| row[0] }
|
||
|
rescue StandardError => e
|
||
|
log_error(message: 'Error in column_names from sync adapter', exception: e)
|
||
|
[]
|
||
|
end
|
||
|
|
||
|
attr_reader :table_name, :runner, :database, :user
|
||
|
|
||
|
def log_context
|
||
|
super.merge(table: { name: table_name }, current_user: user)
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
end
|