56 lines
2.1 KiB
Ruby
56 lines
2.1 KiB
Ruby
|
require_relative './base_job'
|
||
|
|
||
|
module Resque
|
||
|
class DoSyncJobs < BaseJob
|
||
|
@queue = :imports
|
||
|
|
||
|
def self.perform(options = {})
|
||
|
run_action(options, @queue, lambda { |options|
|
||
|
data_import = DataImport[options.symbolize_keys[:job_id]]
|
||
|
|
||
|
user = Carto::User.find(data_import.user_id)
|
||
|
licensing_service = Carto::DoLicensingService.new(user.username)
|
||
|
subscription_info = get_subscription_info(data_import)
|
||
|
|
||
|
write_do_sync_status(subscription_info, data_import, licensing_service)
|
||
|
data_import.run_import!
|
||
|
write_do_sync_status(subscription_info, data_import, licensing_service)
|
||
|
})
|
||
|
end
|
||
|
|
||
|
def self.write_do_sync_status(subscription_info, data_import, licensing_service)
|
||
|
subscription = licensing_service.subscription(subscription_info['subscription_id'])
|
||
|
# Check if the subscription has been removed during the synchronization process:
|
||
|
if subscription.nil?
|
||
|
# Remove just-imported table...
|
||
|
Carto::UserTable.find(data_import.table_id).visualization.destroy
|
||
|
raise StandardError.new("Subscription not found after import! (tablename: #{data_import.table_name})")
|
||
|
end
|
||
|
|
||
|
status_name = 'syncing'
|
||
|
unsyncable_reason = nil
|
||
|
unsynced_errors = []
|
||
|
if data_import.state == 'complete' && data_import.success == true then
|
||
|
status_name = 'synced'
|
||
|
elsif data_import.state != 'pending' then
|
||
|
sync_info = licensing_service.get_sync_status(subscription[:dataset_id])
|
||
|
status_name, unsyncable_reason, unsynced_errors = sync_info.values_at(:sync_status, :unsyncable_reason, :unsynced_errors)
|
||
|
end
|
||
|
|
||
|
licensing_service.add_to_redis(subscription.merge({
|
||
|
sync_status: status_name,
|
||
|
unsyncable_reason: unsyncable_reason,
|
||
|
unsynced_errors: unsynced_errors,
|
||
|
sync_table: data_import.table_name,
|
||
|
sync_table_id: data_import.table_id,
|
||
|
synchronization_id: data_import.synchronization_id
|
||
|
}))
|
||
|
end
|
||
|
|
||
|
def self.get_subscription_info(data_import)
|
||
|
service_item_id = data_import.service_item_id
|
||
|
JSON.parse(service_item_id)
|
||
|
end
|
||
|
end
|
||
|
end
|