1203 lines
40 KiB
Ruby
1203 lines
40 KiB
Ruby
require 'sequel'
|
|
require 'fileutils'
|
|
require 'uuidtools'
|
|
require_relative './user'
|
|
require_relative './table'
|
|
require_relative './log'
|
|
require_relative './visualization/member'
|
|
require_relative './table_registrar'
|
|
require_relative './quota_checker'
|
|
require_relative '../../lib/cartodb/errors'
|
|
require_relative '../../lib/cartodb/import_error_codes'
|
|
require_relative '../../lib/cartodb/metrics'
|
|
require_relative '../../lib/cartodb/stats/importer'
|
|
require_relative '../../config/initializers/redis'
|
|
require_relative '../../services/importer/lib/importer'
|
|
require_relative '../connectors/importer'
|
|
require_relative '../../services/importer/lib/importer/datasource_downloader'
|
|
require_relative '../../services/datasources/lib/datasources'
|
|
require_relative '../../services/importer/lib/importer/unp'
|
|
require_relative '../../services/importer/lib/importer/post_import_handler'
|
|
require_relative '../../services/importer/lib/importer/mail_notifier'
|
|
require_relative '../../services/importer/lib/importer/cartodbfy_time'
|
|
require_relative '../../services/platform-limits/platform_limits'
|
|
require_relative '../../services/importer/lib/importer/overviews'
|
|
require_relative '../../services/importer/lib/importer/connector_runner'
|
|
require_relative '../../services/importer/lib/importer/exceptions'
|
|
|
|
require_dependency 'carto/tracking/events'
|
|
require_dependency 'carto/valid_table_name_proposer'
|
|
require_dependency 'carto/configuration'
|
|
require_dependency 'carto/db/user_schema'
|
|
|
|
include CartoDB::Datasources
|
|
|
|
class DataImport < Sequel::Model
|
|
include Carto::DataImportConstants
|
|
include Carto::Configuration
|
|
|
|
MERGE_WITH_UNMATCHING_COLUMN_TYPES_RE = /No .*matches.*argument type.*/
|
|
DIRECT_STATEMENT_TIMEOUT = 1.hour * 1000
|
|
|
|
attr_accessor :log, :results
|
|
|
|
one_to_many :external_data_imports
|
|
many_to_one :user
|
|
|
|
# @see store_results() method also when adding new fields
|
|
PUBLIC_ATTRIBUTES = [
|
|
'id',
|
|
'user_id',
|
|
'table_id',
|
|
'data_type',
|
|
'table_name',
|
|
'state',
|
|
'error_code',
|
|
'queue_id',
|
|
'get_error_text',
|
|
'get_error_source',
|
|
'tables_created_count',
|
|
'synchronization_id',
|
|
'service_name',
|
|
'service_item_id',
|
|
'type_guessing',
|
|
'quoted_fields_guessing',
|
|
'content_guessing',
|
|
'server',
|
|
'host',
|
|
'upload_host',
|
|
'resque_ppid',
|
|
'create_visualization',
|
|
'visualization_id',
|
|
# String field containing a json, format:
|
|
# {
|
|
# twitter_credits: Integer
|
|
# }
|
|
# No automatic conversion coded
|
|
'user_defined_limits',
|
|
'original_url',
|
|
'privacy',
|
|
'http_response_code',
|
|
'rejected_layers',
|
|
'runner_warnings'
|
|
]
|
|
|
|
# This attributes will get removed from public_values upon calling api_call_public_values
|
|
NON_API_VISIBLE_ATTRIBUTES = [
|
|
'service_item_id',
|
|
'service_name',
|
|
'server',
|
|
'host',
|
|
'upload_host',
|
|
'resque_ppid',
|
|
]
|
|
|
|
# Not all constants are used, but so that we keep track of available states
|
|
STATE_ENQUEUED = 'enqueued' # Default state for imports whose files are not yet at "import source"
|
|
STATE_PENDING = 'pending' # Default state for files already at "import source" (e.g. S3 bucket)
|
|
STATE_UNPACKING = 'unpacking'
|
|
STATE_IMPORTING = 'importing'
|
|
STATE_COMPLETE = 'complete'
|
|
STATE_UPLOADING = 'uploading'
|
|
STATE_FAILURE = 'failure'
|
|
STATE_STUCK = 'stuck'
|
|
|
|
TYPE_EXTERNAL_TABLE = 'external_table'
|
|
TYPE_FILE = 'file'
|
|
TYPE_URL = 'url'
|
|
TYPE_QUERY = 'query'
|
|
TYPE_DATASOURCE = 'datasource'
|
|
|
|
def after_initialize
|
|
instantiate_log
|
|
self.results = []
|
|
self.state ||= STATE_PENDING
|
|
end
|
|
|
|
# This before_create should be only necessary to track old dashboard data imports.
|
|
# New ones are already tracked during the data_import create inside the controller
|
|
# For the old dashboard
|
|
def before_create
|
|
if self.from_common_data?
|
|
self.extra_options = self.extra_options.merge({:common_data => true})
|
|
end
|
|
end
|
|
|
|
def before_save
|
|
unless logger.present?
|
|
log.save
|
|
self.logger = log.id
|
|
end
|
|
self.updated_at = Time.now
|
|
end
|
|
|
|
# The objective of this after_create method is to track in the logs every started
|
|
# import process.
|
|
def after_create
|
|
notify(results)
|
|
end
|
|
|
|
def from_common_data?
|
|
if Cartodb.config[:common_data] &&
|
|
!Cartodb.config[:common_data]['username'].blank? &&
|
|
!Cartodb.config[:common_data]['host'].blank?
|
|
if !self.extra_options.has_key?('common_data') &&
|
|
self.data_source &&
|
|
self.data_source.include?("#{Cartodb.config[:common_data]['username']}.#{Cartodb.config[:common_data]['host']}")
|
|
return true
|
|
end
|
|
end
|
|
return false
|
|
end
|
|
|
|
def extra_options
|
|
return {} if self.import_extra_options.nil?
|
|
::JSON.parse(self.import_extra_options).symbolize_keys
|
|
end
|
|
|
|
def extra_options=(value)
|
|
if !value.nil?
|
|
self.import_extra_options = ::JSON.dump(value)
|
|
end
|
|
end
|
|
|
|
def dataimport_logger
|
|
@@dataimport_logger ||= CartoDB.unformatted_logger(log_file_path("imports.log"))
|
|
end
|
|
|
|
# Meant to be used when calling from API endpoints (hides some fields not needed at editor scope)
|
|
def api_public_values
|
|
public_values.reject { |key|
|
|
DataImport::NON_API_VISIBLE_ATTRIBUTES.include?(key)
|
|
}
|
|
end
|
|
|
|
def public_values
|
|
values = Hash[PUBLIC_ATTRIBUTES.map{ |attribute| [attribute, send(attribute)] }]
|
|
values.merge!('queue_id' => id)
|
|
values.merge!(success: success) if (state == STATE_COMPLETE || state == STATE_FAILURE || state == STATE_STUCK)
|
|
values
|
|
end
|
|
|
|
def run_import!
|
|
self.resque_ppid = Process.ppid
|
|
self.server = Socket.gethostname
|
|
log.append "Running on server #{server} with PID: #{Process.pid}"
|
|
|
|
begin
|
|
success = !!dispatch
|
|
rescue TokenExpiredOrInvalidError => ex
|
|
success = false
|
|
begin
|
|
current_user.oauths.remove(ex.service_name)
|
|
rescue => ex2
|
|
log.append "Exception removing OAuth: #{ex2.message}"
|
|
log.append ex2.backtrace
|
|
end
|
|
end
|
|
|
|
log.append 'After dispatch'
|
|
|
|
if results.empty?
|
|
if collision_strategy == COLLISION_STRATEGY_SKIP
|
|
self.error_code = 1022
|
|
self.state = STATE_COMPLETE
|
|
else
|
|
self.error_code = 1002
|
|
self.state = STATE_FAILURE
|
|
end
|
|
save
|
|
end
|
|
|
|
self.cartodbfy_time = CartoDB::Importer2::CartodbfyTime::instance(id).get
|
|
success ? handle_success : handle_failure
|
|
log.store
|
|
Rails.logger.debug log.to_s
|
|
|
|
self
|
|
rescue CartoDB::QuotaExceeded => quota_exception
|
|
current_user_id = current_user.id
|
|
Carto::Tracking::Events::ExceededQuota.new(current_user_id, user_id: current_user_id).report
|
|
|
|
CartoDB::notify_warning_exception(quota_exception)
|
|
handle_failure(quota_exception)
|
|
self
|
|
rescue CartoDB::CartoDBfyInvalidID
|
|
invalid_cartodb_id_exception = CartoDB::Importer2::CartoDBfyInvalidID.new
|
|
log.append "Exception: #{invalid_cartodb_id_exception}"
|
|
CartoDB::notify_warning_exception(invalid_cartodb_id_exception)
|
|
handle_failure(invalid_cartodb_id_exception)
|
|
self
|
|
rescue Carto::UnauthorizedError => e
|
|
log.append "Exception: #{e.message}"
|
|
log.append e.backtrace, truncate = false
|
|
stacktrace = e.message + e.backtrace.join
|
|
CartoDB.report_exception(e, 'Public map quota exceeded', error_info: stacktrace)
|
|
error = CartoDB::Importer2::MapQuotaExceededError.new
|
|
handle_failure(error)
|
|
raise error
|
|
rescue => exception
|
|
log.append "Exception: #{exception.to_s}"
|
|
log.append exception.backtrace, truncate = false
|
|
stacktrace = exception.to_s + exception.backtrace.join
|
|
CartoDB.report_exception(exception, 'Import error', error_info: stacktrace)
|
|
handle_failure(exception)
|
|
raise exception
|
|
self
|
|
end
|
|
|
|
# Notice that this returns the entire error hash, not just the text
|
|
# It seems that it's only used for the rollbar reporting
|
|
def get_error_text
|
|
if self.error_code == CartoDB::NO_ERROR_CODE
|
|
CartoDB::NO_ERROR_CODE
|
|
else
|
|
self.error_code.blank? ? CartoDB::IMPORTER_ERROR_CODES[99999] : CartoDB::IMPORTER_ERROR_CODES[self.error_code]
|
|
end
|
|
end
|
|
|
|
def get_error_source
|
|
if self.error_code == CartoDB::NO_ERROR_CODE
|
|
CartoDB::NO_ERROR_CODE
|
|
else
|
|
self.error_code.blank? ? CartoDB::IMPORTER_ERROR_CODES[99999][:source] : CartoDB::IMPORTER_ERROR_CODES[self.error_code][:source]
|
|
end
|
|
end
|
|
|
|
def raise_over_table_quota_error
|
|
log.append 'Over account table limit, please upgrade'
|
|
self.error_code = 8002
|
|
self.state = STATE_FAILURE
|
|
save
|
|
raise CartoDB::QuotaExceeded, 'More tables required'
|
|
end
|
|
|
|
def raise_over_map_quota_error
|
|
log.append 'Over account public maps limit, please upgrade'
|
|
self.error_code = 8003
|
|
self.state = STATE_FAILURE
|
|
save
|
|
raise CartoDB::QuotaExceeded, 'More public maps required'
|
|
end
|
|
|
|
def mark_as_failed_if_stuck!
|
|
return false unless stuck?
|
|
|
|
log.append "Import timed out. Id:#{self.id} State:#{self.state} Created at:#{self.created_at} Running imports:#{running_import_ids}"
|
|
|
|
handle_failure(CartoDB::Importer2::StuckImportJobError.new)
|
|
|
|
CartoDB::notify_exception(
|
|
CartoDB::Importer2::GenericImportError.new('Import timed out or got stuck'),
|
|
user: current_user
|
|
)
|
|
true
|
|
end
|
|
|
|
def data_source=(data_source)
|
|
if data_source.nil?
|
|
values[:data_type] = TYPE_DATASOURCE
|
|
values[:data_source] = ''
|
|
else
|
|
path = uploaded_file_path(data_source)
|
|
if File.exist?(path) && !File.directory?(path)
|
|
values[:data_type] = TYPE_FILE
|
|
values[:data_source] = path
|
|
elsif Addressable::URI.parse(data_source).host.present?
|
|
values[:data_type] = TYPE_URL
|
|
values[:data_source] = data_source
|
|
end
|
|
end
|
|
|
|
self.original_url = self.values[:data_source] if (self.original_url.to_s.length == 0)
|
|
|
|
# else SQL-based import
|
|
end
|
|
|
|
def remove_uploaded_resources
|
|
return nil unless uploaded_file
|
|
|
|
file_upload_helper = CartoDB::FileUpload.new(Cartodb.config[:importer].fetch("uploads_path", nil))
|
|
path = file_upload_helper.get_uploads_path.join(uploaded_file[1])
|
|
FileUtils.rm_rf(path) if Dir.exists?(path)
|
|
end
|
|
|
|
def handle_success
|
|
self.success = true
|
|
self.state = STATE_COMPLETE
|
|
table_names = results.map { |result| result.name }.select { |name| name != nil}.sort
|
|
self.table_names = table_names.join(' ')
|
|
self.tables_created_count = table_names.size
|
|
log.append "Import finished\n"
|
|
log.store
|
|
save
|
|
|
|
begin
|
|
CartoDB::PlatformLimits::Importer::UserConcurrentImportsAmount.new({
|
|
user: current_user,
|
|
redis: {
|
|
db: $users_metadata
|
|
}
|
|
})
|
|
.decrement!
|
|
rescue => exception
|
|
CartoDB::StdoutLogger.info('Error decreasing concurrent import limit',
|
|
"#{exception.message} #{exception.backtrace.inspect}")
|
|
end
|
|
notify(results)
|
|
track_results(results, id)
|
|
|
|
self
|
|
end
|
|
|
|
def handle_failure(supplied_exception = nil)
|
|
self.success = false
|
|
self.state = STATE_FAILURE
|
|
if !supplied_exception.nil? && supplied_exception.respond_to?(:error_code)
|
|
self.error_code = supplied_exception.error_code
|
|
end
|
|
log.append "ERROR!\n"
|
|
log.store
|
|
self.save
|
|
begin
|
|
CartoDB::PlatformLimits::Importer::UserConcurrentImportsAmount.new({
|
|
user: current_user,
|
|
redis: {
|
|
db: $users_metadata
|
|
}
|
|
})
|
|
.decrement!
|
|
rescue => exception
|
|
CartoDB::StdoutLogger.info('Error decreasing concurrent import limit',
|
|
"#{exception.message} #{exception.backtrace.inspect}")
|
|
end
|
|
notify(results)
|
|
self
|
|
rescue => exception
|
|
log.append "Exception: #{exception.to_s}"
|
|
log.append exception.backtrace, truncate = false
|
|
log.store
|
|
self
|
|
end
|
|
|
|
def table
|
|
# We can assume the owner is always who imports the data
|
|
# so no need to change to a Visualization::Collection based load
|
|
# TODO better to use an association for this
|
|
::Table.new(user_table: UserTable.where(id: table_id, user_id: user_id).first)
|
|
end
|
|
|
|
def tables
|
|
table_names_array.map do |table_name|
|
|
UserTable.where(name: table_name, user_id: user_id).first.service
|
|
end
|
|
end
|
|
|
|
def table_names_array
|
|
table_names.present? ? table_names.split(' ') : []
|
|
end
|
|
|
|
def is_raster?
|
|
::JSON.parse(self.stats).select{ |item| item['type'] == '.tif' }.length > 0
|
|
end
|
|
|
|
# Calculates the maximum timeout in seconds for a given user, to be used when performing HTTP requests
|
|
# TODO: Candidate for being private if we join syncs and data imports someday
|
|
# TODO: Add timeout config (if we need to change this)
|
|
def self.http_timeout_for(user, assumed_kb_sec = 75*1024)
|
|
if user.nil? || !user.respond_to?(:quota_in_bytes)
|
|
raise ArgumentError.new('Need a User object to calculate its download speed')
|
|
end
|
|
|
|
if assumed_kb_sec < 1
|
|
raise ArgumentError.new('KB per second must be > 0')
|
|
end
|
|
|
|
(user.quota_in_bytes / assumed_kb_sec).round
|
|
end
|
|
|
|
def validate
|
|
super
|
|
errors.add(:user, "Viewer users can't create data imports") if user && user.viewer
|
|
validate_collision_strategy
|
|
end
|
|
|
|
def final_state?
|
|
[STATE_COMPLETE, STATE_FAILURE, STATE_STUCK].include?(state)
|
|
end
|
|
|
|
private
|
|
|
|
def dispatch
|
|
self.state = STATE_UPLOADING
|
|
return from_table if table_copy.present? || from_query.present?
|
|
|
|
if service_name == 'connector'
|
|
importer, runner, datasource_provider, manual_fields = new_importer_with_connector
|
|
else
|
|
importer, runner, datasource_provider, manual_fields = new_importer
|
|
end
|
|
execute_importer importer, runner, datasource_provider, manual_fields
|
|
end
|
|
|
|
def running_import_ids
|
|
Resque::Worker.all.map do |worker|
|
|
next unless worker.job['queue'] == 'imports'
|
|
worker.job['payload']['args'].first['job_id'] rescue nil
|
|
end.compact
|
|
end
|
|
|
|
def public_url
|
|
return data_source unless uploaded_file
|
|
"https://#{current_user.username}.carto.com/#{uploaded_file[0]}"
|
|
end
|
|
|
|
def valid_uuid?(text)
|
|
!!UUIDTools::UUID.parse(text)
|
|
rescue TypeError
|
|
false
|
|
rescue ArgumentError
|
|
false
|
|
end
|
|
|
|
def before_destroy
|
|
self.remove_uploaded_resources
|
|
end
|
|
|
|
def instantiate_log
|
|
uuid = logger
|
|
|
|
if valid_uuid?(uuid)
|
|
self.log = CartoDB::Log.where(id: uuid.to_s).first
|
|
else
|
|
self.log = CartoDB::Log.new(
|
|
type: CartoDB::Log::TYPE_DATA_IMPORT,
|
|
user_id: user_id
|
|
)
|
|
end
|
|
end
|
|
|
|
def uploaded_file
|
|
data_source.to_s.match(/uploads\/([a-z0-9]{20})\/.*/)
|
|
end
|
|
|
|
# A stuck job should've started but not be finished, so it's state should not be complete nor failed, it should
|
|
# have been in the queue for more than 5 minutes and it shouldn't be currently processed by any active worker
|
|
def stuck?
|
|
state == STATE_STUCK ||
|
|
![STATE_ENQUEUED, STATE_PENDING, STATE_COMPLETE, STATE_FAILURE].include?(state) &&
|
|
created_at < 5.minutes.ago &&
|
|
!running_import_ids.include?(id)
|
|
end
|
|
|
|
def from_table
|
|
log.append 'from_table()'
|
|
|
|
number_of_tables = 1
|
|
quota_checker = CartoDB::QuotaChecker.new(current_user)
|
|
if quota_checker.will_be_over_table_quota?(number_of_tables)
|
|
raise_over_table_quota_error
|
|
end
|
|
|
|
query = table_copy ? "SELECT * FROM #{table_copy}" : from_query
|
|
new_table_name = import_from_query(table_name, query)
|
|
return true unless new_table_name && !overwrite_strategy?
|
|
|
|
sanitize_columns(new_table_name)
|
|
|
|
self.update(table_names: new_table_name, service_name: nil)
|
|
migrate_existing(new_table_name)
|
|
|
|
self.results.push CartoDB::Importer2::Result.new(success: true, error: nil)
|
|
rescue Sequel::DatabaseError => exception
|
|
if exception.to_s =~ MERGE_WITH_UNMATCHING_COLUMN_TYPES_RE
|
|
set_merge_error(8004, exception.to_s)
|
|
else
|
|
set_merge_error(8003, exception.to_s)
|
|
end
|
|
false
|
|
end
|
|
|
|
def import_from_query(name, query)
|
|
log.append 'import_from_query()'
|
|
|
|
self.data_type = TYPE_QUERY
|
|
self.data_source = query
|
|
save
|
|
|
|
taken_names = Carto::Db::UserSchema.new(current_user).table_names
|
|
|
|
if taken_names.include?(name) && collision_strategy == Carto::DataImportConstants::COLLISION_STRATEGY_SKIP
|
|
log.append("Table with name #{name} already exists. Skipping")
|
|
return
|
|
end
|
|
|
|
table_name = Carto::ValidTableNameProposer.new.propose_valid_table_name(name, taken_names: taken_names)
|
|
|
|
if overwrite_strategy?
|
|
overwrite_table_from_query(table_name, name, query)
|
|
results.push CartoDB::Importer2::Result.new(success: true, error: nil)
|
|
else
|
|
current_user.db_service.in_database_direct_connection(
|
|
statement_timeout: DIRECT_STATEMENT_TIMEOUT
|
|
) do |user_direct_conn|
|
|
user_direct_conn.run(%{CREATE TABLE #{table_name} AS #{query}})
|
|
end
|
|
end
|
|
if current_user.over_disk_quota?
|
|
log.append "Over storage quota. Dropping table #{table_name}"
|
|
current_user.in_database.run(%{DROP TABLE #{table_name}})
|
|
self.error_code = 8001
|
|
self.state = STATE_FAILURE
|
|
save
|
|
raise CartoDB::QuotaExceeded, 'More storage required'
|
|
end
|
|
|
|
table_name
|
|
end
|
|
|
|
def overwrite_table_from_query(new_table_name, overwrite_table_name, query)
|
|
importer = new_importer_with_unused_runner
|
|
importer.overwrite_register(
|
|
CartoDB::Importer2::Result.new(tables: [new_table_name]),
|
|
overwrite_table_name
|
|
) do |database, schema|
|
|
database.execute(%{CREATE TABLE #{new_table_name} AS #{query}})
|
|
importer.drop("\"#{schema}\".\"#{overwrite_table_name}\"")
|
|
database.execute(%{
|
|
ALTER TABLE "#{schema}"."#{new_table_name}" RENAME TO "#{overwrite_table_name}"
|
|
})
|
|
end
|
|
end
|
|
|
|
def overwrite_strategy?
|
|
collision_strategy == Carto::DataImportConstants::COLLISION_STRATEGY_OVERWRITE
|
|
end
|
|
|
|
def sanitize_columns(table_name)
|
|
Table.sanitize_columns(table_name, {
|
|
connection: current_user.in_database,
|
|
database_schema: current_user.database_schema,
|
|
reserved_words: CartoDB::Importer2::Column::RESERVED_WORDS
|
|
})
|
|
end
|
|
|
|
def migrate_existing(imported_name)
|
|
log.append 'migrate_existing()'
|
|
|
|
table = ::Table.new
|
|
table.user_id = user_id
|
|
table.name = imported_name
|
|
table.migrate_existing_table = imported_name
|
|
table.data_import_id = self.id
|
|
|
|
if table.valid?
|
|
log.append 'Table valid'
|
|
table.save
|
|
table.optimize
|
|
table.map.recalculate_bounds!
|
|
if current_user.remaining_quota < 0
|
|
log.append 'Over storage quota, removing table'
|
|
self.error_code = 8001
|
|
self.state = STATE_FAILURE
|
|
save
|
|
table.destroy
|
|
raise CartoDB::QuotaExceeded, 'More storage required'
|
|
end
|
|
refresh
|
|
self.table_id = table.id
|
|
self.table_name = table.name
|
|
log.append "Table '#{table.name}' registered"
|
|
save
|
|
true
|
|
else
|
|
reload
|
|
log.append "Table invalid: Error linking #{imported_name} to UI: " + table.errors.full_messages.join(' - ')
|
|
false
|
|
end
|
|
end
|
|
|
|
def pg_options
|
|
SequelRails.configuration.environment_for(Rails.env)
|
|
.merge(
|
|
username: current_user.database_username,
|
|
password: current_user.database_password,
|
|
database: current_user.database_name,
|
|
host: current_user.database_host
|
|
) {|key, o, n| n.nil? || n.empty? ? o : n}
|
|
end
|
|
|
|
def ogr2ogr_options
|
|
options = Cartodb.config.fetch(:ogr2ogr, {})
|
|
if options['binary'].nil? || options['csv_guessing'].nil?
|
|
{}
|
|
else
|
|
ogr_options = {
|
|
ogr2ogr_binary: options['binary'],
|
|
ogr2ogr_csv_guessing: options['csv_guessing'] && self.type_guessing,
|
|
quoted_fields_guessing: self.quoted_fields_guessing
|
|
}
|
|
if options['memory_limit'].present?
|
|
ogr_options.merge!(ogr2ogr_memory_limit: options['memory_limit'])
|
|
end
|
|
return ogr_options
|
|
end
|
|
end
|
|
|
|
def content_guessing_options
|
|
guessing_config = Cartodb.config.fetch(:importer, {}).deep_symbolize_keys.fetch(:content_guessing, {})
|
|
geocoder_config = Cartodb.config.fetch(:geocoder, {}).deep_symbolize_keys
|
|
if guessing_config[:enabled] and self.content_guessing and geocoder_config
|
|
{ guessing: guessing_config, geocoder: geocoder_config }
|
|
else
|
|
{ guessing: { enabled: false } }
|
|
end
|
|
end
|
|
|
|
# Create an Importer object (using a Runner to fetch the data).
|
|
# This methods returns an array with four elements:
|
|
# * importer: the new importer (nil if download errors detected)
|
|
# * runner: the runner that the importer uses
|
|
# * datasource_provider: the DataSource used
|
|
# * manual_fields: error code and log in case of errors
|
|
def new_importer
|
|
manual_fields = {}
|
|
had_errors = false
|
|
log.append 'new_importer()'
|
|
|
|
datasource_provider = get_datasource_provider
|
|
|
|
# If retrieving metadata we get an error, fail early
|
|
begin
|
|
downloader = get_downloader(datasource_provider)
|
|
rescue DataDownloadError => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: 1012,
|
|
log_info: ex.to_s
|
|
}
|
|
rescue ExternalServiceError => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: 1012,
|
|
log_info: ex.to_s
|
|
}
|
|
rescue ExternalServiceTimeoutError => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: 1020,
|
|
log_info: ex.to_s
|
|
}
|
|
rescue DataDownloadTimeoutError => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: 1020,
|
|
log_info: ex.to_s
|
|
}
|
|
rescue ResponseError => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: 1011,
|
|
log_info: ex.to_s
|
|
}
|
|
rescue InvalidServiceError => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: 1013,
|
|
log_info: ex.to_s
|
|
}
|
|
rescue InvalidInputDataError => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: 1012,
|
|
log_info: ex.to_s
|
|
}
|
|
rescue UnsupportedOperationError => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: 1023,
|
|
log_info: ex.to_s
|
|
}
|
|
rescue CartoDB::Importer2::FileTooBigError => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: ex.error_code,
|
|
log_info: CartoDB::IMPORTER_ERROR_CODES[ex.error_code]
|
|
}
|
|
rescue => ex
|
|
had_errors = true
|
|
manual_fields = {
|
|
error_code: 99999,
|
|
log_info: ex.to_s
|
|
}
|
|
end
|
|
|
|
if had_errors
|
|
importer = runner = datasource_provider = nil
|
|
else
|
|
post_import_handler = CartoDB::Importer2::PostImportHandler.new
|
|
case datasource_provider.class::DATASOURCE_NAME
|
|
when Url::ArcGIS::DATASOURCE_NAME
|
|
post_import_handler.add_fix_geometries_task
|
|
when Search::Twitter::DATASOURCE_NAME
|
|
post_import_handler.add_transform_geojson_geom_column
|
|
end
|
|
|
|
database_options = pg_options
|
|
self.host = database_options[:host]
|
|
|
|
unp = CartoDB::Importer2::Unp.new(Cartodb.config[:importer], Cartodb.config[:ogr2ogr])
|
|
|
|
importer, runner = new_importer_with_runner(downloader, unp, post_import_handler)
|
|
end
|
|
|
|
[importer, runner, datasource_provider, manual_fields]
|
|
end
|
|
|
|
# Create an Importer using a ConnectorRunner to fetch the data.
|
|
# This methods returns an array with two elements:
|
|
# * importer: the new importer (nil if download errors detected)
|
|
# * connector: the connector that the importer uses
|
|
def new_importer_with_connector
|
|
CartoDB::Importer2::ConnectorRunner.check_availability!(current_user)
|
|
|
|
database_options = pg_options
|
|
|
|
self.host = database_options[:host]
|
|
|
|
connector = CartoDB::Importer2::ConnectorRunner.new(
|
|
service_item_id,
|
|
user: current_user,
|
|
pg: database_options,
|
|
log: log,
|
|
collision_strategy: collision_strategy
|
|
)
|
|
|
|
registrar = CartoDB::TableRegistrar.new(current_user, ::Table)
|
|
quota_checker = CartoDB::QuotaChecker.new(current_user)
|
|
database = current_user.in_database
|
|
destination_schema = current_user.database_schema
|
|
public_user_roles = current_user.db_service.public_user_roles
|
|
overviews_creator = CartoDB::Importer2::Overviews.new(connector, current_user)
|
|
importer = CartoDB::Connector::Importer.new(
|
|
runner: connector,
|
|
table_registrar: registrar,
|
|
quota_checker: quota_checker,
|
|
database: database,
|
|
data_import_id: id,
|
|
overviews_creator: overviews_creator,
|
|
destination_schema: destination_schema,
|
|
public_user_roles: public_user_roles
|
|
)
|
|
[importer, connector, nil, nil]
|
|
end
|
|
|
|
# Create an Importer object (using a Runner to fetch the data).
|
|
# This methods returns an array with two elements:
|
|
# * importer: the new importer (nil if download errors detected)
|
|
# * runner: the runner that the importer uses
|
|
def new_importer_with_runner(downloader, unpacker, post_import_handler)
|
|
runner = CartoDB::Importer2::Runner.new(
|
|
pg: pg_options,
|
|
downloader: downloader,
|
|
log: log,
|
|
user: user,
|
|
unpacker: unpacker,
|
|
post_import_handler: post_import_handler,
|
|
importer_config: Cartodb.config[:importer],
|
|
collision_strategy: collision_strategy
|
|
)
|
|
|
|
runner.loader_options = ogr2ogr_options.merge content_guessing_options
|
|
runner.set_importer_stats_host_info(Socket.gethostname)
|
|
registrar = CartoDB::TableRegistrar.new(current_user, ::Table)
|
|
quota_checker = CartoDB::QuotaChecker.new(current_user)
|
|
database = current_user.in_database
|
|
destination_schema = current_user.database_schema
|
|
public_user_roles = current_user.db_service.public_user_roles
|
|
overviews_creator = CartoDB::Importer2::Overviews.new(runner, current_user)
|
|
importer = CartoDB::Connector::Importer.new(
|
|
runner: runner,
|
|
table_registrar: registrar,
|
|
quota_checker: quota_checker,
|
|
database: database,
|
|
data_import_id: id,
|
|
overviews_creator: overviews_creator,
|
|
destination_schema: destination_schema,
|
|
public_user_roles: public_user_roles,
|
|
collision_strategy: collision_strategy
|
|
)
|
|
|
|
[importer, runner]
|
|
end
|
|
|
|
# Create an Importer object with a runner that it's not able to fetch data.
|
|
# This method is useful when you just need some logic from the Importer class
|
|
# This methods returns an array with two elements:
|
|
# * importer: the new importer (nil if download errors detected)
|
|
def new_importer_with_unused_runner
|
|
importer, = new_importer_with_runner(nil, nil, nil)
|
|
importer
|
|
end
|
|
|
|
# Run importer, store results and return success state.
|
|
def execute_importer(importer, runner, datasource_provider = nil, manual_fields = nil)
|
|
if importer
|
|
tracker = lambda do |state|
|
|
self.state = state
|
|
save
|
|
end
|
|
log.append 'Before importer run'
|
|
importer.run(tracker)
|
|
log.append 'After importer run'
|
|
end
|
|
|
|
store_results(importer, runner, datasource_provider, manual_fields)
|
|
importer.nil? ? false : importer.success?
|
|
rescue => e
|
|
# Note: If this exception is not treated, results will not be defined
|
|
# and the import will finish with a null error_code
|
|
set_error(manual_fields.fetch(:error_code, 99999))
|
|
raise e
|
|
end
|
|
|
|
# Note: Assumes that if importer is nil an error happened
|
|
# @param importer CartoDB::Connector::Importer|nil
|
|
# @param runner CartoDB::Importer2::Runner|nil
|
|
# @param datasource_provider mixed|nil
|
|
# @param manual_fields Hash
|
|
def store_results(importer=nil, runner=nil, datasource_provider=nil, manual_fields={})
|
|
if importer.nil?
|
|
set_error(manual_fields.fetch(:error_code, 99999), manual_fields.fetch(:log_info, nil))
|
|
else
|
|
self.results = importer.results
|
|
self.error_code = importer.error_code
|
|
self.rejected_layers = importer.rejected_layers.join(',') if !importer.rejected_layers.empty?
|
|
self.runner_warnings = runner.warnings.to_json if !runner.warnings.empty?
|
|
|
|
# http_response_code is only relevant if a direct download is performed
|
|
if runner && datasource_provider && datasource_provider.providers_download_url?
|
|
self.http_response_code = runner.downloader.http_response_code
|
|
end
|
|
|
|
# Table.after_create() setted fields that won't be saved to "final" data import unless specified here
|
|
self.table_name = importer.table.name if importer.success? && importer.table
|
|
self.table_id = importer.table.id if importer.success? && importer.table
|
|
|
|
if importer.success?
|
|
update_visualization_id(importer)
|
|
end
|
|
update_synchronization(importer)
|
|
|
|
importer.success? ? set_datasource_audit_to_complete(datasource_provider,
|
|
importer.success? && importer.table ? importer.table.id : nil)
|
|
: set_datasource_audit_to_failed(datasource_provider)
|
|
end
|
|
|
|
unless runner.nil?
|
|
self.stats = ::JSON.dump(runner.stats)
|
|
end
|
|
end
|
|
|
|
def update_visualization_id(importer)
|
|
if importer.data_import.create_visualization
|
|
self.visualization_id = importer.data_import.visualization_id
|
|
end
|
|
end
|
|
|
|
def update_synchronization(importer)
|
|
if synchronization_id
|
|
log.type = CartoDB::Log::TYPE_SYNCHRONIZATION
|
|
log.store
|
|
log.append "synchronization_id: #{synchronization_id}"
|
|
synchronization = CartoDB::Synchronization::Member.new(id: synchronization_id).fetch
|
|
synchronization.name = self.table_name
|
|
synchronization.log_id = log.id
|
|
|
|
if importer.success?
|
|
imported_table = ::Table.get_by_table_id(self.table_id)
|
|
if !imported_table.nil? && imported_table.table_visualization
|
|
synchronization.visualization_id = imported_table.table_visualization.id
|
|
end
|
|
|
|
synchronization.state = 'success'
|
|
synchronization.error_code = nil
|
|
synchronization.error_message = nil
|
|
else
|
|
synchronization.state = 'failure'
|
|
synchronization.error_code = error_code.blank? ? 9999 : error_code
|
|
synchronization.error_message = get_error_text[:title] + ' ' + get_error_text[:what_about]
|
|
end
|
|
log.append "importer.success? #{synchronization.state}"
|
|
synchronization.store
|
|
end
|
|
end
|
|
|
|
def get_datasource_provider
|
|
return nil if service_name == 'connector'
|
|
datasource_name = (service_name.nil? || service_name.size == 0) ? Url::PublicUrl::DATASOURCE_NAME : service_name
|
|
if service_item_id.nil? || service_item_id.size == 0
|
|
self.service_item_id = data_source
|
|
end
|
|
|
|
get_datasource(datasource_name, service_item_id)
|
|
end
|
|
|
|
def get_downloader(datasource_provider)
|
|
log.append "Fetching datasource #{datasource_provider} metadata for item id #{service_item_id}"
|
|
|
|
metadata = datasource_provider.get_resource_metadata(service_item_id)
|
|
|
|
if hit_platform_limit?(datasource_provider, metadata, current_user)
|
|
raise CartoDB::Importer2::FileTooBigError.new(metadata.inspect)
|
|
end
|
|
|
|
if datasource_provider.providers_download_url?
|
|
metadata_url = metadata[:url]
|
|
resource_url = (metadata_url.present? && datasource_provider.providers_download_url?) ? metadata_url : data_source
|
|
|
|
log.append "File will be downloaded from #{resource_url}"
|
|
|
|
http_options = { http_timeout: ::DataImport.http_timeout_for(current_user) }
|
|
CartoDB::Importer2::Downloader.new(current_user.id,
|
|
resource_url,
|
|
http_options,
|
|
importer_config: Cartodb.config[:importer])
|
|
else
|
|
log.append 'Downloading file data from datasource'
|
|
|
|
http_timeout = ::DataImport.http_timeout_for(current_user)
|
|
options = {
|
|
http_timeout: http_timeout,
|
|
importer_config: Cartodb.config[:importer],
|
|
user_id: current_user.id
|
|
}
|
|
|
|
CartoDB::Importer2::DatasourceDownloader.new(datasource_provider, metadata, options, log)
|
|
end
|
|
end
|
|
|
|
def hit_platform_limit?(datasource, metadata, user)
|
|
if datasource.has_resource_size?(metadata)
|
|
CartoDB::PlatformLimits::Importer::InputFileSize.new({ user: user })
|
|
.is_over_limit!(metadata[:size])
|
|
else
|
|
false
|
|
end
|
|
end
|
|
|
|
def current_user
|
|
@current_user ||= ::User[user_id]
|
|
end
|
|
|
|
def notify(results)
|
|
owner = ::User.where(:id => self.user_id).first
|
|
imported_tables = results.select {|r| r.success }.length
|
|
failed_tables = results.length - imported_tables
|
|
|
|
# Calculate total size out of stats
|
|
total_size = 0
|
|
::JSON.parse(stats).each { |stat| total_size += stat ? stat['size'] : 0 }
|
|
importer_stats_aggregator.update_counter('total_size', total_size)
|
|
|
|
import_time = self.updated_at - self.created_at
|
|
cartodbfy_throughtput = (cartodbfy_time == 0.0 ? nil : (total_size / cartodbfy_time))
|
|
|
|
import_log = {'user' => owner.username,
|
|
'state' => self.state,
|
|
'tables' => results.length,
|
|
'imported_tables' => imported_tables,
|
|
'failed_tables' => failed_tables,
|
|
'error_code' => self.error_code,
|
|
'import_timestamp' => Time.now,
|
|
'queue_server' => `hostname`.strip,
|
|
'database_host' => owner.database_host,
|
|
'service_name' => self.service_name,
|
|
'data_type' => self.data_type,
|
|
'is_sync_import' => !self.synchronization_id.nil?,
|
|
'import_time' => import_time,
|
|
'file_stats' => ::JSON.parse(self.stats),
|
|
'resque_ppid' => self.resque_ppid,
|
|
'user_timeout' => ::DataImport.http_timeout_for(current_user),
|
|
'error_source' => get_error_source,
|
|
'id' => self.id,
|
|
'total_size' => total_size,
|
|
'cartodbfy_time' => self.cartodbfy_time,
|
|
'import_throughput' => (total_size / import_time),
|
|
'cartodbfy_throughtput' => cartodbfy_throughtput,
|
|
'cartodbfy_import_ratio' => (self.cartodbfy_time / import_time)
|
|
}
|
|
if !self.extra_options.nil?
|
|
import_log['extra_options'] = self.extra_options
|
|
end
|
|
import_log.merge!(decorate_log(self))
|
|
dataimport_logger.info(import_log.to_json)
|
|
CartoDB::Importer2::MailNotifier.new(self, results, ::Resque).notify_if_needed
|
|
|
|
user_id = user.id
|
|
properties = {
|
|
user_id: user_id,
|
|
connection: {
|
|
imported_from: service_name,
|
|
data_from: data_type,
|
|
sync: sync?
|
|
}
|
|
}
|
|
|
|
if results.any?
|
|
results.each do |result|
|
|
CartoDB::Metrics.new.report(:import, payload_for(result))
|
|
|
|
properties[:connection][:file_type] = result.extension
|
|
|
|
if result.success?
|
|
Carto::Tracking::Events::CompletedConnection.new(user_id, properties).report
|
|
else
|
|
Carto::Tracking::Events::FailedConnection.new(user_id, properties).report
|
|
end
|
|
end
|
|
elsif state == STATE_FAILURE
|
|
Carto::Tracking::Events::FailedConnection.new(user_id, properties).report
|
|
end
|
|
end
|
|
|
|
def importer_stats_aggregator
|
|
@importer_stats_aggregator ||= CartoDB::Stats::Importer.instance
|
|
end
|
|
|
|
def decorate_log(data_import)
|
|
decoration = { retrieved_items: 0 }
|
|
if data_import.success && data_import.table_id && data_import.from_query.nil? &&
|
|
data_import.table_copy.nil?
|
|
datasource = get_datasource_provider
|
|
if datasource && datasource.persists_state_via_data_import?
|
|
decoration = datasource.get_audit_stats
|
|
end
|
|
end
|
|
decoration
|
|
end
|
|
|
|
def payload_for(result=nil)
|
|
log.store
|
|
payload = {
|
|
file_url: public_url,
|
|
distinct_id: current_user.username,
|
|
username: current_user.username,
|
|
account_type: current_user.account_type,
|
|
database: current_user.database_name,
|
|
email: current_user.email,
|
|
log: log.to_s
|
|
}
|
|
payload.merge!(
|
|
name: result.name,
|
|
extension: result.extension,
|
|
success: result.success,
|
|
error_code: result.error_code,
|
|
) if result
|
|
payload.merge!(
|
|
file_url_hostname: URI.parse(public_url).hostname
|
|
) if public_url rescue nil
|
|
payload.merge!(error_title: get_error_text[:title]) if state == STATE_FAILURE
|
|
payload
|
|
end
|
|
|
|
# @param datasource_name String
|
|
# @param service_item_id String|nil
|
|
# @return mixed|nil
|
|
# @throws DataSourceError
|
|
def get_datasource(datasource_name, service_item_id)
|
|
begin
|
|
oauth = current_user.oauths.select(datasource_name)
|
|
# Tables metadata DB also store resque data
|
|
datasource = DatasourcesFactory.get_datasource(
|
|
datasource_name, current_user, {
|
|
http_timeout: ::DataImport.http_timeout_for(current_user),
|
|
redis_storage: $tables_metadata,
|
|
user_defined_limits: ::JSON.parse(user_defined_limits).symbolize_keys
|
|
})
|
|
datasource.token = oauth.token unless oauth.nil?
|
|
rescue => ex
|
|
log.append "Exception: #{ex.message}"
|
|
log.append ex.backtrace, truncate = false
|
|
CartoDB.report_exception(ex, 'Import error: ', error_info: ex.message + ex.backtrace.join)
|
|
raise CartoDB::DataSourceError.new("Datasource #{datasource_name} could not be instantiated")
|
|
end
|
|
if service_item_id.nil?
|
|
raise CartoDB::DataSourceError.new("Datasource #{datasource_name} without item id")
|
|
end
|
|
|
|
if datasource.persists_state_via_data_import?
|
|
datasource.data_import_item = self
|
|
end
|
|
|
|
datasource
|
|
end
|
|
|
|
def set_error(error_code, log_info='')
|
|
log.append("Additional error info: #{log_info}") unless log_info.empty?
|
|
self.results = [CartoDB::Importer2::Result.new(
|
|
success: false, error_code: error_code
|
|
)]
|
|
self.error_code = error_code
|
|
self.state = STATE_FAILURE
|
|
end
|
|
|
|
def set_merge_error(error_code, log_info='')
|
|
log.append("Going to set merge error with code #{error_code}")
|
|
set_error(error_code, log_info)
|
|
end
|
|
|
|
def set_datasource_audit_to_complete(datasource, table_id = nil)
|
|
if datasource && datasource.persists_state_via_data_import?
|
|
datasource.data_import_item = self
|
|
datasource.set_audit_to_completed(table_id)
|
|
end
|
|
end
|
|
|
|
def set_datasource_audit_to_failed(datasource)
|
|
if datasource && datasource.persists_state_via_data_import?
|
|
datasource.data_import_item = self
|
|
datasource.set_audit_to_failed
|
|
end
|
|
end
|
|
|
|
def track_results(results, import_id)
|
|
current_user_id = current_user.id
|
|
return unless current_user_id
|
|
|
|
if visualization_id
|
|
Carto::Tracking::Events::CreatedMap.new(current_user_id,
|
|
user_id: current_user_id,
|
|
visualization_id: visualization_id,
|
|
origin: 'import').report
|
|
end
|
|
|
|
results.select(&:success?).each do |result|
|
|
condition, origin = if result.name
|
|
[{ data_import_id: import_id, name: result.name },
|
|
from_common_data? ? 'common-data' : 'import']
|
|
else
|
|
[{ data_import_id: import_id }, 'copy']
|
|
end
|
|
|
|
user_table = ::UserTable.where(condition).first
|
|
map = user_table.map if user_table
|
|
if map
|
|
vis = Carto::Visualization.where(map_id: map.id).first
|
|
|
|
Carto::Tracking::Events::CreatedDataset.new(current_user_id,
|
|
user_id: current_user_id,
|
|
visualization_id: vis.id,
|
|
origin: origin).report
|
|
end
|
|
end
|
|
rescue => exception
|
|
CartoDB::Logger.warning(message: 'Carto::Tracking: Couldn\'t report event',
|
|
exception: exception)
|
|
end
|
|
|
|
def sync?
|
|
synchronization_id.present?
|
|
end
|
|
end
|