cartodb/services/table-geocoder/lib/geocoder_cache.rb

178 lines
6.6 KiB
Ruby
Raw Normal View History

2020-06-15 10:58:47 +08:00
require_relative 'exceptions'
require_relative '../../../lib/carto/http/client'
module CartoDB
class GeocoderCache
DEFAULT_BATCH_SIZE = 5000
DEFAULT_MAX_ROWS = 1000000
HTTP_CONNECT_TIMEOUT = 60
HTTP_DEFAULT_TIMEOUT = 600
attr_reader :connection, :working_dir, :table_name, :hits, :misses,
:max_rows, :sql_api, :formatter, :cache_results
def initialize(arguments)
@sql_api = arguments.fetch(:sql_api)
@connection = arguments.fetch(:connection)
@table_name = arguments.fetch(:table_name)
@qualified_table_name = arguments.fetch(:qualified_table_name)
@working_dir = arguments[:working_dir] || Dir.mktmpdir
`chmod 777 #{@working_dir}`
@formatter = arguments.fetch(:formatter)
@max_rows = arguments[:max_rows] || DEFAULT_MAX_ROWS
@cache_results = nil
@batch_size = arguments[:batch_size] || DEFAULT_BATCH_SIZE
@cache_results = File.join(working_dir, "#{temp_table_name}_results.csv")
@usage_metrics = arguments.fetch(:usage_metrics)
@log = arguments.fetch(:log)
init_rows_count
end
def run
@log.append_and_store "Started searching previous geocoded results in geocoder cache"
get_cache_results
create_temp_table
load_results_to_temp_table
@hits = connection.select.from(temp_table_name).where('longitude is not null and latitude is not null').count.to_i
copy_results_to_table
@log.append_and_store "Finished geocoder cache job"
rescue => e
@log.append_and_store "Error getting results from geocoder cache: #{e.inspect}"
handle_cache_exception e
ensure
@usage_metrics.incr(:geocoder_cache, :total_requests, @total_rows)
@usage_metrics.incr(:geocoder_cache, :success_responses, @hits)
@usage_metrics.incr(:geocoder_cache, :empty_responses, (@total_rows - @hits - @failed_rows))
@usage_metrics.incr(:geocoder_cache, :failed_responses, @failed_rows)
update_log_stats
end
def get_cache_results
begin
count = count + 1 rescue 0
limit = [@batch_size, @max_rows - (count * @batch_size)].min
rows = connection.fetch(%Q{
SELECT DISTINCT(md5(#{formatter})) AS searchtext
FROM #{@qualified_table_name}
WHERE cartodb_georef_status IS NULL
LIMIT #{limit} OFFSET #{count * @batch_size}
}).all
@total_rows += rows.size
sql = "WITH addresses(address) AS (VALUES "
sql << rows.map { |r| "('#{r[:searchtext]}')" }.join(',')
sql << ") SELECT DISTINCT ON(geocode_string) st_x(g.the_geom) longitude, st_y(g.the_geom) latitude,g.geocode_string FROM addresses a INNER JOIN #{sql_api[:table_name]} g ON md5(g.geocode_string)=a.address"
response = run_query(sql, 'csv').gsub(/\A.*/, '').gsub(/^$\n/, '')
File.open(cache_results, 'a') { |f| f.write(response.force_encoding("UTF-8")) } unless response == "\n"
end while rows.size >= @batch_size && (count * @batch_size) + rows.size < @max_rows
end
def store
begin
count = count + 1 rescue 0
sql = %Q{
WITH
-- write the new values
n(searchtext, the_geom) AS (
VALUES %%VALUES%%
),
-- update existing rows
upsert AS (
UPDATE #{sql_api[:table_name]} o
SET updated_at = NOW()
FROM n WHERE o.geocode_string = n.searchtext
RETURNING o.geocode_string
)
-- insert missing rows
INSERT INTO #{sql_api[:table_name]} (geocode_string,the_geom)
SELECT n.searchtext, n.the_geom FROM n
WHERE n.searchtext NOT IN (
SELECT geocode_string FROM upsert
);
}
rows = connection.fetch(%Q{
SELECT DISTINCT(quote_nullable(#{formatter})) AS searchtext, the_geom
FROM #{@qualified_table_name} AS orig
WHERE orig.cartodb_georef_status IS TRUE AND the_geom IS NOT NULL
LIMIT #{@batch_size} OFFSET #{count * @batch_size}
}).all
sql.gsub! '%%VALUES%%', rows.map { |r| "(#{r[:searchtext]}, '#{r[:the_geom]}')" }.join(',')
run_query(sql) if rows && rows.size > 0
end while rows.size >= @batch_size
rescue => e
handle_cache_exception e
ensure
drop_temp_table
end
def create_temp_table
connection.run(%Q{
CREATE TABLE #{temp_table_name} (
longitude text, latitude text, geocode_string text
);
})
end
def load_results_to_temp_table
connection.copy_into(Sequel.lit(temp_table_name), data: File.read(cache_results), format: :csv)
end
def copy_results_to_table
connection.run(%Q{
UPDATE #{@qualified_table_name} AS dest
SET the_geom = ST_GeomFromText(
'POINT(' || orig.longitude || ' ' || orig.latitude || ')', 4326
),
cartodb_georef_status = TRUE
FROM #{temp_table_name} AS orig
WHERE #{formatter} = orig.geocode_string
})
end
def drop_temp_table
connection.run("DROP TABLE IF EXISTS #{temp_table_name}")
end
def temp_table_name
@temp_table_name ||= "geocoding_cache_#{Time.now.to_i}"
end
def run_query(query, format = '')
params = { q: query, api_key: sql_api[:api_key], format: format }
http_client = Carto::Http::Client.get('geocoder_cache',
log_requests: true)
response = http_client.post(sql_api[:base_url],
body: URI.encode_www_form(params),
connecttimeout: HTTP_CONNECT_TIMEOUT,
timeout: HTTP_DEFAULT_TIMEOUT)
response.body
end
# It handles in such a way that the caching is silently stopped
def handle_cache_exception(exception)
drop_temp_table
if exception.class == Sequel::DatabaseError && exception.message =~ /canceling statement due to statement timeout/
# for the moment we just wrap the exception to get a specific error in rollbar
exception = Carto::GeocoderErrors::GeocoderCacheDbTimeoutError.new(exception)
end
# In case we get some error we are going to pass all the rows as failed
@failed_rows = @total_rows
CartoDB.notify_exception(exception)
end
private
def init_rows_count
@hits = 0
@total_rows = 0
@failed_rows = 0
end
def update_log_stats
@log.append_and_store "Geocoding cache stats update. "\
"Total rows: #{@total_rows} "\
"--- Hits: #{@hits} --- Failed: #{@failed_rows}"
end
end
end