require 'sequel' require 'fileutils' require 'uuidtools' require_relative './user' require_relative './table' require_relative './log' require_relative './visualization/member' require_relative './table_registrar' require_relative './quota_checker' require_relative '../../lib/cartodb/errors' require_relative '../../lib/cartodb/import_error_codes' require_relative '../../lib/cartodb/metrics' require_relative '../../lib/cartodb/stats/importer' require_relative '../../config/initializers/redis' require_relative '../../services/importer/lib/importer' require_relative '../connectors/importer' require_relative '../../services/importer/lib/importer/datasource_downloader' require_relative '../../services/datasources/lib/datasources' require_relative '../../services/importer/lib/importer/unp' require_relative '../../services/importer/lib/importer/post_import_handler' require_relative '../../services/importer/lib/importer/mail_notifier' require_relative '../../services/importer/lib/importer/cartodbfy_time' require_relative '../../services/platform-limits/platform_limits' require_relative '../../services/importer/lib/importer/overviews' require_relative '../../services/importer/lib/importer/connector_runner' require_relative '../../services/importer/lib/importer/exceptions' require_dependency 'carto/tracking/events' require_dependency 'carto/valid_table_name_proposer' require_dependency 'carto/configuration' require_dependency 'carto/db/user_schema' include CartoDB::Datasources class DataImport < Sequel::Model include Carto::DataImportConstants include Carto::Configuration MERGE_WITH_UNMATCHING_COLUMN_TYPES_RE = /No .*matches.*argument type.*/ DIRECT_STATEMENT_TIMEOUT = 1.hour * 1000 attr_accessor :log, :results one_to_many :external_data_imports many_to_one :user # @see store_results() method also when adding new fields PUBLIC_ATTRIBUTES = [ 'id', 'user_id', 'table_id', 'data_type', 'table_name', 'state', 'error_code', 'queue_id', 'get_error_text', 'get_error_source', 'tables_created_count', 'synchronization_id', 'service_name', 'service_item_id', 'type_guessing', 'quoted_fields_guessing', 'content_guessing', 'server', 'host', 'upload_host', 'resque_ppid', 'create_visualization', 'visualization_id', # String field containing a json, format: # { # twitter_credits: Integer # } # No automatic conversion coded 'user_defined_limits', 'original_url', 'privacy', 'http_response_code', 'rejected_layers', 'runner_warnings' ] # This attributes will get removed from public_values upon calling api_call_public_values NON_API_VISIBLE_ATTRIBUTES = [ 'service_item_id', 'service_name', 'server', 'host', 'upload_host', 'resque_ppid', ] # Not all constants are used, but so that we keep track of available states STATE_ENQUEUED = 'enqueued' # Default state for imports whose files are not yet at "import source" STATE_PENDING = 'pending' # Default state for files already at "import source" (e.g. S3 bucket) STATE_UNPACKING = 'unpacking' STATE_IMPORTING = 'importing' STATE_COMPLETE = 'complete' STATE_UPLOADING = 'uploading' STATE_FAILURE = 'failure' STATE_STUCK = 'stuck' TYPE_EXTERNAL_TABLE = 'external_table' TYPE_FILE = 'file' TYPE_URL = 'url' TYPE_QUERY = 'query' TYPE_DATASOURCE = 'datasource' def after_initialize instantiate_log self.results = [] self.state ||= STATE_PENDING end # This before_create should be only necessary to track old dashboard data imports. # New ones are already tracked during the data_import create inside the controller # For the old dashboard def before_create if self.from_common_data? self.extra_options = self.extra_options.merge({:common_data => true}) end end def before_save unless logger.present? log.save self.logger = log.id end self.updated_at = Time.now end # The objective of this after_create method is to track in the logs every started # import process. def after_create notify(results) end def from_common_data? if Cartodb.config[:common_data] && !Cartodb.config[:common_data]['username'].blank? && !Cartodb.config[:common_data]['host'].blank? if !self.extra_options.has_key?('common_data') && self.data_source && self.data_source.include?("#{Cartodb.config[:common_data]['username']}.#{Cartodb.config[:common_data]['host']}") return true end end return false end def extra_options return {} if self.import_extra_options.nil? ::JSON.parse(self.import_extra_options).symbolize_keys end def extra_options=(value) if !value.nil? self.import_extra_options = ::JSON.dump(value) end end def dataimport_logger @@dataimport_logger ||= CartoDB.unformatted_logger(log_file_path("imports.log")) end # Meant to be used when calling from API endpoints (hides some fields not needed at editor scope) def api_public_values public_values.reject { |key| DataImport::NON_API_VISIBLE_ATTRIBUTES.include?(key) } end def public_values values = Hash[PUBLIC_ATTRIBUTES.map{ |attribute| [attribute, send(attribute)] }] values.merge!('queue_id' => id) values.merge!(success: success) if (state == STATE_COMPLETE || state == STATE_FAILURE || state == STATE_STUCK) values end def run_import! self.resque_ppid = Process.ppid self.server = Socket.gethostname log.append "Running on server #{server} with PID: #{Process.pid}" begin success = !!dispatch rescue TokenExpiredOrInvalidError => ex success = false begin current_user.oauths.remove(ex.service_name) rescue => ex2 log.append "Exception removing OAuth: #{ex2.message}" log.append ex2.backtrace end end log.append 'After dispatch' if results.empty? if collision_strategy == COLLISION_STRATEGY_SKIP self.error_code = 1022 self.state = STATE_COMPLETE else self.error_code = 1002 self.state = STATE_FAILURE end save end self.cartodbfy_time = CartoDB::Importer2::CartodbfyTime::instance(id).get success ? handle_success : handle_failure log.store Rails.logger.debug log.to_s self rescue CartoDB::QuotaExceeded => quota_exception current_user_id = current_user.id Carto::Tracking::Events::ExceededQuota.new(current_user_id, user_id: current_user_id).report CartoDB::notify_warning_exception(quota_exception) handle_failure(quota_exception) self rescue CartoDB::CartoDBfyInvalidID invalid_cartodb_id_exception = CartoDB::Importer2::CartoDBfyInvalidID.new log.append "Exception: #{invalid_cartodb_id_exception}" CartoDB::notify_warning_exception(invalid_cartodb_id_exception) handle_failure(invalid_cartodb_id_exception) self rescue Carto::UnauthorizedError => e log.append "Exception: #{e.message}" log.append e.backtrace, truncate = false stacktrace = e.message + e.backtrace.join CartoDB.report_exception(e, 'Public map quota exceeded', error_info: stacktrace) error = CartoDB::Importer2::MapQuotaExceededError.new handle_failure(error) raise error rescue => exception log.append "Exception: #{exception.to_s}" log.append exception.backtrace, truncate = false stacktrace = exception.to_s + exception.backtrace.join CartoDB.report_exception(exception, 'Import error', error_info: stacktrace) handle_failure(exception) raise exception self end # Notice that this returns the entire error hash, not just the text # It seems that it's only used for the rollbar reporting def get_error_text if self.error_code == CartoDB::NO_ERROR_CODE CartoDB::NO_ERROR_CODE else self.error_code.blank? ? CartoDB::IMPORTER_ERROR_CODES[99999] : CartoDB::IMPORTER_ERROR_CODES[self.error_code] end end def get_error_source if self.error_code == CartoDB::NO_ERROR_CODE CartoDB::NO_ERROR_CODE else self.error_code.blank? ? CartoDB::IMPORTER_ERROR_CODES[99999][:source] : CartoDB::IMPORTER_ERROR_CODES[self.error_code][:source] end end def raise_over_table_quota_error log.append 'Over account table limit, please upgrade' self.error_code = 8002 self.state = STATE_FAILURE save raise CartoDB::QuotaExceeded, 'More tables required' end def raise_over_map_quota_error log.append 'Over account public maps limit, please upgrade' self.error_code = 8003 self.state = STATE_FAILURE save raise CartoDB::QuotaExceeded, 'More public maps required' end def mark_as_failed_if_stuck! return false unless stuck? log.append "Import timed out. Id:#{self.id} State:#{self.state} Created at:#{self.created_at} Running imports:#{running_import_ids}" handle_failure(CartoDB::Importer2::StuckImportJobError.new) CartoDB::notify_exception( CartoDB::Importer2::GenericImportError.new('Import timed out or got stuck'), user: current_user ) true end def data_source=(data_source) if data_source.nil? values[:data_type] = TYPE_DATASOURCE values[:data_source] = '' else path = uploaded_file_path(data_source) if File.exist?(path) && !File.directory?(path) values[:data_type] = TYPE_FILE values[:data_source] = path elsif Addressable::URI.parse(data_source).host.present? values[:data_type] = TYPE_URL values[:data_source] = data_source end end self.original_url = self.values[:data_source] if (self.original_url.to_s.length == 0) # else SQL-based import end def remove_uploaded_resources return nil unless uploaded_file file_upload_helper = CartoDB::FileUpload.new(Cartodb.config[:importer].fetch("uploads_path", nil)) path = file_upload_helper.get_uploads_path.join(uploaded_file[1]) FileUtils.rm_rf(path) if Dir.exists?(path) end def handle_success self.success = true self.state = STATE_COMPLETE table_names = results.map { |result| result.name }.select { |name| name != nil}.sort self.table_names = table_names.join(' ') self.tables_created_count = table_names.size log.append "Import finished\n" log.store save begin CartoDB::PlatformLimits::Importer::UserConcurrentImportsAmount.new({ user: current_user, redis: { db: $users_metadata } }) .decrement! rescue => exception CartoDB::StdoutLogger.info('Error decreasing concurrent import limit', "#{exception.message} #{exception.backtrace.inspect}") end notify(results) track_results(results, id) self end def handle_failure(supplied_exception = nil) self.success = false self.state = STATE_FAILURE if !supplied_exception.nil? && supplied_exception.respond_to?(:error_code) self.error_code = supplied_exception.error_code end log.append "ERROR!\n" log.store self.save begin CartoDB::PlatformLimits::Importer::UserConcurrentImportsAmount.new({ user: current_user, redis: { db: $users_metadata } }) .decrement! rescue => exception CartoDB::StdoutLogger.info('Error decreasing concurrent import limit', "#{exception.message} #{exception.backtrace.inspect}") end notify(results) self rescue => exception log.append "Exception: #{exception.to_s}" log.append exception.backtrace, truncate = false log.store self end def table # We can assume the owner is always who imports the data # so no need to change to a Visualization::Collection based load # TODO better to use an association for this ::Table.new(user_table: UserTable.where(id: table_id, user_id: user_id).first) end def tables table_names_array.map do |table_name| UserTable.where(name: table_name, user_id: user_id).first.service end end def table_names_array table_names.present? ? table_names.split(' ') : [] end def is_raster? ::JSON.parse(self.stats).select{ |item| item['type'] == '.tif' }.length > 0 end # Calculates the maximum timeout in seconds for a given user, to be used when performing HTTP requests # TODO: Candidate for being private if we join syncs and data imports someday # TODO: Add timeout config (if we need to change this) def self.http_timeout_for(user, assumed_kb_sec = 75*1024) if user.nil? || !user.respond_to?(:quota_in_bytes) raise ArgumentError.new('Need a User object to calculate its download speed') end if assumed_kb_sec < 1 raise ArgumentError.new('KB per second must be > 0') end (user.quota_in_bytes / assumed_kb_sec).round end def validate super errors.add(:user, "Viewer users can't create data imports") if user && user.viewer validate_collision_strategy end def final_state? [STATE_COMPLETE, STATE_FAILURE, STATE_STUCK].include?(state) end private def dispatch self.state = STATE_UPLOADING return from_table if table_copy.present? || from_query.present? if service_name == 'connector' importer, runner, datasource_provider, manual_fields = new_importer_with_connector else importer, runner, datasource_provider, manual_fields = new_importer end execute_importer importer, runner, datasource_provider, manual_fields end def running_import_ids Resque::Worker.all.map do |worker| next unless worker.job['queue'] == 'imports' worker.job['payload']['args'].first['job_id'] rescue nil end.compact end def public_url return data_source unless uploaded_file "https://#{current_user.username}.carto.com/#{uploaded_file[0]}" end def valid_uuid?(text) !!UUIDTools::UUID.parse(text) rescue TypeError false rescue ArgumentError false end def before_destroy self.remove_uploaded_resources end def instantiate_log uuid = logger if valid_uuid?(uuid) self.log = CartoDB::Log.where(id: uuid.to_s).first else self.log = CartoDB::Log.new( type: CartoDB::Log::TYPE_DATA_IMPORT, user_id: user_id ) end end def uploaded_file data_source.to_s.match(/uploads\/([a-z0-9]{20})\/.*/) end # A stuck job should've started but not be finished, so it's state should not be complete nor failed, it should # have been in the queue for more than 5 minutes and it shouldn't be currently processed by any active worker def stuck? state == STATE_STUCK || ![STATE_ENQUEUED, STATE_PENDING, STATE_COMPLETE, STATE_FAILURE].include?(state) && created_at < 5.minutes.ago && !running_import_ids.include?(id) end def from_table log.append 'from_table()' number_of_tables = 1 quota_checker = CartoDB::QuotaChecker.new(current_user) if quota_checker.will_be_over_table_quota?(number_of_tables) raise_over_table_quota_error end query = table_copy ? "SELECT * FROM #{table_copy}" : from_query new_table_name = import_from_query(table_name, query) return true unless new_table_name && !overwrite_strategy? sanitize_columns(new_table_name) self.update(table_names: new_table_name, service_name: nil) migrate_existing(new_table_name) self.results.push CartoDB::Importer2::Result.new(success: true, error: nil) rescue Sequel::DatabaseError => exception if exception.to_s =~ MERGE_WITH_UNMATCHING_COLUMN_TYPES_RE set_merge_error(8004, exception.to_s) else set_merge_error(8003, exception.to_s) end false end def import_from_query(name, query) log.append 'import_from_query()' self.data_type = TYPE_QUERY self.data_source = query save taken_names = Carto::Db::UserSchema.new(current_user).table_names if taken_names.include?(name) && collision_strategy == Carto::DataImportConstants::COLLISION_STRATEGY_SKIP log.append("Table with name #{name} already exists. Skipping") return end table_name = Carto::ValidTableNameProposer.new.propose_valid_table_name(name, taken_names: taken_names) if overwrite_strategy? overwrite_table_from_query(table_name, name, query) results.push CartoDB::Importer2::Result.new(success: true, error: nil) else current_user.db_service.in_database_direct_connection( statement_timeout: DIRECT_STATEMENT_TIMEOUT ) do |user_direct_conn| user_direct_conn.run(%{CREATE TABLE #{table_name} AS #{query}}) end end if current_user.over_disk_quota? log.append "Over storage quota. Dropping table #{table_name}" current_user.in_database.run(%{DROP TABLE #{table_name}}) self.error_code = 8001 self.state = STATE_FAILURE save raise CartoDB::QuotaExceeded, 'More storage required' end table_name end def overwrite_table_from_query(new_table_name, overwrite_table_name, query) importer = new_importer_with_unused_runner importer.overwrite_register( CartoDB::Importer2::Result.new(tables: [new_table_name]), overwrite_table_name ) do |database, schema| database.execute(%{CREATE TABLE #{new_table_name} AS #{query}}) importer.drop("\"#{schema}\".\"#{overwrite_table_name}\"") database.execute(%{ ALTER TABLE "#{schema}"."#{new_table_name}" RENAME TO "#{overwrite_table_name}" }) end end def overwrite_strategy? collision_strategy == Carto::DataImportConstants::COLLISION_STRATEGY_OVERWRITE end def sanitize_columns(table_name) Table.sanitize_columns(table_name, { connection: current_user.in_database, database_schema: current_user.database_schema, reserved_words: CartoDB::Importer2::Column::RESERVED_WORDS }) end def migrate_existing(imported_name) log.append 'migrate_existing()' table = ::Table.new table.user_id = user_id table.name = imported_name table.migrate_existing_table = imported_name table.data_import_id = self.id if table.valid? log.append 'Table valid' table.save table.optimize table.map.recalculate_bounds! if current_user.remaining_quota < 0 log.append 'Over storage quota, removing table' self.error_code = 8001 self.state = STATE_FAILURE save table.destroy raise CartoDB::QuotaExceeded, 'More storage required' end refresh self.table_id = table.id self.table_name = table.name log.append "Table '#{table.name}' registered" save true else reload log.append "Table invalid: Error linking #{imported_name} to UI: " + table.errors.full_messages.join(' - ') false end end def pg_options SequelRails.configuration.environment_for(Rails.env) .merge( username: current_user.database_username, password: current_user.database_password, database: current_user.database_name, host: current_user.database_host ) {|key, o, n| n.nil? || n.empty? ? o : n} end def ogr2ogr_options options = Cartodb.config.fetch(:ogr2ogr, {}) if options['binary'].nil? || options['csv_guessing'].nil? {} else ogr_options = { ogr2ogr_binary: options['binary'], ogr2ogr_csv_guessing: options['csv_guessing'] && self.type_guessing, quoted_fields_guessing: self.quoted_fields_guessing } if options['memory_limit'].present? ogr_options.merge!(ogr2ogr_memory_limit: options['memory_limit']) end return ogr_options end end def content_guessing_options guessing_config = Cartodb.config.fetch(:importer, {}).deep_symbolize_keys.fetch(:content_guessing, {}) geocoder_config = Cartodb.config.fetch(:geocoder, {}).deep_symbolize_keys if guessing_config[:enabled] and self.content_guessing and geocoder_config { guessing: guessing_config, geocoder: geocoder_config } else { guessing: { enabled: false } } end end # Create an Importer object (using a Runner to fetch the data). # This methods returns an array with four elements: # * importer: the new importer (nil if download errors detected) # * runner: the runner that the importer uses # * datasource_provider: the DataSource used # * manual_fields: error code and log in case of errors def new_importer manual_fields = {} had_errors = false log.append 'new_importer()' datasource_provider = get_datasource_provider # If retrieving metadata we get an error, fail early begin downloader = get_downloader(datasource_provider) rescue DataDownloadError => ex had_errors = true manual_fields = { error_code: 1012, log_info: ex.to_s } rescue ExternalServiceError => ex had_errors = true manual_fields = { error_code: 1012, log_info: ex.to_s } rescue ExternalServiceTimeoutError => ex had_errors = true manual_fields = { error_code: 1020, log_info: ex.to_s } rescue DataDownloadTimeoutError => ex had_errors = true manual_fields = { error_code: 1020, log_info: ex.to_s } rescue ResponseError => ex had_errors = true manual_fields = { error_code: 1011, log_info: ex.to_s } rescue InvalidServiceError => ex had_errors = true manual_fields = { error_code: 1013, log_info: ex.to_s } rescue InvalidInputDataError => ex had_errors = true manual_fields = { error_code: 1012, log_info: ex.to_s } rescue UnsupportedOperationError => ex had_errors = true manual_fields = { error_code: 1023, log_info: ex.to_s } rescue CartoDB::Importer2::FileTooBigError => ex had_errors = true manual_fields = { error_code: ex.error_code, log_info: CartoDB::IMPORTER_ERROR_CODES[ex.error_code] } rescue => ex had_errors = true manual_fields = { error_code: 99999, log_info: ex.to_s } end if had_errors importer = runner = datasource_provider = nil else post_import_handler = CartoDB::Importer2::PostImportHandler.new case datasource_provider.class::DATASOURCE_NAME when Url::ArcGIS::DATASOURCE_NAME post_import_handler.add_fix_geometries_task when Search::Twitter::DATASOURCE_NAME post_import_handler.add_transform_geojson_geom_column end database_options = pg_options self.host = database_options[:host] unp = CartoDB::Importer2::Unp.new(Cartodb.config[:importer], Cartodb.config[:ogr2ogr]) importer, runner = new_importer_with_runner(downloader, unp, post_import_handler) end [importer, runner, datasource_provider, manual_fields] end # Create an Importer using a ConnectorRunner to fetch the data. # This methods returns an array with two elements: # * importer: the new importer (nil if download errors detected) # * connector: the connector that the importer uses def new_importer_with_connector CartoDB::Importer2::ConnectorRunner.check_availability!(current_user) database_options = pg_options self.host = database_options[:host] connector = CartoDB::Importer2::ConnectorRunner.new( service_item_id, user: current_user, pg: database_options, log: log, collision_strategy: collision_strategy ) registrar = CartoDB::TableRegistrar.new(current_user, ::Table) quota_checker = CartoDB::QuotaChecker.new(current_user) database = current_user.in_database destination_schema = current_user.database_schema public_user_roles = current_user.db_service.public_user_roles overviews_creator = CartoDB::Importer2::Overviews.new(connector, current_user) importer = CartoDB::Connector::Importer.new( runner: connector, table_registrar: registrar, quota_checker: quota_checker, database: database, data_import_id: id, overviews_creator: overviews_creator, destination_schema: destination_schema, public_user_roles: public_user_roles ) [importer, connector, nil, nil] end # Create an Importer object (using a Runner to fetch the data). # This methods returns an array with two elements: # * importer: the new importer (nil if download errors detected) # * runner: the runner that the importer uses def new_importer_with_runner(downloader, unpacker, post_import_handler) runner = CartoDB::Importer2::Runner.new( pg: pg_options, downloader: downloader, log: log, user: user, unpacker: unpacker, post_import_handler: post_import_handler, importer_config: Cartodb.config[:importer], collision_strategy: collision_strategy ) runner.loader_options = ogr2ogr_options.merge content_guessing_options runner.set_importer_stats_host_info(Socket.gethostname) registrar = CartoDB::TableRegistrar.new(current_user, ::Table) quota_checker = CartoDB::QuotaChecker.new(current_user) database = current_user.in_database destination_schema = current_user.database_schema public_user_roles = current_user.db_service.public_user_roles overviews_creator = CartoDB::Importer2::Overviews.new(runner, current_user) importer = CartoDB::Connector::Importer.new( runner: runner, table_registrar: registrar, quota_checker: quota_checker, database: database, data_import_id: id, overviews_creator: overviews_creator, destination_schema: destination_schema, public_user_roles: public_user_roles, collision_strategy: collision_strategy ) [importer, runner] end # Create an Importer object with a runner that it's not able to fetch data. # This method is useful when you just need some logic from the Importer class # This methods returns an array with two elements: # * importer: the new importer (nil if download errors detected) def new_importer_with_unused_runner importer, = new_importer_with_runner(nil, nil, nil) importer end # Run importer, store results and return success state. def execute_importer(importer, runner, datasource_provider = nil, manual_fields = nil) if importer tracker = lambda do |state| self.state = state save end log.append 'Before importer run' importer.run(tracker) log.append 'After importer run' end store_results(importer, runner, datasource_provider, manual_fields) importer.nil? ? false : importer.success? rescue => e # Note: If this exception is not treated, results will not be defined # and the import will finish with a null error_code set_error(manual_fields.fetch(:error_code, 99999)) raise e end # Note: Assumes that if importer is nil an error happened # @param importer CartoDB::Connector::Importer|nil # @param runner CartoDB::Importer2::Runner|nil # @param datasource_provider mixed|nil # @param manual_fields Hash def store_results(importer=nil, runner=nil, datasource_provider=nil, manual_fields={}) if importer.nil? set_error(manual_fields.fetch(:error_code, 99999), manual_fields.fetch(:log_info, nil)) else self.results = importer.results self.error_code = importer.error_code self.rejected_layers = importer.rejected_layers.join(',') if !importer.rejected_layers.empty? self.runner_warnings = runner.warnings.to_json if !runner.warnings.empty? # http_response_code is only relevant if a direct download is performed if runner && datasource_provider && datasource_provider.providers_download_url? self.http_response_code = runner.downloader.http_response_code end # Table.after_create() setted fields that won't be saved to "final" data import unless specified here self.table_name = importer.table.name if importer.success? && importer.table self.table_id = importer.table.id if importer.success? && importer.table if importer.success? update_visualization_id(importer) end update_synchronization(importer) importer.success? ? set_datasource_audit_to_complete(datasource_provider, importer.success? && importer.table ? importer.table.id : nil) : set_datasource_audit_to_failed(datasource_provider) end unless runner.nil? self.stats = ::JSON.dump(runner.stats) end end def update_visualization_id(importer) if importer.data_import.create_visualization self.visualization_id = importer.data_import.visualization_id end end def update_synchronization(importer) if synchronization_id log.type = CartoDB::Log::TYPE_SYNCHRONIZATION log.store log.append "synchronization_id: #{synchronization_id}" synchronization = CartoDB::Synchronization::Member.new(id: synchronization_id).fetch synchronization.name = self.table_name synchronization.log_id = log.id if importer.success? imported_table = ::Table.get_by_table_id(self.table_id) if !imported_table.nil? && imported_table.table_visualization synchronization.visualization_id = imported_table.table_visualization.id end synchronization.state = 'success' synchronization.error_code = nil synchronization.error_message = nil else synchronization.state = 'failure' synchronization.error_code = error_code.blank? ? 9999 : error_code synchronization.error_message = get_error_text[:title] + ' ' + get_error_text[:what_about] end log.append "importer.success? #{synchronization.state}" synchronization.store end end def get_datasource_provider return nil if service_name == 'connector' datasource_name = (service_name.nil? || service_name.size == 0) ? Url::PublicUrl::DATASOURCE_NAME : service_name if service_item_id.nil? || service_item_id.size == 0 self.service_item_id = data_source end get_datasource(datasource_name, service_item_id) end def get_downloader(datasource_provider) log.append "Fetching datasource #{datasource_provider} metadata for item id #{service_item_id}" metadata = datasource_provider.get_resource_metadata(service_item_id) if hit_platform_limit?(datasource_provider, metadata, current_user) raise CartoDB::Importer2::FileTooBigError.new(metadata.inspect) end if datasource_provider.providers_download_url? metadata_url = metadata[:url] resource_url = (metadata_url.present? && datasource_provider.providers_download_url?) ? metadata_url : data_source log.append "File will be downloaded from #{resource_url}" http_options = { http_timeout: ::DataImport.http_timeout_for(current_user) } CartoDB::Importer2::Downloader.new(current_user.id, resource_url, http_options, importer_config: Cartodb.config[:importer]) else log.append 'Downloading file data from datasource' http_timeout = ::DataImport.http_timeout_for(current_user) options = { http_timeout: http_timeout, importer_config: Cartodb.config[:importer], user_id: current_user.id } CartoDB::Importer2::DatasourceDownloader.new(datasource_provider, metadata, options, log) end end def hit_platform_limit?(datasource, metadata, user) if datasource.has_resource_size?(metadata) CartoDB::PlatformLimits::Importer::InputFileSize.new({ user: user }) .is_over_limit!(metadata[:size]) else false end end def current_user @current_user ||= ::User[user_id] end def notify(results) owner = ::User.where(:id => self.user_id).first imported_tables = results.select {|r| r.success }.length failed_tables = results.length - imported_tables # Calculate total size out of stats total_size = 0 ::JSON.parse(stats).each { |stat| total_size += stat ? stat['size'] : 0 } importer_stats_aggregator.update_counter('total_size', total_size) import_time = self.updated_at - self.created_at cartodbfy_throughtput = (cartodbfy_time == 0.0 ? nil : (total_size / cartodbfy_time)) import_log = {'user' => owner.username, 'state' => self.state, 'tables' => results.length, 'imported_tables' => imported_tables, 'failed_tables' => failed_tables, 'error_code' => self.error_code, 'import_timestamp' => Time.now, 'queue_server' => `hostname`.strip, 'database_host' => owner.database_host, 'service_name' => self.service_name, 'data_type' => self.data_type, 'is_sync_import' => !self.synchronization_id.nil?, 'import_time' => import_time, 'file_stats' => ::JSON.parse(self.stats), 'resque_ppid' => self.resque_ppid, 'user_timeout' => ::DataImport.http_timeout_for(current_user), 'error_source' => get_error_source, 'id' => self.id, 'total_size' => total_size, 'cartodbfy_time' => self.cartodbfy_time, 'import_throughput' => (total_size / import_time), 'cartodbfy_throughtput' => cartodbfy_throughtput, 'cartodbfy_import_ratio' => (self.cartodbfy_time / import_time) } if !self.extra_options.nil? import_log['extra_options'] = self.extra_options end import_log.merge!(decorate_log(self)) dataimport_logger.info(import_log.to_json) CartoDB::Importer2::MailNotifier.new(self, results, ::Resque).notify_if_needed user_id = user.id properties = { user_id: user_id, connection: { imported_from: service_name, data_from: data_type, sync: sync? } } if results.any? results.each do |result| CartoDB::Metrics.new.report(:import, payload_for(result)) properties[:connection][:file_type] = result.extension if result.success? Carto::Tracking::Events::CompletedConnection.new(user_id, properties).report else Carto::Tracking::Events::FailedConnection.new(user_id, properties).report end end elsif state == STATE_FAILURE Carto::Tracking::Events::FailedConnection.new(user_id, properties).report end end def importer_stats_aggregator @importer_stats_aggregator ||= CartoDB::Stats::Importer.instance end def decorate_log(data_import) decoration = { retrieved_items: 0 } if data_import.success && data_import.table_id && data_import.from_query.nil? && data_import.table_copy.nil? datasource = get_datasource_provider if datasource && datasource.persists_state_via_data_import? decoration = datasource.get_audit_stats end end decoration end def payload_for(result=nil) log.store payload = { file_url: public_url, distinct_id: current_user.username, username: current_user.username, account_type: current_user.account_type, database: current_user.database_name, email: current_user.email, log: log.to_s } payload.merge!( name: result.name, extension: result.extension, success: result.success, error_code: result.error_code, ) if result payload.merge!( file_url_hostname: URI.parse(public_url).hostname ) if public_url rescue nil payload.merge!(error_title: get_error_text[:title]) if state == STATE_FAILURE payload end # @param datasource_name String # @param service_item_id String|nil # @return mixed|nil # @throws DataSourceError def get_datasource(datasource_name, service_item_id) begin oauth = current_user.oauths.select(datasource_name) # Tables metadata DB also store resque data datasource = DatasourcesFactory.get_datasource( datasource_name, current_user, { http_timeout: ::DataImport.http_timeout_for(current_user), redis_storage: $tables_metadata, user_defined_limits: ::JSON.parse(user_defined_limits).symbolize_keys }) datasource.token = oauth.token unless oauth.nil? rescue => ex log.append "Exception: #{ex.message}" log.append ex.backtrace, truncate = false CartoDB.report_exception(ex, 'Import error: ', error_info: ex.message + ex.backtrace.join) raise CartoDB::DataSourceError.new("Datasource #{datasource_name} could not be instantiated") end if service_item_id.nil? raise CartoDB::DataSourceError.new("Datasource #{datasource_name} without item id") end if datasource.persists_state_via_data_import? datasource.data_import_item = self end datasource end def set_error(error_code, log_info='') log.append("Additional error info: #{log_info}") unless log_info.empty? self.results = [CartoDB::Importer2::Result.new( success: false, error_code: error_code )] self.error_code = error_code self.state = STATE_FAILURE end def set_merge_error(error_code, log_info='') log.append("Going to set merge error with code #{error_code}") set_error(error_code, log_info) end def set_datasource_audit_to_complete(datasource, table_id = nil) if datasource && datasource.persists_state_via_data_import? datasource.data_import_item = self datasource.set_audit_to_completed(table_id) end end def set_datasource_audit_to_failed(datasource) if datasource && datasource.persists_state_via_data_import? datasource.data_import_item = self datasource.set_audit_to_failed end end def track_results(results, import_id) current_user_id = current_user.id return unless current_user_id if visualization_id Carto::Tracking::Events::CreatedMap.new(current_user_id, user_id: current_user_id, visualization_id: visualization_id, origin: 'import').report end results.select(&:success?).each do |result| condition, origin = if result.name [{ data_import_id: import_id, name: result.name }, from_common_data? ? 'common-data' : 'import'] else [{ data_import_id: import_id }, 'copy'] end user_table = ::UserTable.where(condition).first map = user_table.map if user_table if map vis = Carto::Visualization.where(map_id: map.id).first Carto::Tracking::Events::CreatedDataset.new(current_user_id, user_id: current_user_id, visualization_id: vis.id, origin: origin).report end end rescue => exception CartoDB::Logger.warning(message: 'Carto::Tracking: Couldn\'t report event', exception: exception) end def sync? synchronization_id.present? end end