1704 lines
66 KiB
Ruby
1704 lines
66 KiB
Ruby
require_relative 'db_queries'
|
|
require_dependency 'carto/db/database'
|
|
require_dependency 'carto/db/user_schema_mover'
|
|
require 'cartodb/sequel_connection_helper'
|
|
require 'carto/configuration'
|
|
|
|
# To avoid collisions with User model
|
|
module CartoDB
|
|
# To avoid collisions with User class
|
|
module UserModule
|
|
class DBService
|
|
include CartoDB::MiniSequel
|
|
include Carto::Configuration
|
|
include ::LoggerHelper
|
|
extend CartoDB::SequelConnectionHelper
|
|
|
|
# Also default schema for new users
|
|
SCHEMA_PUBLIC = 'public'.freeze
|
|
SCHEMA_CARTODB = 'cartodb'.freeze
|
|
SCHEMA_IMPORTER = 'cdb_importer'.freeze
|
|
SCHEMA_GEOCODING = 'cdb'.freeze
|
|
SCHEMA_CDB_DATASERVICES_API = 'cdb_dataservices_client'.freeze
|
|
SCHEMA_AGGREGATION_TABLES = 'aggregation'.freeze
|
|
CDB_DATASERVICES_CLIENT_VERSION = '0.30.0'.freeze
|
|
|
|
attr_accessor :user
|
|
|
|
def initialize(user)
|
|
raise "User nil" unless user
|
|
@user = user
|
|
@queries = CartoDB::UserModule::DBQueries.new(@user)
|
|
@pgversion = nil
|
|
@plpythonu = nil
|
|
end
|
|
|
|
def queries
|
|
@queries
|
|
end
|
|
|
|
# This method is used both upon user creation and by the UserMover
|
|
# All methods called inside should allow to be executed multiple times without errors
|
|
def configure_database
|
|
set_pgversion
|
|
|
|
set_database_search_path
|
|
|
|
grant_user_in_database
|
|
|
|
set_statement_timeouts
|
|
|
|
# INFO: Added to everyone because eases migration of normal users to org owners
|
|
# and non-org users just don't use it
|
|
set_user_as_organization_member
|
|
|
|
if @user.database_schema == SCHEMA_PUBLIC
|
|
setup_single_user_schema
|
|
else
|
|
setup_organization_user_schema
|
|
end
|
|
|
|
create_function_invalidate_varnish
|
|
grant_publicuser_in_database
|
|
end
|
|
|
|
# All methods called inside should allow to be executed multiple times without errors
|
|
def setup_single_user_schema
|
|
set_user_privileges_at_db
|
|
rebuild_quota_trigger
|
|
create_ghost_tables_event_trigger
|
|
end
|
|
|
|
# All methods called inside should allow to be executed multiple times without errors
|
|
def setup_organization_user_schema
|
|
# WIP: CartoDB/cartodb-management#4467
|
|
# Avoid mover reseting permissions. It's been moved to callers. Look for "WIP: CartoDB/cartodb-management#4467"
|
|
# reset_user_schema_permissions
|
|
reset_schema_owner
|
|
set_user_privileges_at_db
|
|
set_user_as_organization_member
|
|
rebuild_quota_trigger
|
|
|
|
# INFO: organization privileges are set for org_member_role, which is assigned to each org user
|
|
if @user.organization_owner?
|
|
setup_organization_owner
|
|
create_ghost_tables_event_trigger
|
|
end
|
|
|
|
if @user.organization_admin?
|
|
grant_admin_permissions
|
|
end
|
|
|
|
# Rebuild the geocoder api user config to reflect that is an organization user
|
|
install_and_configure_geocoder_api_extension
|
|
install_odbc_fdw
|
|
end
|
|
|
|
# INFO: main setup for non-org users
|
|
def new_non_organization_user_main_db_setup
|
|
Thread.new do
|
|
create_db_user
|
|
create_user_db
|
|
grant_owner_in_database
|
|
end.join
|
|
create_importer_schema
|
|
create_geocoding_schema
|
|
load_cartodb_functions
|
|
install_and_configure_geocoder_api_extension
|
|
install_odbc_fdw
|
|
# We reset the connections to this database to be sure the change in default search_path is effective
|
|
reset_pooled_connections
|
|
|
|
reset_database_permissions # Reset privileges
|
|
|
|
# WIP: CartoDB/cartodb-management#4467
|
|
# Added after commenting it in setup_organization_user_schema to avoid configure_database to reset permissions
|
|
reset_user_schema_permissions
|
|
|
|
configure_database
|
|
|
|
revoke_cdb_conf_access
|
|
end
|
|
|
|
# INFO: main setup for org users
|
|
def new_organization_user_main_db_setup
|
|
Thread.new do
|
|
create_db_user
|
|
end.join
|
|
create_own_schema
|
|
|
|
# WIP: CartoDB/cartodb-management#4467
|
|
# Added after commenting it in setup_organization_user_schema to avoid configure_database to reset permissions
|
|
reset_user_schema_permissions
|
|
|
|
setup_organization_user_schema
|
|
install_and_configure_geocoder_api_extension
|
|
# We reset the connections to this database to be sure the change in default search_path is effective
|
|
reset_pooled_connections
|
|
revoke_cdb_conf_access
|
|
end
|
|
|
|
def set_user_privileges_at_db # MU
|
|
# INFO: organization permission on public schema is handled through role assignment
|
|
unless @user.organization_user?
|
|
set_user_privileges_in_cartodb_schema
|
|
set_user_privileges_in_public_schema
|
|
end
|
|
|
|
set_user_privileges_in_own_schema
|
|
set_privileges_to_publicuser_in_own_schema
|
|
|
|
unless @user.organization_user?
|
|
set_user_privileges_in_importer_schema
|
|
set_user_privileges_in_geocoding_schema
|
|
set_geo_columns_privileges
|
|
set_raster_privileges
|
|
end
|
|
end
|
|
|
|
def disable_writes
|
|
# NOTE: This will not affect already opened connections. Run `terminate_database_conections` method after this
|
|
# to ensure no more writes are possible.
|
|
@user.in_database(as: :cluster_admin) do |database|
|
|
database.run(%{
|
|
ALTER DATABASE "#{@user.database_name}"
|
|
SET default_transaction_read_only = 'on'
|
|
})
|
|
end
|
|
end
|
|
|
|
def enable_writes
|
|
# NOTE: This will not affect already opened connections. Run `terminate_database_conections` method after this
|
|
# to ensure no more writes are possible.
|
|
@user.in_database(as: :cluster_admin) do |database|
|
|
database.run(%{
|
|
ALTER DATABASE "#{@user.database_name}"
|
|
SET default_transaction_read_only = default
|
|
})
|
|
end
|
|
end
|
|
|
|
def writes_enabled?
|
|
@user.in_database(as: :superuser) do |database|
|
|
database.fetch(%{SHOW default_transaction_read_only}).first[:default_transaction_read_only] == 'off'
|
|
end
|
|
end
|
|
|
|
# Cartodb functions
|
|
def load_cartodb_functions(statement_timeout = nil, cdb_extension_target_version = nil)
|
|
add_python
|
|
|
|
# Install dependencies of cartodb extension
|
|
@user.in_database(as: :superuser, no_cartodb_in_schema: true) do |db|
|
|
db.transaction do
|
|
unless statement_timeout.nil?
|
|
old_timeout = db.fetch("SHOW statement_timeout;").first[:statement_timeout]
|
|
db.run("SET statement_timeout TO '#{statement_timeout}';")
|
|
end
|
|
|
|
db.run("CREATE EXTENSION IF NOT EXISTS #{@plpythonu}")
|
|
db.run("CREATE EXTENSION IF NOT EXISTS postgis")
|
|
|
|
unless statement_timeout.nil?
|
|
db.run("SET statement_timeout TO '#{old_timeout}';")
|
|
end
|
|
end
|
|
end
|
|
|
|
upgrade_cartodb_postgres_extension(statement_timeout, cdb_extension_target_version)
|
|
|
|
rebuild_quota_trigger
|
|
end
|
|
|
|
def rebuild_quota_trigger
|
|
@user.in_database(as: :superuser) do |db|
|
|
rebuild_quota_trigger_with_database(db)
|
|
end
|
|
end
|
|
|
|
def rebuild_quota_trigger_with_database(db)
|
|
if !cartodb_extension_version_pre_mu? && @user.has_organization?
|
|
db.run("DROP FUNCTION IF EXISTS public._CDB_UserQuotaInBytes();")
|
|
end
|
|
|
|
db.transaction do
|
|
# NOTE: this has been written to work for both databases that switched to "cartodb" extension
|
|
# and those before the switch.
|
|
# In the future we should guarantee that exntension lives in cartodb schema so we don't need to set
|
|
# a search_path before
|
|
search_path = db.fetch("SHOW search_path;").first[:search_path]
|
|
db.run("SET search_path TO #{SCHEMA_CARTODB}, #{SCHEMA_PUBLIC};")
|
|
if cartodb_extension_version_pre_mu?
|
|
db.run("SELECT CDB_SetUserQuotaInBytes(#{@user.quota_in_bytes});")
|
|
else
|
|
db.run("SELECT CDB_SetUserQuotaInBytes('#{@user.database_schema}', #{@user.quota_in_bytes});")
|
|
end
|
|
|
|
# _CDB_UserQuotaInBytes is called by the quota trigger, and need to be open so
|
|
# other users in the organization can run it (when updating shared datasets)
|
|
db_user = @user.organization_user? ? organization_member_group_role_member_name : @user.database_username
|
|
db.run(%{REVOKE ALL ON FUNCTION "#{@user.database_schema}"._CDB_UserQuotaInBytes() FROM PUBLIC;})
|
|
db.run(%{GRANT ALL ON FUNCTION "#{@user.database_schema}"._CDB_UserQuotaInBytes() TO "#{db_user}";})
|
|
|
|
db.run("SET search_path TO #{search_path};")
|
|
end
|
|
end
|
|
|
|
def build_search_path(user_schema = nil, quote_user_schema = true)
|
|
user_schema ||= @user.database_schema
|
|
DBService.build_search_path(user_schema, quote_user_schema)
|
|
end
|
|
|
|
# Centralized method to provide the (ordered) search_path
|
|
def self.build_search_path(user_schema, quote_user_schema = true)
|
|
quote_char = quote_user_schema ? "\"" : ""
|
|
"#{quote_char}#{user_schema}#{quote_char}, #{SCHEMA_CARTODB}, #{SCHEMA_CDB_DATASERVICES_API}, #{SCHEMA_PUBLIC}"
|
|
end
|
|
|
|
def set_pgversion
|
|
@user.in_database(as: :superuser) do |database|
|
|
@pgversion = database.fetch("select current_setting('server_version_num') as version").first[:version].to_i
|
|
end
|
|
if @pgversion >= 120000
|
|
@plpythonu = "plpython3u"
|
|
else
|
|
@plpythonu = "plpythonu"
|
|
end
|
|
end
|
|
|
|
def set_database_search_path
|
|
@user.in_database(as: :superuser) do |database|
|
|
database.run(%{
|
|
ALTER USER "#{@user.database_username}"
|
|
SET search_path = #{build_search_path}
|
|
})
|
|
end
|
|
end
|
|
|
|
def create_importer_schema
|
|
create_schema('cdb_importer')
|
|
end
|
|
|
|
def create_geocoding_schema
|
|
create_schema('cdb')
|
|
end
|
|
|
|
def create_user_schema
|
|
create_schema(@user.database_schema, @user.database_username)
|
|
end
|
|
|
|
def create_schema(schema, role = nil)
|
|
@user.in_database(as: :superuser) do |db|
|
|
Carto::Db::Database.new(@user.database_name, db).create_schema(schema, role)
|
|
end
|
|
end
|
|
|
|
def grant_admin_permissions
|
|
@user.in_database(as: :superuser) do |database|
|
|
database.run(%{ SELECT cartodb.CDB_Organization_AddAdmin('#{@user.username}') })
|
|
end
|
|
end
|
|
|
|
def revoke_admin_permissions
|
|
@user.in_database(as: :superuser) do |database|
|
|
database.run(%{ SELECT cartodb.CDB_Organization_RemoveAdmin('#{@user.username}') })
|
|
end
|
|
end
|
|
|
|
def revoke_cdb_conf_access
|
|
errors = []
|
|
|
|
roles = [@user.database_username]
|
|
if @user.organization_owner?
|
|
begin
|
|
roles << organization_member_group_role_member_name
|
|
rescue StandardError => e
|
|
errors << "WARN: Error fetching org member role (does #{@user.organization.name} has that role?)"
|
|
end
|
|
end
|
|
roles << CartoDB::PUBLIC_DB_USER
|
|
|
|
queries = []
|
|
roles.map do |db_role|
|
|
queries.concat(@queries.revoke_permissions_on_cartodb_conf_queries(db_role))
|
|
end
|
|
|
|
queries.map do |query|
|
|
@user.in_database(as: :superuser) do |database|
|
|
begin
|
|
database.run(query)
|
|
rescue StandardError => e
|
|
# We can find organizations not yet upgraded for any reason or missing roles
|
|
errors << e.message
|
|
end
|
|
end
|
|
end
|
|
|
|
errors
|
|
rescue StandardError => e
|
|
# For broken organizations
|
|
["FATAL ERROR for #{name}: #{e.message}"]
|
|
end
|
|
|
|
def create_public_db_user
|
|
@user.in_database(as: :superuser) do |database|
|
|
database.run(%{ CREATE USER "#{@user.database_public_username}" LOGIN INHERIT })
|
|
database.run(%{ GRANT publicuser TO "#{@user.database_public_username}" })
|
|
database.run(%{ ALTER USER "#{@user.database_public_username}" SET search_path = #{build_search_path} })
|
|
end
|
|
end
|
|
|
|
def tables_effective(schema = 'public')
|
|
@user.in_database do |user_database|
|
|
user_database.synchronize do |conn|
|
|
query = "select table_name::text from information_schema.tables where table_schema = '#{schema}'"
|
|
tables = user_database[query].all.map { |i| i[:table_name] }
|
|
return tables
|
|
end
|
|
end
|
|
end
|
|
|
|
def all_user_roles
|
|
roles = [@user.database_username]
|
|
if @user.organization_user?
|
|
roles << organization_member_group_role_member_name
|
|
roles += @user.groups.map(&:database_role)
|
|
end
|
|
|
|
roles
|
|
end
|
|
|
|
def all_tables_granted(role = nil)
|
|
roles_str = role ? "'#{role}'" : all_user_roles.map { |r| "'#{r}'" }.join(',')
|
|
query = %{
|
|
SELECT
|
|
s.nspname as schema,
|
|
c.relname as t,
|
|
string_agg(lower(acl.privilege_type), ',') as permission
|
|
FROM
|
|
pg_class c
|
|
JOIN pg_namespace s ON c.relnamespace = s.oid
|
|
JOIN LATERAL aclexplode(COALESCE(c.relacl, acldefault('r'::"char", c.relowner))) acl ON TRUE
|
|
JOIN pg_roles r ON acl.grantee = r.oid
|
|
WHERE
|
|
r.rolname IN (#{roles_str}) AND
|
|
s.nspname NOT IN ('cartodb', 'cdb', 'cdb_importer')
|
|
GROUP BY schema, t;
|
|
}
|
|
|
|
@user.in_database(as: :superuser) do |database|
|
|
database.fetch(query)
|
|
end
|
|
end
|
|
|
|
def all_tables_granted_hashed(role = nil)
|
|
results = all_tables_granted(role)
|
|
privileges_hashed = {}
|
|
|
|
if !results.nil?
|
|
results.each do |row|
|
|
privileges_hashed[row[:schema]] = {} if privileges_hashed[row[:schema]].nil?
|
|
privileges_hashed[row[:schema]][row[:t]] = row[:permission].split(',')
|
|
end
|
|
end
|
|
|
|
privileges_hashed
|
|
end
|
|
|
|
def all_schemas_granted(role)
|
|
roles_str = role ? role : all_user_roles.join(',')
|
|
permissions = 'create,usage'
|
|
query = %{
|
|
WITH
|
|
roles AS (
|
|
SELECT unnest('{#{roles_str}}'::text[]) AS rname
|
|
),
|
|
permissions AS (
|
|
SELECT 'SCHEMA' AS ptype, unnest('{#{permissions}}'::text[]) AS pname
|
|
),
|
|
schemas AS (
|
|
SELECT schema_name AS sname
|
|
FROM information_schema.schemata
|
|
WHERE schema_name !~ '^pg_'
|
|
AND schema_name NOT IN ('cartodb', 'cdb', 'cdb_importer')
|
|
),
|
|
schemas_roles_permissions AS (
|
|
SELECT
|
|
permissions.ptype,
|
|
schemas.sname AS obj_name,
|
|
roles.rname,
|
|
permissions.pname,
|
|
has_schema_privilege(roles.rname, schemas.sname, permissions.pname) AS has_permission
|
|
FROM
|
|
schemas
|
|
CROSS JOIN roles
|
|
CROSS JOIN permissions
|
|
WHERE
|
|
permissions.ptype = 'SCHEMA'
|
|
),
|
|
schemas_and_grants AS (
|
|
SELECT obj_name AS object_name,
|
|
COALESCE(string_agg(DISTINCT CASE WHEN has_permission THEN pname END, ','), '') AS granted_permissions
|
|
FROM schemas_roles_permissions
|
|
GROUP BY 1
|
|
ORDER BY 1
|
|
)
|
|
SELECT
|
|
object_name, granted_permissions
|
|
FROM
|
|
schemas_and_grants
|
|
WHERE
|
|
granted_permissions is not null and granted_permissions <> '';
|
|
}
|
|
|
|
@user.in_database(as: :superuser) do |database|
|
|
database.fetch(query)
|
|
end
|
|
end
|
|
|
|
def all_schemas_granted_hashed(role = nil)
|
|
results = all_schemas_granted(role)
|
|
return {} if results.nil?
|
|
|
|
results.map { |row| [row[:object_name], row[:granted_permissions].split(',')] }.to_h
|
|
end
|
|
|
|
def exists_role?(rolname)
|
|
query = %{
|
|
SELECT 1
|
|
FROM pg_catalog.pg_roles
|
|
WHERE rolname = '#{rolname}'
|
|
}
|
|
|
|
result = @user.in_database(as: :superuser).fetch(query)
|
|
result.count > 0
|
|
end
|
|
|
|
def drop_owned_by_user(conn, role)
|
|
conn.run("DROP OWNED BY \"#{role}\"")
|
|
end
|
|
|
|
def drop_user(conn = nil, username = nil)
|
|
conn ||= @user.in_database(as: :cluster_admin)
|
|
username ||= @user.database_username
|
|
database_with_conflicts = nil
|
|
retried = false
|
|
|
|
begin
|
|
conn.run("DROP USER IF EXISTS \"#{username}\"")
|
|
rescue StandardError => e
|
|
if !retried && e.message =~ /cannot be dropped because some objects depend on it/
|
|
retried = true
|
|
e.message =~ /object[s]? in database (.*)$/
|
|
e.message =~ /privileges for database (.*)$/ unless $1
|
|
if database_with_conflicts == $1
|
|
raise e
|
|
else
|
|
database_with_conflicts = $1
|
|
revoke_all_on_database_from(conn, database_with_conflicts, username)
|
|
revoke_all_memberships_on_database_to_role(conn, username)
|
|
drop_owned_by_user(conn, username)
|
|
|
|
conflict_database_conn = @user.in_database(
|
|
as: :cluster_admin,
|
|
'database' => database_with_conflicts
|
|
)
|
|
drop_owned_by_user(conflict_database_conn, username)
|
|
['cdb', 'cdb_importer', 'cartodb', 'public', @user.database_schema].each do |s|
|
|
drop_users_privileges_in_schema(s, [username], conn: conflict_database_conn)
|
|
end
|
|
DBService.close_sequel_connection(conflict_database_conn)
|
|
retry
|
|
end
|
|
else
|
|
raise e
|
|
end
|
|
end
|
|
end
|
|
|
|
# Org users share the same db, so must only delete the schema unless he's the owner
|
|
def drop_organization_user(org_id, is_owner: false, force_destroy: false)
|
|
raise CartoDB::BaseCartoDBError.new('Tried to delete an organization user without org id') if org_id.nil?
|
|
Thread.new do
|
|
@user.in_database(as: :superuser) do |database|
|
|
if is_owner
|
|
schemas = ['cdb', 'cdb_importer', 'cartodb', 'public', @user.database_schema] +
|
|
::User.select(:database_schema).where(organization_id: org_id).all.collect(&:database_schema)
|
|
schemas.uniq.each do |s|
|
|
drop_users_privileges_in_schema(
|
|
s,
|
|
[@user.database_username, @user.database_public_username, CartoDB::PUBLIC_DB_USER])
|
|
end
|
|
|
|
# To avoid "cannot drop function" errors
|
|
database.run("drop extension if exists plproxy cascade")
|
|
end
|
|
|
|
# If user is in an organization should never have public schema, but to be safe (& tests which stub stuff)
|
|
unless @user.database_schema == SCHEMA_PUBLIC
|
|
database.run(%{ DROP FUNCTION IF EXISTS "#{@user.database_schema}"._CDB_UserQuotaInBytes()})
|
|
drop_analysis_cache(@user)
|
|
drop_all_functions_from_schema(@user.database_schema)
|
|
|
|
cascade_drop = force_destroy ? 'CASCADE' : ''
|
|
database.run(%{ DROP SCHEMA IF EXISTS "#{@user.database_schema}" #{cascade_drop}})
|
|
end
|
|
end
|
|
|
|
conn = @user.in_database(as: :cluster_admin)
|
|
CartoDB::UserModule::DBService.terminate_database_connections(@user.database_name, @user.database_host)
|
|
|
|
# If user is in an organization should never have public schema, but to be safe (& tests which stub stuff)
|
|
unless @user.database_schema == SCHEMA_PUBLIC
|
|
drop_user(conn, @user.database_public_username)
|
|
end
|
|
|
|
if is_owner
|
|
conn.run("DROP DATABASE \"#{@user.database_name}\"")
|
|
end
|
|
drop_user(conn)
|
|
end.join
|
|
|
|
monitor_user_notification
|
|
end
|
|
|
|
def configure_extension_org_metadata_api_endpoint
|
|
config = Cartodb.config[:org_metadata_api]
|
|
host = config['host']
|
|
port = config['port']
|
|
username = config['username']
|
|
password = config['password']
|
|
timeout = config.fetch('timeout', 10)
|
|
|
|
if host.present? && port.present? && username.present? && password.present?
|
|
conf_sql = %{
|
|
SELECT cartodb.CDB_Conf_SetConf('groups_api',
|
|
'{ \"host\": \"#{host}\", \"port\": #{port}, \"timeout\": #{timeout}, \"username\": \"#{username}\",
|
|
\"password\": \"#{password}\"}'::json
|
|
)
|
|
}
|
|
@user.in_database(as: :superuser) do |database|
|
|
database.fetch(conf_sql).first
|
|
end
|
|
else
|
|
CartoDB.notify_debug("org_metadata_api configuration missing", user_id: @user.id, config: config)
|
|
end
|
|
end
|
|
|
|
def install_and_configure_geocoder_api_extension
|
|
geocoder_api_config = Cartodb.get_config(:geocoder, 'api')
|
|
# If there's no config we assume there's no need to install the
|
|
# geocoder client as it is an independent API
|
|
return if geocoder_api_config.blank?
|
|
install_geocoder_api_extension
|
|
@user.in_database(as: :superuser) do |db|
|
|
db.transaction do
|
|
db.run(build_geocoder_server_config_sql(geocoder_api_config))
|
|
db.run(build_entity_config_sql)
|
|
db.run("ALTER USER \"#{@user.database_username}\" SET search_path TO #{build_search_path}")
|
|
db.run("ALTER USER \"#{@user.database_public_username}\" SET search_path TO #{build_search_path}")
|
|
end
|
|
end
|
|
return true
|
|
rescue StandardError => e
|
|
CartoDB.notify_error(
|
|
'Error installing and configuring geocoder api extension',
|
|
error: e.inspect, user: @user
|
|
)
|
|
return false
|
|
end
|
|
|
|
def install_geocoder_api_extension
|
|
@user.in_database(as: :superuser) do |db|
|
|
db.transaction do
|
|
db.run("CREATE EXTENSION IF NOT EXISTS cdb_dataservices_client VERSION '#{CDB_DATASERVICES_CLIENT_VERSION}' CASCADE")
|
|
db.run("ALTER EXTENSION cdb_dataservices_client UPDATE TO '#{CDB_DATASERVICES_CLIENT_VERSION}'")
|
|
end
|
|
end
|
|
end
|
|
|
|
def install_odbc_fdw
|
|
@user.in_database(as: :superuser) do |db|
|
|
db.run('CREATE EXTENSION IF NOT EXISTS odbc_fdw SCHEMA public')
|
|
end
|
|
rescue Sequel::DatabaseError => error
|
|
# For the time being we'll be resilient to the odbc_fdw not being available
|
|
# and just proceed without installing it.
|
|
log_error(message: "Could not install odbc_fdw", exception: error, error_detail: error.cause.inspect)
|
|
end
|
|
|
|
def setup_organization_owner
|
|
setup_organization_role_permissions
|
|
configure_extension_org_metadata_api_endpoint
|
|
end
|
|
|
|
# Use a direct connection to the db through the direct port specified
|
|
# in the database configuration and set up its statement timeout value. This
|
|
# allows to overpass the statement_timeout limit if a connection pooler is used.
|
|
# This method is supposed to receive a block that will be run with the created
|
|
# connection.
|
|
def in_database_direct_connection(statement_timeout:)
|
|
raise 'need block' unless block_given?
|
|
|
|
configuration = db_configuration_for
|
|
configuration[:port] = configuration.fetch(:direct_port, configuration["direct_port"]) || configuration[:port] || configuration["port"]
|
|
|
|
connection = @user.get_connection(_opts = {}, configuration)
|
|
|
|
begin
|
|
connection.run("SET statement_timeout TO #{statement_timeout}")
|
|
yield(connection)
|
|
ensure
|
|
connection.run("SET statement_timeout TO DEFAULT")
|
|
end
|
|
end
|
|
|
|
def reset_pooled_connections
|
|
# Only close connections to this users' database
|
|
$pool.close_connections!(@user.database_name)
|
|
end
|
|
|
|
# Upgrade the cartodb postgresql extension
|
|
def upgrade_cartodb_postgres_extension(statement_timeout = nil, cdb_extension_target_version = nil)
|
|
if cdb_extension_target_version.nil?
|
|
cdb_extension_target_version = '0.36.0'
|
|
end
|
|
|
|
@user.in_database(as: :superuser, no_cartodb_in_schema: true) do |db|
|
|
db.transaction do
|
|
unless statement_timeout.nil?
|
|
old_timeout = db.fetch("SHOW statement_timeout;").first[:statement_timeout]
|
|
db.run("SET statement_timeout TO '#{statement_timeout}';")
|
|
end
|
|
|
|
db.run(%{
|
|
DO LANGUAGE 'plpgsql' $$
|
|
DECLARE
|
|
ver TEXT;
|
|
BEGIN
|
|
BEGIN
|
|
SELECT cartodb.cdb_version() INTO ver;
|
|
EXCEPTION WHEN undefined_function OR invalid_schema_name THEN
|
|
RAISE NOTICE 'Got % (%)', SQLERRM, SQLSTATE;
|
|
BEGIN
|
|
CREATE EXTENSION cartodb VERSION '#{cdb_extension_target_version}' CASCADE;
|
|
EXCEPTION WHEN undefined_table THEN
|
|
RAISE NOTICE 'Got % (%)', SQLERRM, SQLSTATE;
|
|
CREATE EXTENSION cartodb CASCADE VERSION '#{cdb_extension_target_version}';
|
|
RETURN;
|
|
END;
|
|
RETURN;
|
|
END;
|
|
ver := '#{cdb_extension_target_version}';
|
|
EXECUTE 'ALTER EXTENSION cartodb UPDATE TO ''' || ver || 'next''';
|
|
EXECUTE 'ALTER EXTENSION cartodb UPDATE TO ''' || ver || '''';
|
|
END;
|
|
$$;
|
|
})
|
|
|
|
unless statement_timeout.nil?
|
|
db.run("SET statement_timeout TO '#{old_timeout}';")
|
|
end
|
|
|
|
obtained = db.fetch('SELECT cartodb.cdb_version() as v').first[:v]
|
|
|
|
unless cartodb_extension_semver(cdb_extension_target_version) == cartodb_extension_semver(obtained)
|
|
raise("Expected cartodb extension '#{cdb_extension_target_version}' obtained '#{obtained}'")
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def cartodb_extension_version_pre_mu?
|
|
current_version = cartodb_extension_semver(cartodb_extension_version)
|
|
if current_version.size == 3
|
|
major, minor, = current_version
|
|
major == 0 && minor < 3
|
|
else
|
|
raise 'Current cartodb extension version does not match standard x.y.z format'
|
|
end
|
|
end
|
|
|
|
# Returns a tree elements array with [major, minor, patch] as in http://semver.org/
|
|
def cartodb_extension_semver(extension_version)
|
|
extension_version.split('.').take(3).map(&:to_i)
|
|
end
|
|
|
|
def cartodb_extension_version
|
|
@cartodb_extension_version ||= @user.in_database(as: :superuser)
|
|
.fetch('SELECT cartodb.cdb_version() AS v').first[:v]
|
|
end
|
|
|
|
def reset_user_schema_permissions
|
|
@user.in_database(as: :superuser) do |user_database|
|
|
user_database.transaction do
|
|
schemas = [@user.database_schema].uniq
|
|
schemas.each do |schema|
|
|
revoke_privileges(user_database, schema, 'PUBLIC')
|
|
end
|
|
yield(user_database) if block_given?
|
|
end
|
|
end
|
|
end
|
|
|
|
def reset_database_permissions
|
|
@user.in_database(as: :superuser) do |user_database|
|
|
user_database.transaction do
|
|
schemas = %w(public cdb_importer cdb cartodb)
|
|
['PUBLIC', CartoDB::PUBLIC_DB_USER].each do |user|
|
|
revoke_all_on_database_from(user_database, @user.database_name, user)
|
|
schemas.each do |schema|
|
|
revoke_privileges(user_database, schema, user)
|
|
end
|
|
end
|
|
yield(user_database) if block_given?
|
|
end
|
|
end
|
|
end
|
|
|
|
def set_statement_timeouts
|
|
@user.in_database(as: :superuser) do |user_database|
|
|
user_database["ALTER ROLE \"?\" SET statement_timeout to ?", Sequel.lit(@user.database_username),
|
|
@user.user_timeout].all
|
|
user_database["ALTER DATABASE \"?\" SET statement_timeout to ?", Sequel.lit(@user.database_name),
|
|
@user.database_timeout].all
|
|
end
|
|
@user.in_database.disconnect
|
|
@user.in_database.connect(db_configuration_for)
|
|
@user.in_database(as: :public_user).disconnect
|
|
@user.in_database(as: :public_user).connect(db_configuration_for(:public_user))
|
|
rescue Sequel::DatabaseConnectionError
|
|
end
|
|
|
|
def set_user_as_organization_member
|
|
@user.in_database(as: :superuser) do |user_database|
|
|
user_database.transaction do
|
|
user_database.run("SELECT cartodb.CDB_Organization_Create_Member('#{@user.database_username}');")
|
|
end
|
|
end
|
|
end
|
|
|
|
def reset_schema_owner
|
|
@user.in_database(as: :superuser) do |database|
|
|
database.run(%{ ALTER SCHEMA "#{@user.database_schema}" OWNER TO "#{@user.database_username}" })
|
|
end
|
|
end
|
|
|
|
def grant_user_in_database
|
|
@queries.run_in_transaction(
|
|
@queries.grant_connect_on_database_queries,
|
|
true
|
|
)
|
|
end
|
|
|
|
def grant_publicuser_in_database
|
|
@queries.run_in_transaction(
|
|
@queries.grant_connect_on_database_queries(CartoDB::PUBLIC_DB_USER),
|
|
true
|
|
)
|
|
@queries.run_in_transaction(
|
|
@queries.grant_read_on_schema_queries(SCHEMA_CARTODB, CartoDB::PUBLIC_DB_USER),
|
|
true
|
|
)
|
|
@queries.run_in_transaction(
|
|
[
|
|
"REVOKE SELECT ON cartodb.cdb_tablemetadata FROM #{CartoDB::PUBLIC_DB_USER} CASCADE",
|
|
"REVOKE SELECT ON cartodb.cdb_analysis_catalog FROM #{CartoDB::PUBLIC_DB_USER} CASCADE"
|
|
],
|
|
true
|
|
)
|
|
@queries.run_in_transaction(
|
|
[
|
|
"GRANT USAGE ON SCHEMA #{SCHEMA_PUBLIC} TO #{CartoDB::PUBLIC_DB_USER}",
|
|
"GRANT EXECUTE ON ALL FUNCTIONS IN SCHEMA #{SCHEMA_PUBLIC} TO #{CartoDB::PUBLIC_DB_USER}",
|
|
"GRANT SELECT ON spatial_ref_sys TO #{CartoDB::PUBLIC_DB_USER}"
|
|
],
|
|
true
|
|
)
|
|
end
|
|
|
|
def set_user_privileges_in_own_schema # MU
|
|
@queries.run_in_transaction(
|
|
@queries.grant_all_on_user_schema_queries,
|
|
true
|
|
)
|
|
end
|
|
|
|
def set_user_privileges_in_cartodb_schema(db_user = nil)
|
|
@queries.run_in_transaction(
|
|
(
|
|
@queries.grant_read_on_schema_queries(SCHEMA_CARTODB, db_user) +
|
|
@queries.grant_write_on_cdb_tablemetadata_queries(db_user) +
|
|
@queries.grant_write_on_cdb_analysis_catalog_queries(db_user)
|
|
),
|
|
true
|
|
)
|
|
end
|
|
|
|
def set_privileges_to_publicuser_in_own_schema # MU
|
|
# Privileges in user schema for publicuser
|
|
@queries.run_in_transaction(
|
|
@queries.grant_usage_on_user_schema_to_other(CartoDB::PUBLIC_DB_USER),
|
|
true
|
|
)
|
|
end
|
|
|
|
def set_user_privileges_in_public_schema(db_user = nil)
|
|
@queries.run_in_transaction(
|
|
@queries.grant_read_on_schema_queries(SCHEMA_PUBLIC, db_user),
|
|
true
|
|
)
|
|
end
|
|
|
|
def set_user_privileges_in_importer_schema(db_user = nil) # MU
|
|
@queries.run_in_transaction(
|
|
@queries.grant_all_on_schema_queries(SCHEMA_IMPORTER, db_user),
|
|
true
|
|
)
|
|
end
|
|
|
|
def revoke_all_on_database_from(conn, database, role)
|
|
conn.run("REVOKE ALL ON DATABASE \"#{database}\" FROM \"#{role}\" CASCADE") if role_exists?(conn, role)
|
|
end
|
|
|
|
def grant_owner_in_database
|
|
@queries.run_in_transaction(
|
|
@queries.grant_all_on_database_queries,
|
|
true
|
|
)
|
|
end
|
|
|
|
def fix_table_permissions
|
|
tables_queries = []
|
|
@user.tables.each do |table|
|
|
Carto::TableAndFriends.apply(@user.in_database, @user.database_schema, table.name) do |schema, table_name|
|
|
if table.public? || table.public_with_link_only?
|
|
tables_queries << %{
|
|
GRANT SELECT ON \"#{schema}\".\"#{table_name}\" TO #{CartoDB::PUBLIC_DB_USER} }
|
|
end
|
|
tables_queries << %{
|
|
ALTER TABLE \"#{schema}\".\"#{table_name}\" OWNER TO \"#{@user.database_username}\" }
|
|
end
|
|
end
|
|
@queries.run_in_transaction(
|
|
tables_queries,
|
|
true
|
|
)
|
|
end
|
|
|
|
def set_user_privileges_in_geocoding_schema(db_user = nil)
|
|
@queries.run_in_transaction(
|
|
@queries.grant_all_on_schema_queries(SCHEMA_GEOCODING, db_user),
|
|
true
|
|
)
|
|
end
|
|
|
|
def set_geo_columns_privileges(role_name = nil)
|
|
# Postgis lives at public schema, as do geometry_columns and geography_columns
|
|
catalogs_schema = SCHEMA_PUBLIC
|
|
target_user = role_name.nil? ? @user.database_public_username : role_name
|
|
queries = [
|
|
%{ GRANT SELECT ON "#{catalogs_schema}"."geometry_columns" TO "#{target_user}" },
|
|
%{ GRANT SELECT ON "#{catalogs_schema}"."geography_columns" TO "#{target_user}" }
|
|
]
|
|
@queries.run_in_transaction(queries, true)
|
|
end
|
|
|
|
# PG12_DEPRECATED in postgis 3+
|
|
def set_raster_privileges(role_name = nil)
|
|
database = @user.in_database(as: :superuser)
|
|
return unless database.table_exists?('raster_overviews')
|
|
# Postgis lives at public schema, so raster catalogs too
|
|
catalogs_schema = SCHEMA_PUBLIC
|
|
queries = [
|
|
"GRANT SELECT ON TABLE \"#{catalogs_schema}\".\"raster_overviews\" TO \"#{CartoDB::PUBLIC_DB_USER}\"",
|
|
"GRANT SELECT ON TABLE \"#{catalogs_schema}\".\"raster_columns\" TO \"#{CartoDB::PUBLIC_DB_USER}\""
|
|
]
|
|
target_user = role_name || @user.database_public_username
|
|
unless @user.organization.nil?
|
|
queries << "GRANT SELECT ON TABLE \"#{catalogs_schema}\".\"raster_overviews\" TO \"#{target_user}\""
|
|
queries << "GRANT SELECT ON TABLE \"#{catalogs_schema}\".\"raster_columns\" TO \"#{target_user}\""
|
|
end
|
|
@queries.run_in_transaction(queries, true)
|
|
end
|
|
|
|
def setup_organization_role_permissions
|
|
org_member_role = organization_member_group_role_member_name
|
|
set_user_privileges_in_public_schema(org_member_role)
|
|
@queries.run_in_transaction(
|
|
@queries.grant_connect_on_database_queries(org_member_role), true
|
|
)
|
|
set_geo_columns_privileges(org_member_role)
|
|
set_raster_privileges(org_member_role)
|
|
set_user_privileges_in_cartodb_schema(org_member_role)
|
|
set_user_privileges_in_importer_schema(org_member_role)
|
|
set_user_privileges_in_geocoding_schema(org_member_role)
|
|
end
|
|
|
|
def schema_exists?(schema, database = @user.in_database)
|
|
query = "SELECT 1 as schema_exist FROM information_schema.schemata WHERE schema_name = '#{schema}'"
|
|
!database.fetch(query).first.nil?
|
|
end
|
|
|
|
def drop_users_privileges_in_schema(schema, accounts, conn: nil)
|
|
user_database = conn || @user.in_database(as: :superuser, statement_timeout: 600000)
|
|
return unless schema_exists?(schema, user_database)
|
|
|
|
user_database.transaction do
|
|
accounts
|
|
.select { |role| role_exists?(user_database, role) }
|
|
.each { |role| revoke_privileges(user_database, schema, "\"#{role}\"") }
|
|
end
|
|
end
|
|
|
|
def revoke_all_memberships_on_database_to_role(conn, role)
|
|
conn.fetch(%{
|
|
select rolname from pg_user join pg_auth_members on (pg_user.usesysid=pg_auth_members.member)
|
|
join pg_roles on (pg_roles.oid=pg_auth_members.roleid) where pg_user.usename='#{role}'
|
|
}).each do |rolname|
|
|
conn.run("REVOKE \"#{rolname[:rolname]}\" FROM \"#{role}\" CASCADE")
|
|
end
|
|
end
|
|
|
|
def drop_analysis_cache(user)
|
|
# Filtering this query by tableowner should be enough but for security reasons I've added an additional
|
|
# filter by schema. Also we have add a regexp to matches the current analysis tables name format to avoid
|
|
# deleting user tables in the process (https://github.com/CartoDB/camshaft/blob/0.59.4/lib/node/node.js#L344-L348)
|
|
cache_tables_sql = "SELECT tablename FROM pg_tables WHERE schemaname = '#{user.database_schema}' and tableowner = '#{user.database_username}' and tablename ~* '^analysis_[0-9a-z]{10}_[0-9a-z]{40}$';"
|
|
delete_analysis_metadata_sql = "DELETE FROM cdb_analysis_catalog WHERE username = '#{user.username}'"
|
|
user.in_database(as: :superuser) do |database|
|
|
database.transaction do
|
|
database.fetch(cache_tables_sql).map(:tablename).each do |cache_table_name|
|
|
database.run(%{DROP TABLE "#{user.database_schema}"."#{cache_table_name}"})
|
|
end
|
|
database.run(delete_analysis_metadata_sql)
|
|
end
|
|
end
|
|
end
|
|
|
|
# Drops grants and functions in a given schema, avoiding by all means a CASCADE
|
|
# to not affect extensions or other users
|
|
def drop_all_functions_from_schema(schema_name)
|
|
recursivity_max_depth = 3
|
|
|
|
return if schema_name == SCHEMA_PUBLIC
|
|
|
|
@user.in_database(as: :superuser) do |database|
|
|
# Non-aggregate functions
|
|
drop_function_sqls = database.fetch(get_drop_functions_sql(database, schema_name))
|
|
# Simulate a controlled environment drop cascade contained to only functions
|
|
failed_sqls = []
|
|
recursivity_level = 0
|
|
begin
|
|
failed_sqls = []
|
|
drop_function_sqls.each do |sql_sentence|
|
|
begin
|
|
database.run(sql_sentence[:sql])
|
|
rescue Sequel::DatabaseError => e
|
|
if e.message =~ /depends on function /i
|
|
failed_sqls.push(sql_sentence)
|
|
else
|
|
raise
|
|
end
|
|
end
|
|
end
|
|
drop_function_sqls = failed_sqls
|
|
recursivity_level += 1
|
|
end while failed_sqls.count > 0 && recursivity_level < recursivity_max_depth
|
|
|
|
# If something remains, reattempt later after dropping aggregates
|
|
if drop_function_sqls.count > 0
|
|
aggregate_dependant_function_sqls = drop_function_sqls
|
|
else
|
|
aggregate_dependant_function_sqls = []
|
|
end
|
|
|
|
# And now aggregate functions
|
|
failed_sqls = []
|
|
drop_function_sqls = database.fetch(get_drop_functions_sql(database, schema_name, aggregated: true))
|
|
drop_function_sqls.each do |sql_sentence|
|
|
begin
|
|
database.run(sql_sentence[:sql])
|
|
rescue Sequel::DatabaseError
|
|
failed_sqls.push(sql_sentence)
|
|
end
|
|
end
|
|
|
|
if failed_sqls.count > 0
|
|
raise CartoDB::BaseCartoDBError.new('Cannot drop schema aggregate functions, dependencies remain')
|
|
end
|
|
|
|
# One final pass of normal functions, if left
|
|
if aggregate_dependant_function_sqls.count > 0
|
|
aggregate_dependant_function_sqls.each do |sql_sentence|
|
|
begin
|
|
database.run(sql_sentence[:sql])
|
|
rescue Sequel::DatabaseError
|
|
failed_sqls.push(sql_sentence)
|
|
end
|
|
end
|
|
end
|
|
|
|
if failed_sqls.count > 0
|
|
raise CartoDB::BaseCartoDBError.new('Cannot drop schema functions, dependencies remain')
|
|
end
|
|
end
|
|
end
|
|
|
|
def get_drop_functions_sql(database, schema_name, aggregated: false)
|
|
set_pgversion
|
|
|
|
if @pgversion >= 110000
|
|
agg_join_clause = "pg_proc.prokind #{aggregated ? ' = ' : ' <> '} 'a'"
|
|
else
|
|
agg_join_clause = "pg_proc.proisagg = #{aggregated ? 'TRUE' : 'FALSE'}"
|
|
end
|
|
|
|
drop_type = aggregated ? 'AGGREGATE' : 'FUNCTION'
|
|
|
|
%{SELECT 'DROP #{drop_type} "' || ns.nspname || '".' || proname || '(' || oidvectortypes(proargtypes) || ');'
|
|
AS sql
|
|
FROM pg_proc INNER JOIN pg_namespace ns ON (pg_proc.pronamespace = ns.oid AND #{agg_join_clause} )
|
|
WHERE ns.nspname = '#{schema_name}'}
|
|
end
|
|
|
|
# Create a "public.cdb_invalidate_varnish()" function to invalidate Varnish
|
|
#
|
|
# The function can only be used by the superuser, we expect
|
|
# security-definer triggers OR triggers on superuser-owned tables
|
|
# to call it with controlled set of parameters.
|
|
#
|
|
# The function is written in python because it needs to reach out
|
|
# to a Varnish server.
|
|
#
|
|
# Being unable to communicate with Varnish may or may not be critical
|
|
# depending on CartoDB configuration at time of function definition.
|
|
#
|
|
def create_function_invalidate_varnish
|
|
if Cartodb.get_config(:invalidation_service, 'enabled')
|
|
create_function_invalidate_varnish_invalidation_service
|
|
elsif Cartodb.get_config(:varnish_management, 'http_port')
|
|
create_function_invalidate_varnish_http
|
|
else
|
|
create_function_invalidate_varnish_telnet
|
|
end
|
|
end
|
|
|
|
# Add plpythonu pl handler
|
|
def add_python
|
|
set_pgversion
|
|
|
|
@user.in_database(
|
|
as: :superuser,
|
|
no_cartodb_in_schema: true
|
|
).run("CREATE EXTENSION IF NOT EXISTS #{@plpythonu};")
|
|
end
|
|
|
|
# Needed because in some cases it might not exist and failure ends transaction
|
|
def role_exists?(db, role)
|
|
!db.fetch("SELECT 1 FROM pg_roles WHERE rolname='#{role}'").first.nil?
|
|
end
|
|
|
|
def revoke_privileges(db, schema, user)
|
|
db.run("REVOKE ALL ON SCHEMA \"#{schema}\" FROM #{user} CASCADE")
|
|
db.run("REVOKE ALL ON ALL SEQUENCES IN SCHEMA \"#{schema}\" FROM #{user} CASCADE")
|
|
db.run("REVOKE ALL ON ALL FUNCTIONS IN SCHEMA \"#{schema}\" FROM #{user} CASCADE")
|
|
db.run("REVOKE ALL ON ALL TABLES IN SCHEMA \"#{schema}\" FROM #{user} CASCADE")
|
|
end
|
|
|
|
def organization_member_group_role_member_name
|
|
@user.in_database.fetch(
|
|
"SELECT cartodb.CDB_Organization_Member_Group_Role_Member_Name() as org_member_role;"
|
|
)[:org_member_role][:org_member_role]
|
|
end
|
|
|
|
def db_configuration_for(user_role = nil)
|
|
logger = (Rails.env.development? || Rails.env.test? ? ::Rails.logger : nil)
|
|
if user_role == :superuser
|
|
::SequelRails.configuration.environment_for(Rails.env).merge(
|
|
'database' => @user.database_name,
|
|
:logger => logger,
|
|
'host' => @user.database_host
|
|
) { |_, o, n| n.nil? ? o : n }
|
|
elsif user_role == :cluster_admin
|
|
::SequelRails.configuration.environment_for(Rails.env).merge(
|
|
'database' => 'postgres',
|
|
:logger => logger,
|
|
'host' => @user.database_host
|
|
) { |_, o, n| n.nil? ? o : n }
|
|
elsif user_role == :public_user
|
|
::SequelRails.configuration.environment_for(Rails.env).merge(
|
|
'database' => @user.database_name,
|
|
:logger => logger,
|
|
'username' => CartoDB::PUBLIC_DB_USER, 'password' => CartoDB::PUBLIC_DB_USER_PASSWORD,
|
|
'host' => @user.database_host
|
|
) { |_, o, n| n.nil? ? o : n }
|
|
elsif user_role == :public_db_user
|
|
::SequelRails.configuration.environment_for(Rails.env).merge(
|
|
'database' => @user.database_name,
|
|
:logger => logger,
|
|
'username' => @user.database_public_username, 'password' => CartoDB::PUBLIC_DB_USER_PASSWORD,
|
|
'host' => @user.database_host
|
|
) { |_, o, n| n.nil? ? o : n }
|
|
else
|
|
::SequelRails.configuration.environment_for(Rails.env).merge(
|
|
'database' => @user.database_name,
|
|
:logger => logger,
|
|
'username' => @user.database_username,
|
|
'password' => @user.database_password,
|
|
'host' => @user.database_host
|
|
) { |_, o, n| n.nil? ? o : n }
|
|
end
|
|
end
|
|
|
|
def monitor_user_notification
|
|
FileUtils.touch(log_file_path('users_modifications'))
|
|
enable_remote_db_user if Cartodb.get_config(:signups, 'service', 'port')
|
|
end
|
|
|
|
def enable_remote_db_user
|
|
request = http_client.request(
|
|
"#{@user.database_host}:#{Cartodb.get_config(:signups, 'service', 'port')}/scripts/activate_db_user",
|
|
method: :post,
|
|
headers: { "Content-Type" => "application/json" }
|
|
)
|
|
response = request.run
|
|
if response.code != 200
|
|
raise(response.body)
|
|
else
|
|
comm_response = JSON.parse(response.body)
|
|
if comm_response['retcode'].to_i != 0
|
|
raise(response['stderr'])
|
|
end
|
|
end
|
|
end
|
|
|
|
def create_own_schema
|
|
load_cartodb_functions
|
|
@user.database_schema = @user.username
|
|
@user.this.update(database_schema: @user.database_schema)
|
|
create_user_schema
|
|
set_database_search_path
|
|
create_public_db_user
|
|
end
|
|
|
|
def move_to_own_schema
|
|
new_schema_name = @user.username
|
|
old_database_schema_name = @user.database_schema
|
|
if @user.database_schema != new_schema_name
|
|
Carto::Db::UserSchemaMover.new(@user).move_objects(new_schema_name)
|
|
|
|
create_public_db_user
|
|
set_database_search_path
|
|
end
|
|
rescue StandardError => e
|
|
# Undo metadata changes if process fails
|
|
begin
|
|
@user.this.update database_schema: old_database_schema_name
|
|
|
|
# Defensive measure to avoid undesired table dropping
|
|
if schema_exists?(new_schema_name) && tables_effective(new_schema_name).count == 0
|
|
drop_all_functions_from_schema(new_schema_name)
|
|
@user.in_database.run(%{ DROP SCHEMA "#{new_schema_name}" })
|
|
end
|
|
rescue StandardError => ee
|
|
# Avoid shadowing the actual error
|
|
CartoDB.notify_exception(ee, user: @user)
|
|
end
|
|
raise e
|
|
end
|
|
|
|
def drop_database_and_user(conn = nil)
|
|
conn ||= @user.in_database(as: :cluster_admin)
|
|
|
|
if !@user.database_name.nil? && !@user.database_name.empty?
|
|
@user.in_database(as: :superuser).run("DROP SCHEMA \"#{@user.database_schema}\" CASCADE")
|
|
conn.run("UPDATE pg_database SET datallowconn = 'false' WHERE datname = '#{@user.database_name}'")
|
|
CartoDB::UserModule::DBService.terminate_database_connections(@user.database_name, @user.database_host)
|
|
conn.run("DROP DATABASE \"#{@user.database_name}\"")
|
|
end
|
|
|
|
if !@user.database_username.nil? && !@user.database_username.empty?
|
|
conn.run("DROP USER \"#{@user.database_username}\"")
|
|
end
|
|
end
|
|
|
|
def run_pg_query(query)
|
|
time = nil
|
|
res = nil
|
|
translation_proc = nil
|
|
@user.in_database do |user_database|
|
|
time = Benchmark.measure do
|
|
user_database.synchronize do |conn|
|
|
res = conn.exec query
|
|
end
|
|
translation_proc = user_database.conversion_procs
|
|
end
|
|
end
|
|
{
|
|
time: time.real,
|
|
total_rows: res.ntuples,
|
|
rows: pg_to_hash(res, translation_proc),
|
|
results: pg_results?(res),
|
|
modified: pg_modified?(res),
|
|
affected_rows: pg_size(res)
|
|
}
|
|
rescue StandardError => e
|
|
if e.is_a? PGError
|
|
if e.message.include?("does not exist")
|
|
if e.message.include?("column")
|
|
raise CartoDB::ColumnNotExists, e.message
|
|
else
|
|
raise CartoDB::TableNotExists, e.message
|
|
end
|
|
else
|
|
raise CartoDB::ErrorRunningQuery, e.message
|
|
end
|
|
else
|
|
raise e
|
|
end
|
|
end
|
|
|
|
def create_db_user
|
|
conn = @user.in_database(as: :cluster_admin)
|
|
begin
|
|
conn.transaction do
|
|
begin
|
|
conn.run("CREATE USER \"#{@user.database_username}\" PASSWORD '#{@user.database_password}'")
|
|
conn.run("GRANT publicuser to \"#{@user.database_username}\"")
|
|
rescue StandardError => e
|
|
puts "#{Time.now} USER SETUP ERROR (#{@user.database_username}): #{$!}"
|
|
raise e
|
|
end
|
|
end
|
|
end
|
|
end
|
|
|
|
def create_user_db
|
|
conn = @user.in_database(as: :cluster_admin)
|
|
begin
|
|
conn.run("CREATE DATABASE \"#{@user.database_name}\"
|
|
WITH TEMPLATE = template_postgis
|
|
OWNER = #{::SequelRails.configuration.environment_for(Rails.env)['username']}
|
|
ENCODING = 'UTF8'
|
|
CONNECTION LIMIT=-1")
|
|
rescue StandardError => e
|
|
puts "#{Time.now} USER SETUP ERROR WHEN CREATING DATABASE #{@user.database_name}: #{$!}"
|
|
raise e
|
|
end
|
|
end
|
|
|
|
def set_database_name
|
|
@user.database_name = case Rails.env
|
|
when 'development'
|
|
"cartodb_dev_user_#{@user.partial_db_name}_db"
|
|
when 'staging'
|
|
"cartodb_staging_user_#{@user.partial_db_name}_db"
|
|
when 'test'
|
|
"cartodb_test_user_#{@user.partial_db_name}_db"
|
|
else
|
|
"cartodb_user_#{@user.partial_db_name}_db"
|
|
end
|
|
if @user.has_organization_enabled?
|
|
if !@user.database_exists?
|
|
raise "Organization database #{@user.database_name} doesn't exist"
|
|
end
|
|
else
|
|
if @user.database_exists?
|
|
raise "Database #{@user.database_name} already exists"
|
|
end
|
|
end
|
|
@user.this.update database_name: @user.database_name
|
|
end
|
|
|
|
def public_user_roles
|
|
@user.organization_user? ? [CartoDB::PUBLIC_DB_USER, @user.database_public_username] : [CartoDB::PUBLIC_DB_USER]
|
|
end
|
|
|
|
def terminate_database_connections
|
|
CartoDB::UserModule::DBService.terminate_database_connections(@user.database_name, @user.database_host)
|
|
end
|
|
|
|
def self.terminate_database_connections(database_name, database_host)
|
|
connection_params = ::SequelRails.configuration.environment_for(Rails.env).merge(
|
|
'host' => database_host,
|
|
'database' => 'postgres'
|
|
) { |_, o, n| n.nil? ? o : n }
|
|
conn = ::Sequel.connect(connection_params)
|
|
conn.run("
|
|
DO language plpgsql $$
|
|
DECLARE
|
|
ver INT;
|
|
sql TEXT;
|
|
BEGIN
|
|
SELECT INTO ver setting from pg_settings where name='server_version_num';
|
|
sql := 'SELECT pg_terminate_backend(';
|
|
IF ver > 90199 THEN
|
|
sql := sql || 'pid';
|
|
ELSE
|
|
sql := sql || 'procpid';
|
|
END IF;
|
|
|
|
sql := sql || ') FROM pg_stat_activity WHERE datname = '
|
|
|| quote_literal('#{database_name}');
|
|
|
|
RAISE NOTICE '%', sql;
|
|
|
|
EXECUTE sql;
|
|
END
|
|
$$
|
|
")
|
|
close_sequel_connection(conn)
|
|
end
|
|
|
|
def triggers(schema = @user.database_schema)
|
|
Carto::Db::Database.build_with_user(@user).triggers(schema)
|
|
end
|
|
|
|
def functions(schema = @user.database_schema)
|
|
Carto::Db::Database.build_with_user(@user).functions(schema, @user.database_username)
|
|
end
|
|
|
|
def views(schema = @user.database_schema)
|
|
Carto::Db::Database.build_with_user(@user).views(schema, @user.database_username)
|
|
end
|
|
|
|
def materialized_views(schema = @user.database_schema)
|
|
Carto::Db::Database.build_with_user(@user).materialized_views(schema, @user.database_username)
|
|
end
|
|
|
|
def get_database_version
|
|
version_match = @user.in_database.fetch("SELECT version()").first[:version].match(/(PostgreSQL (([0-9]+\.?){2,3})).*/)
|
|
if version_match.nil?
|
|
return nil
|
|
else
|
|
return version_match[2]
|
|
end
|
|
end
|
|
|
|
def connect_to_aggregation_tables
|
|
config = Cartodb.get_config(:aggregation_tables)
|
|
@user.in_database(as: :superuser) do |db|
|
|
db.transaction do
|
|
db.run(build_aggregation_fdw_config_sql(config))
|
|
db.run("SELECT cartodb._CDB_Setup_FDW('aggregation');")
|
|
db.run("CREATE FOREIGN TABLE IF NOT EXISTS #{SCHEMA_AGGREGATION_TABLES}.agg_admin0 " \
|
|
"(cartodb_id integer, the_geom geometry(Geometry,4326), " \
|
|
"the_geom_webmercator geometry(Geometry,3857), " \
|
|
"population double precision OPTIONS (column_name 'pop_est')) SERVER aggregation OPTIONS " \
|
|
"(schema_name 'public', table_name '#{config['tables']['admin0']}', updatable 'false');")
|
|
db.run("CREATE FOREIGN TABLE IF NOT EXISTS #{SCHEMA_AGGREGATION_TABLES}.agg_admin1 " \
|
|
"(cartodb_id integer,the_geom geometry(Geometry,4326), " \
|
|
"the_geom_webmercator geometry(Geometry,3857)) " \
|
|
"SERVER aggregation OPTIONS (schema_name 'public', table_name '#{config['tables']['admin1']}', updatable 'false');")
|
|
db.run("GRANT SELECT ON TABLE #{SCHEMA_AGGREGATION_TABLES}.agg_admin0 TO \"#{@user.database_username}\";")
|
|
db.run("GRANT SELECT ON TABLE #{SCHEMA_AGGREGATION_TABLES}.agg_admin1 TO \"#{@user.database_username}\";")
|
|
end
|
|
end
|
|
end
|
|
|
|
# Execute a query in the user database
|
|
# @param [String] query, using $1, $2 ... for placeholders
|
|
# @param ... values for the placeholders
|
|
# @return [Array<Hash<String, Value>>] Something (an actual Array in this case) that behaves like an Array
|
|
# of Hashes that map column name (as string) to value
|
|
# @raise [PG::Error] if the query fails
|
|
def execute_in_user_database(query, *binds)
|
|
placeholder_query = query.gsub(/\$\d+/, '?')
|
|
@user.in_database.fetch(placeholder_query, *binds).all.map(&:stringify_keys)
|
|
rescue Sequel::DatabaseError => exception
|
|
raise exception.cause
|
|
end
|
|
|
|
def create_ghost_tables_event_trigger
|
|
return if @user.has_feature_flag?('ghost_tables_trigger_disabled')
|
|
configure_ghost_table_event_trigger
|
|
@user.in_database(as: :superuser).run('SELECT CDB_EnableGhostTablesTrigger()')
|
|
end
|
|
|
|
def configure_ghost_table_event_trigger
|
|
tis_config = Cartodb.config[:invalidation_service]
|
|
return unless tis_config
|
|
@user.in_database(as: :superuser)
|
|
.run("SELECT cartodb.CDB_Conf_SetConf('invalidation_service', '#{tis_config.to_json}')")
|
|
end
|
|
|
|
def drop_ghost_tables_event_trigger
|
|
@user.in_database(as: :superuser).run('SELECT CDB_DisableGhostTablesTrigger()')
|
|
end
|
|
|
|
def create_oauth_reassign_ownership_event_trigger
|
|
@user.in_database(as: :superuser).run('SELECT CDB_EnableOAuthReassignTablesTrigger()')
|
|
end
|
|
|
|
def drop_oauth_reassign_ownership_event_trigger
|
|
@user.in_database(as: :superuser).run('SELECT CDB_DisableOAuthReassignTablesTrigger()')
|
|
end
|
|
|
|
private
|
|
|
|
def http_client
|
|
@http_client ||= Carto::Http::Client.get('old_user', log_requests: true)
|
|
end
|
|
|
|
# Telnet invalidation works only for Varnish 2.x.
|
|
def create_function_invalidate_varnish_telnet
|
|
add_python
|
|
|
|
varnish_host = Cartodb.get_config(:varnish_management, 'host') || '127.0.0.1'
|
|
varnish_port = Cartodb.get_config(:varnish_management, 'port') || 6082
|
|
varnish_timeout = Cartodb.get_config(:varnish_management, 'timeout') || 5
|
|
varnish_critical = Cartodb.get_config(:varnish_management, 'critical') == true ? 1 : 0
|
|
varnish_retry = Cartodb.get_config(:varnish_management, 'retry') || 5
|
|
purge_command = Cartodb.get_config(:varnish_management, 'purge_command')
|
|
varnish_trigger_verbose = Cartodb.get_config(:varnish_management).fetch('trigger_verbose', true) == true ? 1 : 0
|
|
|
|
@user.in_database(as: :superuser).run(
|
|
<<-TRIGGER
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION public.cdb_invalidate_varnish(table_name text) RETURNS void AS
|
|
$$
|
|
critical = #{varnish_critical}
|
|
timeout = #{varnish_timeout}
|
|
retry = #{varnish_retry}
|
|
trigger_verbose = #{varnish_trigger_verbose}
|
|
|
|
client = GD.get('varnish', None)
|
|
for i in ('base64', 'hashlib'):
|
|
if not i in GD:
|
|
GD[i] = __import__(i)
|
|
|
|
while True:
|
|
|
|
if not client:
|
|
try:
|
|
import varnish
|
|
client = GD['varnish'] = varnish.VarnishHandler(('#{varnish_host}', #{varnish_port}, timeout))
|
|
except Exception as err:
|
|
# NOTE: we won't retry on connection error
|
|
if critical:
|
|
plpy.error('Varnish connection error: ' + str(err))
|
|
break
|
|
|
|
try:
|
|
cache_key = "t:" + GD['base64'].b64encode(GD['hashlib'].sha256('#{@user.database_name}:%s' % table_name).digest())[0:6]
|
|
# We want to say \b here, but the Varnish telnet interface expects \\b, we have to escape that on Python to \\\\b and double that for SQL
|
|
client.fetch('#{purge_command} obj.http.Surrogate-Key ~ "\\\\\\\\b%s\\\\\\\\b"' % cache_key)
|
|
break
|
|
except Exception as err:
|
|
if trigger_verbose:
|
|
plpy.warning('Varnish fetch error: ' + str(err))
|
|
client = GD['varnish'] = None # force reconnect
|
|
if not retry:
|
|
if critical:
|
|
plpy.error('Varnish fetch error: ' + str(err))
|
|
break
|
|
retry -= 1 # try reconnecting
|
|
$$
|
|
LANGUAGE '#{@plpythonu}' VOLATILE;
|
|
REVOKE ALL ON FUNCTION public.cdb_invalidate_varnish(TEXT) FROM PUBLIC;
|
|
COMMIT;
|
|
TRIGGER
|
|
)
|
|
end
|
|
|
|
def create_function_invalidate_varnish_http
|
|
add_python
|
|
|
|
varnish_host = Cartodb.get_config(:varnish_management, 'host') || '127.0.0.1'
|
|
varnish_port = Cartodb.get_config(:varnish_management, 'http_port') || 6081
|
|
varnish_timeout = Cartodb.get_config(:varnish_management, 'timeout') || 5
|
|
varnish_critical = Cartodb.get_config(:varnish_management, 'critical') == true ? 1 : 0
|
|
varnish_retry = Cartodb.get_config(:varnish_management, 'retry') || 5
|
|
varnish_trigger_verbose = Cartodb.get_config(:varnish_management).fetch('trigger_verbose', true) == true ? 1 : 0
|
|
|
|
# PG12_DEPRECATED remove support for python 2
|
|
@user.in_database(as: :superuser).run(
|
|
<<-TRIGGER
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION public.cdb_invalidate_varnish(table_name text) RETURNS void AS
|
|
$$
|
|
critical = #{varnish_critical}
|
|
timeout = #{varnish_timeout}
|
|
retry = #{varnish_retry}
|
|
trigger_verbose = #{varnish_trigger_verbose}
|
|
|
|
if not 'httplib' in GD:
|
|
try:
|
|
GD['httplib'] = __import__('httplib')
|
|
except:
|
|
from http import client
|
|
GD['httplib'] = client
|
|
|
|
for i in ('base64', 'hashlib'):
|
|
if not i in GD:
|
|
GD[i] = __import__(i)
|
|
|
|
while True:
|
|
|
|
try:
|
|
database_table = '#{@user.database_name}:%s' % table_name
|
|
try:
|
|
conn = GD['httplib'].HTTPConnection('#{varnish_host}', #{varnish_port}, False, timeout)
|
|
dbtable_hash = GD['hashlib'].sha256(database_table).digest()
|
|
dbtable_encoded = GD['base64'].b64encode(dbtable_hash)[0:6]
|
|
except Exception:
|
|
conn = GD['httplib'].HTTPConnection('#{varnish_host}', port=#{varnish_port}, timeout=timeout)
|
|
dbtable_hash = GD['hashlib'].sha256(database_table.encode()).digest()
|
|
dbtable_encoded = GD['base64'].b64encode(dbtable_hash)[0:6].decode()
|
|
raw_cache_key = 't:%s' % dbtable_encoded
|
|
cache_key = raw_cache_key.replace('+', r'\+')
|
|
conn.request('PURGE', '/key', '', {"Invalidation-Match": ('\\\\b%s\\\\b' % cache_key) })
|
|
response = conn.getresponse()
|
|
assert response.status == 204
|
|
break
|
|
except Exception as err:
|
|
if trigger_verbose:
|
|
plpy.warning('Varnish purge error: ' + str(err))
|
|
if not retry:
|
|
if critical:
|
|
plpy.error('Varnish purge error: ' + str(err))
|
|
break
|
|
retry -= 1 # try reconnecting
|
|
$$
|
|
LANGUAGE '#{@plpythonu}' VOLATILE;
|
|
REVOKE ALL ON FUNCTION public.cdb_invalidate_varnish(TEXT) FROM PUBLIC;
|
|
COMMIT;
|
|
TRIGGER
|
|
)
|
|
end
|
|
|
|
# Invalidate through external service
|
|
def create_function_invalidate_varnish_invalidation_service
|
|
add_python
|
|
|
|
invalidation_host = Cartodb.get_config(:invalidation_service, 'host') || '127.0.0.1'
|
|
invalidation_port = Cartodb.get_config(:invalidation_service, 'port') || 3142
|
|
invalidation_timeout = Cartodb.get_config(:invalidation_service, 'timeout') || 5
|
|
invalidation_critical = Cartodb.get_config(:invalidation_service, 'critical') ? 1 : 0
|
|
invalidation_retry = Cartodb.get_config(:invalidation_service, 'retry') || 5
|
|
invalidation_trigger_verbose =
|
|
Cartodb.get_config(:invalidation_service).fetch('trigger_verbose', true) == true ? 1 : 0
|
|
|
|
@user.in_database(as: :superuser).run(
|
|
<<-TRIGGER
|
|
BEGIN;
|
|
CREATE OR REPLACE FUNCTION public.cdb_invalidate_varnish(table_name text) RETURNS void AS
|
|
$$
|
|
critical = #{invalidation_critical}
|
|
timeout = #{invalidation_timeout}
|
|
retry = #{invalidation_retry}
|
|
trigger_verbose = #{invalidation_trigger_verbose}
|
|
|
|
client = GD.get('invalidation', None)
|
|
|
|
if 'syslog' not in GD:
|
|
import syslog
|
|
GD['syslog'] = syslog
|
|
else:
|
|
syslog = GD['syslog']
|
|
|
|
if 'time' not in GD:
|
|
import time
|
|
GD['time'] = time
|
|
else:
|
|
time = GD['time']
|
|
|
|
if 'json' not in GD:
|
|
import json
|
|
GD['json'] = json
|
|
else:
|
|
json = GD['json']
|
|
|
|
start = time.time()
|
|
retries = 0
|
|
termination_state = 1
|
|
error = ''
|
|
|
|
while True:
|
|
|
|
if not client:
|
|
try:
|
|
import redis
|
|
client = GD['invalidation'] = redis.Redis(host='#{invalidation_host}', port=#{invalidation_port}, socket_timeout=timeout)
|
|
except Exception as err:
|
|
error = "client_error - %s" % str(err)
|
|
# NOTE: we won't retry on connection error
|
|
if critical:
|
|
plpy.error('Invalidation Service connection error: ' + str(err))
|
|
break
|
|
|
|
try:
|
|
client.execute_command('TCH', '#{@user.database_name}', table_name)
|
|
termination_state = 0
|
|
error = ''
|
|
break
|
|
except Exception as err:
|
|
error = "request_error - %s" % str(err)
|
|
if trigger_verbose:
|
|
plpy.warning('Invalidation Service warning: ' + str(err))
|
|
client = GD['invalidation'] = None # force reconnect
|
|
if not retry:
|
|
if critical:
|
|
plpy.error('Invalidation Service error: ' + str(err))
|
|
break
|
|
retries = retries + 1
|
|
retry -= 1 # try reconnecting
|
|
|
|
end = time.time()
|
|
invalidation_duration = (end - start)
|
|
current_time = time.strftime("%Y-%m-%d %H:%M:%S %Z", time.localtime())
|
|
session_user = plpy.execute("SELECT session_user", 1)[0]["session_user"]
|
|
invalidation_result = {"timestamp": current_time, "duration": round(invalidation_duration, 8), "termination_state": termination_state, "retries": retries, "error": error, "database": "#{@user.database_name}", "table_name": table_name, "dbuser": session_user}
|
|
|
|
if trigger_verbose:
|
|
syslog.syslog(syslog.LOG_INFO, "invalidation: %s" % json.dumps(invalidation_result))
|
|
|
|
$$
|
|
LANGUAGE '#{@plpythonu}' VOLATILE;
|
|
REVOKE ALL ON FUNCTION public.cdb_invalidate_varnish(TEXT) FROM PUBLIC;
|
|
COMMIT;
|
|
TRIGGER
|
|
)
|
|
end
|
|
|
|
# Geocoder api extension related
|
|
def build_geocoder_server_config_sql(config)
|
|
host = config['host']
|
|
port = config['port']
|
|
user = config['user']
|
|
dbname = config['dbname']
|
|
%{
|
|
SELECT cartodb.CDB_Conf_SetConf('geocoder_server_config',
|
|
'{ \"connection_str\": \"host=#{host} port=#{port} dbname=#{dbname} user=#{user}\"}'::json
|
|
);
|
|
}
|
|
end
|
|
|
|
def build_entity_config_sql
|
|
# User configuration
|
|
entity_name = @user.organization_user? ? @user.organization.name : @user.username
|
|
%{
|
|
SELECT cartodb.CDB_Conf_SetConf('user_config',
|
|
'{"is_organization": #{@user.organization_user?}, "entity_name": "#{entity_name}"}'::json
|
|
);
|
|
}
|
|
end
|
|
|
|
def build_aggregation_fdw_config_sql(config)
|
|
%{
|
|
SELECT cartodb.CDB_Conf_SetConf('fdws',
|
|
'{"aggregation":{"server":{"extensions":"postgis", "dbname":"#{config['dbname']}",
|
|
"host":"#{config['host']}", "port":"#{config['port']}"}, "users":{"public":{"user":"#{config['username']}",
|
|
"password":"#{config['password']}"} } } }'::json
|
|
);
|
|
}
|
|
end
|
|
|
|
def log_context
|
|
super.merge(current_user: user)
|
|
end
|
|
end
|
|
end
|
|
end
|