You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
344 lines
13 KiB
344 lines
13 KiB
namespace :cartodb do
|
|
# This rake retrieves all sync tables that should get synchronized, and puts the synchronization tasks at Resque
|
|
# NOTE: This version does not mark the tables as "enqueued", should be done if planning to run multiple instances
|
|
desc 'Runs the sync tables process'
|
|
task :sync_tables, [:force_all_arg] => [:environment] do |task, args|
|
|
puts '> Sync tables started' if ENV['VERBOSE']
|
|
|
|
require_relative '../../services/synchronizer/lib/synchronizer/collection'
|
|
collection = CartoDB::Synchronizer::Collection.new
|
|
|
|
# This fetches and enqueues
|
|
collection.fetch_and_enqueue(args[:force_all_arg].present? ? args[:force_all_arg] : false)
|
|
|
|
puts '> Sync tables finished' if ENV['VERBOSE']
|
|
end
|
|
|
|
|
|
desc 'Adds visualization_id to every Synchronization'
|
|
task :populate_synchronization_visualization_ids => [:environment] do |task, args|
|
|
require_relative '../../services/synchronizer/lib/synchronizer/collection'
|
|
collection = CartoDB::Synchronizer::Collection.new
|
|
|
|
collection.fetch_all.each { |record|
|
|
begin
|
|
synchronization = CartoDB::Synchronization::Member.new(id: record[:id]).fetch
|
|
rescue KeyError
|
|
synchronization = nil
|
|
end
|
|
if synchronization
|
|
begin
|
|
table = UserTable.where({
|
|
name: synchronization.name,
|
|
user_id: synchronization.user_id
|
|
}).first
|
|
if table.nil?
|
|
puts "\nSync id '#{record[:id]}' related table not found"
|
|
else
|
|
table = table.service
|
|
end
|
|
rescue StandardError => exception
|
|
table = nil
|
|
puts "\nSync id '#{record[:id]}' errored: #{exception.inspect}"
|
|
end
|
|
unless table.nil?
|
|
if synchronization.visualization_id.nil?
|
|
begin
|
|
synchronization.visualization_id = table.table_visualization.id
|
|
rescue StandardError => exception
|
|
puts "\nSync id '#{record[:id]}' errored, canonical visualization not found"
|
|
end
|
|
begin
|
|
synchronization.store
|
|
printf '.'
|
|
rescue StandardError => exception
|
|
puts "\nSync id '#{record[:id]}' errored: #{exception.inspect}"
|
|
end
|
|
else
|
|
printf 'S'
|
|
end
|
|
end
|
|
else
|
|
puts "\nSync id '#{record[:id]}' errored: missing synchronization entry"
|
|
end
|
|
}
|
|
puts "\nFINISHED"
|
|
|
|
end
|
|
|
|
def report_incompatible_bq_parameters(parameters)
|
|
valid_params = %w(provider connection table sql_query import_as project dataset billing_project)
|
|
valid_conn_params = %w(billing_project service_account access_token refresh_token default_project default_dataset)
|
|
invalid_params = parameters.keys - valid_params
|
|
invalid_conn_params = (parameters['connection'] || {}).keys - valid_conn_params
|
|
puts " Invalid parameters: #{invalid_params.inspect}" if invalid_params.present?
|
|
puts " Invalid connection parameters: #{invalid_conn_params.inspect}" if invalid_conn_params.present?
|
|
end
|
|
|
|
def replicate_bq_config
|
|
puts 'Replicating configuration'
|
|
bigquery = Carto::ConnectorProvider.find_by(name: 'bigquery')
|
|
bigquery_beta = Carto::ConnectorProvider.find_by(name: 'bigquery-beta')
|
|
Carto::ConnectorConfiguration.where(connector_provider_id: bigquery.id).find_each do |bq_config|
|
|
unless Carto::ConnectorConfiguration.where(
|
|
connector_provider_id: bigquery_beta.id,
|
|
user_id: bq_config.user_id,
|
|
organization_id: bq_config.organization_id
|
|
).exists?
|
|
Carto::ConnectorConfiguration.create!(
|
|
connector_provider: bigquery_beta,
|
|
user_id: bq_config.user_id,
|
|
organization_id: bq_config.organization_id,
|
|
enabled: bq_config.enabled,
|
|
max_rows: bq_config.max_rows
|
|
)
|
|
puts " configuration for #{bq_config.user&.username || bq_config.configuration.name} replicated"
|
|
end
|
|
end
|
|
end
|
|
|
|
def bq_syncs_query(user: nil, sync: nil)
|
|
condition = ''
|
|
condition += "AND user_id = '#{user.id}'" if user.present?
|
|
condition += "AND id = '#{sync.id}'" if sync.present?
|
|
%{
|
|
service_name = 'connector'
|
|
#{condition}
|
|
AND (state IN (
|
|
'#{Carto::Synchronization::STATE_SUCCESS}', '#{Carto::Synchronization::STATE_SYNCING}',
|
|
'#{Carto::Synchronization::STATE_QUEUED}', '#{Carto::Synchronization::STATE_CREATED}')
|
|
OR (state = '#{Carto::Synchronization::STATE_FAILURE}'
|
|
AND retried_times < #{CartoDB::Synchronization::Member::MAX_RETRIES}))
|
|
AND ((service_item_id::JSON)#>>'{provider}') = 'bigquery-beta'
|
|
}
|
|
end
|
|
|
|
def bq_sync_blocked_states
|
|
[
|
|
Carto::Synchronization::STATE_CREATED,
|
|
Carto::Synchronization::STATE_QUEUED,
|
|
Carto::Synchronization::STATE_SYNCING
|
|
]
|
|
end
|
|
|
|
desc 'Port BQ syncs to beta connector'
|
|
task port_bq_syncs_to_beta: [:environment] do
|
|
dry_mode = ENV['DRY_RUN'] != 'NO'
|
|
|
|
if dry_mode
|
|
puts 'running in "dry" mode; set DRY_RUN=NO to make actual changes'
|
|
else
|
|
Rake::Task['cartodb:connectors:create_providers'].invoke
|
|
replicate_bq_config
|
|
end
|
|
|
|
number_of_pending_syncs = 0
|
|
Carto::Synchronization.where(bq_syncs_query).find_each do |synchronization|
|
|
next unless synchronization.user.state == 'active'
|
|
|
|
sleep 0.2
|
|
synchronization.transaction do
|
|
synchronization.reload
|
|
parameters = JSON.parse(synchronization.service_item_id)
|
|
if synchronization.state.in? bq_sync_blocked_states
|
|
puts "Synchronization #{synchronization.id} could not be modifed; state: #{synchronization.state}"
|
|
number_of_pending_syncs += 1
|
|
elsif dry_mode
|
|
puts "Synchronization #{synchronization.id} would be modified to use bigquery-beta"
|
|
report_incompatible_bq_parameters(parameters)
|
|
else
|
|
begin
|
|
puts "Modifying #{synchronization.id} to use bigquery-beta"
|
|
run_at = synchronization.run_at
|
|
synchronization.update! run_at: nil
|
|
|
|
# Change the provider id
|
|
parameters['provider'] = 'bigquery-beta'
|
|
synchronization.update! service_item_id: parameters.to_json
|
|
report_incompatible_bq_parameters(parameters)
|
|
rescue
|
|
raise
|
|
ensure
|
|
synchronization.update! run_at: run_at
|
|
end
|
|
end
|
|
end
|
|
end
|
|
if number_of_pending_syncs.positive?
|
|
puts "#{number_of_pending_syncs} syncs could not be modified. . Please try again later."
|
|
end
|
|
end
|
|
|
|
desc 'Port BQ beta syncs to new connector'
|
|
task :port_beta_bq_syncs_to_new, [:username_or_sync_id] => :environment do |_task, args|
|
|
dry_mode = ENV['DRY_RUN'] != 'NO'
|
|
|
|
puts 'running in "dry" mode; set DRY_RUN=NO to make actual changes' if dry_mode
|
|
|
|
if args.username_or_sync_id != 'all-the-users'
|
|
user = Carto::User.find_by(username: args.username_or_sync_id)
|
|
if user.blank?
|
|
sync = Carto::Synchronization.find_by(id: args.username_or_sync_id)
|
|
raise "User/Sync not found: #{args.username_or_sync_id}" unless sync
|
|
end
|
|
end
|
|
|
|
sql = bq_syncs_query(user: user, sync: sync)
|
|
|
|
number_of_pending_syncs = 0
|
|
Carto::Synchronization.where(sql).find_each do |synchronization|
|
|
next unless synchronization.user.state == 'active'
|
|
|
|
sleep 0.2
|
|
synchronization.transaction do
|
|
synchronization.reload
|
|
parameters = JSON.parse(synchronization.service_item_id)
|
|
if synchronization.state.in? bq_sync_blocked_states
|
|
puts "Synchronization #{synchronization.id} could not be modifed; state: #{synchronization.state}"
|
|
number_of_pending_syncs += 1
|
|
elsif dry_mode
|
|
puts "Synchronization #{synchronization.id} would be modified to use bigquery"
|
|
puts ' parameter billing_project would be moved to connection' if parameters['billing_project'].present?
|
|
report_incompatible_bq_parameters(parameters)
|
|
else
|
|
begin
|
|
puts "Modifying #{synchronization.id} to use bigquery"
|
|
run_at = synchronization.run_at
|
|
synchronization.update! run_at: nil
|
|
|
|
# Change the provider id
|
|
parameters['provider'] = 'bigquery'
|
|
# If passing the billing project out of the connection move it inside
|
|
if parameters['billing_project'].present?
|
|
puts ' Moving billing_project inside the connection parameter'
|
|
billing_project = parameters.delete('billing_project')
|
|
parameters['connection'] ||= {}
|
|
parameters['connection']['billing_project'] = billing_project
|
|
end
|
|
synchronization.update! service_item_id: parameters.to_json
|
|
report_incompatible_bq_parameters(parameters)
|
|
rescue
|
|
raise
|
|
ensure
|
|
synchronization.update! run_at: run_at
|
|
end
|
|
end
|
|
end
|
|
end
|
|
if number_of_pending_syncs.positive?
|
|
puts "#{number_of_pending_syncs} syncs could not be modified. . Please try again later."
|
|
end
|
|
end
|
|
|
|
desc 'Migrate legacy sync tables to new connections'
|
|
task :migrate_legacy_sync_tables_to_new_connections, [:username, :synchronization_ids, :flexible_password] => :environment do |_task, args|
|
|
|
|
# NOTE: Managing user
|
|
user = Carto::User.find_by(username: args[:username])
|
|
if user.blank?
|
|
puts "> ERROR: Missing user with username '#{args[:username]}'"
|
|
next
|
|
end
|
|
|
|
# NOTE: Managing synchronizations
|
|
sync_ids = (args[:synchronization_ids].try(:split, ' ') || []).uniq
|
|
synchronizations = user.synchronizations
|
|
|
|
if sync_ids.empty?
|
|
puts "> INFO: No synchronizations were provided, so all '#{user.username}' synchronizations will be analyzed"
|
|
else
|
|
synchronizations = synchronizations.where(id: sync_ids)
|
|
|
|
if synchronizations.count != sync_ids.count
|
|
puts "> ERROR: #{sync_ids.count} synchronization/s provided, but only #{synchronizations.count} found"
|
|
next
|
|
end
|
|
puts "> INFO: List of #{synchronizations.count} synchronization/s provided"
|
|
end
|
|
|
|
# NOTE: Looking for legacy synchronizations
|
|
legacy_syncs = synchronizations.where(url: ['', nil]).select do |sync|
|
|
data = JSON.parse(sync.service_item_id)
|
|
data['provider'].present? && data['connection'].present? && data['connection_id'].blank?
|
|
end
|
|
|
|
puts "> INFO: #{legacy_syncs.count} legacy synchronization/s were found"
|
|
next if legacy_syncs.count.zero?
|
|
|
|
# NOTE: Finding/creating required connections
|
|
connection_groups = legacy_syncs.group_by do |sync|
|
|
data = JSON.parse(sync.service_item_id)
|
|
[data['provider'], data['connection']]
|
|
end
|
|
|
|
puts "> INFO: #{connection_groups.count} unique connections needed"
|
|
if args[:flexible_password] == 'true'
|
|
puts "> INFO: Password parameter will be omitted while looking for existent connections"
|
|
end
|
|
|
|
connection_manager = Carto::ConnectionManager.new(user)
|
|
syncs_with_connection = connection_groups.map.with_index do |((provider, connection_info), syncs), index|
|
|
puts "> INFO: Connection #{index} - [#{provider}] #{connection_info}"
|
|
|
|
connections = user.db_connections
|
|
connection_info.each do |key, value|
|
|
next if key == 'password' && args[:flexible_password] == 'true'
|
|
connections = connections.where("parameters->>'#{key}' = ?", value)
|
|
end
|
|
connection = connections.find_by(connector: provider)
|
|
|
|
if connection.present?
|
|
puts "> INFO: Connection found [#{connection.id}]"
|
|
else
|
|
connection = connection_manager.create_db_connection(
|
|
name: "#{provider} [#{SecureRandom.hex(6)}]",
|
|
provider: provider,
|
|
parameters: connection_info
|
|
)
|
|
puts "> INFO: Connection created [#{connection.id}]"
|
|
end
|
|
|
|
{
|
|
connection: connection,
|
|
syncs: syncs,
|
|
password_changed: connection_info['password'] != connection.parameters['password']
|
|
}
|
|
rescue StandardError => exception
|
|
puts "> WARNING: Error finding/creating connection #{index} - #{exception.message}"
|
|
puts "> INFO: Skipping #{syncs.count} synchronization/s - Sample sync [#{syncs.first.id}]"
|
|
|
|
{ connection: nil, syncs: [] }
|
|
end
|
|
|
|
# NOTE: Updating and synchronizations
|
|
syncs_with_connection.each.with_index do |data, index|
|
|
next if data[:connection].nil?
|
|
|
|
puts "> INFO: Linking #{data[:syncs].count} synchronization/s to connection #{index}"
|
|
ActiveRecord::Base.transaction do
|
|
data[:syncs].each do |sync|
|
|
service_item_id = JSON.parse(sync.service_item_id).except('connection')
|
|
service_item_id['connection_id'] = data[:connection].id
|
|
|
|
sync.update!(service_item_id: service_item_id.to_json)
|
|
end
|
|
end
|
|
|
|
# NOTE: Queuing synchronizations
|
|
next unless data[:password_changed]
|
|
|
|
data[:syncs].each do |sync|
|
|
next if sync.state != 'failure' || sync.retried_times < CartoDB::Synchronization::Member::MAX_RETRIES
|
|
|
|
puts "> INFO: Queueing failed sync [#{sync.id}]"
|
|
CartoDB::Synchronization::Member.new(sync).enqueue
|
|
end
|
|
rescue StandardError => exception
|
|
puts "> WARNING: Error updating synchronizations for connection #{index} - #{exception.message}"
|
|
puts "> INFO: Skipping updating #{syncs.count} synchronization/s - Sample sync [#{data[:syncs].first.id}]"
|
|
end
|
|
rescue StandardError => exception
|
|
puts "> ERROR: Something went wrong while migrating legacy synchronizations | #{exception.inspect}"
|
|
end
|
|
end
|