|
|
|
@ -125,7 +125,15 @@ class DataImport < Sequel::Model
|
|
|
|
|
log.append("Running on server #{server} with PID: #{Process.pid}")
|
|
|
|
|
|
|
|
|
|
begin
|
|
|
|
|
success = !!dispatch
|
|
|
|
|
self.state = STATE_UPLOADING
|
|
|
|
|
|
|
|
|
|
success = if table_copy.present? || from_query.present?
|
|
|
|
|
from_table
|
|
|
|
|
elsif service_name == 'connector'
|
|
|
|
|
!!execute_importer(*new_importer_with_connector)
|
|
|
|
|
else
|
|
|
|
|
!!execute_importer(*new_importer)
|
|
|
|
|
end
|
|
|
|
|
rescue TokenExpiredOrInvalidError => ex
|
|
|
|
|
success = false
|
|
|
|
|
begin
|
|
|
|
@ -345,18 +353,6 @@ class DataImport < Sequel::Model
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def dispatch
|
|
|
|
|
self.state = STATE_UPLOADING
|
|
|
|
|
return from_table if table_copy.present? || from_query.present?
|
|
|
|
|
|
|
|
|
|
if service_name == 'connector'
|
|
|
|
|
importer, runner, datasource_provider, manual_fields = new_importer_with_connector
|
|
|
|
|
else
|
|
|
|
|
importer, runner, datasource_provider, manual_fields = new_importer
|
|
|
|
|
end
|
|
|
|
|
execute_importer importer, runner, datasource_provider, manual_fields
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def running_import_ids
|
|
|
|
|
Resque::Worker.all.map do |worker|
|
|
|
|
|
next unless worker.job['queue'] == 'imports'
|
|
|
|
@ -827,13 +823,13 @@ class DataImport < Sequel::Model
|
|
|
|
|
synchronization = CartoDB::Synchronization::Member.new(id: synchronization_id).fetch
|
|
|
|
|
synchronization.name = self.table_name
|
|
|
|
|
synchronization.log_id = log.id
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
if importer.success?
|
|
|
|
|
imported_table = ::Table.get_by_table_id(self.table_id)
|
|
|
|
|
if !imported_table.nil? && imported_table.table_visualization
|
|
|
|
|
synchronization.visualization_id = imported_table.table_visualization.id
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
synchronization.state = 'success'
|
|
|
|
|
synchronization.error_code = nil
|
|
|
|
|
synchronization.error_message = nil
|
|
|
|
|