class SidekiqUniqueJobs::Orphans::RubyReaper
@author Mikael Henriksson <mikael@zoolutions.se>
@note this is a much slower version of the lua script but does not crash redis
Class DeleteOrphans provides deletion of orphaned digests
def active?(digest)
def active?(digest) Sidekiq.redis do |conn| procs = conn.sscan_each("processes").to_a.sort result = conn.pipelined do procs.map do |key| conn.hget(key, "info") end end result.flatten.compact.any? { |job| load_json(job)[LOCK_DIGEST] == digest } end end
def belongs_to_job?(digest)
-
(false)
- when no job was found for this digest -
(true)
- when either of the checks return true
Parameters:
-
digest
(String
) -- the digest to search for
def belongs_to_job?(digest) scheduled?(digest) || retried?(digest) || enqueued?(digest) || active?(digest) end
def call
-
(Integer)
- the number of reaped locks
def call BatchDelete.call(orphans, conn) end
def enqueued?(digest)
-
(true)
- when digest exists in any queue
Parameters:
-
digest
(String
) -- the current digest
def enqueued?(digest) Sidekiq.redis do |conn| queues(conn) do |queue| entries(conn, queue) do |entry| return true if entry.include?(digest) end end false end end
def entries(conn, queue) # rubocop:disable Metrics/MethodLength
def entries(conn, queue) # rubocop:disable Metrics/MethodLength queue_key = "queue:#{queue}" initial_size = conn.llen(queue_key) deleted_size = 0 page = 0 page_size = 50 loop do range_start = page * page_size - deleted_size range_end = range_start + page_size - 1 entries = conn.lrange(queue_key, range_start, range_end) page += 1 break if entries.empty? entries.each do |entry| yield entry end deleted_size = initial_size - conn.llen(queue_key) end end
def in_sorted_set?(key, digest)
-
(false)
- when missing -
(true)
- when found
Parameters:
-
digest
(String
) -- the digest to scan for -
key
(String
) -- the key for the sorted set
def in_sorted_set?(key, digest) conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any? end
def initialize(conn)
-
conn
(Redis
) -- a connection to redis
def initialize(conn) super(conn) @digests = SidekiqUniqueJobs::Digests.new @scheduled = Redis::SortedSet.new(SCHEDULE) @retried = Redis::SortedSet.new(RETRY) end
def orphans
-
(Array
- an array of orphaned digests)
def orphans conn.zrevrange(digests.key, 0, -1).each_with_object([]) do |digest, result| next if belongs_to_job?(digest) result << digest break if result.size >= reaper_count end end
def queues(conn, &block)
- Yield: - queues one at a time
Returns:
-
(void)
-
Parameters:
-
conn
(Redis
) -- the connection to use for fetching queues
def queues(conn, &block) conn.sscan_each("queues", &block) end
def retried?(digest)
-
(true)
- when digest exists in retry set
Parameters:
-
digest
(String
) -- the current digest
def retried?(digest) in_sorted_set?(RETRY, digest) end
def scheduled?(digest)
-
(true)
- when digest exists in scheduled set
Parameters:
-
digest
(String
) -- the current digest
def scheduled?(digest) in_sorted_set?(SCHEDULE, digest) end