class SidekiqUniqueJobs::Locksmith

rubocop:disable Metrics/ClassLength
@author Mikael Henriksson <mikael@zoolutions.se>
Lock manager class that handles all the various locks

def ==(other)

Returns:
  • (true, false) -

Parameters:
  • other (Locksmith) -- the locksmith to compare with
def ==(other)
  key == other.key && job_id == other.job_id
end

def argv

def argv
  [job_id, config.pttl, config.type, config.limit]
end

def brpoplpush(conn)

Other tags:
    Api: - private
def brpoplpush(conn)
  # passing timeout 0 to brpoplpush causes it to block indefinitely
  conn.brpoplpush(key.queued, key.primed, timeout: config.timeout || 0)
end

def delete



Deletes the lock unless it has a pttl set
def delete
  return if config.pttl.positive?
  delete!
end

def delete!


Deletes the lock regardless of if it has a pttl set
def delete!
  call_script(:delete, key.to_a, [job_id, config.pttl, config.type, config.limit]).positive?
end

def drift(val)

Returns:
  • (Integer) - a computed drift value

Parameters:
  • val (Integer) -- the value to compute drift for
def drift(val)
  # Add 2 milliseconds to the drift to account for Redis expires
  # precision, which is 1 millisecond, plus 1 millisecond min drift
  # for small TTLs.
  (val.to_i * CLOCK_DRIFT_FACTOR).to_i + 2
end

def enqueue(conn)

Returns:
  • (yield) - when successfully enqueued
  • (nil) - when redis was already prepared for this lock

Parameters:
  • conn (Redis) -- a redis connection
def enqueue(conn)
  queued_token, elapsed = timed do
    call_script(:queue, key.to_a, argv, conn)
  end
  validity = config.pttl - elapsed - drift(config.pttl)
  return unless queued_token && (validity >= 0 || config.pttl.zero?)
  write_lock_info(conn)
  yield
end

def initialize(item, redis_pool = nil)

Parameters:
  • redis_pool (Sidekiq::RedisConnection, ConnectionPool) -- the redis connection
  • item (Hash) -- a Sidekiq job hash

Options Hash: (**item)
  • :unique_digest (String) -- the unique digest (See: {LockDigest#lock_digest})
  • :jid (String) -- the sidekiq job id
  • :lock_ttl (Integer) -- the configured expiration
def initialize(item, redis_pool = nil)
  @item        = item
  @key         = Key.new(item[LOCK_DIGEST] || item[UNIQUE_DIGEST]) # fallback until can be removed
  @job_id      = item[JID]
  @config      = LockConfig.new(item)
  @redis_pool  = redis_pool
end

def inspect

Other tags:
    See: to_s -
def inspect
  to_s
end

def lock(&block)

Returns:
  • (String) - the Sidekiq job_id that was locked/queued
def lock(&block)
  redis(redis_pool) do |conn|
    return lock_async(conn, &block) if block_given?
    lock_sync(conn) do
      return job_id
    end
  end
end

def lock_async(conn)

Other tags:
    Yieldparam: job_id - a Sidekiq JID

Returns:
  • (Object) - whatever the block returns when lock was acquired
  • (nil) - when lock was not possible

Parameters:
  • conn (Redis) -- a redis connection
def lock_async(conn)
  return yield job_id if locked?(conn)
  enqueue(conn) do
    primed_async(conn) do
      locked_token = call_script(:lock, key.to_a, argv, conn)
      return yield if locked_token == job_id
    end
  end
ensure
  unlock!(conn)
end

def lock_info

def lock_info
  @lock_info ||= dump_json(
    WORKER => item[CLASS],
    QUEUE => item[QUEUE],
    LIMIT => item[LOCK_LIMIT],
    TIMEOUT => item[LOCK_TIMEOUT],
    TTL => item[LOCK_TTL],
    LOCK => config.type,
    LOCK_ARGS => item[LOCK_ARGS],
    TIME => now_f,
  )
end

def lock_sync(conn)

Other tags:
    Yieldparam: job_id - a Sidekiq JID

Returns:
  • (Object) - whatever the block returns when lock was acquired
  • (nil) - when lock was not possible

Parameters:
  • conn (Redis) -- a redis connection
def lock_sync(conn)
  return yield if locked?(conn)
  enqueue(conn) do
    primed_sync(conn) do
      locked_token = call_script(:lock, key.to_a, argv, conn)
      return yield if locked_token
    end
  end
end

def locked?(conn = nil)

Returns:
  • (true, false) - true when the :LOCKED hash contains the job_id
def locked?(conn = nil)
  return taken?(conn) if conn
  redis { |rcon| taken?(rcon) }
end

def pop_queued(conn)

Returns:
  • (String) - a previously enqueued token (now taken off the queue)

Parameters:
  • conn (Redis) -- a redis connection
def pop_queued(conn)
  if config.wait_for_lock?
    brpoplpush(conn)
  else
    rpoplpush(conn)
  end
end

def primed_async(conn)

Returns:
  • (Object) - whatever the block returns when lock was acquired
  • (nil) - when lock was not possible

Parameters:
  • conn (Redis) -- a redis connection

Other tags:
    Note: - Used for runtime locks to avoid problems with blocking commands
def primed_async(conn)
  return yield if Concurrent::Promises
                  .future(conn) { |red_con| pop_queued(red_con) }
                  .value(drift(config.ttl))
  warn_about_timeout
end

def primed_sync(conn)

Returns:
  • (Object) - whatever the block returns when lock was acquired
  • (nil) - when lock was not possible

Parameters:
  • conn (Redis) -- a redis connection

Other tags:
    Note: - Used for non-runtime locks
def primed_sync(conn)
  return yield if pop_queued(conn)
  warn_about_timeout
end

def rpoplpush(conn)

Other tags:
    Api: - private
def rpoplpush(conn)
  conn.rpoplpush(key.queued, key.primed)
end

def taken?(conn)

Returns:
  • (true, false) -

Parameters:
  • conn (Redis) -- a redis connection
def taken?(conn)
  conn.hexists(key.locked, job_id)
end

def to_s

Returns:
  • (String) -
def to_s
  "Locksmith##{object_id}(digest=#{key} job_id=#{job_id}, locked=#{locked?})"
end

def unlock(conn = nil)

Returns:
  • (String) - Sidekiq job_id (jid) if successful
  • (false) - unless locked?
def unlock(conn = nil)
  return false unless locked?(conn)
  unlock!(conn)
end

def unlock!(conn = nil)

Returns:
  • (String) - Sidekiq job_id (jid) if successful
  • (false) - unless locked?
def unlock!(conn = nil)
  call_script(:unlock, key.to_a, argv, conn)
end

def warn_about_timeout

def warn_about_timeout
  log_warn("Timed out after #{config.timeout}s while waiting for primed token (digest: #{key}, job_id: #{job_id})")
end

def write_lock_info(conn)

Returns:
  • (void) -
def write_lock_info(conn)
  return unless config.lock_info
  conn.set(key.info, lock_info)
end