318 lines
11 KiB
Ruby
318 lines
11 KiB
Ruby
|
require_relative 'bolt.rb'
|
||
|
|
||
|
module Carto
|
||
|
class GhostTablesManager
|
||
|
MUTEX_REDIS_KEY = 'ghost_tables_working'.freeze
|
||
|
MUTEX_TTL_MS = 600000
|
||
|
MAX_TABLES_FOR_SYNC_RUN = 8
|
||
|
|
||
|
def initialize(user_id)
|
||
|
@user_id = user_id
|
||
|
end
|
||
|
|
||
|
def user
|
||
|
@user ||= ::User[@user_id]
|
||
|
end
|
||
|
|
||
|
def link_ghost_tables
|
||
|
return if user_tables_synced_with_db?
|
||
|
|
||
|
if should_run_synchronously?
|
||
|
link_ghost_tables_synchronously
|
||
|
else
|
||
|
::Resque.enqueue(::Resque::UserDBJobs::UserDBMaintenance::LinkGhostTables, @user_id)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def link_ghost_tables_synchronously
|
||
|
sync_user_tables_with_db unless user_tables_synced_with_db?
|
||
|
end
|
||
|
|
||
|
# determine linked tables vs cartodbfied tables consistency; i.e.: needs to run
|
||
|
def user_tables_synced_with_db?
|
||
|
user_tables = fetch_user_tables
|
||
|
cartodbfied_tables = fetch_cartodbfied_tables
|
||
|
|
||
|
user_tables.length == cartodbfied_tables.length &&
|
||
|
(user_tables - cartodbfied_tables).empty? &&
|
||
|
(cartodbfied_tables - user_tables).empty?
|
||
|
end
|
||
|
|
||
|
def get_bolt
|
||
|
Carto::Bolt.new("#{user.username}:#{MUTEX_REDIS_KEY}", ttl_ms: MUTEX_TTL_MS)
|
||
|
end
|
||
|
|
||
|
# run a block of code exclusively with GhostTablesManager (using Bolt lock)
|
||
|
# if warning_params is provided (with paramters for Logger.warning) then
|
||
|
# the code is executed even if the lock is not acquired (in which case
|
||
|
# a warning is emmitted)
|
||
|
def self.run_synchronized(user_id, attempts: 10, timeout: 30000, **warning_params)
|
||
|
gtm = new(user_id)
|
||
|
bolt = gtm.get_bolt
|
||
|
rerun_func = lambda { gtm.send(:sync) }
|
||
|
lock_acquired = bolt.run_locked(attempts: attempts, timeout: timeout, rerun_func: rerun_func) do
|
||
|
yield
|
||
|
end
|
||
|
if !lock_acquired && warning_params.present?
|
||
|
# run even if lock wasn't aquired
|
||
|
CartoDB::Logger.warning(warning_params)
|
||
|
yield
|
||
|
end
|
||
|
end
|
||
|
|
||
|
private
|
||
|
|
||
|
# It's nice to run sync if any unsafe stale (dropped or renamed) tables will be shown to the user but we can't block
|
||
|
# the workers for more that 180 seconds
|
||
|
def should_run_synchronously?
|
||
|
cartodbfied_tables = fetch_cartodbfied_tables
|
||
|
|
||
|
dropped_and_stale_tables = find_dropped_tables(cartodbfied_tables) + find_stale_tables(cartodbfied_tables)
|
||
|
total_tables_to_be_linked = dropped_and_stale_tables + find_new_tables(cartodbfied_tables)
|
||
|
|
||
|
dropped_and_stale_tables.count != 0 && total_tables_to_be_linked.count < MAX_TABLES_FOR_SYNC_RUN
|
||
|
end
|
||
|
|
||
|
def sync_user_tables_with_db
|
||
|
got_locked = get_bolt.run_locked(rerun_func: lambda { sync }) { sync }
|
||
|
|
||
|
CartoDB::Logger.info(message: 'Ghost table race condition avoided', user: user) unless got_locked
|
||
|
end
|
||
|
|
||
|
def sync
|
||
|
cartodbfied_tables = fetch_cartodbfied_tables
|
||
|
|
||
|
# Update table_id on UserTables with physical tables with changed oid. Should go first.
|
||
|
find_regenerated_tables(cartodbfied_tables).each(&:regenerate_user_table)
|
||
|
|
||
|
# Relink tables that have been renamed through the SQL API
|
||
|
find_renamed_tables(cartodbfied_tables).each(&:rename_user_table_vis)
|
||
|
|
||
|
# Create UserTables for non linked Tables
|
||
|
find_new_tables(cartodbfied_tables).each(&:create_user_table)
|
||
|
|
||
|
# Unlink tables that have been created through the SQL API. Should go last.
|
||
|
find_dropped_tables(cartodbfied_tables).each(&:drop_user_table)
|
||
|
end
|
||
|
|
||
|
# Any UserTable that has been renamed or regenerated.
|
||
|
def find_stale_tables(cartodbfied_tables)
|
||
|
find_regenerated_tables(cartodbfied_tables) | find_renamed_tables(cartodbfied_tables)
|
||
|
end
|
||
|
|
||
|
# UserTables that coincide with a cartodbfied table in name but not id
|
||
|
def find_renamed_tables(cartodbfied_tables)
|
||
|
user_tables = fetch_user_tables
|
||
|
|
||
|
cartodbfied_tables.select do |cartodbfied_table|
|
||
|
user_tables.any?{|t| t.name != cartodbfied_table.name && t.id == cartodbfied_table.id}
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# UserTables that coincide with a cartodbfied table in id but not in name
|
||
|
def find_regenerated_tables(cartodbfied_tables)
|
||
|
user_tables = fetch_user_tables
|
||
|
|
||
|
cartodbfied_tables.select do |cartodbfied_table|
|
||
|
user_tables.any?{|t| t.name == cartodbfied_table.name && t.id != cartodbfied_table.id}
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# Cartodbfied tables that are not stale and are not linked as UserTables yet
|
||
|
def find_new_tables(cartodbfied_tables)
|
||
|
cartodbfied_tables - fetch_user_tables - find_stale_tables(cartodbfied_tables)
|
||
|
end
|
||
|
|
||
|
# UserTables that are not stale and have no cartodbfied table associated to it
|
||
|
def find_dropped_tables(cartodbfied_tables)
|
||
|
fetch_user_tables - cartodbfied_tables - find_stale_tables(cartodbfied_tables)
|
||
|
end
|
||
|
|
||
|
# Fetches all currently linked user tables
|
||
|
def fetch_user_tables
|
||
|
Carto::UserTable.select([:name, :table_id]).where(user_id: @user_id).map do |record|
|
||
|
Carto::TableFacade.new(record[:table_id], record[:name], @user_id)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# Fetches all linkable tables: non raster cartodbfied + raster
|
||
|
def fetch_cartodbfied_tables
|
||
|
(fetch_non_raster_cartodbfied_tables + fetch_raster_tables).uniq
|
||
|
end
|
||
|
|
||
|
# this method searchs for tables with all the columns needed in a cartodb table.
|
||
|
# it does not check column types, and only the latest cartodbfication trigger attached (test_quota_per_row)
|
||
|
def fetch_non_raster_cartodbfied_tables
|
||
|
cartodb_columns = (Table::CARTODB_REQUIRED_COLUMNS + [Table::THE_GEOM_WEBMERCATOR]).map { |col| "'#{col}'" }
|
||
|
.join(',')
|
||
|
|
||
|
sql = %{
|
||
|
WITH cartodbfied_tables as (
|
||
|
SELECT c.table_name,
|
||
|
tg.tgrelid reloid,
|
||
|
count(column_name::text) cdb_columns_count
|
||
|
FROM information_schema.columns c, pg_tables t, pg_trigger tg
|
||
|
WHERE
|
||
|
t.tablename !~ '^importer_' AND
|
||
|
t.tablename = c.table_name AND
|
||
|
t.schemaname = c.table_schema AND
|
||
|
c.table_schema = '#{user.database_schema}' AND
|
||
|
t.tableowner in ('#{user.get_database_roles.join('\',\'')}') AND
|
||
|
column_name IN (#{cartodb_columns}) AND
|
||
|
tg.tgrelid = (quote_ident(t.schemaname) || '.' || quote_ident(t.tablename))::regclass::oid AND
|
||
|
tg.tgname = 'test_quota_per_row'
|
||
|
GROUP BY reloid, 1)
|
||
|
SELECT table_name, reloid FROM cartodbfied_tables WHERE cdb_columns_count = #{cartodb_columns.split(',').length}
|
||
|
}
|
||
|
|
||
|
user.in_database(as: :superuser)[sql].all.map do |record|
|
||
|
Carto::TableFacade.new(record[:reloid], record[:table_name], @user_id)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# Find raster tables which won't appear as cartodbfied but MUST be linked
|
||
|
def fetch_raster_tables
|
||
|
sql = %{
|
||
|
WITH cartodbfied_tables as (
|
||
|
SELECT c.table_name,
|
||
|
tg.tgrelid reloid,
|
||
|
count(column_name::text) cdb_columns_count
|
||
|
FROM information_schema.columns c, pg_tables t, pg_trigger tg
|
||
|
WHERE
|
||
|
t.tablename = c.table_name AND
|
||
|
t.schemaname = c.table_schema AND
|
||
|
c.table_schema = '#{user.database_schema}' AND
|
||
|
t.tableowner in ('#{user.get_database_roles.join('\',\'')}') AND
|
||
|
column_name IN ('cartodb_id', 'the_raster_webmercator') AND
|
||
|
tg.tgrelid = (quote_ident(t.schemaname) || '.' || quote_ident(t.tablename))::regclass::oid AND
|
||
|
tg.tgname = 'test_quota_per_row'
|
||
|
GROUP BY reloid, 1)
|
||
|
SELECT table_name, reloid FROM cartodbfied_tables WHERE cdb_columns_count = 2;
|
||
|
}
|
||
|
|
||
|
user.in_database(as: :superuser)[sql].all.map do |record|
|
||
|
Carto::TableFacade.new(record[:reloid], record[:table_name], @user_id)
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
class TableFacade
|
||
|
attr_reader :id, :name, :user_id
|
||
|
|
||
|
def initialize(id, name, user_id)
|
||
|
@id = id
|
||
|
@name = name
|
||
|
@user_id = user_id
|
||
|
end
|
||
|
|
||
|
def user
|
||
|
@user ||= ::User[@user_id]
|
||
|
end
|
||
|
|
||
|
def user_table_with_matching_id
|
||
|
user.tables.where(table_id: id).first
|
||
|
end
|
||
|
|
||
|
def user_table_with_matching_name
|
||
|
user.tables.where(name: name).first
|
||
|
end
|
||
|
|
||
|
def create_user_table
|
||
|
CartoDB::Logger.debug(message: 'ghost tables',
|
||
|
action: 'linking new table',
|
||
|
user: user,
|
||
|
table_name: name,
|
||
|
table_id: id)
|
||
|
user_table = Carto::UserTable.new
|
||
|
user_table.user_id = user.id
|
||
|
user_table.table_id = id
|
||
|
user_table.name = name
|
||
|
new_table = ::Table.new(user_table: user_table)
|
||
|
|
||
|
new_table.register_table_only = true
|
||
|
new_table.keep_user_database_table = true
|
||
|
|
||
|
new_table.save
|
||
|
rescue StandardError => exception
|
||
|
CartoDB::Logger.error(message: 'Ghost tables: Error creating UserTable',
|
||
|
exception: exception,
|
||
|
user: user,
|
||
|
table_name: name,
|
||
|
table_id: id)
|
||
|
end
|
||
|
|
||
|
def rename_user_table_vis
|
||
|
CartoDB::Logger.debug(message: 'ghost tables',
|
||
|
action: 'relinking renamed table',
|
||
|
user: user,
|
||
|
table_name: name,
|
||
|
table_id: id)
|
||
|
|
||
|
user_table_vis = user_table_with_matching_id.table_visualization
|
||
|
|
||
|
user_table_vis.register_table_only = true
|
||
|
user_table_vis.name = name
|
||
|
|
||
|
user_table_vis.store
|
||
|
rescue StandardError => exception
|
||
|
CartoDB::Logger.error(message: 'Ghost tables: Error renaming Visualization',
|
||
|
exception: exception,
|
||
|
user: user,
|
||
|
table_name: name,
|
||
|
table_id: id)
|
||
|
end
|
||
|
|
||
|
def drop_user_table
|
||
|
CartoDB::Logger.debug(message: 'ghost tables',
|
||
|
action: 'unlinking dropped table',
|
||
|
user: user,
|
||
|
table_name: name,
|
||
|
table_id: id)
|
||
|
|
||
|
user_table_to_drop = user.tables.where(table_id: id, name: name).first
|
||
|
return unless user_table_to_drop # The table has already been deleted
|
||
|
|
||
|
table_to_drop = ::Table.new(user_table: user_table_to_drop)
|
||
|
table_to_drop.keep_user_database_table = true
|
||
|
table_to_drop.destroy
|
||
|
rescue StandardError => exception
|
||
|
CartoDB::Logger.error(message: 'Ghost tables: Error dropping Table',
|
||
|
exception: exception,
|
||
|
user: user,
|
||
|
table_name: name,
|
||
|
table_id: id)
|
||
|
end
|
||
|
|
||
|
def regenerate_user_table
|
||
|
CartoDB::Logger.debug(message: 'ghost tables',
|
||
|
action: 'regenerating table_id',
|
||
|
user: user,
|
||
|
table_name: name,
|
||
|
table_id: id)
|
||
|
|
||
|
user_table_to_regenerate = user_table_with_matching_name
|
||
|
|
||
|
user_table_to_regenerate.table_id = id
|
||
|
user_table_to_regenerate.save
|
||
|
rescue StandardError => exception
|
||
|
CartoDB::Logger.error(message: 'Ghost tables: Error syncing table_id for UserTable',
|
||
|
exception: exception,
|
||
|
user: user,
|
||
|
table_name: name,
|
||
|
table_id: id)
|
||
|
end
|
||
|
|
||
|
def eql?(other)
|
||
|
id.eql?(other.id) && name.eql?(other.name) && user.id.eql?(other.user_id)
|
||
|
end
|
||
|
|
||
|
def ==(other)
|
||
|
eql?(other)
|
||
|
end
|
||
|
|
||
|
def hash
|
||
|
[id, name, user_id].hash
|
||
|
end
|
||
|
end
|
||
|
end
|