302 lines
10 KiB
Ruby
302 lines
10 KiB
Ruby
|
require_relative 'bolt.rb'
|
||
|
|
||
|
module Carto
|
||
|
class GhostTablesManager
|
||
|
|
||
|
include ::LoggerHelper
|
||
|
extend ::LoggerHelper
|
||
|
|
||
|
MUTEX_REDIS_KEY = 'ghost_tables_working'.freeze
|
||
|
MUTEX_TTL_MS = 600000
|
||
|
MAX_TABLES_FOR_SYNC_RUN = 8
|
||
|
MAX_USERTABLES_FOR_SYNC_CHECK = 128
|
||
|
|
||
|
def initialize(user_id)
|
||
|
@user_id = user_id
|
||
|
end
|
||
|
|
||
|
def user
|
||
|
@user ||= ::User[@user_id]
|
||
|
end
|
||
|
|
||
|
def link_ghost_tables
|
||
|
user_tables = fetch_user_tables
|
||
|
if (user_tables.length > MAX_USERTABLES_FOR_SYNC_CHECK)
|
||
|
# When the user has a big amount of tables, we don't even attempt to check if we
|
||
|
# need to run ghost tables, and instead we request an async link.
|
||
|
# We do this because doing comparisons with the arrays scales pretty badly
|
||
|
link_ghost_tables_asynchronously
|
||
|
else
|
||
|
regenerated_tables, renamed_tables, new_tables, dropped_tables = fetch_altered_tables
|
||
|
return if user_tables_synced_with_db?(regenerated_tables, renamed_tables, new_tables, dropped_tables)
|
||
|
|
||
|
if should_run_synchronously?(regenerated_tables, renamed_tables, new_tables, dropped_tables)
|
||
|
link_ghost_tables_synchronously
|
||
|
else
|
||
|
link_ghost_tables_asynchronously
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def link_ghost_tables_synchronously
|
||
|
sync_user_tables_with_db
|
||
|
end
|
||
|
|
||
|
def link_ghost_tables_asynchronously
|
||
|
::Resque.dequeue(::Resque::UserDBJobs::UserDBMaintenance::LinkGhostTablesByUsername, user.username)
|
||
|
::Resque.enqueue(::Resque::UserDBJobs::UserDBMaintenance::LinkGhostTablesByUsername, user.username)
|
||
|
end
|
||
|
|
||
|
# determine linked tables vs cartodbfied tables consistency; that is, check whether it needs to run
|
||
|
def user_tables_synced_with_db?(*tables)
|
||
|
tables.all?(&:blank?)
|
||
|
end
|
||
|
|
||
|
# Helper for tests that will fetch the tables and do the checks
|
||
|
def fetch_user_tables_synced_with_db?
|
||
|
regenerated_tables, renamed_tables, new_tables, dropped_tables = fetch_altered_tables
|
||
|
user_tables_synced_with_db?(regenerated_tables, renamed_tables, new_tables, dropped_tables)
|
||
|
end
|
||
|
|
||
|
def get_bolt
|
||
|
Carto::Bolt.new("#{user.username}:#{MUTEX_REDIS_KEY}", ttl_ms: MUTEX_TTL_MS)
|
||
|
end
|
||
|
|
||
|
# run a block of code exclusively with GhostTablesManager (using Bolt lock)
|
||
|
# if warning_params is provided (with paramters for Logger.warning) then
|
||
|
# the code is executed even if the lock is not acquired (in which case
|
||
|
# a warning is emmitted)
|
||
|
def self.run_synchronized(user_id, attempts: 10, timeout: 30000, **warning_params)
|
||
|
gtm = new(user_id)
|
||
|
bolt = gtm.get_bolt
|
||
|
lock_acquired = bolt.run_locked(attempts: attempts, timeout: timeout) do
|
||
|
yield
|
||
|
end
|
||
|
if !lock_acquired && warning_params.present?
|
||
|
# run even if lock wasn't aquired
|
||
|
log_warning(warning_params)
|
||
|
yield
|
||
|
end
|
||
|
end
|
||
|
|
||
|
private
|
||
|
|
||
|
# It's nice to run sync if any unsafe stale (dropped or renamed) tables will be shown to the user but we can't block
|
||
|
# the workers for more that 180 seconds
|
||
|
def should_run_synchronously?(regenerated_tables, renamed_tables, new_tables, dropped_tables)
|
||
|
(regenerated_tables.count + renamed_tables.count + new_tables.count + dropped_tables.count) < MAX_TABLES_FOR_SYNC_RUN
|
||
|
end
|
||
|
|
||
|
def sync_user_tables_with_db
|
||
|
got_locked = get_bolt.run_locked(fail_function: lambda { link_ghost_tables_asynchronously }) { sync }
|
||
|
end
|
||
|
|
||
|
def fetch_altered_tables
|
||
|
cartodbfied_tables = fetch_cartodbfied_tables.sort_by(&:id)
|
||
|
user_tables = fetch_user_tables.sort_by(&:id)
|
||
|
|
||
|
cartodb_table_it = 0
|
||
|
user_tables_it = 0
|
||
|
new_cartodbfied_ids = []
|
||
|
missing_user_tables_ids = []
|
||
|
renamed_tables = []
|
||
|
|
||
|
# Find which ids are new, which existed but have changed names, and which one have dissapeared
|
||
|
while (cartodb_table_it < cartodbfied_tables.size && user_tables_it < user_tables.size)
|
||
|
cdb_table = cartodbfied_tables[cartodb_table_it]
|
||
|
user_table = user_tables[user_tables_it]
|
||
|
if (cdb_table.id < user_table.id)
|
||
|
new_cartodbfied_ids << cdb_table
|
||
|
cartodb_table_it += 1
|
||
|
elsif (cdb_table.id == user_table.id)
|
||
|
if (cdb_table.name != user_table.name)
|
||
|
renamed_tables << cdb_table
|
||
|
end
|
||
|
cartodb_table_it += 1
|
||
|
user_tables_it += 1
|
||
|
else
|
||
|
missing_user_tables_ids << user_table
|
||
|
user_tables_it += 1
|
||
|
end
|
||
|
end
|
||
|
|
||
|
new_cartodbfied_ids += cartodbfied_tables[cartodb_table_it, cartodbfied_tables.size - cartodb_table_it] if cartodb_table_it < cartodbfied_tables.size
|
||
|
missing_user_tables_ids += user_tables[user_tables_it, user_tables.size - user_tables_it] if user_tables_it < user_tables.size
|
||
|
|
||
|
# Out of the extracted ids we need to know which one are truly new tables, which ones are
|
||
|
# regenerated tables (the underlying ids have changed, but the name remains) and which ones
|
||
|
# have been completely deleted
|
||
|
regenerated_tables = new_cartodbfied_ids.select do |cartodbfied_table|
|
||
|
missing_user_tables_ids.any?{|t| t.name == cartodbfied_table.name}
|
||
|
end
|
||
|
new_tables = new_cartodbfied_ids - regenerated_tables
|
||
|
dropped_tables = missing_user_tables_ids.reject do |dropped_table|
|
||
|
regenerated_tables.any?{|t| t.name == dropped_table.name}
|
||
|
end
|
||
|
|
||
|
return regenerated_tables, renamed_tables, new_tables, dropped_tables
|
||
|
end
|
||
|
|
||
|
def sync
|
||
|
regenerated_tables, renamed_tables, new_tables, dropped_tables = fetch_altered_tables
|
||
|
|
||
|
# Update table_id on UserTables with physical tables with changed oid. Should go first.
|
||
|
regenerated_tables.each(&:regenerate_user_table)
|
||
|
|
||
|
# Relink tables that have been renamed through the SQL API
|
||
|
renamed_tables.each(&:rename_user_table_vis)
|
||
|
|
||
|
# Create UserTables for non linked Tables
|
||
|
new_tables.each(&:create_user_table)
|
||
|
|
||
|
# Unlink tables that have been created through the SQL API. Should go last.
|
||
|
dropped_tables.each(&:drop_user_table)
|
||
|
end
|
||
|
|
||
|
# Fetches all currently linked user tables
|
||
|
def fetch_user_tables
|
||
|
Carto::UserTable.select([:name, :table_id]).where(user_id: @user_id).map do |record|
|
||
|
Carto::TableFacade.new(record[:table_id], record[:name], @user_id)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# Fetches all linkable tables: non raster cartodbfied + raster
|
||
|
def fetch_cartodbfied_tables
|
||
|
sql = %{
|
||
|
WITH tables_with_proper_owner_and_schema AS
|
||
|
(
|
||
|
SELECT c.oid, c.relname
|
||
|
FROM pg_catalog.pg_class c
|
||
|
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
|
||
|
WHERE c.relowner IN (#{user.get_database_roles.map{ |r| "to_regrole(\'#{r}\')" }.join(',')})
|
||
|
AND c.relkind = 'r'
|
||
|
AND n.nspname = '#{user.database_schema}'
|
||
|
),
|
||
|
tables_with_columns AS
|
||
|
(
|
||
|
SELECT attrelid FROM
|
||
|
(
|
||
|
SELECT attrelid,
|
||
|
SUM(
|
||
|
CASE WHEN attname = 'cartodb_id' THEN 100
|
||
|
WHEN attname = 'the_geom' THEN 1
|
||
|
WHEN attname = 'the_geom_webmercator' THEN 1
|
||
|
WHEN attname = 'the_raster_webmercator' THEN 10
|
||
|
ELSE 0
|
||
|
END) as ncolumns
|
||
|
FROM pg_attribute
|
||
|
WHERE attname IN ('cartodb_id','the_geom','the_geom_webmercator', 'the_raster_webmercator')
|
||
|
AND attnum > 0
|
||
|
AND NOT attisdropped
|
||
|
GROUP BY attrelid
|
||
|
) all_tables WHERE ncolumns = 102 OR ncolumns >= 110
|
||
|
),
|
||
|
tables_with_trigger AS
|
||
|
(
|
||
|
SELECT tgrelid
|
||
|
FROM pg_trigger
|
||
|
WHERE tgname = 'test_quota_per_row'
|
||
|
)
|
||
|
SELECT t1.relname as table_name, t1.oid as reloid
|
||
|
FROM tables_with_proper_owner_and_schema t1
|
||
|
INNER JOIN tables_with_columns t2 ON (t1.oid = t2.attrelid)
|
||
|
INNER JOIN tables_with_trigger t3 ON (t1.oid = t3.tgrelid)
|
||
|
}
|
||
|
|
||
|
user.in_database(as: :superuser)[sql].all.map do |record|
|
||
|
Carto::TableFacade.new(record[:reloid], record[:table_name], @user_id)
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
class TableFacade
|
||
|
include ::LoggerHelper
|
||
|
|
||
|
attr_reader :id, :name, :user_id
|
||
|
|
||
|
def initialize(id, name, user_id)
|
||
|
@id = id
|
||
|
@name = name
|
||
|
@user_id = user_id
|
||
|
end
|
||
|
|
||
|
def user
|
||
|
@user ||= ::User[@user_id]
|
||
|
end
|
||
|
|
||
|
def user_table_with_matching_id
|
||
|
user.tables.where(table_id: id).first
|
||
|
end
|
||
|
|
||
|
def user_table_with_matching_name
|
||
|
user.tables.where(name: name).first
|
||
|
end
|
||
|
|
||
|
def create_user_table
|
||
|
user_table = Carto::UserTable.new
|
||
|
user_table.user_id = user.id
|
||
|
user_table.table_id = id
|
||
|
user_table.name = name
|
||
|
new_table = ::Table.new(user_table: user_table)
|
||
|
|
||
|
new_table.register_table_only = true
|
||
|
new_table.keep_user_database_table = true
|
||
|
|
||
|
new_table.save
|
||
|
rescue StandardError => exception
|
||
|
log_error(message: 'Ghost tables: Error creating UserTable', exception: exception)
|
||
|
end
|
||
|
|
||
|
def rename_user_table_vis
|
||
|
user_table_vis = user_table_with_matching_id.table_visualization
|
||
|
|
||
|
user_table_vis.register_table_only = true
|
||
|
user_table_vis.name = name
|
||
|
|
||
|
user_table_vis.store
|
||
|
rescue StandardError => exception
|
||
|
log_error(message: 'Ghost tables: Error renaming Visualization', exception: exception)
|
||
|
end
|
||
|
|
||
|
def drop_user_table
|
||
|
user_table_to_drop = user.tables.where(table_id: id, name: name).first
|
||
|
return unless user_table_to_drop # The table has already been deleted
|
||
|
|
||
|
table_to_drop = ::Table.new(user_table: user_table_to_drop)
|
||
|
|
||
|
table_to_drop.visualizations.map(&:backup_visualization)
|
||
|
table_to_drop.keep_user_database_table = true
|
||
|
table_to_drop.destroy
|
||
|
rescue StandardError => exception
|
||
|
log_error(message: 'Ghost tables: Error dropping Table', exception: exception)
|
||
|
end
|
||
|
|
||
|
def regenerate_user_table
|
||
|
user_table_to_regenerate = user_table_with_matching_name
|
||
|
|
||
|
user_table_to_regenerate.table_id = id
|
||
|
user_table_to_regenerate.save
|
||
|
rescue StandardError => exception
|
||
|
log_error(message: 'Ghost tables: Error syncing table_id for UserTable', exception: exception)
|
||
|
end
|
||
|
|
||
|
def eql?(other)
|
||
|
id.eql?(other.id) && name.eql?(other.name) && user.id.eql?(other.user_id)
|
||
|
end
|
||
|
|
||
|
def ==(other)
|
||
|
eql?(other)
|
||
|
end
|
||
|
|
||
|
def hash
|
||
|
[id, name, user_id].hash
|
||
|
end
|
||
|
|
||
|
private
|
||
|
|
||
|
def log_context
|
||
|
super.merge(target_user: user, table: { id: id, name: name })
|
||
|
end
|
||
|
end
|
||
|
end
|