781 lines
29 KiB
Ruby
781 lines
29 KiB
Ruby
require 'pg'
|
|
require 'redis'
|
|
require 'yaml'
|
|
require 'json'
|
|
require 'logger'
|
|
require 'optparse'
|
|
require 'digest'
|
|
require 'securerandom'
|
|
|
|
require_relative 'config'
|
|
require_relative 'utils'
|
|
require_relative 'legacy_functions'
|
|
|
|
module CartoDB
|
|
module DataMover
|
|
class ImportJob
|
|
include CartoDB::DataMover::Utils
|
|
include CartoDB::DataMover::LegacyFunctions
|
|
attr_reader :logger
|
|
|
|
def initialize(options)
|
|
default_options = { data: true, metadata: true, set_banner: true, update_metadata: true }
|
|
@options = default_options.merge(options)
|
|
@config = CartoDB::DataMover::Config.config
|
|
@logger = @options[:logger] || default_logger
|
|
@@importjob_logger = @options[:import_job_logger]
|
|
|
|
@start = Time.now
|
|
@logger.debug "Starting import job with options: #{@options}"
|
|
|
|
@target_dbport = ENV['USER_DB_PORT'] || @config[:dbport]
|
|
@target_dbhost = @options[:host] || @config[:dbhost]
|
|
|
|
raise "File #{@options[:file]} does not exist!" unless File.exists?(@options[:file])
|
|
# User(s) metadata json
|
|
@pack_config = JSON::parse File.read(@options[:file])
|
|
|
|
@path = File.expand_path(File.dirname(@options[:file])) + "/"
|
|
|
|
job_uuid = @options[:job_uuid] || SecureRandom.uuid
|
|
@import_log = { job_uuid: job_uuid,
|
|
id: nil,
|
|
type: 'import',
|
|
path: @path,
|
|
start: @start,
|
|
end: nil,
|
|
elapsed_time: nil,
|
|
server: `hostname`.strip,
|
|
pid: Process.pid,
|
|
db_target: @target_dbhost,
|
|
status: nil,
|
|
trace: nil
|
|
}
|
|
|
|
@target_dbname = target_dbname
|
|
end
|
|
|
|
def run!
|
|
if @pack_config['organization']
|
|
process_org
|
|
else
|
|
process_user
|
|
end
|
|
end
|
|
|
|
def rollback!
|
|
close_all_database_connections
|
|
if @pack_config['organization']
|
|
rollback_org
|
|
else
|
|
rollback_user
|
|
end
|
|
end
|
|
|
|
def terminate_connections
|
|
@user_conn && @user_conn.close
|
|
@user_conn = nil
|
|
|
|
@superuser_user_conn && @superuser_user_conn.close
|
|
@superuser_user_conn = nil
|
|
|
|
@superuser_conn && @superuser_conn.close
|
|
@superuser_conn = nil
|
|
end
|
|
|
|
def db_exists?
|
|
superuser_pg_conn.query("select 1 from pg_database where datname = '#{@target_dbname}'").count > 0
|
|
end
|
|
|
|
private
|
|
|
|
def for_each_oauth_app_user(user_id)
|
|
Carto::User.find(user_id).oauth_app_users.each do |oau|
|
|
yield superuser_user_pg_conn, oau
|
|
end
|
|
rescue PG::Error => e
|
|
# Ignore role already exists errors
|
|
if e.message =~ /already exists/
|
|
@logger.warn "Warning: Oauth app user role already exists"
|
|
else
|
|
throw e
|
|
end
|
|
end
|
|
|
|
def process_user
|
|
@target_username = @pack_config['user']['username']
|
|
@target_userid = @pack_config['user']['id']
|
|
@import_log[:id] = @pack_config['user']['username']
|
|
@target_port = ENV['USER_DB_PORT'] || @config[:dbport]
|
|
|
|
if org_import?
|
|
@target_dbuser = database_username(@target_userid)
|
|
@target_schema = @pack_config['user']['database_schema']
|
|
@target_org_id = nil
|
|
else
|
|
organization_data = get_org_info(@options[:target_org])
|
|
@target_dbuser = database_username(@target_userid)
|
|
@target_schema = @pack_config['user']['database_schema']
|
|
@target_org_id = organization_data['id']
|
|
|
|
if owner?(organization_data)
|
|
# If the user being imported into an org is the owner of the org, we make the import as it were a single-user
|
|
@target_is_owner = true
|
|
else
|
|
# We fill the missing configuration data for the owner
|
|
organization_owner_data = get_user_info(organization_data['owner_id'])
|
|
@target_dbhost = @options[:host] || organization_owner_data['database_host']
|
|
@target_is_owner = false
|
|
end
|
|
end
|
|
|
|
if @options[:mode] == :import
|
|
import_user
|
|
elsif @options[:mode] == :rollback
|
|
rollback_user
|
|
end
|
|
end
|
|
|
|
def process_org
|
|
@organization_id = @pack_config['organization']['id']
|
|
@owner_id = @pack_config['organization']['owner_id']
|
|
@import_log[:id] = @organization_id
|
|
|
|
if @options[:mode] == :import
|
|
import_org
|
|
elsif @options[:mode] == :rollback
|
|
rollback_org
|
|
end
|
|
end
|
|
|
|
def rollback_user
|
|
if @options[:metadata]
|
|
rollback_metadata("user_#{@target_userid}_metadata_undo.sql")
|
|
rollback_redis("user_#{@target_userid}_metadata_undo.redis")
|
|
end
|
|
if @options[:data]
|
|
drop_database(@target_dbname) if @options[:drop_database] && !@options[:schema_mode]
|
|
drop_role(@target_dbuser) if @options[:drop_roles]
|
|
end
|
|
end
|
|
|
|
def import_user
|
|
begin
|
|
if @options[:metadata]
|
|
check_user_exists_redis
|
|
check_user_exists_postgres
|
|
end
|
|
rescue => e
|
|
@logger.error "Error in sanity checks: #{e}"
|
|
log_error(e)
|
|
remove_user_mover_banner(@pack_config['user']['id']) if @options[:set_banner]
|
|
throw e
|
|
end
|
|
|
|
if @options[:data]
|
|
# Password should be passed here too
|
|
create_user(@target_dbuser)
|
|
create_org_role(@target_dbname) # Create org role for the original org
|
|
create_org_owner_role(@target_dbname)
|
|
if org_import?
|
|
grant_user_org_role(@target_dbuser, @target_dbname)
|
|
end
|
|
|
|
if @target_schema != 'public'
|
|
set_user_search_path(@target_dbuser, @pack_config['user']['database_schema'])
|
|
create_public_db_user(@target_userid, @pack_config['user']['database_schema'])
|
|
end
|
|
|
|
@pack_config['roles'].each do |user, roles|
|
|
roles.each { |role| grant_user_role(user, role) }
|
|
end
|
|
begin
|
|
if @target_org_id && @target_is_owner && File.exists?("#{@path}org_#{@target_org_id}.dump")
|
|
create_db(@target_dbname, true)
|
|
create_org_oauth_app_user_roles(@target_org_id)
|
|
create_org_api_key_roles(@target_org_id)
|
|
import_pgdump("org_#{@target_org_id}.dump")
|
|
grant_org_oauth_app_user_roles(@target_org_id)
|
|
grant_org_api_key_roles(@target_org_id)
|
|
elsif File.exists? "#{@path}user_#{@target_userid}.dump"
|
|
create_db(@target_dbname, true)
|
|
create_user_oauth_app_user_roles(@target_userid)
|
|
create_user_api_key_roles(@target_userid)
|
|
import_pgdump("user_#{@target_userid}.dump")
|
|
grant_user_oauth_app_user_roles(@target_userid)
|
|
grant_user_api_key_roles(@target_userid)
|
|
elsif File.exists? "#{@path}#{@target_username}.schema.sql"
|
|
create_db(@target_dbname, false)
|
|
run_file_restore_schema("#{@target_username}.schema.sql")
|
|
end
|
|
rescue => e
|
|
begin
|
|
remove_user_mover_banner(@pack_config['user']['id'])
|
|
ensure
|
|
log_error(e)
|
|
throw e
|
|
end
|
|
end
|
|
end
|
|
|
|
if @options[:metadata]
|
|
begin
|
|
import_redis("user_#{@target_userid}_metadata.redis")
|
|
import_metadata("user_#{@target_userid}_metadata.sql")
|
|
rescue => e
|
|
rollback_metadata("user_#{@target_userid}_metadata_undo.sql")
|
|
rollback_redis("user_#{@target_userid}_metadata_undo.redis")
|
|
log_error(e)
|
|
remove_user_mover_banner(@pack_config['user']['id']) if @options[:set_banner]
|
|
throw e
|
|
end
|
|
end
|
|
|
|
if @options[:data]
|
|
configure_database(@target_dbhost)
|
|
end
|
|
|
|
if @options[:update_metadata]
|
|
update_metadata_user(@target_dbhost)
|
|
end
|
|
|
|
log_success
|
|
remove_user_mover_banner(@pack_config['user']['id']) if @options[:set_banner]
|
|
end
|
|
|
|
def import_org
|
|
import_metadata("org_#{@organization_id}_metadata.sql") if @options[:metadata]
|
|
create_org_role(@pack_config['users'][0]['database_name']) # Create org role for the original org
|
|
@pack_config['groups'].each do |group|
|
|
create_role(group['database_role'])
|
|
end
|
|
@pack_config['users'].each do |user|
|
|
# Password should be passed here too
|
|
create_user(database_username(user['id']))
|
|
create_public_db_user(user['id'], user['database_schema'])
|
|
grant_user_org_role(database_username(user['id']), user['database_name'])
|
|
end
|
|
|
|
org_user_ids = @pack_config['users'].map{|u| u['id']}
|
|
# We set the owner to be imported first (if schemas are not split, this will also import the whole org database)
|
|
org_user_ids = org_user_ids.insert(0, org_user_ids.delete(@owner_id))
|
|
|
|
org_user_ids.each do |user|
|
|
@logger.info("Importing org user #{user}..")
|
|
i = ImportJob.new(file: @path + "user_#{user}.json",
|
|
mode: @options[:mode],
|
|
host: @target_dbhost,
|
|
target_org: @pack_config['organization']['name'],
|
|
logger: @logger, data: @options[:data], metadata: @options[:metadata],
|
|
update_metadata: @options[:update_metadata])
|
|
i.run!
|
|
end
|
|
rescue => e
|
|
rollback_metadata("org_#{@organization_id}_metadata_undo.sql") if @options[:metadata]
|
|
log_error(e)
|
|
raise e
|
|
else
|
|
log_success
|
|
ensure
|
|
@pack_config['users'].each do |user|
|
|
remove_user_mover_banner(user['id']) if @options[:set_banner]
|
|
end
|
|
end
|
|
|
|
def rollback_org
|
|
db = @pack_config['users'][0]['database_name']
|
|
@pack_config['users'].each do |user|
|
|
@logger.info("Rolling back metadata for org user #{user['id']}..")
|
|
ImportJob.new(file: @path + "user_#{user['id']}.json",
|
|
mode: :rollback,
|
|
host: @target_dbhost,
|
|
target_org: @pack_config['organization']['name'],
|
|
logger: @logger,
|
|
metadata: @options[:metadata],
|
|
data: false).run!
|
|
end
|
|
rollback_metadata("org_#{@organization_id}_metadata_undo.sql") if @options[:metadata]
|
|
if @options[:data]
|
|
drop_database(db) if @options[:drop_database]
|
|
if @options[:drop_roles]
|
|
drop_role(org_role_name(db))
|
|
@pack_config['users'].each { |u| drop_role(database_username(u['id'])) }
|
|
@pack_config['groups'].each { |g| drop_role(g['database_role']) }
|
|
end
|
|
end
|
|
end
|
|
|
|
def drop_role(role)
|
|
superuser_pg_conn.query("DROP ROLE IF EXISTS \"#{role}\"")
|
|
end
|
|
|
|
def get_org_info(orgname)
|
|
result = metadata_pg_conn.query('SELECT * FROM organizations WHERE name = $1', [orgname])
|
|
raise "Organization #{orgname} not found" if result.cmd_tuples == 0
|
|
result[0]
|
|
end
|
|
|
|
def get_user_info(user_id)
|
|
result = metadata_pg_conn.query('SELECT * FROM users WHERE id = $1', [user_id])
|
|
raise "User with ID #{user_id} not found" if result.cmd_tuples == 0
|
|
result[0]
|
|
end
|
|
|
|
def set_user_statement_timeout(user, timeout)
|
|
superuser_pg_conn.query("ALTER USER #{superuser_pg_conn.quote_ident(user)} SET statement_timeout = #{timeout}")
|
|
end
|
|
|
|
def set_db_statement_timeout(db, timeout)
|
|
superuser_pg_conn.query("ALTER DATABASE #{superuser_pg_conn.quote_ident(db)} SET statement_timeout = #{timeout}")
|
|
end
|
|
|
|
def close_all_database_connections(database_name = @target_dbname)
|
|
superuser_pg_conn.query("SELECT pg_terminate_backend(pg_stat_activity.pid)
|
|
FROM pg_stat_activity
|
|
WHERE pg_stat_activity.datname = '#{database_name}'
|
|
AND pid <> pg_backend_pid();")
|
|
terminate_connections
|
|
end
|
|
|
|
def user_pg_conn
|
|
@user_conn ||= PG.connect(host: @target_dbhost,
|
|
user: @target_dbuser,
|
|
dbname: @target_dbname,
|
|
port: @config[:dbport],
|
|
connect_timeout: CartoDB::DataMover::Config.config[:connect_timeout])
|
|
end
|
|
|
|
def superuser_user_pg_conn
|
|
@superuser_user_conn ||= PG.connect(host: @target_dbhost,
|
|
user: @config[:dbuser],
|
|
dbname: @target_dbname,
|
|
port: @target_dbport,
|
|
connect_timeout: CartoDB::DataMover::Config.config[:connect_timeout])
|
|
end
|
|
|
|
def superuser_pg_conn
|
|
@superuser_conn ||= PG.connect(host: @target_dbhost,
|
|
user: @config[:dbuser],
|
|
dbname: 'postgres',
|
|
port: @target_dbport,
|
|
connect_timeout: CartoDB::DataMover::Config.config[:connect_timeout])
|
|
end
|
|
|
|
def drop_database(db_name)
|
|
close_all_database_connections(db_name)
|
|
superuser_pg_conn.query("DROP DATABASE IF EXISTS \"#{db_name}\"")
|
|
end
|
|
|
|
def clean_oids(user_id, user_schema)
|
|
tables = superuser_user_pg_conn.query("SELECT pg_class.oid, pg_class.relname
|
|
FROM pg_class inner join pg_namespace on pg_namespace.oid=pg_class.relnamespace where relkind='r' and nspname=$1;", [user_schema])
|
|
tables.each do |row|
|
|
metadata_pg_conn.query('UPDATE user_tables SET table_id=$1 where user_id = $2 and name=$3', [row['oid'], user_id, row['relname']])
|
|
end
|
|
end
|
|
|
|
def check_user_exists_postgres
|
|
@logger.debug 'Checking if user does not exist on Postgres metadata...'
|
|
result = metadata_pg_conn.query('SELECT * FROM USERS WHERE id = $1', [@target_userid])
|
|
raise "User already exists in PostgreSQL metadata" if result.cmd_tuples != 0
|
|
end
|
|
|
|
def check_user_exists_redis
|
|
@logger.debug 'Checking if user does not exist on Redis metadata...'
|
|
result = metadata_redis_conn.hgetall("rails:users:#{@target_dbname}")
|
|
raise "User already exists in Redis metadata" if result != {}
|
|
end
|
|
|
|
def conn_string(user, host, port, name)
|
|
%{#{!user ? '' : '-U ' + user} -h #{host} -p #{port} -d #{name} }
|
|
end
|
|
|
|
def run_redis_command(config)
|
|
"redis-cli -p #{config[:redis_port]} -h #{config[:redis_host]} --pipe"
|
|
end
|
|
|
|
def run_file_redis(file)
|
|
run_command("cat #{@path}#{file} | #{run_redis_command(@config)}")
|
|
end
|
|
|
|
def run_file_metadata_postgres(file)
|
|
run_command("cat #{@path}#{file} | psql -v ON_ERROR_STOP=1 #{conn_string(@config[:dbuser], @config[:dbhost], @config[:dbport], @config[:dbname])}")
|
|
end
|
|
|
|
def run_file_restore_postgres(file, sections = nil)
|
|
file_path = "#{@path}#{file}"
|
|
command = "#{pg_restore_bin_path(file_path)} -e --verbose -j4 --disable-triggers -Fc #{file_path} #{conn_string(
|
|
@config[:dbuser],
|
|
@target_dbhost,
|
|
@config[:user_dbport],
|
|
@target_dbname)}"
|
|
command += " --section=#{sections}" if sections
|
|
command += " --use-list=\"#{@toc_file}\""
|
|
run_command(command)
|
|
end
|
|
|
|
def run_file_restore_schema(file)
|
|
run_command("cat #{@path}#{file} | psql -v ON_ERROR_STOP=1 #{conn_string(
|
|
@config[:dbuser],
|
|
@target_dbhost,
|
|
@config[:user_dbport],
|
|
@target_dbname)}")
|
|
end
|
|
|
|
def import_redis(file)
|
|
@logger.info("Importing Redis keys from #{file}..")
|
|
run_file_redis(file)
|
|
end
|
|
|
|
def import_metadata(file)
|
|
@logger.info("Importing PostgreSQL metadata from #{file}..")
|
|
run_file_metadata_postgres(file)
|
|
end
|
|
|
|
def rollback_redis(file)
|
|
@logger.info("Rolling back Redis keys from #{file}..")
|
|
run_file_redis(file)
|
|
end
|
|
|
|
def rollback_metadata(file)
|
|
@logger.info("Rolling back PostgreSQL metadata from #{file}..")
|
|
run_file_metadata_postgres(file)
|
|
end
|
|
|
|
def clean_toc_file(file)
|
|
tmp = Tempfile.new("extract_#{@target_username}.txt")
|
|
File.open(file, 'r').each do |l|
|
|
tmp << l unless remove_line?(l)
|
|
end
|
|
|
|
tmp.close
|
|
FileUtils.mv(tmp.path, file)
|
|
ensure
|
|
tmp.delete
|
|
end
|
|
|
|
def toc_file(file)
|
|
toc_file = "#{@path}user_#{@target_username}.list"
|
|
command = "#{pg_restore_bin_path(file)} -l #{file} --file='#{toc_file}'"
|
|
run_command(command)
|
|
clean_toc_file(toc_file)
|
|
toc_file
|
|
end
|
|
|
|
# It would be a good idea to "disable" the invalidation trigger during the load
|
|
# This way, the restore will be much faster and won't also cause a big overhead
|
|
# in the old database while the process is ongoing
|
|
# Disabling it may be hard. Maybe it's easier to just exclude it in the export.
|
|
def import_pgdump(dump)
|
|
@logger.info("Importing dump from #{dump} using pg_restore..")
|
|
@toc_file = toc_file("#{@path}#{dump}")
|
|
|
|
run_file_restore_postgres(dump, 'pre-data')
|
|
run_file_restore_postgres(dump, 'data')
|
|
run_file_restore_postgres(dump, 'post-data')
|
|
end
|
|
|
|
def create_user(username, password = nil)
|
|
@logger.info "Creating user #{username} on target db.."
|
|
begin
|
|
superuser_pg_conn.query("CREATE ROLE \"#{username}\";
|
|
ALTER ROLE \"#{username}\" WITH NOSUPERUSER INHERIT NOCREATEROLE NOCREATEDB LOGIN NOREPLICATION;
|
|
GRANT publicuser TO \"#{username}\";")
|
|
superuser_pg_conn.query("ALTER ROLE #{username} WITH PASSWORD '#{password}'") unless password.nil?
|
|
rescue PG::Error => e
|
|
@logger.info "Target Postgres role already exists: #{e.inspect}"
|
|
end
|
|
end
|
|
|
|
def create_org_api_key_roles(org_id)
|
|
Carto::Organization.find(org_id).users.each { |u| create_user_api_key_roles(u.id) }
|
|
end
|
|
|
|
def create_user_api_key_roles(user_id)
|
|
Carto::User.find(user_id).api_keys.select(&:needs_setup?).each do |k|
|
|
begin
|
|
k.role_creation_queries.each { |q| superuser_user_pg_conn.query(q) }
|
|
rescue PG::Error => e
|
|
# Ignore role already exists errors
|
|
throw e unless e.message =~ /already exists/
|
|
end
|
|
end
|
|
end
|
|
|
|
def grant_org_api_key_roles(org_id)
|
|
Carto::Organization.find(org_id).users.each { |u| grant_user_api_key_roles(u.id) }
|
|
end
|
|
|
|
def grant_user_api_key_roles(user_id)
|
|
Carto::User.find(user_id).api_keys.select(&:needs_setup?).each do |k|
|
|
k.role_permission_queries.each { |q| superuser_user_pg_conn.query(q) }
|
|
k.grant_ownership_role_privileges
|
|
end
|
|
end
|
|
|
|
def create_org_oauth_app_user_roles(org_id)
|
|
Carto::Organization.find(org_id).users.each { |u| create_user_oauth_app_user_roles(u.id) }
|
|
end
|
|
|
|
def create_user_oauth_app_user_roles(user_id)
|
|
for_each_oauth_app_user(user_id) do |conn, oau|
|
|
conn.query(oau.create_dataset_role_query)
|
|
end
|
|
|
|
# different loops to avoid failing to create ownership role due to an error in the dataset role
|
|
for_each_oauth_app_user(user_id) do |conn, oau|
|
|
conn.query(oau.create_ownership_role_query)
|
|
end
|
|
end
|
|
|
|
def grant_org_oauth_app_user_roles(org_id)
|
|
Carto::Organization.find(org_id).users.each { |u| grant_user_oauth_app_user_roles(u.id) }
|
|
end
|
|
|
|
def grant_user_oauth_app_user_roles(user_id)
|
|
Carto::User.find(user_id).oauth_app_users.each do |oau|
|
|
begin
|
|
oau.grant_dataset_role_privileges
|
|
rescue Carto::OauthProvider::Errors::InvalidScope => e
|
|
# Ignore managed oauth_app_user errors
|
|
@logger.error "Error granting permissions to dataset role: #{e}"
|
|
end
|
|
|
|
begin
|
|
oau.grant_ownership_role_privileges
|
|
rescue Carto::OauthProvider::Errors::InvalidScope => e
|
|
# Ignore managed oauth_app_user errors
|
|
@logger.error "Error granting permissions to ownership role: #{e}"
|
|
end
|
|
end
|
|
end
|
|
|
|
def org_role_name(database_name)
|
|
"cdb_org_member_#{Digest::MD5.hexdigest(database_name)}"
|
|
end
|
|
|
|
def org_owner_role_name(database_name)
|
|
"#{database_name}_a"
|
|
end
|
|
|
|
def grant_user_role(user, role)
|
|
superuser_pg_conn.query("GRANT \"#{role}\" TO \"#{user}\"")
|
|
end
|
|
|
|
def grant_user_org_role(user, database_name)
|
|
grant_user_role(user, org_role_name(database_name))
|
|
end
|
|
|
|
def create_role(role, createrole = false)
|
|
@logger.info "Creating role #{role} on target db.."
|
|
begin
|
|
superuser_pg_conn.query("CREATE ROLE \"#{role}\" #{createrole ? 'CREATEROLE' : ''} NOLOGIN;")
|
|
rescue PG::Error => e
|
|
@logger.info "Target org role already exists: #{e.inspect}"
|
|
end
|
|
end
|
|
|
|
def create_org_role(database_name)
|
|
create_role(org_role_name(database_name))
|
|
end
|
|
|
|
def create_org_owner_role(database_name)
|
|
create_role(org_owner_role_name(database_name), true)
|
|
end
|
|
|
|
def create_public_db_user(user_id, schema)
|
|
user = "cartodb_publicuser_#{user_id}"
|
|
create_user(user)
|
|
superuser_pg_conn.query("GRANT publicuser TO \"#{user}\"")
|
|
set_user_search_path(user, schema)
|
|
end
|
|
|
|
def set_user_search_path(user, search_path_prefix)
|
|
search_path = CartoDB::UserModule::DBService.build_search_path(search_path_prefix)
|
|
superuser_pg_conn.query("ALTER USER \"#{user}\" SET search_path= #{search_path}")
|
|
end
|
|
|
|
def create_db(dbname, blank)
|
|
# Blank is when the database should be created empty (will receive a pg_dump).
|
|
# blank = false: it should have postgis, cartodb/cdb_importer/cdb schemas
|
|
# connect as superuser (postgres)
|
|
@logger.info "Creating user DB #{dbname}..."
|
|
begin
|
|
if blank
|
|
superuser_pg_conn.query("CREATE DATABASE \"#{dbname}\"")
|
|
else
|
|
superuser_pg_conn.query("CREATE DATABASE \"#{dbname}\" WITH TEMPLATE template_postgis")
|
|
end
|
|
# This rescue can be improved a little bit. The way it is it assumes that the error
|
|
# will always be that the db already exists
|
|
rescue PG::Error => e
|
|
if blank
|
|
@logger.error "Error: Database already exists"
|
|
raise e
|
|
else
|
|
@logger.warn "Warning: Database already exists"
|
|
end
|
|
end
|
|
|
|
setup_db(dbname) unless blank
|
|
end
|
|
|
|
def setup_db(_dbname)
|
|
['plpythonu', 'postgis'].each do |extension|
|
|
superuser_user_pg_conn.query("CREATE EXTENSION IF NOT EXISTS #{extension}")
|
|
end
|
|
cartodb_schema = superuser_user_pg_conn.query("SELECT nspname FROM pg_catalog.pg_namespace where nspname = 'cartodb'")
|
|
superuser_user_pg_conn.query("CREATE SCHEMA cartodb") if cartodb_schema.count == 0
|
|
cdb_importer_schema = superuser_user_pg_conn.query("SELECT nspname FROM pg_catalog.pg_namespace where nspname = 'cdb_importer'")
|
|
superuser_user_pg_conn.query("CREATE SCHEMA cdb_importer") if cdb_importer_schema.count == 0
|
|
cdb_schema = superuser_user_pg_conn.query("SELECT nspname FROM pg_catalog.pg_namespace where nspname = 'cdb'")
|
|
superuser_user_pg_conn.query("CREATE SCHEMA cdb") if cdb_schema.count == 0
|
|
superuser_user_pg_conn.query("CREATE EXTENSION IF NOT EXISTS cartodb WITH SCHEMA cartodb")
|
|
superuser_user_pg_conn.query("SELECT CDB_DisableGhostTablesTrigger()")
|
|
rescue PG::Error => e
|
|
@logger.error "Error: Cannot setup DB"
|
|
raise e
|
|
end
|
|
|
|
def update_database_retries(userid, username, db_host, db_name, retries = 1)
|
|
update_database(userid, username, db_host, db_name)
|
|
rescue => e
|
|
@logger.error "Error updating database"
|
|
if retries > 0
|
|
@logger.info "Retrying..."
|
|
update_database_retries(userid, username, db_host, db_name, retries - 1)
|
|
else
|
|
@logger.info "No more retries"
|
|
throw e
|
|
end
|
|
end
|
|
|
|
def update_database(userid, username, db_host, db_name)
|
|
update_postgres_database_host(userid, db_host)
|
|
update_redis_database_host(username, db_host)
|
|
|
|
update_postgres_database_name(userid, db_name)
|
|
update_redis_database_name(username, db_name)
|
|
end
|
|
|
|
def update_postgres_database_host(userid, db)
|
|
@logger.info "Updating PostgreSQL database_host..."
|
|
metadata_pg_conn.exec("UPDATE users SET database_host = $1 WHERE id = $2", [db, userid])
|
|
end
|
|
|
|
def update_postgres_organization(userid, org_id)
|
|
@logger.info "Updating PostgreSQL organization..."
|
|
metadata_pg_conn.exec("UPDATE users SET organization_id = $1 WHERE id = $2", [org_id, userid])
|
|
end
|
|
|
|
def update_redis_database_host(user, db)
|
|
@logger.info "Updating Redis database_host..."
|
|
metadata_redis_conn.hset("rails:users:#{user}", 'database_host', db)
|
|
end
|
|
|
|
def update_postgres_database_name(userid, db)
|
|
@logger.info "Updating PostgreSQL database_name..."
|
|
metadata_pg_conn.exec("UPDATE users SET database_name = $1 WHERE id = $2", [db, userid])
|
|
end
|
|
|
|
def update_redis_database_name(user, db)
|
|
@logger.info "Updating Redis database_name..."
|
|
metadata_redis_conn.hset("rails:users:#{user}", 'database_name', db)
|
|
end
|
|
|
|
def configure_database(target_dbhost)
|
|
# Note: this will change database_host on the user model to perform configuration but will not actually store
|
|
# the change
|
|
user_model = ::User.find(username: @target_username)
|
|
user_model.database_host = target_dbhost
|
|
user_model.database_name = @target_dbname
|
|
user_model.organization_id = @target_org_id if !@target_org_id.nil?
|
|
|
|
user_model.db_service.setup_organization_owner if @target_is_owner
|
|
user_model.db_service.monitor_user_notification # Used to inform the database_server
|
|
user_model.db_service.configure_database
|
|
user_model.db_service.monitor_user_notification
|
|
end
|
|
|
|
def update_metadata_user(target_dbhost)
|
|
user_model = ::User.find(username: @target_username)
|
|
orig_dbhost = user_model.database_host
|
|
changed_metadata = false
|
|
begin
|
|
clean_oids(@target_userid, @target_schema)
|
|
if @target_org_id
|
|
update_postgres_organization(@target_userid, @target_org_id)
|
|
else
|
|
update_postgres_organization(@target_userid, nil)
|
|
end
|
|
begin
|
|
update_database_retries(@target_userid, @target_username, target_dbhost, @target_dbname, 1)
|
|
changed_metadata = true
|
|
user_model.reload
|
|
end
|
|
rescue => e
|
|
if changed_metadata
|
|
update_database_retries(@target_userid, @target_username, orig_dbhost, @target_dbname, 1)
|
|
end
|
|
log_error(e)
|
|
remove_user_mover_banner(@pack_config['user']['id']) if @options[:set_banner]
|
|
throw e
|
|
end
|
|
end
|
|
|
|
def importjob_logger
|
|
@@importjob_logger ||= CartoDB.unformatted_logger("#{Rails.root}/log/datamover.log")
|
|
end
|
|
|
|
def log_error(e)
|
|
@logger.error e
|
|
@import_log[:end] = Time.now
|
|
@import_log[:elapsed_time] = (@import_log[:end] - @import_log[:start]).ceil
|
|
@import_log[:status] = 'failure'
|
|
@import_log[:trace] = e.to_s
|
|
importjob_logger.info(@import_log.to_json)
|
|
end
|
|
|
|
def log_success
|
|
@import_log[:end] = Time.now
|
|
@import_log[:elapsed_time] = (@import_log[:end] - @import_log[:start]).ceil
|
|
@import_log[:status] = 'success'
|
|
importjob_logger.info(@import_log.to_json)
|
|
end
|
|
|
|
def pg_restore_bin_path(dump)
|
|
get_pg_restore_bin_path(superuser_pg_conn, dump)
|
|
end
|
|
|
|
def target_dbname
|
|
return @pack_config['users'][0]['database_name'] if @pack_config['organization']
|
|
|
|
@target_userid = @pack_config['user']['id']
|
|
if org_import?
|
|
user_database(@target_userid)
|
|
else
|
|
organization_data = get_org_info(@options[:target_org])
|
|
if owner?(organization_data)
|
|
user_database(@target_userid)
|
|
else
|
|
organization_owner_data = get_user_info(organization_data['owner_id'])
|
|
organization_owner_data['database_name']
|
|
end
|
|
end
|
|
end
|
|
|
|
def owner?(organization_data)
|
|
@pack_config['user']['id'] == organization_data['owner_id']
|
|
end
|
|
|
|
def org_import?
|
|
@options[:target_org] == nil
|
|
end
|
|
|
|
def organization_import?
|
|
@pack_config['organization'] != nil
|
|
end
|
|
end
|
|
end
|
|
end
|