|
|
|
@ -25,6 +25,8 @@ module CartoDB
|
|
|
|
|
HTTP_CONNECTION_TIMEOUT = 60
|
|
|
|
|
# In seconds, for the full request
|
|
|
|
|
HTTP_TIMEOUT = 60
|
|
|
|
|
# In seconds, for writting to logs
|
|
|
|
|
LOG_TIMEOUT = 120
|
|
|
|
|
|
|
|
|
|
# Amount to multiply or divide
|
|
|
|
|
BLOCK_FACTOR = 2
|
|
|
|
@ -32,12 +34,12 @@ module CartoDB
|
|
|
|
|
# GeoJSON can get too big in memory, or ArcGIS have mem problems, so keep reasonable number
|
|
|
|
|
MAX_BLOCK_SIZE = 100
|
|
|
|
|
# In seconds, use 0 to disable
|
|
|
|
|
BLOCK_SLEEP_TIME = 1
|
|
|
|
|
BLOCK_SLEEP_TIME = 0
|
|
|
|
|
|
|
|
|
|
# Each retry will be after SLEEP_REQUEST_TIME^(current_retries_count). Set to 0 to disable retrying
|
|
|
|
|
MAX_RETRIES = 0
|
|
|
|
|
SLEEP_REQUEST_TIME = 3
|
|
|
|
|
SKIP_FAILED_IDS = true
|
|
|
|
|
MAX_RETRIES = 2
|
|
|
|
|
SLEEP_REQUEST_TIME = 5
|
|
|
|
|
SKIP_FAILED_IDS = false
|
|
|
|
|
|
|
|
|
|
# Used to display more data only (for local debugging purposes)
|
|
|
|
|
DEBUG = false
|
|
|
|
@ -77,6 +79,7 @@ module CartoDB
|
|
|
|
|
@current_stream_status = true
|
|
|
|
|
@last_stream_status = true
|
|
|
|
|
@ids = nil
|
|
|
|
|
@log_timer = nil
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Factory method
|
|
|
|
@ -112,6 +115,14 @@ module CartoDB
|
|
|
|
|
raise 'Not supported by this datasource'
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def timed_log(s)
|
|
|
|
|
t2 = Time.now
|
|
|
|
|
if ((@log_timer == nil) || ((t2 - @log_timer) > LOG_TIMEOUT))
|
|
|
|
|
@logger.append_and_store(s) if @logger != nil
|
|
|
|
|
@log_timer = t2
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Initial stream, to be used for container creation (table usually)
|
|
|
|
|
# @param id string
|
|
|
|
|
# @return String
|
|
|
|
@ -126,8 +137,10 @@ module CartoDB
|
|
|
|
|
first_item = get_by_ids(@url, [@ids.slice!(0)], @metadata[:fields])
|
|
|
|
|
@ids_retrieved += 1
|
|
|
|
|
|
|
|
|
|
# Start optimistic
|
|
|
|
|
@block_size = [MAX_BLOCK_SIZE, @metadata[:max_records_per_query]].min
|
|
|
|
|
timed_log("Retrieved the first element (Total elements: #{@ids_total})")
|
|
|
|
|
|
|
|
|
|
# Start with a pesimistic setup
|
|
|
|
|
@block_size = [MIN_BLOCK_SIZE * BLOCK_FACTOR, MAX_BLOCK_SIZE, @metadata[:max_records_per_query]].min
|
|
|
|
|
|
|
|
|
|
::JSON.dump(first_item)
|
|
|
|
|
end
|
|
|
|
@ -144,7 +157,7 @@ module CartoDB
|
|
|
|
|
puts "#{@ids_retrieved}/#{@ids_total} (#{ids_block.length})" if DEBUG
|
|
|
|
|
|
|
|
|
|
items = get_by_ids(@url, ids_block, @metadata[:fields])
|
|
|
|
|
@last_stream_status = @current_stream_status
|
|
|
|
|
timed_log("Downloaded a chunk of #{ids_block.length} ids (#{@ids_retrieved + ids_block.length} so far)")
|
|
|
|
|
@current_stream_status = true
|
|
|
|
|
retries = 0
|
|
|
|
|
sleep(BLOCK_SLEEP_TIME) unless BLOCK_SLEEP_TIME == 0
|
|
|
|
@ -153,10 +166,11 @@ module CartoDB
|
|
|
|
|
if SKIP_FAILED_IDS
|
|
|
|
|
items = []
|
|
|
|
|
else
|
|
|
|
|
@logger.append_and_store("Too many download failures. Giving up.") if @logger != nil
|
|
|
|
|
raise exception
|
|
|
|
|
end
|
|
|
|
|
else
|
|
|
|
|
@last_stream_status = @current_stream_status
|
|
|
|
|
timed_log("FAILED to download a chunk of #{ids_block.length} ids (#{@ids_retrieved} ids already downloaded). Retrying...")
|
|
|
|
|
@current_stream_status = false
|
|
|
|
|
# Add back, next pass will get fewer items
|
|
|
|
|
@ids = ids_block + @ids
|
|
|
|
@ -482,17 +496,15 @@ module CartoDB
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# By default, will update the block size, incrementing or decrementing it according to stream operation results
|
|
|
|
|
# Block size only gets incremented after 2 successful streams to avoid scenario of:
|
|
|
|
|
# X items -> FAIL
|
|
|
|
|
# X/2 items -> PASS
|
|
|
|
|
# X items -> FAIL (again, because erroring item was at second half of X)
|
|
|
|
|
# We decrement faster (BLOCK_FACTOR ** BLOCK_FACTOR) then we increment on success (BLOCK_FACTOR) to
|
|
|
|
|
# reduce the load of the remote server faster
|
|
|
|
|
def block_size(update=true)
|
|
|
|
|
if update
|
|
|
|
|
if @current_stream_status && @last_stream_status && @block_size < MAX_BLOCK_SIZE
|
|
|
|
|
if @current_stream_status && (@block_size < MAX_BLOCK_SIZE)
|
|
|
|
|
@block_size = [@block_size * BLOCK_FACTOR, MAX_BLOCK_SIZE].min
|
|
|
|
|
end
|
|
|
|
|
if !@current_stream_status && @block_size > MIN_BLOCK_SIZE
|
|
|
|
|
@block_size = [[(@block_size / BLOCK_FACTOR).floor, 1].max, MAX_BLOCK_SIZE].min
|
|
|
|
|
@block_size = [[(@block_size / (BLOCK_FACTOR ** BLOCK_FACTOR)).floor, MIN_BLOCK_SIZE].max, MAX_BLOCK_SIZE].min
|
|
|
|
|
end
|
|
|
|
|
@block_size = [@block_size, @metadata[:max_records_per_query]].min
|
|
|
|
|
end
|
|
|
|
|