1528 lines
52 KiB
Ruby
1528 lines
52 KiB
Ruby
|
# Proxies management of a table in the users database
|
||
|
require 'forwardable'
|
||
|
|
||
|
require_relative './table/column_typecaster'
|
||
|
require_relative './table/privacy_manager'
|
||
|
require_relative './table/relator'
|
||
|
require_relative './table/user_table'
|
||
|
require_relative './visualization/member'
|
||
|
require_relative './visualization/collection'
|
||
|
require_relative './visualization/overlays'
|
||
|
require_relative './visualization/table_blender'
|
||
|
require_relative '../../services/importer/lib/importer/query_batcher'
|
||
|
require_relative '../../services/importer/lib/importer/cartodbfy_time'
|
||
|
require_relative '../../services/importer/lib/importer/column'
|
||
|
require_relative '../../services/datasources/lib/datasources/decorators/factory'
|
||
|
require_relative '../../services/table-geocoder/lib/internal-geocoder/latitude_longitude'
|
||
|
require_relative '../model_factories/layer_factory'
|
||
|
require_relative '../model_factories/map_factory'
|
||
|
require_relative '../../lib/cartodb/stats/user_tables'
|
||
|
require_relative '../../lib/cartodb/stats/importer'
|
||
|
require_dependency 'carto/table_utils'
|
||
|
require_dependency 'carto/valid_table_name_proposer'
|
||
|
|
||
|
class Table
|
||
|
extend Forwardable
|
||
|
include Carto::TableUtils
|
||
|
include ::LoggerHelper
|
||
|
|
||
|
# TODO Part of a service along with schema
|
||
|
# INFO: created_at and updated_at cannot be dropped from existing tables without dropping the triggers first
|
||
|
CARTODB_REQUIRED_COLUMNS = %w{cartodb_id the_geom}.freeze
|
||
|
CARTODB_COLUMNS = %w{cartodb_id created_at updated_at the_geom}.freeze
|
||
|
THE_GEOM_WEBMERCATOR = :the_geom_webmercator
|
||
|
THE_GEOM = :the_geom
|
||
|
CARTODB_ID = :cartodb_id
|
||
|
DATATYPE_DATE = 'date'.freeze
|
||
|
|
||
|
NO_GEOMETRY_TYPES_CACHING_TIMEOUT = 5.minutes
|
||
|
GEOMETRY_TYPES_PRESENT_CACHING_TIMEOUT = 24.hours
|
||
|
STATEMENT_TIMEOUT = 1.hour * 1000
|
||
|
|
||
|
DEFAULT_THE_GEOM_TYPE = 'geometry'
|
||
|
|
||
|
VALID_GEOMETRY_TYPES = %W{ geometry multipolygon point multilinestring }
|
||
|
|
||
|
def_delegators :relator, *CartoDB::TableRelator::INTERFACE
|
||
|
def_delegators :@user_table, *::UserTable::INTERFACE
|
||
|
|
||
|
attr_accessor :user_table
|
||
|
|
||
|
def initialize(args = {})
|
||
|
if args[:user_table].nil?
|
||
|
# TODO: This won't work, you need to UserTable.new.set_fields(args, args.keys)
|
||
|
@user_table = model_class.new(args)
|
||
|
else
|
||
|
@user_table = args[:user_table]
|
||
|
end
|
||
|
self.user_id = args[:user_id] if args[:user_id].present?
|
||
|
# TODO: this probably makes sense only if user_table is not passed as argument
|
||
|
@user_table.set_service(self)
|
||
|
end
|
||
|
|
||
|
# This is here just for testing purposes (being able to test this service against both models)
|
||
|
def model_class
|
||
|
Carto::UserTable
|
||
|
end
|
||
|
|
||
|
# forwardable does not work well with this one
|
||
|
def layers
|
||
|
@user_table.layers
|
||
|
end
|
||
|
|
||
|
def save
|
||
|
# TODO: kept for compatibility reasons on tests on both models, until 100% removal of ::UserTable support
|
||
|
if @user_table.respond_to?(:save!)
|
||
|
@user_table.save!
|
||
|
else
|
||
|
@user_table.save
|
||
|
end
|
||
|
|
||
|
self
|
||
|
end
|
||
|
|
||
|
def update(args)
|
||
|
# Sequel and ActiveRecord #update don't behave equally, we need this workaround for compatibility reasons
|
||
|
if @user_table.is_a?(Carto::UserTable)
|
||
|
@user_table.update_attributes(args)
|
||
|
else
|
||
|
@user_table.update(args)
|
||
|
end
|
||
|
self
|
||
|
end
|
||
|
|
||
|
def reload
|
||
|
@user_table.reload
|
||
|
self
|
||
|
end
|
||
|
|
||
|
# ----------------------------------------------------------------------------
|
||
|
|
||
|
def geometry_types_key
|
||
|
"#{redis_key}:geometry_types"
|
||
|
end
|
||
|
|
||
|
def geometry_types
|
||
|
# default return value
|
||
|
types = []
|
||
|
|
||
|
types_str = cache.get geometry_types_key
|
||
|
if types_str.present?
|
||
|
# cache hit
|
||
|
types = JSON.parse(types_str)
|
||
|
else
|
||
|
# cache miss, query and store
|
||
|
types = query_geometry_types
|
||
|
timeout = types.empty? ? NO_GEOMETRY_TYPES_CACHING_TIMEOUT : GEOMETRY_TYPES_PRESENT_CACHING_TIMEOUT
|
||
|
cache.setex(geometry_types_key, timeout, types)
|
||
|
end
|
||
|
|
||
|
types
|
||
|
end
|
||
|
|
||
|
def is_raster?
|
||
|
schema.select { |key, value| value == 'raster' }.length > 0
|
||
|
end
|
||
|
|
||
|
attr_accessor :force_schema,
|
||
|
:import_from_file,
|
||
|
:import_from_url,
|
||
|
:import_from_query,
|
||
|
:import_from_table_copy,
|
||
|
:importing_encoding,
|
||
|
:the_geom_type_value,
|
||
|
:migrate_existing_table,
|
||
|
# this flag is used to register table changes only without doing operations on in the database
|
||
|
# for example when the table is renamed or created. For remove see keep_user_database_table
|
||
|
:register_table_only,
|
||
|
:new_table,
|
||
|
# Handy for rakes and custom ghost table registers, won't delete user table in case of error
|
||
|
:keep_user_database_table
|
||
|
|
||
|
# Getter by table uuid using canonical visualizations
|
||
|
# @param table_id String
|
||
|
# @param viewer_user ::User
|
||
|
def self.get_by_id(table_id, viewer_user)
|
||
|
table = nil
|
||
|
return table unless viewer_user
|
||
|
|
||
|
table_temp = Carto::UserTable.where(id: table_id).first.service
|
||
|
unless table_temp.nil?
|
||
|
vis = CartoDB::Visualization::Collection.new.fetch(
|
||
|
user_id: viewer_user.id,
|
||
|
map_id: table_temp.map_id,
|
||
|
type: Carto::Visualization::TYPE_CANONICAL
|
||
|
).first
|
||
|
table = vis.table unless vis.nil?
|
||
|
end
|
||
|
table
|
||
|
end
|
||
|
|
||
|
# Getter by table uuid using canonical visualizations. No privacy checks
|
||
|
# @param table_id String
|
||
|
def self.get_by_table_id(table_id)
|
||
|
table_temp = Carto::UserTable.where(id: table_id).first
|
||
|
table_temp.service unless table_temp.nil?
|
||
|
end
|
||
|
|
||
|
# Get a list of tables given an array with the names
|
||
|
# (can be fully qualified).
|
||
|
# it also needs the user used to search a table when the
|
||
|
# name is not qualified
|
||
|
def self.get_all_by_names(names, viewer_user)
|
||
|
names.map { |t|
|
||
|
user_id = viewer_user.id
|
||
|
table_name, table_schema = Table.table_and_schema(t)
|
||
|
unless table_schema.nil?
|
||
|
owner = ::User.where(username:table_schema).first
|
||
|
unless owner.nil?
|
||
|
user_id = owner.id
|
||
|
end
|
||
|
end
|
||
|
::UserTable.where(user_id: user_id, name: table_name).first
|
||
|
}
|
||
|
end
|
||
|
|
||
|
# TODO: REFACTOR THIS patch introduced to continue with #3664
|
||
|
def self.get_all_user_tables_by_names(names, viewer_user)
|
||
|
names.map { |t|
|
||
|
user_id = viewer_user.id
|
||
|
table_name, table_schema = Table.table_and_schema(t)
|
||
|
unless table_schema.nil?
|
||
|
owner = ::User.where(username:table_schema).first
|
||
|
unless owner.nil?
|
||
|
user_id = owner.id
|
||
|
end
|
||
|
end
|
||
|
Carto::UserTable.where(user_id: user_id, name: table_name).first
|
||
|
}
|
||
|
end
|
||
|
|
||
|
def self.table_and_schema(table_name)
|
||
|
if table_name =~ /\./
|
||
|
table_name, schema = table_name.split('.').reverse
|
||
|
# remove quotes from schema
|
||
|
schema = schema.delete('"')
|
||
|
[table_name, (schema if schema != 'public')]
|
||
|
else
|
||
|
[table_name, nil]
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def data_import
|
||
|
@data_import ||= DataImport.where(id: @user_table.data_import_id).first || DataImport.new(user_id: owner.id)
|
||
|
end
|
||
|
|
||
|
## Callbacks
|
||
|
|
||
|
def import_to_cartodb(uniname = nil)
|
||
|
if migrate_existing_table.present? || uniname
|
||
|
data_import.data_type = DataImport::TYPE_EXTERNAL_TABLE if data_import.data_type.nil?
|
||
|
data_import.data_source = migrate_existing_table || uniname
|
||
|
data_import.save
|
||
|
|
||
|
# ensure unique name, also ensures self.name can override any imported table name
|
||
|
uniname = get_valid_name(name ? name : migrate_existing_table) unless uniname
|
||
|
|
||
|
# Make sure column names are sanitized. Make it consistently.
|
||
|
sanitize_columns(table_name: uniname, database_schema: owner.database_schema, connection: owner.in_database)
|
||
|
|
||
|
# with table #{uniname} table created now run migrator to CartoDBify
|
||
|
hash_in = ::SequelRails.configuration.environment_for(Rails.env).merge(
|
||
|
'host' => owner.database_host,
|
||
|
'database' => owner.database_name,
|
||
|
:logger => ::Rails.logger,
|
||
|
'username' => owner.database_username,
|
||
|
'password' => owner.database_password,
|
||
|
:schema => owner.database_schema,
|
||
|
:current_name => migrate_existing_table || uniname,
|
||
|
:suggested_name => uniname,
|
||
|
:debug => Rails.env.development?,
|
||
|
:remaining_quota => owner.remaining_quota,
|
||
|
:remaining_tables => owner.remaining_table_quota,
|
||
|
:data_import_id => data_import.id
|
||
|
).symbolize_keys
|
||
|
importer = CartoDB::Migrator.new(hash_in)
|
||
|
imported_name = importer.migrate!
|
||
|
data_import.reload
|
||
|
data_import.save
|
||
|
imported_name
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# TODO: basically most if not all of what the import_cleanup does is done by cartodbfy.
|
||
|
# Consider deletion.
|
||
|
def import_cleanup
|
||
|
# When tables are created using ogr2ogr they are added a ogc_fid or gid primary key
|
||
|
# In that case:
|
||
|
# - If cartodb_id already exists, remove ogc_fid
|
||
|
# - If cartodb_id does not exist, treat this field as the auxiliary column
|
||
|
aux_cartodb_id_column = [:ogc_fid, :gid].find { |col| valid_cartodb_id_candidate?(col) }
|
||
|
|
||
|
# Remove primary key
|
||
|
owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT, as: :superuser) do |user_database|
|
||
|
existing_pk = user_database[%Q{
|
||
|
SELECT c.conname AS pk_name
|
||
|
FROM pg_class r, pg_constraint c, pg_namespace n
|
||
|
WHERE r.oid = c.conrelid AND contype='p' AND relname = '#{name}'
|
||
|
AND r.relnamespace = n.oid and n.nspname= '#{owner.database_schema}'
|
||
|
}].first
|
||
|
|
||
|
existing_pk = existing_pk[:pk_name] unless existing_pk.nil?
|
||
|
|
||
|
user_database.run(%{
|
||
|
ALTER TABLE #{qualified_table_name} DROP CONSTRAINT "#{existing_pk}"
|
||
|
}) unless existing_pk.nil?
|
||
|
end
|
||
|
|
||
|
# All normal fields casted to text
|
||
|
self.schema(reload: true, cartodb_types: false).each do |column|
|
||
|
if column[1] =~ /^character varying/
|
||
|
owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
|
||
|
user_database.run(%{ALTER TABLE #{qualified_table_name} ALTER COLUMN "#{column[0]}" TYPE text})
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# If there's an auxiliary column, copy to cartodb_id and restart the sequence to the max(cartodb_id)+1
|
||
|
if aux_cartodb_id_column.present?
|
||
|
begin
|
||
|
already_had_cartodb_id = false
|
||
|
owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
|
||
|
user_database.run(%{ALTER TABLE #{qualified_table_name} ADD COLUMN cartodb_id SERIAL})
|
||
|
end
|
||
|
rescue StandardError
|
||
|
already_had_cartodb_id = true
|
||
|
end
|
||
|
unless already_had_cartodb_id
|
||
|
owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
|
||
|
user_database.run(%{
|
||
|
UPDATE #{qualified_table_name}
|
||
|
SET cartodb_id = CAST(#{aux_cartodb_id_column} AS INTEGER)
|
||
|
})
|
||
|
|
||
|
cartodb_id_sequence_name = user_database[%{
|
||
|
SELECT pg_get_serial_sequence('#{owner.database_schema}.#{name}', 'cartodb_id')
|
||
|
}].first[:pg_get_serial_sequence]
|
||
|
max_cartodb_id = user_database[%{SELECT max(cartodb_id) FROM #{qualified_table_name}}].first[:max]
|
||
|
# only reset the sequence on real imports.
|
||
|
|
||
|
if max_cartodb_id
|
||
|
user_database.run("ALTER SEQUENCE #{cartodb_id_sequence_name} RESTART WITH #{max_cartodb_id + 1}")
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
|
||
|
user_database.run(%{ALTER TABLE #{qualified_table_name} DROP COLUMN #{aux_cartodb_id_column}})
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def before_create
|
||
|
raise CartoDB::QuotaExceeded if owner.over_table_quota?
|
||
|
|
||
|
# The Table model only migrates now, never imports
|
||
|
if migrate_existing_table.present?
|
||
|
if @user_table.data_import_id.nil? #needed for non ui-created tables
|
||
|
@data_import = DataImport.new(:user_id => self.user_id)
|
||
|
@data_import.updated_at = Time.now
|
||
|
@data_import.save
|
||
|
else
|
||
|
@data_import = DataImport.find(:id=>@user_table.data_import_id)
|
||
|
end
|
||
|
|
||
|
importer_result_name = import_to_cartodb(name)
|
||
|
|
||
|
@data_import.reload
|
||
|
@data_import.table_name = importer_result_name
|
||
|
@data_import.save
|
||
|
|
||
|
self[:name] = importer_result_name
|
||
|
|
||
|
set_the_geom_column!
|
||
|
|
||
|
import_cleanup
|
||
|
self.cartodbfy
|
||
|
|
||
|
@data_import.save
|
||
|
else
|
||
|
if !register_table_only.present?
|
||
|
create_table_in_database!
|
||
|
set_the_geom_column!(self.the_geom_type)
|
||
|
self.cartodbfy
|
||
|
end
|
||
|
end
|
||
|
|
||
|
self.schema(reload:true)
|
||
|
set_table_id
|
||
|
rescue StandardError => e
|
||
|
self.handle_creation_error(e)
|
||
|
end
|
||
|
|
||
|
def after_create
|
||
|
grant_select_to_tiler_user
|
||
|
|
||
|
@force_schema = nil
|
||
|
self.new_table = true
|
||
|
|
||
|
# finally, close off the data import
|
||
|
if @user_table.data_import_id && !register_table_only.present?
|
||
|
@data_import = DataImport.find(id: @user_table.data_import_id)
|
||
|
@data_import.table_id = id
|
||
|
@data_import.table_name = name
|
||
|
@data_import.save
|
||
|
|
||
|
if !@data_import.privacy.nil?
|
||
|
if !self.owner.valid_privacy?(@data_import.privacy)
|
||
|
raise "Error: User '#{self.owner.username}' doesn't have private tables enabled"
|
||
|
end
|
||
|
@user_table.privacy = @data_import.privacy
|
||
|
end
|
||
|
|
||
|
@user_table.save
|
||
|
|
||
|
decorator = CartoDB::Datasources::Decorators::Factory.decorator_for(@data_import.service_name)
|
||
|
if !decorator.nil? && decorator.decorates_layer?
|
||
|
self.map.layers.each do |layer|
|
||
|
decorator.decorate_layer!(layer)
|
||
|
layer.save if decorator.layer_eligible?(layer) # skip .save if nothing changed
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
add_table_to_stats
|
||
|
|
||
|
update_table_pg_stats
|
||
|
|
||
|
rescue StandardError => e
|
||
|
handle_creation_error(e)
|
||
|
end
|
||
|
|
||
|
def before_save
|
||
|
@user_table.updated_at = table_visualization.updated_at if table_visualization
|
||
|
end
|
||
|
|
||
|
def after_save
|
||
|
manage_tags
|
||
|
update_name_changes
|
||
|
|
||
|
CartoDB::TablePrivacyManager.new(@user_table).apply_privacy_change(self, previous_privacy, privacy_changed?)
|
||
|
update_cdb_tablemetadata if privacy_changed? || !@name_changed_from.nil?
|
||
|
end
|
||
|
|
||
|
def propagate_namechange_to_table_vis
|
||
|
table_visualization.name = name
|
||
|
table_visualization.store
|
||
|
end
|
||
|
|
||
|
def grant_select_to_tiler_user
|
||
|
owner.in_database(:as => :superuser).run(%Q{GRANT SELECT ON #{qualified_table_name} TO #{CartoDB::TILE_DB_USER};})
|
||
|
end
|
||
|
|
||
|
def optimize
|
||
|
owner.db_service.in_database_direct_connection({statement_timeout: STATEMENT_TIMEOUT}) do |user_direct_conn|
|
||
|
user_direct_conn.run(%Q{
|
||
|
VACUUM ANALYZE #{qualified_table_name}
|
||
|
})
|
||
|
end
|
||
|
rescue StandardError => e
|
||
|
CartoDB::notify_exception(e, { user: owner })
|
||
|
false
|
||
|
end
|
||
|
|
||
|
def handle_creation_error(e)
|
||
|
log_info(message: 'table#create error', exception: e)
|
||
|
# Remove the table, except if it already exists
|
||
|
unless self.name.blank? || e.message =~ /relation .* already exists/
|
||
|
@data_import.log.append ("Import ERROR: Dropping table #{qualified_table_name}") if @data_import
|
||
|
|
||
|
self.remove_table_from_user_database unless keep_user_database_table
|
||
|
end
|
||
|
@data_import.log.append ("Import ERROR: #{e.message} Trace: #{e.backtrace}") if @data_import
|
||
|
raise e
|
||
|
end
|
||
|
|
||
|
def after_destroy
|
||
|
Carto::Tag.where(user_id: user_id, table_id: id).each(&:destroy)
|
||
|
remove_table_from_stats
|
||
|
|
||
|
cache.del geometry_types_key
|
||
|
|
||
|
update_cdb_tablemetadata if real_table_exists?
|
||
|
remove_table_from_user_database unless keep_user_database_table
|
||
|
|
||
|
related_templates.each { |template| template.destroy }
|
||
|
end
|
||
|
|
||
|
def remove_table_from_user_database
|
||
|
owner.in_database(:as => :superuser) do |user_database|
|
||
|
user_database.transaction do
|
||
|
# Give up if it cannot get ExclusiveLocks for DDL operations in a reasonable time
|
||
|
user_database.run(%{SET LOCAL lock_timeout = '1s'})
|
||
|
Carto::OverviewsService.new(user_database).delete_overviews qualified_table_name
|
||
|
user_database.run(%{DROP TABLE IF EXISTS #{qualified_table_name}})
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def real_table_exists?
|
||
|
!get_table_id.nil?
|
||
|
end
|
||
|
|
||
|
def name=(value)
|
||
|
value = value.downcase if value
|
||
|
return if value == @user_table.name || value.blank?
|
||
|
|
||
|
new_name = register_table_only ? value : get_valid_name(value)
|
||
|
|
||
|
# Do not keep track of name changes until table has been saved
|
||
|
unless new_record?
|
||
|
@name_changed_from = @user_table.name if @user_table.name.present?
|
||
|
update_cdb_tablemetadata
|
||
|
end
|
||
|
|
||
|
@user_table.name = new_name
|
||
|
end
|
||
|
|
||
|
def privacy_changed?
|
||
|
@user_table.privacy_changed?
|
||
|
end
|
||
|
|
||
|
def redis_key
|
||
|
"rails:table:#{id}"
|
||
|
end
|
||
|
|
||
|
# TODO: change name and refactor for ActiveRecord
|
||
|
def sequel
|
||
|
owner.in_database.from(sequel_qualified_table_name)
|
||
|
end
|
||
|
|
||
|
def rows_estimated(user=nil)
|
||
|
user ||= self.owner
|
||
|
user.in_database["SELECT reltuples::integer FROM pg_class WHERE oid = '#{self.name}'::regclass"].first[:reltuples]
|
||
|
end
|
||
|
|
||
|
# Preferred: `actual_row_count`
|
||
|
def rows_counted
|
||
|
actual_row_count
|
||
|
end
|
||
|
|
||
|
# Returns table size in bytes
|
||
|
def table_size(user=nil)
|
||
|
user ||= self.owner
|
||
|
@table_size ||= Table.table_size(name, connection: user.in_database)
|
||
|
end
|
||
|
|
||
|
def self.table_size(name, options)
|
||
|
options[:connection]['SELECT pg_total_relation_size(?) AS size', name].first[:size] / 2
|
||
|
rescue Sequel::DatabaseError
|
||
|
nil
|
||
|
end
|
||
|
|
||
|
def schema(options = {})
|
||
|
first_columns = []
|
||
|
middle_columns = []
|
||
|
last_columns = []
|
||
|
owner.in_database.schema(name, schema: owner.database_schema, reload: options.fetch(:reload, true)).each do |column|
|
||
|
next if column[0] == THE_GEOM_WEBMERCATOR
|
||
|
|
||
|
calculate_the_geom_type if column[0] == :the_geom
|
||
|
|
||
|
col_db_type = column[1][:db_type].starts_with?('geometry') ? 'geometry' : column[1][:db_type]
|
||
|
col = [
|
||
|
column[0],
|
||
|
# Default/unset or set to true means we want cartodb types
|
||
|
(options.include?(:cartodb_types) && options[:cartodb_types] == false ? col_db_type : col_db_type.convert_to_cartodb_type),
|
||
|
col_db_type == 'geometry' ? 'geometry' : nil,
|
||
|
col_db_type == 'geometry' ? the_geom_type : nil
|
||
|
].compact
|
||
|
|
||
|
# Make sensible sorting for UI
|
||
|
case column[0]
|
||
|
when :cartodb_id
|
||
|
first_columns.insert(0,col)
|
||
|
when :the_geom
|
||
|
first_columns.insert(1,col)
|
||
|
when :created_at, :updated_at
|
||
|
last_columns.insert(-1,col)
|
||
|
else
|
||
|
middle_columns << col
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# sort middle columns alphabetically
|
||
|
middle_columns.sort! {|x,y| x[0].to_s <=> y[0].to_s}
|
||
|
|
||
|
# group columns together and return
|
||
|
(first_columns + middle_columns + last_columns).compact
|
||
|
end
|
||
|
|
||
|
def insert_row!(raw_attributes)
|
||
|
primary_key = nil
|
||
|
owner.in_database do |user_database|
|
||
|
schema = user_database.schema(name, schema: owner.database_schema, reload: true).map{|c| c.first}
|
||
|
raw_attributes.delete(:id) unless schema.include?(:id)
|
||
|
attributes = raw_attributes.dup.select{ |k,v| schema.include?(k.to_sym) }
|
||
|
if attributes.keys.size != raw_attributes.keys.size
|
||
|
raise CartoDB::InvalidAttributes.new("Invalid rows: #{(raw_attributes.keys - attributes.keys).join(',')}")
|
||
|
end
|
||
|
begin
|
||
|
primary_key = user_database.from(name).insert(make_sequel_compatible(attributes))
|
||
|
rescue Sequel::DatabaseError => e
|
||
|
message = e.message.split("\n")[0]
|
||
|
raise message if message =~ /Quota exceeded by/
|
||
|
|
||
|
invalid_column = nil
|
||
|
|
||
|
# If the type don't match the schema of the table is modified for the next valid type
|
||
|
invalid_value = (m = message.match(/"([^"]+)"$/)) ? m[1] : nil
|
||
|
if invalid_value
|
||
|
invalid_column = attributes.invert[invalid_value] # which is the column of the name that raises error
|
||
|
else
|
||
|
m = message.match(/PGError: ERROR: value too long for type (.+)$/)
|
||
|
if m
|
||
|
candidate = schema(cartodb_types: false).select{ |c| c[1].to_s == m[1].to_s }.first
|
||
|
if candidate
|
||
|
invalid_column = candidate[0]
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
if invalid_column.nil?
|
||
|
raise e
|
||
|
else
|
||
|
new_column_type = get_new_column_type(invalid_column)
|
||
|
user_database.set_column_type(self.name, invalid_column.to_sym, new_column_type)
|
||
|
retry
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
update_the_geom!(raw_attributes, primary_key)
|
||
|
primary_key
|
||
|
end
|
||
|
|
||
|
MAX_UPDATE_ROW_RETRIES = 3
|
||
|
|
||
|
def update_row!(row_id, raw_attributes)
|
||
|
retries = 0
|
||
|
|
||
|
rows_updated = 0
|
||
|
owner.in_database do |user_database|
|
||
|
schema = user_database.schema(name, schema: owner.database_schema, reload: true).map{|c| c.first}
|
||
|
raw_attributes.delete(:id) unless schema.include?(:id)
|
||
|
|
||
|
attributes = raw_attributes.dup.select{ |k,v| schema.include?(k.to_sym) }
|
||
|
if attributes.keys.size != raw_attributes.keys.size
|
||
|
raise CartoDB::InvalidAttributes, "Invalid rows: #{(raw_attributes.keys - attributes.keys).join(',')}"
|
||
|
end
|
||
|
|
||
|
if attributes.except(THE_GEOM).empty?
|
||
|
if attributes.size == 1 && attributes.keys == [THE_GEOM]
|
||
|
rows_updated = 1
|
||
|
end
|
||
|
else
|
||
|
begin
|
||
|
# update row
|
||
|
rows_updated = user_database.from(name).filter(:cartodb_id => row_id).update(make_sequel_compatible(attributes))
|
||
|
rescue Sequel::DatabaseError => e
|
||
|
# If the type don't match the schema of the table is modified for the next valid type
|
||
|
# TODO: STOP THIS MADNESS
|
||
|
message = e.message.split("\n")[0]
|
||
|
|
||
|
invalid_value = (m = message.match(/"([^"]+)"$/)) ? m[1] : nil
|
||
|
if invalid_value
|
||
|
invalid_column = attributes.invert[invalid_value] # which is the column of the name that raises error
|
||
|
new_column_type = get_new_column_type(invalid_column)
|
||
|
if new_column_type
|
||
|
user_database.set_column_type self.name, invalid_column.to_sym, new_column_type
|
||
|
if (retries += 1) > MAX_UPDATE_ROW_RETRIES
|
||
|
log_error(message: 'Max update_row! retries reached',
|
||
|
user_id: user_id,
|
||
|
qualified_table_name: qualified_table_name,
|
||
|
row_id: row_id,
|
||
|
raw_attributes: raw_attributes,
|
||
|
exception: e)
|
||
|
else
|
||
|
retry
|
||
|
end
|
||
|
end
|
||
|
else
|
||
|
raise e
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
update_the_geom!(raw_attributes, row_id)
|
||
|
rows_updated
|
||
|
end
|
||
|
|
||
|
# make all identifiers SEQUEL Compatible
|
||
|
# https://github.com/Vizzuality/cartodb/issues/331
|
||
|
def make_sequel_compatible(attributes)
|
||
|
attributes.except(THE_GEOM).convert_nulls.each_with_object({}) { |(k, v), h| h[Sequel.identifier(k)] = v }
|
||
|
end
|
||
|
|
||
|
def add_column!(options)
|
||
|
raise CartoDB::InvalidColumnName if CartoDB::Importer2::Column.rejected?(options[:name])
|
||
|
type = options[:type].convert_to_db_type
|
||
|
cartodb_type = options[:type].convert_to_cartodb_type
|
||
|
# FIXME: consider CartoDB::Importer2::Column.get_valid_column_name with CURRENT_COLUMN_SANITIZATION_VERSION
|
||
|
column_name = CartoDB::Importer2::Column.sanitize_name(options[:name].to_s)
|
||
|
owner.in_database.add_column name, column_name, type
|
||
|
|
||
|
update_cdb_tablemetadata
|
||
|
return {:name => column_name, :type => type, :cartodb_type => cartodb_type}
|
||
|
rescue StandardError => e
|
||
|
if e.message =~ /^(PG::Error|PGError|PG::UndefinedObject)/
|
||
|
raise CartoDB::InvalidType, e.message
|
||
|
else
|
||
|
raise e
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def drop_column!(options)
|
||
|
raise if CARTODB_COLUMNS.include?(options[:name].to_s)
|
||
|
owner.in_database.drop_column name, options[:name].to_s
|
||
|
|
||
|
update_cdb_tablemetadata
|
||
|
end
|
||
|
|
||
|
def modify_column!(options)
|
||
|
old_name = options.fetch(:name, '').to_s
|
||
|
new_name = options.fetch(:new_name, '').to_s
|
||
|
|
||
|
# FIXME: consider CartoDB::Importer2::Column.get_valid_column_name with CURRENT_COLUMN_SANITIZATION_VERSION
|
||
|
old_sanitized_name = CartoDB::Importer2::Column.sanitize_name(old_name)
|
||
|
raise 'This column cannot be modified' if CARTODB_COLUMNS.include?(old_sanitized_name.to_s)
|
||
|
|
||
|
if new_name.present? && new_name != old_name
|
||
|
new_sanitized_name = CartoDB::Importer2::Column.sanitize_name(new_name)
|
||
|
rename_column(old_sanitized_name, new_sanitized_name)
|
||
|
end
|
||
|
|
||
|
column_name = (new_name.present? ? new_sanitized_name : old_sanitized_name)
|
||
|
convert_column_datatype(owner.in_database, name, column_name, options[:type])
|
||
|
column_type = column_type_for(column_name)
|
||
|
|
||
|
update_cdb_tablemetadata
|
||
|
{ name: column_name, type: column_type, cartodb_type: column_type.convert_to_cartodb_type }
|
||
|
end #modify_column!
|
||
|
|
||
|
def column_type_for(column_name)
|
||
|
schema(cartodb_types: false, reload: true).select { |c|
|
||
|
c[0] == column_name.to_sym
|
||
|
}.first[1]
|
||
|
end #column_type_for
|
||
|
|
||
|
def self.column_names_for(db, table_name, owner)
|
||
|
db.schema(table_name, schema: owner.database_schema, reload: true).map{ |s| s[0].to_s }
|
||
|
end #column_names
|
||
|
|
||
|
def rename_column(old_name, new_name='')
|
||
|
raise 'Please provide a column name' if new_name.empty?
|
||
|
raise 'This column cannot be renamed' if CARTODB_COLUMNS.include?(old_name.to_s)
|
||
|
|
||
|
if CartoDB::Importer2::Column.reserved_or_unsupported?(new_name) || CARTODB_COLUMNS.include?(new_name)
|
||
|
raise CartoDB::InvalidColumnName, 'That column name is reserved, please choose a different one'
|
||
|
end
|
||
|
|
||
|
self.owner.in_database do |user_database|
|
||
|
if Table.column_names_for(user_database, name, self.owner).include?(new_name)
|
||
|
raise 'Column already exists'
|
||
|
end
|
||
|
user_database.execute %{
|
||
|
ALTER TABLE "#{name}" RENAME COLUMN "#{old_name}" TO "#{new_name}"
|
||
|
}
|
||
|
end
|
||
|
end #rename_column
|
||
|
|
||
|
def convert_column_datatype(database, table_name, column_name, new_type)
|
||
|
CartoDB::ColumnTypecaster.new(
|
||
|
user_database: database,
|
||
|
schema: self.owner.database_schema,
|
||
|
table_name: table_name,
|
||
|
column_name: column_name,
|
||
|
new_type: new_type
|
||
|
).run
|
||
|
end
|
||
|
|
||
|
def records(options = {})
|
||
|
rows = []
|
||
|
records_count = 0
|
||
|
page, per_page = CartoDB::Pagination.get_page_and_per_page(options)
|
||
|
order_by_column = options[:order_by] || 'cartodb_id'
|
||
|
mode = (options[:mode] || 'asc').downcase == 'asc' ? 'ASC' : 'DESC NULLS LAST'
|
||
|
|
||
|
filters = options.slice(:filter_column, :filter_value).reject{|k,v| v.blank?}.values
|
||
|
where = filters.present? ? "WHERE (#{filters.first})|| '' ILIKE '%#{filters.second}%'" : ''
|
||
|
|
||
|
owner.in_database do |user_database|
|
||
|
columns_sql_builder = <<-SQL
|
||
|
SELECT array_to_string(ARRAY(SELECT '"#{name}"' || '.' || quote_ident(c.column_name)
|
||
|
FROM information_schema.columns As c
|
||
|
WHERE table_name = '#{name}'
|
||
|
AND c.column_name <> 'the_geom_webmercator'
|
||
|
), ',') AS column_names
|
||
|
SQL
|
||
|
|
||
|
column_names = user_database[columns_sql_builder].first[:column_names].split(',')
|
||
|
the_geom_index = column_names.index("\"#{name}\".the_geom")
|
||
|
if the_geom_index
|
||
|
column_names[the_geom_index] = <<-STR
|
||
|
CASE
|
||
|
WHEN GeometryType(the_geom) = 'POINT' THEN
|
||
|
ST_AsGeoJSON(the_geom,8)
|
||
|
WHEN (the_geom IS NULL) THEN
|
||
|
NULL
|
||
|
ELSE
|
||
|
'GeoJSON'
|
||
|
END the_geom
|
||
|
STR
|
||
|
end
|
||
|
select_columns = column_names.join(',')
|
||
|
|
||
|
# Counting results can be really expensive, so we estimate
|
||
|
#
|
||
|
# See https://github.com/Vizzuality/cartodb/issues/716
|
||
|
#
|
||
|
max_countable_rows = 65535 # up to this number we accept to count
|
||
|
rows_count_is_estimated = true
|
||
|
rows_count = rows_estimated
|
||
|
if rows_count <= max_countable_rows
|
||
|
rows_count = rows_counted
|
||
|
rows_count_is_estimated = false
|
||
|
end
|
||
|
|
||
|
# If we force to get the name from an schema, we avoid the problem of having as
|
||
|
# table name a reserved word, such 'as'
|
||
|
#
|
||
|
# NOTE: we fetch one more row to verify estimated rowcount is not short
|
||
|
#
|
||
|
rows = user_database[%Q{
|
||
|
SELECT #{select_columns} FROM #{qualified_table_name} #{where} ORDER BY "#{order_by_column}" #{mode} LIMIT #{per_page}+1 OFFSET #{page}
|
||
|
}].all
|
||
|
|
||
|
# Tweak estimation if needed
|
||
|
fetched = rows.length
|
||
|
fetched += page if page
|
||
|
|
||
|
have_more = rows.length > per_page
|
||
|
rows.pop if have_more
|
||
|
|
||
|
records_count = rows_count
|
||
|
if rows_count_is_estimated
|
||
|
if have_more
|
||
|
records_count = fetched > rows_count ? fetched : rows_count
|
||
|
else
|
||
|
records_count = fetched
|
||
|
end
|
||
|
end
|
||
|
|
||
|
# TODO: cache row count !!
|
||
|
# See https://github.com/Vizzuality/cartodb/issues/459
|
||
|
end
|
||
|
|
||
|
{
|
||
|
:id => id,
|
||
|
:name => name,
|
||
|
:total_rows => records_count,
|
||
|
:rows => rows
|
||
|
}
|
||
|
end
|
||
|
|
||
|
def record(identifier)
|
||
|
row = nil
|
||
|
owner.in_database do |user_database|
|
||
|
select_sql = schema.map { |column|
|
||
|
name, type = column
|
||
|
if name == THE_GEOM
|
||
|
"ST_AsGeoJSON(the_geom,8) as the_geom"
|
||
|
elsif type == DATATYPE_DATE
|
||
|
%{CAST("#{name}" AS text) AS "#{name}"}
|
||
|
else
|
||
|
%{"#{name}"}
|
||
|
end
|
||
|
}.join(',')
|
||
|
# If we force to get the name from an schema, we avoid the problem of having as
|
||
|
# table name a reserved word, such 'as'
|
||
|
row = user_database["SELECT #{select_sql} FROM #{qualified_table_name} WHERE cartodb_id = #{identifier}"].first
|
||
|
end
|
||
|
raise if row.nil?
|
||
|
|
||
|
# `.schema` returns [name, type] pairs, except for geometry types where it returns additional data we don't need
|
||
|
db_schema = schema.map { |col_data| col_data.first(2) }.to_h
|
||
|
row.map { |name, value|
|
||
|
parsed_value = db_schema[name] == DATATYPE_DATE && value ? DateTime.parse(value) : value
|
||
|
[name, parsed_value]
|
||
|
}.to_h
|
||
|
end
|
||
|
|
||
|
def run_query(query)
|
||
|
owner.db_service.run_pg_query(query)
|
||
|
end
|
||
|
|
||
|
def georeference_from!(options = {})
|
||
|
if !options[:latitude_column].blank? && !options[:longitude_column].blank?
|
||
|
set_the_geom_column!('point')
|
||
|
|
||
|
owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_conn|
|
||
|
CartoDB::InternalGeocoder::LatitudeLongitude.new(user_conn).geocode(owner.database_schema, self.name, options[:latitude_column], options[:longitude_column])
|
||
|
end
|
||
|
schema(reload: true)
|
||
|
else
|
||
|
raise InvalidArgument
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def the_geom_type
|
||
|
self.the_geom_type_value
|
||
|
end
|
||
|
|
||
|
def the_geom_type=(value)
|
||
|
self.the_geom_type_value = case value.downcase
|
||
|
when 'geometry'
|
||
|
'geometry'
|
||
|
when 'point'
|
||
|
'point'
|
||
|
when 'line'
|
||
|
'multilinestring'
|
||
|
when 'multipoint'
|
||
|
'point'
|
||
|
else
|
||
|
value !~ /^multi/ ? "multi#{value.downcase}" : value.downcase
|
||
|
end
|
||
|
raise CartoDB::InvalidGeomType.new(self.the_geom_type_value) unless VALID_GEOMETRY_TYPES.include?(self.the_geom_type_value)
|
||
|
end
|
||
|
|
||
|
# if the table is already renamed, we just need to update the name attribute
|
||
|
def synchronize_name(name)
|
||
|
self[:name] = name
|
||
|
save
|
||
|
end
|
||
|
|
||
|
def oid
|
||
|
@oid ||= owner.in_database["SELECT '#{qualified_table_name}'::regclass::oid"].first[:oid]
|
||
|
end
|
||
|
|
||
|
# DB Triggers and things
|
||
|
|
||
|
def has_trigger?(trigger_name)
|
||
|
owner.in_database(:as => :superuser).select('trigger_name').from(:information_schema__triggers)
|
||
|
.where(:event_object_catalog => owner.database_name,
|
||
|
:event_object_table => self.name,
|
||
|
:trigger_name => trigger_name).count > 0
|
||
|
end
|
||
|
|
||
|
def has_index?(column_name)
|
||
|
pg_indexes.any? { |i| i[:column] == column_name }
|
||
|
end
|
||
|
|
||
|
def pg_indexes
|
||
|
owner.in_database(as: :superuser).fetch(%{
|
||
|
SELECT
|
||
|
a.attname as column, i.relname as name, ix.indisvalid as valid
|
||
|
FROM
|
||
|
pg_class t, pg_class i, pg_index ix, pg_attribute a, pg_namespace n
|
||
|
WHERE
|
||
|
t.oid = ix.indrelid
|
||
|
AND i.oid = ix.indexrelid
|
||
|
AND a.attrelid = t.oid
|
||
|
AND a.attnum = ANY(ix.indkey)
|
||
|
AND t.relkind = 'r'
|
||
|
AND t.relname = '#{name}'
|
||
|
AND n.nspname = '#{owner.database_schema}'
|
||
|
AND t.relnamespace = n.oid;
|
||
|
}).all
|
||
|
end
|
||
|
|
||
|
def create_index(column, prefix = '', concurrent: false)
|
||
|
concurrently = concurrent ? 'CONCURRENTLY' : ''
|
||
|
owner.in_database.execute(%{CREATE INDEX #{concurrently} "#{index_name(column, prefix)}" ON "#{name}"("#{column}")})
|
||
|
end
|
||
|
|
||
|
def drop_index(column, prefix = '', concurrent: false)
|
||
|
concurrently = concurrent ? 'CONCURRENTLY' : ''
|
||
|
owner.in_database.execute(%{DROP INDEX #{concurrently} "#{index_name(column, prefix)}"})
|
||
|
end
|
||
|
|
||
|
def cartodbfy
|
||
|
start = Time.now
|
||
|
schema_name = owner.database_schema
|
||
|
table_name = "#{owner.database_schema}.#{self.name}"
|
||
|
|
||
|
importer_stats.timing('cartodbfy') do
|
||
|
owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT) do |user_conn|
|
||
|
user_conn.run(%Q{
|
||
|
SELECT cartodb.CDB_CartodbfyTable('#{schema_name}'::TEXT,'#{table_name}'::REGCLASS);
|
||
|
})
|
||
|
end
|
||
|
end
|
||
|
|
||
|
elapsed = Time.now - start
|
||
|
if @data_import
|
||
|
CartoDB::Importer2::CartodbfyTime::instance(@data_import.id).add(elapsed)
|
||
|
end
|
||
|
rescue StandardError => exception
|
||
|
if !!(exception.message =~ /Error: invalid cartodb_id/)
|
||
|
raise CartoDB::CartoDBfyInvalidID
|
||
|
else
|
||
|
raise exception
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def update_table_pg_stats
|
||
|
owner.in_database.execute(%{ANALYZE #{qualified_table_name};})
|
||
|
rescue StandardError => exception
|
||
|
if exception.message =~ /canceling statement due to statement timeout/i
|
||
|
log_info(exception: exception, message: 'Analyze in import raised statement timeout')
|
||
|
else
|
||
|
raise exception
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def update_table_geom_pg_stats
|
||
|
owner.in_database.execute(%{ANALYZE #{qualified_table_name}(the_geom);})
|
||
|
rescue StandardError => exception
|
||
|
if exception.message =~ /canceling statement due to statement timeout/i
|
||
|
log_info(exception: exception, message: 'Analyze in import raised statement timeout')
|
||
|
elsif exception.cause.is_a?(PG::UndefinedColumn)
|
||
|
log_info(exception: exception, message: 'Analyze in import raised column does not exist')
|
||
|
else
|
||
|
raise exception
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def owner
|
||
|
@owner ||= ::User[self.user_id]
|
||
|
end
|
||
|
|
||
|
def table_style
|
||
|
self.map.data_layers.first.options['tile_style']
|
||
|
end
|
||
|
|
||
|
def data_last_modified
|
||
|
owner.in_database
|
||
|
.select(:updated_at)
|
||
|
.from(Sequel.qualify(:cartodb, :cdb_tablemetadata))
|
||
|
.where(tabname: Sequel.lit("'#{name}'::regclass")).first[:updated_at]
|
||
|
rescue StandardError
|
||
|
nil
|
||
|
end
|
||
|
|
||
|
# Simplify certain privacy values for the vizjson
|
||
|
def privacy_text_for_vizjson
|
||
|
privacy == UserTable::PRIVACY_LINK ? 'PUBLIC' : @user_table.privacy_text
|
||
|
end
|
||
|
|
||
|
def relator
|
||
|
@relator ||= CartoDB::TableRelator.new(SequelRails.connection, self)
|
||
|
end
|
||
|
|
||
|
def set_table_id
|
||
|
@user_table.table_id = self.get_table_id
|
||
|
end
|
||
|
|
||
|
def get_table_id
|
||
|
record = owner.in_database.select(:pg_class__oid)
|
||
|
.from(:pg_class)
|
||
|
.join_table(:inner, :pg_namespace, :oid => :relnamespace)
|
||
|
.where(:relkind => 'r', :nspname => owner.database_schema, :relname => name).first
|
||
|
record.nil? ? nil : record[:oid]
|
||
|
end # get_table_id
|
||
|
|
||
|
def changing_name?
|
||
|
@name_changed_from.present?
|
||
|
end
|
||
|
|
||
|
def update_name_changes
|
||
|
if @name_changed_from.present? && @name_changed_from != name
|
||
|
reload
|
||
|
|
||
|
unless register_table_only
|
||
|
begin
|
||
|
# Underscore prefixes have a special meaning in PostgreSQL, hence the ugly hack
|
||
|
Carto::OverviewsService.new(owner.in_database).rename_overviews @name_changed_from, name
|
||
|
if name.start_with?('_')
|
||
|
temp_name = "t" + "#{9.times.map { rand(9) }.join}" + name
|
||
|
owner.in_database.rename_table(@name_changed_from, temp_name)
|
||
|
owner.in_database.rename_table(temp_name, name)
|
||
|
else
|
||
|
owner.in_database.rename_table(@name_changed_from, name)
|
||
|
end
|
||
|
rescue StandardError => exception
|
||
|
exception_to_raise = CartoDB::BaseCartoDBError.new(
|
||
|
"Table update_name_changes(): '#{@name_changed_from}' doesn't exist", exception)
|
||
|
CartoDB::notify_exception(exception_to_raise, user: owner)
|
||
|
raise exception_to_raise
|
||
|
end
|
||
|
end
|
||
|
|
||
|
begin
|
||
|
propagate_name_change_to_analyses
|
||
|
propagate_namechange_to_table_vis
|
||
|
if @user_table.layers.blank?
|
||
|
exception_to_raise = CartoDB::TableError.new("Attempt to rename table without layers #{qualified_table_name}")
|
||
|
CartoDB::notify_exception(exception_to_raise, user: owner)
|
||
|
else
|
||
|
@user_table.layers.each do |layer|
|
||
|
layer.rename_table(@name_changed_from, name).save
|
||
|
end
|
||
|
end
|
||
|
rescue StandardError => exception
|
||
|
log_error(
|
||
|
exception: exception, message: 'Error renaming visualization',
|
||
|
current_user: owner, error_detail: "Renaming from #{@name_changed_from} to #{name}"
|
||
|
)
|
||
|
raise exception
|
||
|
end
|
||
|
end
|
||
|
@name_changed_from = nil
|
||
|
end
|
||
|
|
||
|
# @see https://github.com/jeremyevans/sequel#qualifying-identifiers-columntable-names
|
||
|
def sequel_qualified_table_name
|
||
|
Sequel.qualify(owner.database_schema, @user_table.name)
|
||
|
end
|
||
|
|
||
|
def qualified_table_name
|
||
|
safe_schema_and_table_quoting(owner.database_schema, @user_table.name)
|
||
|
end
|
||
|
|
||
|
def database_schema
|
||
|
owner.database_schema
|
||
|
end
|
||
|
|
||
|
# INFO: Qualified but without double quotes
|
||
|
def self.is_qualified_name_valid?(name)
|
||
|
(name =~ /^[a-z\-_0-9]+\.[a-z\-_0-9]+?$/) == 0
|
||
|
end
|
||
|
|
||
|
############################### Sharing tables ##############################
|
||
|
|
||
|
# @param [::User] organization_user Gives read permission to this user
|
||
|
def add_read_permission(organization_user)
|
||
|
perform_table_permission_change('CDB_Organization_Add_Table_Read_Permission', organization_user)
|
||
|
end
|
||
|
|
||
|
# @param [::User] organization_user Gives read and write permission to this user
|
||
|
def add_read_write_permission(organization_user)
|
||
|
perform_table_permission_change('CDB_Organization_Add_Table_Read_Write_Permission', organization_user)
|
||
|
end
|
||
|
|
||
|
# @param [::User] organization_user Removes all permissions to this user
|
||
|
def remove_access(organization_user)
|
||
|
perform_table_permission_change('CDB_Organization_Remove_Access_Permission', organization_user)
|
||
|
end
|
||
|
|
||
|
def add_organization_read_permission
|
||
|
perform_organization_table_permission_change('CDB_Organization_Add_Table_Organization_Read_Permission')
|
||
|
end
|
||
|
|
||
|
def add_organization_read_write_permission
|
||
|
perform_organization_table_permission_change('CDB_Organization_Add_Table_Organization_Read_Write_Permission')
|
||
|
end
|
||
|
|
||
|
def remove_organization_access
|
||
|
perform_organization_table_permission_change('CDB_Organization_Remove_Organization_Access_Permission')
|
||
|
end
|
||
|
|
||
|
# Estimated row count and size
|
||
|
def row_count_and_size
|
||
|
begin
|
||
|
# Keep in sync with lib/sql/scripts-available/CDB_Quota.sql -> CDB_UserDataSize()
|
||
|
size_calc = is_raster? ? "pg_total_relation_size('\"' || ? || '\".\"' || relname || '\"')"
|
||
|
: "pg_total_relation_size('\"' || ? || '\".\"' || relname || '\"') / 2"
|
||
|
|
||
|
data = owner.in_database.fetch(
|
||
|
%{
|
||
|
SELECT
|
||
|
#{size_calc} AS size,
|
||
|
reltuples::integer AS row_count
|
||
|
FROM pg_class
|
||
|
JOIN pg_catalog.pg_namespace n on n.oid = pg_class.relnamespace
|
||
|
WHERE relname = ?
|
||
|
AND n.nspname = ?
|
||
|
},
|
||
|
owner.database_schema,
|
||
|
name,
|
||
|
database_schema
|
||
|
).first
|
||
|
rescue StandardError => exception
|
||
|
data = nil
|
||
|
# INFO: we don't want code to fail because of SQL error
|
||
|
CartoDB.notify_exception(exception)
|
||
|
end
|
||
|
data = { size: nil, row_count: nil } if data.nil?
|
||
|
|
||
|
data
|
||
|
end
|
||
|
|
||
|
def estimated_row_count
|
||
|
row_count_and_size = self.row_count_and_size
|
||
|
row_count_and_size.nil? ? nil : row_count_and_size[:row_count]
|
||
|
end
|
||
|
|
||
|
def actual_row_count
|
||
|
sequel.count
|
||
|
end
|
||
|
|
||
|
def pg_stats
|
||
|
owner.in_database.fetch('SELECT * FROM pg_stats where schemaname = ? AND tablename = ?',
|
||
|
owner.database_schema, name).all
|
||
|
end
|
||
|
|
||
|
def beautify_name(name)
|
||
|
return name unless name
|
||
|
name.tr('_', ' ').split.map(&:capitalize).join(' ')
|
||
|
end
|
||
|
|
||
|
def update_cdb_tablemetadata
|
||
|
owner.in_database(as: :superuser).run(%{ SELECT CDB_TableMetadataTouch(#{table_id}::oid::regclass) })
|
||
|
rescue StandardError => e
|
||
|
log_error(
|
||
|
message: 'update_cdb_tablemetadata failed', exception: e, current_user: owner,
|
||
|
table: { id: table_id, name: name, oid: get_table_id }
|
||
|
)
|
||
|
end
|
||
|
|
||
|
def propagate_attribution_change(attributions)
|
||
|
# This includes both the canonical and derived visualizations
|
||
|
@user_table.layers.select(&:data_layer?).each do |layer|
|
||
|
if layer.options['table_name'] == name
|
||
|
layer.options['attribution'] = attributions
|
||
|
layer.save
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def table_visualization
|
||
|
@user_table.table_visualization
|
||
|
end
|
||
|
|
||
|
def update_bounding_box
|
||
|
update_table_geom_pg_stats
|
||
|
bounds = Carto::BoundingBoxService.new(owner, name).table_bounds || Carto::BoundingBoxUtils::DEFAULT_BOUNDS
|
||
|
|
||
|
polygon_sql = Carto::BoundingBoxUtils.to_polygon(bounds[:minx], bounds[:miny], bounds[:maxx], bounds[:maxy])
|
||
|
update_sql = %{UPDATE visualizations SET bbox = #{polygon_sql} WHERE id = '#{table_visualization.id}';}
|
||
|
SequelRails.connection.run(update_sql)
|
||
|
end
|
||
|
|
||
|
# Apply the sanitization defined for this table.
|
||
|
# If the table_name parameter is passed, the sanitization which was originally applied to self
|
||
|
# is applied to the table so named.
|
||
|
# If no table_name parameters is paseed the normalization is applied to self.
|
||
|
# This allows re-applying the same normalization to imported tables.
|
||
|
def sanitize_columns(table_name: name, **options)
|
||
|
self.class.sanitize_columns(table_name, column_sanitization_version, options)
|
||
|
end
|
||
|
|
||
|
def visualizations
|
||
|
Carto::Visualization.where(id: user_table.affected_visualizations.map(&:id))
|
||
|
end
|
||
|
|
||
|
private
|
||
|
|
||
|
def valid_cartodb_id_candidate?(col_name)
|
||
|
return false unless column_names.include?(col_name)
|
||
|
owner.transaction_with_timeout(statement_timeout: STATEMENT_TIMEOUT, as: :superuser) do |db|
|
||
|
return db["SELECT 1 FROM #{qualified_table_name} WHERE #{col_name} IS NULL LIMIT 1"].first.nil?
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def column_names
|
||
|
schema.map(&:first)
|
||
|
end
|
||
|
|
||
|
def related_visualizations
|
||
|
carto_layers = layers.map do |layer|
|
||
|
Carto::Layer.find(layer.id) if layer.persisted?
|
||
|
end
|
||
|
|
||
|
carto_layers.flatten.compact.uniq.map(&:visualization).compact.uniq
|
||
|
end
|
||
|
|
||
|
def propagate_name_change_to_analyses
|
||
|
related_visualizations.each do |visualization|
|
||
|
visualization.analyses.each do |analysis|
|
||
|
analysis.update_table_name(@name_changed_from, name)
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def index_name(column, prefix)
|
||
|
"#{prefix}#{name}_#{column}"
|
||
|
end
|
||
|
|
||
|
def previous_privacy
|
||
|
# INFO: @user_table.initial_value(:privacy) weirdly returns incorrect value so using changes index instead
|
||
|
privacy_changed? ? @user_table.privacy_was : nil
|
||
|
end
|
||
|
|
||
|
def importer_stats
|
||
|
@importer_stats ||= CartoDB::Stats::Importer.instance
|
||
|
end
|
||
|
|
||
|
def calculate_the_geom_type
|
||
|
return self.the_geom_type if self.the_geom_type.present?
|
||
|
|
||
|
calculated = geometry_types.first
|
||
|
calculated = calculated.present? ? calculated.downcase.sub('st_', '') : DEFAULT_THE_GEOM_TYPE
|
||
|
self.the_geom_type = calculated
|
||
|
end
|
||
|
|
||
|
def query_geometry_types
|
||
|
# We do not query the DB, if the_geom does not exist we just recover
|
||
|
owner.in_database[distinct_geometry_sql].all.map { |r| r[:st_geometrytype] }
|
||
|
rescue StandardError
|
||
|
[]
|
||
|
end
|
||
|
|
||
|
def distinct_geometry_sql
|
||
|
%{
|
||
|
SELECT DISTINCT ST_GeometryType(the_geom) FROM (
|
||
|
SELECT the_geom
|
||
|
FROM (
|
||
|
SELECT the_geom
|
||
|
FROM #{qualified_table_name}
|
||
|
LIMIT 10000
|
||
|
) as limited
|
||
|
WHERE (the_geom is not null) LIMIT 10
|
||
|
) as not_null
|
||
|
}
|
||
|
end
|
||
|
|
||
|
def cache
|
||
|
@cache ||= $tables_metadata
|
||
|
end
|
||
|
|
||
|
# Gets a valid postgresql table name for a given database
|
||
|
# See http://www.postgresql.org/docs/9.5/static/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS
|
||
|
def get_valid_name(contendent)
|
||
|
user_table_names = owner.tables.map(&:name)
|
||
|
|
||
|
# We only want to check for UserTables names
|
||
|
Carto::ValidTableNameProposer.new.propose_valid_table_name(contendent, taken_names: user_table_names)
|
||
|
end
|
||
|
|
||
|
def column_sanitization_version
|
||
|
data_import&.column_sanitization_version || CartoDB::Importer2::Column::NO_COLUMN_SANITIZATION_VERSION
|
||
|
end
|
||
|
|
||
|
def self.sanitize_columns(table_name, column_sanitization_version, options={})
|
||
|
|
||
|
connection = options.fetch(:connection)
|
||
|
database_schema = options.fetch(:database_schema, 'public')
|
||
|
|
||
|
connection.schema(table_name, schema: database_schema, reload: true).each do |column|
|
||
|
column_name = column[0].to_s
|
||
|
column_type = column[1][:db_type]
|
||
|
column_name = ensure_column_has_valid_name(table_name, column_name, column_sanitization_version, options)
|
||
|
if column_type == 'unknown'
|
||
|
CartoDB::ColumnTypecaster.new(
|
||
|
user_database: connection,
|
||
|
schema: database_schema,
|
||
|
table_name: table_name,
|
||
|
column_name: column_name,
|
||
|
new_type: 'text'
|
||
|
).run
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def self.ensure_column_has_valid_name(table_name, column_name, column_sanitization_version, options={})
|
||
|
connection = options.fetch(:connection)
|
||
|
database_schema = options.fetch(:database_schema, 'public')
|
||
|
|
||
|
valid_column_name = get_valid_column_name(table_name, column_name, column_sanitization_version, options)
|
||
|
if valid_column_name != column_name
|
||
|
connection.run(%Q{ALTER TABLE "#{database_schema}"."#{table_name}" RENAME COLUMN "#{column_name}" TO "#{valid_column_name}";})
|
||
|
end
|
||
|
|
||
|
valid_column_name
|
||
|
end
|
||
|
|
||
|
def self.get_valid_column_name(table_name, candidate_column_name, column_sanitization_version, options={})
|
||
|
CartoDB::Importer2::Column.get_valid_column_name(candidate_column_name, column_sanitization_version, get_column_names(table_name, options))
|
||
|
end
|
||
|
|
||
|
def self.get_column_names(table_name, options={})
|
||
|
connection = options.fetch(:connection)
|
||
|
database_schema = options.fetch(:database_schema, 'public')
|
||
|
table_schema = connection.schema(table_name, schema: database_schema, reload: true)
|
||
|
table_schema.map { |column| column[0].to_s }
|
||
|
end
|
||
|
|
||
|
def get_new_column_type(invalid_column)
|
||
|
next_cartodb_type = {
|
||
|
"number" => "double precision",
|
||
|
"string" => "text"
|
||
|
}
|
||
|
|
||
|
flatten_cartodb_schema = schema.flatten
|
||
|
cartodb_column_type = flatten_cartodb_schema[flatten_cartodb_schema.index(invalid_column.to_sym) + 1]
|
||
|
flatten_schema = schema(cartodb_types: false).flatten
|
||
|
flatten_schema[flatten_schema.index(invalid_column.to_sym) + 1]
|
||
|
next_cartodb_type[cartodb_column_type]
|
||
|
end
|
||
|
|
||
|
def set_the_geom_column!(type = nil)
|
||
|
if type.nil?
|
||
|
if self.schema(reload: true).flatten.include?(THE_GEOM)
|
||
|
if self.schema.select{ |k| k[0] == THE_GEOM }.first[1] == 'geometry'
|
||
|
row = owner.in_database["select GeometryType(#{THE_GEOM}) FROM #{qualified_table_name} where #{THE_GEOM} is not null limit 1"].first
|
||
|
if row
|
||
|
type = row[:geometrytype]
|
||
|
else
|
||
|
type = DEFAULT_THE_GEOM_TYPE
|
||
|
end
|
||
|
else
|
||
|
owner.in_database.execute %{
|
||
|
ALTER TABLE #{qualified_table_name} RENAME COLUMN "#{THE_GEOM}" TO "the_geom_str"
|
||
|
}
|
||
|
end
|
||
|
else # Ensure a the_geom column, of type point by default
|
||
|
type = DEFAULT_THE_GEOM_TYPE
|
||
|
end
|
||
|
end
|
||
|
return if type.nil?
|
||
|
|
||
|
# if the geometry is MULTIPOINT we convert it to POINT
|
||
|
if type.to_s.downcase == 'multipoint'
|
||
|
owner.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
|
||
|
user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_GeometryN(the_geom,1);")
|
||
|
end
|
||
|
type = 'point'
|
||
|
end
|
||
|
|
||
|
# if the geometry is LINESTRING or POLYGON we convert it to MULTILINESTRING and MULTIPOLYGON resp.
|
||
|
if %w(linestring polygon).include?(type.to_s.downcase)
|
||
|
owner.db_service.in_database_direct_connection(statement_timeout: STATEMENT_TIMEOUT) do |user_database|
|
||
|
user_database.run("UPDATE #{qualified_table_name} SET the_geom = ST_Multi(the_geom);")
|
||
|
type = user_database["select GeometryType(#{THE_GEOM}) FROM #{qualified_table_name} where #{THE_GEOM} is not null limit 1"].first[:geometrytype]
|
||
|
end
|
||
|
end
|
||
|
|
||
|
raise "Error: unsupported geometry type #{type.to_s.downcase} in CARTO" unless VALID_GEOMETRY_TYPES.include?(type.to_s.downcase)
|
||
|
|
||
|
type = type.to_s.upcase
|
||
|
|
||
|
self.the_geom_type = type.downcase
|
||
|
@user_table.save_changes unless @user_table.new_record?
|
||
|
end
|
||
|
|
||
|
def create_table_in_database!
|
||
|
self.name ||= get_valid_name(self.name)
|
||
|
|
||
|
owner.in_database do |user_database|
|
||
|
if force_schema.blank?
|
||
|
user_database.create_table sequel_qualified_table_name do
|
||
|
column :cartodb_id, 'SERIAL PRIMARY KEY'
|
||
|
String :name
|
||
|
String :description, :text => true
|
||
|
end
|
||
|
else
|
||
|
sanitized_force_schema = force_schema.split(',').map do |column|
|
||
|
# Convert existing primary key into a unique key
|
||
|
if column =~ /\A\s*\"([^\"]+)\"(.*)\z/
|
||
|
"#{CartoDB::Importer2::Column.sanitize_name $1} #{$2.gsub(/primary\s+key/i,'UNIQUE')}"
|
||
|
else
|
||
|
column.gsub(/primary\s+key/i,'UNIQUE')
|
||
|
end
|
||
|
end
|
||
|
sanitized_force_schema.unshift('cartodb_id SERIAL PRIMARY KEY')
|
||
|
user_database.run(<<-SQL
|
||
|
CREATE TABLE #{qualified_table_name} (#{sanitized_force_schema.join(', ')});
|
||
|
SQL
|
||
|
)
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def update_the_geom!(attributes, primary_key)
|
||
|
return unless attributes[THE_GEOM].present? && attributes[THE_GEOM] != 'GeoJSON'
|
||
|
geojson = attributes[THE_GEOM]
|
||
|
|
||
|
begin
|
||
|
obj = JSON.parse(geojson)
|
||
|
unless obj[:crs].present?
|
||
|
obj[:crs] = JSON.parse('{"type":"name","properties":{"name":"EPSG:4326"}}');
|
||
|
end
|
||
|
geojson = JSON.generate(obj);
|
||
|
|
||
|
owner.in_database(:as => :superuser).run(%Q{UPDATE #{qualified_table_name} SET the_geom =
|
||
|
ST_Transform(ST_GeomFromGeoJSON('#{geojson}'),4326) where cartodb_id =
|
||
|
#{primary_key}})
|
||
|
rescue StandardError => e
|
||
|
raise CartoDB::InvalidGeoJSONFormat, "Invalid geometry: #{e.message}"
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def manage_tags
|
||
|
if @user_table[:tags].blank?
|
||
|
Carto::Tag.where(user_id: user_id, table_id: id).each(&:destroy)
|
||
|
else
|
||
|
tag_names = @user_table.tags.split(',')
|
||
|
table_tags = Carto::Tag.where(user_id: user_id, table_id: id).all
|
||
|
unless table_tags.empty?
|
||
|
# Remove tags that are not in the new names list
|
||
|
table_tags.each do |tag|
|
||
|
if tag_names.include?(tag.name)
|
||
|
tag_names.delete(tag.name)
|
||
|
else
|
||
|
tag.destroy
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
# Create the new tags in the this table
|
||
|
tag_names.each do |new_tag_name|
|
||
|
new_tag = Carto::Tag.new(name: new_tag_name)
|
||
|
new_tag.user_id = user_id
|
||
|
new_tag.table_id = id
|
||
|
new_tag.save
|
||
|
end
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def add_table_to_stats
|
||
|
CartoDB::Stats::UserTables.instance.update_tables_counter(1)
|
||
|
CartoDB::Stats::UserTables.instance.update_tables_counter_per_user(1, self.owner.username)
|
||
|
CartoDB::Stats::UserTables.instance.update_tables_counter_per_host(1)
|
||
|
CartoDB::Stats::UserTables.instance.update_tables_counter_per_plan(1, self.owner.account_type)
|
||
|
end
|
||
|
|
||
|
def remove_table_from_stats
|
||
|
CartoDB::Stats::UserTables.instance.update_tables_counter(-1)
|
||
|
CartoDB::Stats::UserTables.instance.update_tables_counter_per_user(-1, self.owner.username)
|
||
|
CartoDB::Stats::UserTables.instance.update_tables_counter_per_host(-1)
|
||
|
CartoDB::Stats::UserTables.instance.update_tables_counter_per_plan(-1, self.owner.account_type)
|
||
|
end
|
||
|
|
||
|
############################### Sharing tables ##############################
|
||
|
|
||
|
# @param [String] cartodb_pg_func
|
||
|
# @param [::User] organization_user
|
||
|
def perform_table_permission_change(cartodb_pg_func, organization_user)
|
||
|
from_schema = self.owner.database_schema
|
||
|
table_name = self.name
|
||
|
to_role_user = organization_user.database_username
|
||
|
Carto::TableAndFriends.apply(owner.in_database, from_schema, table_name) do |schema, name|
|
||
|
perform_cartodb_function(cartodb_pg_func, schema, name, to_role_user)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def perform_organization_table_permission_change(cartodb_pg_func)
|
||
|
from_schema = self.owner.database_schema
|
||
|
table_name = self.name
|
||
|
Carto::TableAndFriends.apply(owner.in_database, from_schema, table_name) do |schema, name|
|
||
|
perform_cartodb_function(cartodb_pg_func, schema, name)
|
||
|
end
|
||
|
end
|
||
|
|
||
|
def perform_cartodb_function(cartodb_pg_func, *args)
|
||
|
self.owner.in_database do |user_database|
|
||
|
query_args = args.join("','")
|
||
|
user_database.run("SELECT cartodb.#{cartodb_pg_func}('#{query_args}');")
|
||
|
end
|
||
|
end
|
||
|
end
|