You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

318 lines
11 KiB

require_relative 'bolt.rb'
module Carto
class GhostTablesManager
MUTEX_REDIS_KEY = 'ghost_tables_working'.freeze
MUTEX_TTL_MS = 600000
MAX_TABLES_FOR_SYNC_RUN = 8
def initialize(user_id)
@user_id = user_id
end
def user
@user ||= ::User[@user_id]
end
def link_ghost_tables
return if user_tables_synced_with_db?
if should_run_synchronously?
link_ghost_tables_synchronously
else
::Resque.enqueue(::Resque::UserDBJobs::UserDBMaintenance::LinkGhostTables, @user_id)
end
end
def link_ghost_tables_synchronously
sync_user_tables_with_db unless user_tables_synced_with_db?
end
# determine linked tables vs cartodbfied tables consistency; i.e.: needs to run
def user_tables_synced_with_db?
user_tables = fetch_user_tables
cartodbfied_tables = fetch_cartodbfied_tables
user_tables.length == cartodbfied_tables.length &&
(user_tables - cartodbfied_tables).empty? &&
(cartodbfied_tables - user_tables).empty?
end
def get_bolt
Carto::Bolt.new("#{user.username}:#{MUTEX_REDIS_KEY}", ttl_ms: MUTEX_TTL_MS)
end
# run a block of code exclusively with GhostTablesManager (using Bolt lock)
# if warning_params is provided (with paramters for Logger.warning) then
# the code is executed even if the lock is not acquired (in which case
# a warning is emmitted)
def self.run_synchronized(user_id, attempts: 10, timeout: 30000, **warning_params)
gtm = new(user_id)
bolt = gtm.get_bolt
rerun_func = lambda { gtm.send(:sync) }
lock_acquired = bolt.run_locked(attempts: attempts, timeout: timeout, rerun_func: rerun_func) do
yield
end
if !lock_acquired && warning_params.present?
# run even if lock wasn't aquired
CartoDB::Logger.warning(warning_params)
yield
end
end
private
# It's nice to run sync if any unsafe stale (dropped or renamed) tables will be shown to the user but we can't block
# the workers for more that 180 seconds
def should_run_synchronously?
cartodbfied_tables = fetch_cartodbfied_tables
dropped_and_stale_tables = find_dropped_tables(cartodbfied_tables) + find_stale_tables(cartodbfied_tables)
total_tables_to_be_linked = dropped_and_stale_tables + find_new_tables(cartodbfied_tables)
dropped_and_stale_tables.count != 0 && total_tables_to_be_linked.count < MAX_TABLES_FOR_SYNC_RUN
end
def sync_user_tables_with_db
got_locked = get_bolt.run_locked(rerun_func: lambda { sync }) { sync }
CartoDB::Logger.info(message: 'Ghost table race condition avoided', user: user) unless got_locked
end
def sync
cartodbfied_tables = fetch_cartodbfied_tables
# Update table_id on UserTables with physical tables with changed oid. Should go first.
find_regenerated_tables(cartodbfied_tables).each(&:regenerate_user_table)
# Relink tables that have been renamed through the SQL API
find_renamed_tables(cartodbfied_tables).each(&:rename_user_table_vis)
# Create UserTables for non linked Tables
find_new_tables(cartodbfied_tables).each(&:create_user_table)
# Unlink tables that have been created through the SQL API. Should go last.
find_dropped_tables(cartodbfied_tables).each(&:drop_user_table)
end
# Any UserTable that has been renamed or regenerated.
def find_stale_tables(cartodbfied_tables)
find_regenerated_tables(cartodbfied_tables) | find_renamed_tables(cartodbfied_tables)
end
# UserTables that coincide with a cartodbfied table in name but not id
def find_renamed_tables(cartodbfied_tables)
user_tables = fetch_user_tables
cartodbfied_tables.select do |cartodbfied_table|
user_tables.any?{|t| t.name != cartodbfied_table.name && t.id == cartodbfied_table.id}
end
end
# UserTables that coincide with a cartodbfied table in id but not in name
def find_regenerated_tables(cartodbfied_tables)
user_tables = fetch_user_tables
cartodbfied_tables.select do |cartodbfied_table|
user_tables.any?{|t| t.name == cartodbfied_table.name && t.id != cartodbfied_table.id}
end
end
# Cartodbfied tables that are not stale and are not linked as UserTables yet
def find_new_tables(cartodbfied_tables)
cartodbfied_tables - fetch_user_tables - find_stale_tables(cartodbfied_tables)
end
# UserTables that are not stale and have no cartodbfied table associated to it
def find_dropped_tables(cartodbfied_tables)
fetch_user_tables - cartodbfied_tables - find_stale_tables(cartodbfied_tables)
end
# Fetches all currently linked user tables
def fetch_user_tables
Carto::UserTable.select([:name, :table_id]).where(user_id: @user_id).map do |record|
Carto::TableFacade.new(record[:table_id], record[:name], @user_id)
end
end
# Fetches all linkable tables: non raster cartodbfied + raster
def fetch_cartodbfied_tables
(fetch_non_raster_cartodbfied_tables + fetch_raster_tables).uniq
end
# this method searchs for tables with all the columns needed in a cartodb table.
# it does not check column types, and only the latest cartodbfication trigger attached (test_quota_per_row)
def fetch_non_raster_cartodbfied_tables
cartodb_columns = (Table::CARTODB_REQUIRED_COLUMNS + [Table::THE_GEOM_WEBMERCATOR]).map { |col| "'#{col}'" }
.join(',')
sql = %{
WITH cartodbfied_tables as (
SELECT c.table_name,
tg.tgrelid reloid,
count(column_name::text) cdb_columns_count
FROM information_schema.columns c, pg_tables t, pg_trigger tg
WHERE
t.tablename !~ '^importer_' AND
t.tablename = c.table_name AND
t.schemaname = c.table_schema AND
c.table_schema = '#{user.database_schema}' AND
t.tableowner in ('#{user.get_database_roles.join('\',\'')}') AND
column_name IN (#{cartodb_columns}) AND
tg.tgrelid = (quote_ident(t.schemaname) || '.' || quote_ident(t.tablename))::regclass::oid AND
tg.tgname = 'test_quota_per_row'
GROUP BY reloid, 1)
SELECT table_name, reloid FROM cartodbfied_tables WHERE cdb_columns_count = #{cartodb_columns.split(',').length}
}
user.in_database(as: :superuser)[sql].all.map do |record|
Carto::TableFacade.new(record[:reloid], record[:table_name], @user_id)
end
end
# Find raster tables which won't appear as cartodbfied but MUST be linked
def fetch_raster_tables
sql = %{
WITH cartodbfied_tables as (
SELECT c.table_name,
tg.tgrelid reloid,
count(column_name::text) cdb_columns_count
FROM information_schema.columns c, pg_tables t, pg_trigger tg
WHERE
t.tablename = c.table_name AND
t.schemaname = c.table_schema AND
c.table_schema = '#{user.database_schema}' AND
t.tableowner in ('#{user.get_database_roles.join('\',\'')}') AND
column_name IN ('cartodb_id', 'the_raster_webmercator') AND
tg.tgrelid = (quote_ident(t.schemaname) || '.' || quote_ident(t.tablename))::regclass::oid AND
tg.tgname = 'test_quota_per_row'
GROUP BY reloid, 1)
SELECT table_name, reloid FROM cartodbfied_tables WHERE cdb_columns_count = 2;
}
user.in_database(as: :superuser)[sql].all.map do |record|
Carto::TableFacade.new(record[:reloid], record[:table_name], @user_id)
end
end
end
class TableFacade
attr_reader :id, :name, :user_id
def initialize(id, name, user_id)
@id = id
@name = name
@user_id = user_id
end
def user
@user ||= ::User[@user_id]
end
def user_table_with_matching_id
user.tables.where(table_id: id).first
end
def user_table_with_matching_name
user.tables.where(name: name).first
end
def create_user_table
CartoDB::Logger.debug(message: 'ghost tables',
action: 'linking new table',
user: user,
table_name: name,
table_id: id)
user_table = Carto::UserTable.new
user_table.user_id = user.id
user_table.table_id = id
user_table.name = name
new_table = ::Table.new(user_table: user_table)
new_table.register_table_only = true
new_table.keep_user_database_table = true
new_table.save
rescue StandardError => exception
CartoDB::Logger.error(message: 'Ghost tables: Error creating UserTable',
exception: exception,
user: user,
table_name: name,
table_id: id)
end
def rename_user_table_vis
CartoDB::Logger.debug(message: 'ghost tables',
action: 'relinking renamed table',
user: user,
table_name: name,
table_id: id)
user_table_vis = user_table_with_matching_id.table_visualization
user_table_vis.register_table_only = true
user_table_vis.name = name
user_table_vis.store
rescue StandardError => exception
CartoDB::Logger.error(message: 'Ghost tables: Error renaming Visualization',
exception: exception,
user: user,
table_name: name,
table_id: id)
end
def drop_user_table
CartoDB::Logger.debug(message: 'ghost tables',
action: 'unlinking dropped table',
user: user,
table_name: name,
table_id: id)
user_table_to_drop = user.tables.where(table_id: id, name: name).first
return unless user_table_to_drop # The table has already been deleted
table_to_drop = ::Table.new(user_table: user_table_to_drop)
table_to_drop.keep_user_database_table = true
table_to_drop.destroy
rescue StandardError => exception
CartoDB::Logger.error(message: 'Ghost tables: Error dropping Table',
exception: exception,
user: user,
table_name: name,
table_id: id)
end
def regenerate_user_table
CartoDB::Logger.debug(message: 'ghost tables',
action: 'regenerating table_id',
user: user,
table_name: name,
table_id: id)
user_table_to_regenerate = user_table_with_matching_name
user_table_to_regenerate.table_id = id
user_table_to_regenerate.save
rescue StandardError => exception
CartoDB::Logger.error(message: 'Ghost tables: Error syncing table_id for UserTable',
exception: exception,
user: user,
table_name: name,
table_id: id)
end
def eql?(other)
id.eql?(other.id) && name.eql?(other.name) && user.id.eql?(other.user_id)
end
def ==(other)
eql?(other)
end
def hash
[id, name, user_id].hash
end
end
end