|
|
|
@ -11,24 +11,35 @@ module Carto
|
|
|
|
|
@ttl_ms = ttl_ms
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
# Run a block of code with the lock acquired.
|
|
|
|
|
# It will retry acquiring the lock up to `attempts` times and
|
|
|
|
|
# for up to `timeout` milliseconds.
|
|
|
|
|
# If an executable (lambda/Proc) object is passed through `fail_function`
|
|
|
|
|
# it will be executed if the lock is not acquired and another such proc hasn't
|
|
|
|
|
# been executed during the lock period (before a new locked execution).
|
|
|
|
|
# This can be used to reschedule execution while avoiding to reschedule
|
|
|
|
|
# additional executions while one is pending.
|
|
|
|
|
def run_locked(attempts: DEFAULT_RETRY_ATTEMPTS,
|
|
|
|
|
timeout: DEFAULT_RETRY_TIMEOUT,
|
|
|
|
|
rerun_func: nil)
|
|
|
|
|
fail_function: nil)
|
|
|
|
|
raise 'no code block given' unless block_given?
|
|
|
|
|
raise 'no proc/lambda passed as rerun_func' if rerun_func.present? && !proc?(rerun_func)
|
|
|
|
|
raise 'no proc/lambda passed as fail_function' if fail_function.present? && !proc?(fail_function)
|
|
|
|
|
|
|
|
|
|
locked_acquired = acquire_lock(attempts, timeout)
|
|
|
|
|
lock_acquired = acquire_lock(attempts, timeout)
|
|
|
|
|
|
|
|
|
|
begin
|
|
|
|
|
unless locked_acquired
|
|
|
|
|
set_rerun_after_finish
|
|
|
|
|
return !!locked_acquired
|
|
|
|
|
if lock_acquired
|
|
|
|
|
retried
|
|
|
|
|
yield
|
|
|
|
|
true
|
|
|
|
|
else
|
|
|
|
|
if fail_function && !set_retry
|
|
|
|
|
fail_function.call
|
|
|
|
|
end
|
|
|
|
|
false
|
|
|
|
|
end
|
|
|
|
|
yield
|
|
|
|
|
try_to_rerun(rerun_func)
|
|
|
|
|
!!locked_acquired
|
|
|
|
|
ensure
|
|
|
|
|
unlock if locked_acquired
|
|
|
|
|
unlock if lock_acquired
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
@ -48,14 +59,6 @@ module Carto
|
|
|
|
|
false
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def try_to_rerun(rerun_func)
|
|
|
|
|
return unless rerun_func.present?
|
|
|
|
|
while retry?
|
|
|
|
|
refresh_lock_timeout
|
|
|
|
|
rerun_func.call
|
|
|
|
|
end
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def proc?(proc)
|
|
|
|
|
proc.respond_to?(:call)
|
|
|
|
|
end
|
|
|
|
@ -75,16 +78,12 @@ module Carto
|
|
|
|
|
@redis_object.set(@bolt_key, true, px: @ttl_ms, nx: true)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def set_rerun_after_finish
|
|
|
|
|
@redis_object.set("#{@bolt_key}:retry", true, px: @ttl_ms, nx: true)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def refresh_lock_timeout
|
|
|
|
|
@redis_object.pexpire(@bolt_key, @ttl_ms)
|
|
|
|
|
def retried
|
|
|
|
|
@redis_object.del("#{@bolt_key}:retry")
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def retry?
|
|
|
|
|
@redis_object.del("#{@bolt_key}:retry") > 0
|
|
|
|
|
def set_retry
|
|
|
|
|
@redis_object.getset("#{@bolt_key}:retry", true)
|
|
|
|
|
end
|
|
|
|
|
|
|
|
|
|
def add_namespace_to_key(key)
|
|
|
|
|