class SidekiqUniqueJobs::Locksmith
rubocop:disable Metrics/ClassLength
@author Mikael Henriksson <mikael@zoolutions.se>
Lock manager class that handles all the various locks
def ==(other)
-
(true, false)
-
Parameters:
-
other
(Locksmith
) -- the locksmith to compare with
def ==(other) key == other.key && job_id == other.job_id end
def argv
def argv [job_id, config.pttl, config.type, config.limit] end
def brpoplpush(conn)
- Api: - private
def brpoplpush(conn) # passing timeout 0 to brpoplpush causes it to block indefinitely conn.brpoplpush(key.queued, key.primed, timeout: config.timeout || 0) end
def delete
Deletes the lock unless it has a pttl set
def delete return if config.pttl.positive? delete! end
def delete!
Deletes the lock regardless of if it has a pttl set
def delete! call_script(:delete, key.to_a, [job_id, config.pttl, config.type, config.limit]).positive? end
def drift(val)
-
(Integer)
- a computed drift value
Parameters:
-
val
(Integer
) -- the value to compute drift for
def drift(val) # Add 2 milliseconds to the drift to account for Redis expires # precision, which is 1 millisecond, plus 1 millisecond min drift # for small TTLs. (val.to_i * CLOCK_DRIFT_FACTOR).to_i + 2 end
def enqueue(conn)
-
(yield
- when successfully enqueued) -
(nil)
- when redis was already prepared for this lock
Parameters:
-
conn
(Redis
) -- a redis connection
def enqueue(conn) queued_token, elapsed = timed do call_script(:queue, key.to_a, argv, conn) end validity = config.pttl - elapsed - drift(config.pttl) return unless queued_token && (validity >= 0 || config.pttl.zero?) write_lock_info(conn) yield end
def initialize(item, redis_pool = nil)
-
redis_pool
(Sidekiq::RedisConnection, ConnectionPool
) -- the redis connection -
item
(Hash
) -- a Sidekiq job hash
Options Hash:
(**item)
-
:unique_digest
(String
) -- the unique digest (See: {LockDigest#lock_digest}) -
:jid
(String
) -- the sidekiq job id -
:lock_ttl
(Integer
) -- the configured expiration
def initialize(item, redis_pool = nil) @item = item @key = Key.new(item[LOCK_DIGEST] || item[UNIQUE_DIGEST]) # fallback until can be removed @job_id = item[JID] @config = LockConfig.new(item) @redis_pool = redis_pool end
def inspect
- See: to_s -
def inspect to_s end
def lock(&block)
-
(String)
- the Sidekiq job_id that was locked/queued
def lock(&block) redis(redis_pool) do |conn| return lock_async(conn, &block) if block_given? lock_sync(conn) do return job_id end end end
def lock_async(conn)
- Yieldparam: job_id - a Sidekiq JID
Returns:
-
(Object)
- whatever the block returns when lock was acquired -
(nil)
- when lock was not possible
Parameters:
-
conn
(Redis
) -- a redis connection
def lock_async(conn) return yield job_id if locked?(conn) enqueue(conn) do primed_async(conn) do locked_token = call_script(:lock, key.to_a, argv, conn) return yield if locked_token == job_id end end ensure unlock!(conn) end
def lock_info
def lock_info @lock_info ||= dump_json( WORKER => item[CLASS], QUEUE => item[QUEUE], LIMIT => item[LOCK_LIMIT], TIMEOUT => item[LOCK_TIMEOUT], TTL => item[LOCK_TTL], LOCK => config.type, LOCK_ARGS => item[LOCK_ARGS], TIME => now_f, ) end
def lock_sync(conn)
- Yieldparam: job_id - a Sidekiq JID
Returns:
-
(Object)
- whatever the block returns when lock was acquired -
(nil)
- when lock was not possible
Parameters:
-
conn
(Redis
) -- a redis connection
def lock_sync(conn) return yield if locked?(conn) enqueue(conn) do primed_sync(conn) do locked_token = call_script(:lock, key.to_a, argv, conn) return yield if locked_token end end end
def locked?(conn = nil)
-
(true, false)
- true when the :LOCKED hash contains the job_id
def locked?(conn = nil) return taken?(conn) if conn redis { |rcon| taken?(rcon) } end
def pop_queued(conn)
-
(String)
- a previously enqueued token (now taken off the queue)
Parameters:
-
conn
(Redis
) -- a redis connection
def pop_queued(conn) if config.wait_for_lock? brpoplpush(conn) else rpoplpush(conn) end end
def primed_async(conn)
-
(Object)
- whatever the block returns when lock was acquired -
(nil)
- when lock was not possible
Parameters:
-
conn
(Redis
) -- a redis connection
Other tags:
- Note: - Used for runtime locks to avoid problems with blocking commands
def primed_async(conn) return yield if Concurrent::Promises .future(conn) { |red_con| pop_queued(red_con) } .value(drift(config.ttl)) warn_about_timeout end
def primed_sync(conn)
-
(Object)
- whatever the block returns when lock was acquired -
(nil)
- when lock was not possible
Parameters:
-
conn
(Redis
) -- a redis connection
Other tags:
- Note: - Used for non-runtime locks
def primed_sync(conn) return yield if pop_queued(conn) warn_about_timeout end
def rpoplpush(conn)
- Api: - private
def rpoplpush(conn) conn.rpoplpush(key.queued, key.primed) end
def taken?(conn)
-
(true, false)
-
Parameters:
-
conn
(Redis
) -- a redis connection
def taken?(conn) conn.hexists(key.locked, job_id) end
def to_s
-
(String)
-
def to_s "Locksmith##{object_id}(digest=#{key} job_id=#{job_id}, locked=#{locked?})" end
def unlock(conn = nil)
-
(String)
- Sidekiq job_id (jid) if successful -
(false)
- unless locked?
def unlock(conn = nil) return false unless locked?(conn) unlock!(conn) end
def unlock!(conn = nil)
-
(String)
- Sidekiq job_id (jid) if successful -
(false)
- unless locked?
def unlock!(conn = nil) call_script(:unlock, key.to_a, argv, conn) end
def warn_about_timeout
def warn_about_timeout log_warn("Timed out after #{config.timeout}s while waiting for primed token (digest: #{key}, job_id: #{job_id})") end
def write_lock_info(conn)
-
(void)
-
def write_lock_info(conn) return unless config.lock_info conn.set(key.info, lock_info) end