class SidekiqUniqueJobs::Orphans::RubyReaper


@author Mikael Henriksson <mikael@zoolutions.se>
@note this is a much slower version of the lua script but does not crash redis
Class DeleteOrphans provides deletion of orphaned digests

def active?(digest)

def active?(digest)
  Sidekiq.redis do |conn|
    procs = conn.sscan_each("processes").to_a.sort
    result = conn.pipelined do
      procs.map do |key|
        conn.hget(key, "info")
      end
    end
    result.flatten.compact.any? { |job| load_json(job)[LOCK_DIGEST] == digest }
  end
end

def belongs_to_job?(digest)

Returns:
  • (false) - when no job was found for this digest
  • (true) - when either of the checks return true

Parameters:
  • digest (String) -- the digest to search for
def belongs_to_job?(digest)
  scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest)
end

def call

Returns:
  • (Integer) - the number of reaped locks
def call
  BatchDelete.call(orphans, conn)
end

def enqueued?(digest)

Returns:
  • (true) - when digest exists in any queue

Parameters:
  • digest (String) -- the current digest
def enqueued?(digest)
  Sidekiq.redis do |conn|
    queues(conn) do |queue|
      entries(conn, queue) do |entry|
        return true if entry.include?(digest)
      end
    end
    false
  end
end

def entries(conn, queue) # rubocop:disable Metrics/MethodLength

rubocop:disable Metrics/MethodLength
def entries(conn, queue) # rubocop:disable Metrics/MethodLength
  queue_key    = "queue:#{queue}"
  initial_size = conn.llen(queue_key)
  deleted_size = 0
  page         = 0
  page_size    = 50
  loop do
    range_start = page * page_size - deleted_size
    range_end   = range_start + page_size - 1
    entries     = conn.lrange(queue_key, range_start, range_end)
    page       += 1
    break if entries.empty?
    entries.each do |entry|
      yield entry
    end
    deleted_size = initial_size - conn.llen(queue_key)
  end
end

def in_sorted_set?(key, digest)

Returns:
  • (false) - when missing
  • (true) - when found

Parameters:
  • digest (String) -- the digest to scan for
  • key (String) -- the key for the sorted set
def in_sorted_set?(key, digest)
  conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any?
end

def initialize(conn)

Parameters:
  • conn (Redis) -- a connection to redis
def initialize(conn)
  super(conn)
  @digests   = SidekiqUniqueJobs::Digests.new
  @scheduled = Redis::SortedSet.new(SCHEDULE)
  @retried   = Redis::SortedSet.new(RETRY)
end

def orphans

Returns:
  • (Array) - an array of orphaned digests
def orphans
  conn.zrevrange(digests.key, 0, -1).each_with_object([]) do |digest, result|
    next if belongs_to_job?(digest)
    result << digest
    break if result.size >= reaper_count
  end
end

def queues(conn, &block)

Other tags:
    Yield: - queues one at a time

Returns:
  • (void) -

Parameters:
  • conn (Redis) -- the connection to use for fetching queues
def queues(conn, &block)
  conn.sscan_each("queues", &block)
end

def retried?(digest)

Returns:
  • (true) - when digest exists in retry set

Parameters:
  • digest (String) -- the current digest
def retried?(digest)
  in_sorted_set?(RETRY, digest)
end

def scheduled?(digest)

Returns:
  • (true) - when digest exists in scheduled set

Parameters:
  • digest (String) -- the current digest
def scheduled?(digest)
  in_sorted_set?(SCHEDULE, digest)
end