You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

302 lines
10 KiB

6 months ago
require_relative 'bolt.rb'
module Carto
class GhostTablesManager
include ::LoggerHelper
extend ::LoggerHelper
MUTEX_REDIS_KEY = 'ghost_tables_working'.freeze
MUTEX_TTL_MS = 600000
MAX_TABLES_FOR_SYNC_RUN = 8
MAX_USERTABLES_FOR_SYNC_CHECK = 128
def initialize(user_id)
@user_id = user_id
end
def user
@user ||= ::User[@user_id]
end
def link_ghost_tables
user_tables = fetch_user_tables
if (user_tables.length > MAX_USERTABLES_FOR_SYNC_CHECK)
# When the user has a big amount of tables, we don't even attempt to check if we
# need to run ghost tables, and instead we request an async link.
# We do this because doing comparisons with the arrays scales pretty badly
link_ghost_tables_asynchronously
else
regenerated_tables, renamed_tables, new_tables, dropped_tables = fetch_altered_tables
return if user_tables_synced_with_db?(regenerated_tables, renamed_tables, new_tables, dropped_tables)
if should_run_synchronously?(regenerated_tables, renamed_tables, new_tables, dropped_tables)
link_ghost_tables_synchronously
else
link_ghost_tables_asynchronously
end
end
end
def link_ghost_tables_synchronously
sync_user_tables_with_db
end
def link_ghost_tables_asynchronously
::Resque.dequeue(::Resque::UserDBJobs::UserDBMaintenance::LinkGhostTablesByUsername, user.username)
::Resque.enqueue(::Resque::UserDBJobs::UserDBMaintenance::LinkGhostTablesByUsername, user.username)
end
# determine linked tables vs cartodbfied tables consistency; that is, check whether it needs to run
def user_tables_synced_with_db?(*tables)
tables.all?(&:blank?)
end
# Helper for tests that will fetch the tables and do the checks
def fetch_user_tables_synced_with_db?
regenerated_tables, renamed_tables, new_tables, dropped_tables = fetch_altered_tables
user_tables_synced_with_db?(regenerated_tables, renamed_tables, new_tables, dropped_tables)
end
def get_bolt
Carto::Bolt.new("#{user.username}:#{MUTEX_REDIS_KEY}", ttl_ms: MUTEX_TTL_MS)
end
# run a block of code exclusively with GhostTablesManager (using Bolt lock)
# if warning_params is provided (with paramters for Logger.warning) then
# the code is executed even if the lock is not acquired (in which case
# a warning is emmitted)
def self.run_synchronized(user_id, attempts: 10, timeout: 30000, **warning_params)
gtm = new(user_id)
bolt = gtm.get_bolt
lock_acquired = bolt.run_locked(attempts: attempts, timeout: timeout) do
yield
end
if !lock_acquired && warning_params.present?
# run even if lock wasn't aquired
log_warning(warning_params)
yield
end
end
private
# It's nice to run sync if any unsafe stale (dropped or renamed) tables will be shown to the user but we can't block
# the workers for more that 180 seconds
def should_run_synchronously?(regenerated_tables, renamed_tables, new_tables, dropped_tables)
(regenerated_tables.count + renamed_tables.count + new_tables.count + dropped_tables.count) < MAX_TABLES_FOR_SYNC_RUN
end
def sync_user_tables_with_db
got_locked = get_bolt.run_locked(fail_function: lambda { link_ghost_tables_asynchronously }) { sync }
end
def fetch_altered_tables
cartodbfied_tables = fetch_cartodbfied_tables.sort_by(&:id)
user_tables = fetch_user_tables.sort_by(&:id)
cartodb_table_it = 0
user_tables_it = 0
new_cartodbfied_ids = []
missing_user_tables_ids = []
renamed_tables = []
# Find which ids are new, which existed but have changed names, and which one have dissapeared
while (cartodb_table_it < cartodbfied_tables.size && user_tables_it < user_tables.size)
cdb_table = cartodbfied_tables[cartodb_table_it]
user_table = user_tables[user_tables_it]
if (cdb_table.id < user_table.id)
new_cartodbfied_ids << cdb_table
cartodb_table_it += 1
elsif (cdb_table.id == user_table.id)
if (cdb_table.name != user_table.name)
renamed_tables << cdb_table
end
cartodb_table_it += 1
user_tables_it += 1
else
missing_user_tables_ids << user_table
user_tables_it += 1
end
end
new_cartodbfied_ids += cartodbfied_tables[cartodb_table_it, cartodbfied_tables.size - cartodb_table_it] if cartodb_table_it < cartodbfied_tables.size
missing_user_tables_ids += user_tables[user_tables_it, user_tables.size - user_tables_it] if user_tables_it < user_tables.size
# Out of the extracted ids we need to know which one are truly new tables, which ones are
# regenerated tables (the underlying ids have changed, but the name remains) and which ones
# have been completely deleted
regenerated_tables = new_cartodbfied_ids.select do |cartodbfied_table|
missing_user_tables_ids.any?{|t| t.name == cartodbfied_table.name}
end
new_tables = new_cartodbfied_ids - regenerated_tables
dropped_tables = missing_user_tables_ids.reject do |dropped_table|
regenerated_tables.any?{|t| t.name == dropped_table.name}
end
return regenerated_tables, renamed_tables, new_tables, dropped_tables
end
def sync
regenerated_tables, renamed_tables, new_tables, dropped_tables = fetch_altered_tables
# Update table_id on UserTables with physical tables with changed oid. Should go first.
regenerated_tables.each(&:regenerate_user_table)
# Relink tables that have been renamed through the SQL API
renamed_tables.each(&:rename_user_table_vis)
# Create UserTables for non linked Tables
new_tables.each(&:create_user_table)
# Unlink tables that have been created through the SQL API. Should go last.
dropped_tables.each(&:drop_user_table)
end
# Fetches all currently linked user tables
def fetch_user_tables
Carto::UserTable.select([:name, :table_id]).where(user_id: @user_id).map do |record|
Carto::TableFacade.new(record[:table_id], record[:name], @user_id)
end
end
# Fetches all linkable tables: non raster cartodbfied + raster
def fetch_cartodbfied_tables
sql = %{
WITH tables_with_proper_owner_and_schema AS
(
SELECT c.oid, c.relname
FROM pg_catalog.pg_class c
LEFT JOIN pg_catalog.pg_namespace n ON n.oid = c.relnamespace
WHERE c.relowner IN (#{user.get_database_roles.map{ |r| "to_regrole(\'#{r}\')" }.join(',')})
AND c.relkind = 'r'
AND n.nspname = '#{user.database_schema}'
),
tables_with_columns AS
(
SELECT attrelid FROM
(
SELECT attrelid,
SUM(
CASE WHEN attname = 'cartodb_id' THEN 100
WHEN attname = 'the_geom' THEN 1
WHEN attname = 'the_geom_webmercator' THEN 1
WHEN attname = 'the_raster_webmercator' THEN 10
ELSE 0
END) as ncolumns
FROM pg_attribute
WHERE attname IN ('cartodb_id','the_geom','the_geom_webmercator', 'the_raster_webmercator')
AND attnum > 0
AND NOT attisdropped
GROUP BY attrelid
) all_tables WHERE ncolumns = 102 OR ncolumns >= 110
),
tables_with_trigger AS
(
SELECT tgrelid
FROM pg_trigger
WHERE tgname = 'test_quota_per_row'
)
SELECT t1.relname as table_name, t1.oid as reloid
FROM tables_with_proper_owner_and_schema t1
INNER JOIN tables_with_columns t2 ON (t1.oid = t2.attrelid)
INNER JOIN tables_with_trigger t3 ON (t1.oid = t3.tgrelid)
}
user.in_database(as: :superuser)[sql].all.map do |record|
Carto::TableFacade.new(record[:reloid], record[:table_name], @user_id)
end
end
end
class TableFacade
include ::LoggerHelper
attr_reader :id, :name, :user_id
def initialize(id, name, user_id)
@id = id
@name = name
@user_id = user_id
end
def user
@user ||= ::User[@user_id]
end
def user_table_with_matching_id
user.tables.where(table_id: id).first
end
def user_table_with_matching_name
user.tables.where(name: name).first
end
def create_user_table
user_table = Carto::UserTable.new
user_table.user_id = user.id
user_table.table_id = id
user_table.name = name
new_table = ::Table.new(user_table: user_table)
new_table.register_table_only = true
new_table.keep_user_database_table = true
new_table.save
rescue StandardError => exception
log_error(message: 'Ghost tables: Error creating UserTable', exception: exception)
end
def rename_user_table_vis
user_table_vis = user_table_with_matching_id.table_visualization
user_table_vis.register_table_only = true
user_table_vis.name = name
user_table_vis.store
rescue StandardError => exception
log_error(message: 'Ghost tables: Error renaming Visualization', exception: exception)
end
def drop_user_table
user_table_to_drop = user.tables.where(table_id: id, name: name).first
return unless user_table_to_drop # The table has already been deleted
table_to_drop = ::Table.new(user_table: user_table_to_drop)
table_to_drop.visualizations.map(&:backup_visualization)
table_to_drop.keep_user_database_table = true
table_to_drop.destroy
rescue StandardError => exception
log_error(message: 'Ghost tables: Error dropping Table', exception: exception)
end
def regenerate_user_table
user_table_to_regenerate = user_table_with_matching_name
user_table_to_regenerate.table_id = id
user_table_to_regenerate.save
rescue StandardError => exception
log_error(message: 'Ghost tables: Error syncing table_id for UserTable', exception: exception)
end
def eql?(other)
id.eql?(other.id) && name.eql?(other.name) && user.id.eql?(other.user_id)
end
def ==(other)
eql?(other)
end
def hash
[id, name, user_id].hash
end
private
def log_context
super.merge(target_user: user, table: { id: id, name: name })
end
end
end