class SidekiqUniqueJobs::Orphans::Reaper
@author Mikael Henriksson <mikael@zoolutions.se>
@note this is a much slower version of the lua script but does not crash redis
Class DeleteOrphans provides deletion of orphaned digests
def self.call(conn = nil)
-
(void)
-
Parameters:
-
conn
(Redis
) -- nil a connection to redis
def self.call(conn = nil) return new(conn).call if conn redis { |rcon| new(rcon).call } end
def belongs_to_job?(digest)
-
(false)
- when no job was found for this digest -
(true)
- when either of the checks return true
Parameters:
-
digest
(String
) -- the digest to search for
def belongs_to_job?(digest) scheduled?(digest) || retried?(digest) || enqueued?(digest) end
def call
-
(Integer)
- the number of reaped locks
def call case reaper when :ruby execute_ruby_reaper when :lua execute_lua_reaper else log_fatal(":#{reaper} is invalid for `SidekiqUnqiueJobs.config.reaper`") end end
def config
-
(SidekiqUniqueJobs::Config)
-
def config SidekiqUniqueJobs.config end
def enqueued?(digest)
-
(true)
- when digest exists in any queue
Parameters:
-
digest
(String
) -- the current digest
def enqueued?(digest) Sidekiq.redis do |conn| queues(conn) do |queue| entries(conn, queue) do |entry| return true if entry.include?(digest) end end false end end
def entries(conn, queue) # rubocop:disable Metrics/MethodLength
def entries(conn, queue) # rubocop:disable Metrics/MethodLength queue_key = "queue:#{queue}" initial_size = conn.llen(queue_key) deleted_size = 0 page = 0 page_size = 50 loop do range_start = page * page_size - deleted_size range_end = range_start + page_size - 1 entries = conn.lrange(queue_key, range_start, range_end) page += 1 entries.each do |entry| yield entry end deleted_size = initial_size - conn.llen(queue_key) end end
def execute_lua_reaper
-
(Integer)
- the number of deleted locks
def execute_lua_reaper call_script( :reap_orphans, conn, keys: [SidekiqUniqueJobs::DIGESTS, SidekiqUniqueJobs::SCHEDULE, SidekiqUniqueJobs::RETRY], argv: [reaper_count], ) end
def execute_ruby_reaper
-
(Integer)
- the number of deleted locks
def execute_ruby_reaper BatchDelete.call(orphans, conn) end
def in_sorted_set?(key, digest)
-
(false)
- when missing -
(true)
- when found
Parameters:
-
digest
(String
) -- the digest to scan for -
key
(String
) -- the key for the sorted set
def in_sorted_set?(key, digest) conn.zscan_each(key, match: "*#{digest}*", count: 1).to_a.any? end
def initialize(conn)
-
conn
(Redis
) -- a connection to redis
def initialize(conn) @conn = conn @digests = SidekiqUniqueJobs::Digests.new @scheduled = Redis::SortedSet.new(SCHEDULE) @retried = Redis::SortedSet.new(RETRY) end
def orphans
-
(Array
- an array of orphaned digests)
def orphans conn.zrevrange(digests.key, 0, -1).each_with_object([]) do |digest, result| next if belongs_to_job?(digest) result << digest break if result.size >= reaper_count end end
def queues(conn, &block)
def queues(conn, &block) conn.sscan_each("queues", &block) end
def reaper
-
(Symbol)
-
def reaper config.reaper end
def reaper_count
-
(Integer)
-
def reaper_count config.reaper_count end
def retried?(digest)
-
(true)
- when digest exists in retry set
Parameters:
-
digest
(String
) -- the current digest
def retried?(digest) in_sorted_set?(RETRY, digest) end
def scheduled?(digest)
-
(true)
- when digest exists in scheduled set
Parameters:
-
digest
(String
) -- the current digest
def scheduled?(digest) in_sorted_set?(SCHEDULE, digest) end