require 'open3' require 'nokogiri' require 'csv' require 'active_support/core_ext/numeric' require_relative '../../../lib/carto/http/client' require_relative 'hires_geocoder_interface' require_relative 'geocoder_config' module CartoDB class HiresBatchGeocoder < HiresGeocoderInterface DEFAULT_TIMEOUT = 5.hours POLLING_SLEEP_TIME = 5.seconds LOGGING_TIME = 5.minutes DOWNLOAD_RETRIES = 5 DOWLOAD_RETRY_SLEEP = 5.seconds # Generous timeouts, overriden for big files upload/download HTTP_CONNECTION_TIMEOUT = 60 HTTP_REQUEST_TIMEOUT = 600 # Options for the csv upload endpoint of the Batch Geocoder API UPLOAD_OPTIONS = { action: 'run', indelim: ',', outdelim: ',', header: false, outputCombined: false, outcols: "displayLatitude,displayLongitude" } # INFO: the request_id is the most important thing to care for batch requests # INFO: it is called remote_id in upper layers attr_reader :base_url, :request_id, :app_id, :token, :mailto, :status, :processed_rows, :processed_rows, :successful_processed_rows, :failed_processed_rows, :empty_processed_rows, :total_rows, :dir, :input_file class ServiceDisabled < StandardError; end def initialize(input_csv_file, working_dir, log, geocoding_model) @input_file = input_csv_file @dir = working_dir @log = log @geocoding_model = geocoding_model @base_url = config.fetch('base_url') @app_id = config.fetch('app_id') @token = config.fetch('token') @mailto = config.fetch('mailto') @used_batch_request = true begin @batch_api_disabled = config['batch_api_disabled'] == true rescue StandardError @batch_api_disabled = false end end def run init_rows_count @log.append_and_store "Started batched Here geocoding job" @started_at = Time.now change_status('running') upload # INFO: this loop polls for the state of the table_geocoder batch process update_status until ['completed', 'cancelled'].include? @geocoding_model.state do if timeout? begin change_status('timeout') cancel ensure @log.append_and_store "Proceding to cancel job due timeout" end end break if ['failed', 'timeout'].include? @geocoding_model.state sleep polling_sleep_time # We don't want to change the status if the job has been cancelled by the user update_status update_log_stats end update_status update_log_stats change_status('completed') @log.append_and_store "Geocoding Hires job has finished" ensure # Processed data at the end of the job update_status update_log_stats(false) end def upload assert_batch_api_enabled @used_batch_request = true response = http_client.post( api_url(UPLOAD_OPTIONS), body: File.open(input_file, "r").read, headers: { "Content-Type" => "text/plain" }, timeout: 5.hours # more than generous timeout for big file upload ) handle_api_error(response) @request_id = extract_response_field(response.body, '//Response/MetaInfo/RequestId') # TODO: this is a critical error, deal with it appropriately raise 'Could not get the request ID' unless @request_id # Update geocodings model with needed data @geocoding_model.remote_id = @request_id @geocoding_model.batched = true @geocoding_model.save @log.append_and_store "Job sent to HERE, job id: #{@request_id}" @request_id end def used_batch_request? @used_batch_request end def cancel if @geocoding_model.remote_id.nil? @log.append_and_store "Can't cancel a HERE geocoder job without the request id" else @log.append_and_store "Trying to cancel a batch job sent to HERE" assert_batch_api_enabled response = http_client.put(api_url(action: 'cancel'), connecttimeout: HTTP_CONNECTION_TIMEOUT, timeout: HTTP_REQUEST_TIMEOUT) if is_cancellable?(response) @log.append_and_store "Job was already cancelled" else handle_api_error(response) update_stats(response) @log.append_and_store "Job sent to HERE has been cancelled" end change_status('cancelled') end end def update_status assert_batch_api_enabled response = http_client.get(api_url(action: 'status'), connecttimeout: HTTP_CONNECTION_TIMEOUT, timeout: HTTP_REQUEST_TIMEOUT) handle_api_error(response) update_stats(response) end def assert_batch_api_enabled raise ServiceDisabled if @batch_api_disabled end def result return @result unless @result.nil? raise 'No request_id provided' unless @geocoding_model.remote_id results_filename = File.join(dir, "#{@geocoding_model.remote_id}.zip") download_url = api_url({}, 'result') download_status_code = nil retries = 0 while true if(!download_status_code.nil? && download_status_code == 200) break elsif !download_status_code.nil? && download_status_code == 404 # 404 means that the results file is not ready yet sleep DOWLOAD_RETRY_SLEEP retries += 1 elsif retries >= DOWNLOAD_RETRIES raise 'Download request failed: Too many retries, should be a problem with HERE servers' elsif !download_status_code.nil? && download_status_code > 200 && download_status_code != 404 raise "Download request failed: Http status code #{download_status_code}" end download_status_code = execute_results_request(download_url, results_filename) end @result = results_filename end private def execute_results_request(download_url, results_filename) download_status_code = nil # generous timeout for download of results request = http_client.request(download_url, method: :get, timeout: 5.hours) File.open(results_filename, 'wb') do |download_file| request.on_headers do |response| download_status_code = response.response_code end request.on_body do |chunk| if download_status_code == 200 download_file.write(chunk) end end request.on_complete do |response| download_status_code = response.response_code end request.run end return download_status_code end def config GeocoderConfig.instance.get end def http_client @http_client ||= Carto::Http::Client.get('hires_batch_geocoder', log_requests: true) end def api_url(arguments, extra_components = nil) arguments.merge!(app_id: app_id, token: token, mailto: mailto) components = [base_url] # We use the persisted remote_id because we don't have request_id # in the cancel case due is an instance variable components << @geocoding_model.remote_id unless @geocoding_model.remote_id.nil? components << extra_components unless extra_components.nil? components << '?' + URI.encode_www_form(arguments) components.join('/') end def extract_response_field(response, query) Nokogiri::XML(response).xpath("#{query}").first.content rescue NoMethodError => e CartoDB.notify_exception(e) nil end def extract_numeric_response_field(response, query) value = extract_response_field(response, query) return nil if value.blank? Integer(value) rescue ArgumentError => e CartoDB.notify_error("Batch geocoder value error", error: e.message, value: value) nil end def handle_api_error(response) if response.success? == false message = extract_response_field(response.body, '//Details') @failed_processed_rows = number_of_input_file_rows if not input_file.nil? change_status('failed') raise "Geocoding API communication failure: #{message}" end end def default_timeout DEFAULT_TIMEOUT end def polling_sleep_time POLLING_SLEEP_TIME end def number_of_input_file_rows stdout, _status = Open3.capture2('wc', '-l', input_file) stdout.to_i end def update_stats(response) @status = extract_response_field(response.body, '//Response/Status') change_status(@status) @processed_rows = extract_numeric_response_field(response.body, '//Response/ProcessedCount') @successful_processed_rows = extract_numeric_response_field(response.body, '//Response/SuccessCount') # addresses that could not be matched @empty_processed_rows = extract_numeric_response_field(response.body, '//Response/ErrorCount') # invalid input that could not be processed @failed_processed_rows = extract_numeric_response_field(response.body, '//Response/InvalidCount') @total_rows = extract_numeric_response_field(response.body, '//Response/TotalCount') end def init_rows_count @processed_rows = 0 @successful_processed_rows = 0 @empty_processed_rows = 0 @failed_processed_rows = 0 @total_rows = 0 end def update_log_stats(spaced_by_time=true) @last_logging_time ||= Time.now # We don't want to log every few seconds because this kind # of jobs could last for hours if (not spaced_by_time) || (Time.now - @last_logging_time) > LOGGING_TIME @log.append_and_store "Geocoding job status update. "\ "Status: #{@geocoding_model.state} --- Processed rows: #{@processed_rows} "\ "--- Success: #{@successful_processed_rows} --- Empty: #{@empty_processed_rows} "\ "--- Failed: #{@failed_processed_rows}" @last_logging_time = Time.now end end def timeout? (Time.now - @started_at) > default_timeout end def change_status(status) @status = status # The cancelled status should prevail to abort the job @geocoding_model.refresh if status != @geocoding_model.state && (not (@geocoding_model.cancelled? || @geocoding_model.timeout?)) @geocoding_model.state = status @geocoding_model.save end end def is_cancellable?(response) message = extract_response_field(response.body, '//Details') response.response_code == 400 && message =~ /CANNOT CANCEL THE COMPLETED, DELETED, FAILED OR ALREADY CANCELLED JOB/ end end end