96 lines
2.6 KiB
Ruby
96 lines
2.6 KiB
Ruby
|
module Carto
|
||
|
class Bolt
|
||
|
|
||
|
include ::LoggerHelper
|
||
|
|
||
|
DEFAULT_TTL_MS = 10000
|
||
|
DEFAULT_RETRY_ATTEMPTS = 1
|
||
|
DEFAULT_RETRY_TIMEOUT = 10000 # in_ms
|
||
|
|
||
|
def initialize(bolt_key, redis_object: $users_metadata, ttl_ms: DEFAULT_TTL_MS)
|
||
|
@bolt_key = add_namespace_to_key(bolt_key)
|
||
|
@redis_object = redis_object
|
||
|
@ttl_ms = ttl_ms
|
||
|
end
|
||
|
|
||
|
# Run a block of code with the lock acquired.
|
||
|
# It will retry acquiring the lock up to `attempts` times and
|
||
|
# for up to `timeout` milliseconds.
|
||
|
# If an executable (lambda/Proc) object is passed through `fail_function`
|
||
|
# it will be executed if the lock is not acquired and another such proc hasn't
|
||
|
# been executed during the lock period (before a new locked execution).
|
||
|
# This can be used to reschedule execution while avoiding to reschedule
|
||
|
# additional executions while one is pending.
|
||
|
def run_locked(attempts: DEFAULT_RETRY_ATTEMPTS,
|
||
|
timeout: DEFAULT_RETRY_TIMEOUT,
|
||
|
fail_function: nil)
|
||
|
raise 'no code block given' unless block_given?
|
||
|
raise 'no proc/lambda passed as fail_function' if fail_function.present? && !proc?(fail_function)
|
||
|
|
||
|
lock_acquired = acquire_lock(attempts, timeout)
|
||
|
|
||
|
begin
|
||
|
if lock_acquired
|
||
|
retried
|
||
|
yield
|
||
|
true
|
||
|
else
|
||
|
if fail_function && !set_retry
|
||
|
fail_function.call
|
||
|
end
|
||
|
false
|
||
|
end
|
||
|
ensure
|
||
|
unlock if lock_acquired
|
||
|
end
|
||
|
end
|
||
|
|
||
|
private
|
||
|
|
||
|
def acquire_lock(attempts, timeout)
|
||
|
attempts.times do |index|
|
||
|
lock_acquired = get_lock
|
||
|
# With only 1 attempt, the default value, we dont sleep
|
||
|
# even if false
|
||
|
if lock_acquired || (attempts == index + 1)
|
||
|
return lock_acquired
|
||
|
end
|
||
|
sleep((timeout / 1000.0).second)
|
||
|
end
|
||
|
log_warning(message: "Couldn't acquire bolt", attempts: attempts, timeout: timeout)
|
||
|
false
|
||
|
end
|
||
|
|
||
|
def proc?(proc)
|
||
|
proc.respond_to?(:call)
|
||
|
end
|
||
|
|
||
|
def unlock
|
||
|
removed_keys = @redis_object.del(@bolt_key)
|
||
|
|
||
|
# This may happen due to Redis failure. Highly unlikely, still nice to know.
|
||
|
if removed_keys > 1
|
||
|
CartoDB.notify_error('Removed bolt key was duplicated', bolt_key: @bolt_key, amount: removed_keys)
|
||
|
end
|
||
|
|
||
|
removed_keys > 0
|
||
|
end
|
||
|
|
||
|
def get_lock
|
||
|
@redis_object.set(@bolt_key, true, px: @ttl_ms, nx: true)
|
||
|
end
|
||
|
|
||
|
def retried
|
||
|
@redis_object.del("#{@bolt_key}:retry")
|
||
|
end
|
||
|
|
||
|
def set_retry
|
||
|
@redis_object.getset("#{@bolt_key}:retry", true)
|
||
|
end
|
||
|
|
||
|
def add_namespace_to_key(key)
|
||
|
"rails:bolt:#{key}"
|
||
|
end
|
||
|
end
|
||
|
end
|