class Sidekiq::ProcessSet
@yieldparam [Sidekiq::Process]
so this set should be relatively accurate, barring network partitions.
right now. Each process sends a heartbeat to Redis every 5 seconds
Enumerates the set of Sidekiq processes which are actively working
#
def self.[](identity)
def self.[](identity) exists, (info, busy, beat, quiet, rss, rtt_us) = Sidekiq.redis { |conn| conn.multi { |transaction| transaction.sismember("processes", identity) transaction.hmget(identity, "info", "busy", "beat", "quiet", "rss", "rtt_us") } } return nil if exists == 0 || info.nil? hash = Sidekiq.load_json(info) Process.new(hash.merge("busy" => busy.to_i, "beat" => beat.to_f, "quiet" => quiet, "rss" => rss.to_i, "rtt_us" => rtt_us.to_i)) end
def cleanup
- Api: - private
def cleanup # dont run cleanup more than once per minute return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", "NX", "EX", "60") } count = 0 Sidekiq.redis do |conn| procs = conn.sscan("processes").to_a heartbeats = conn.pipelined { |pipeline| procs.each do |key| pipeline.hget(key, "info") end } # the hash named key has an expiry of 60 seconds. # if it's not found, that means the process has not reported # in to Redis and probably died. to_prune = procs.select.with_index { |proc, i| heartbeats[i].nil? } count = conn.srem("processes", to_prune) unless to_prune.empty? end count end
def each
def each result = Sidekiq.redis { |conn| procs = conn.sscan("processes").to_a.sort # We're making a tradeoff here between consuming more memory instead of # making more roundtrips to Redis, but if you have hundreds or thousands of workers, # you'll be happier this way conn.pipelined do |pipeline| procs.each do |key| pipeline.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us") end end } result.each do |info, busy, beat, quiet, rss, rtt_us| # If a process is stopped between when we query Redis for `procs` and # when we query for `result`, we will have an item in `result` that is # composed of `nil` values. next if info.nil? hash = Sidekiq.load_json(info) yield Process.new(hash.merge("busy" => busy.to_i, "beat" => beat.to_f, "quiet" => quiet, "rss" => rss.to_i, "rtt_us" => rtt_us.to_i)) end end
def initialize(clean_plz = true)
- Api: - private
def initialize(clean_plz = true) cleanup if clean_plz end
def leader
-
(String)
- empty string if no leader -
(String)
- Identity of cluster leader
def leader @leader ||= begin x = Sidekiq.redis { |c| c.get("dear-leader") } # need a non-falsy value so we can memoize x ||= "" x end end
def size
-
(Integer)
- current number of registered Sidekiq processes
def size Sidekiq.redis { |conn| conn.scard("processes") } end
def total_concurrency
-
(Integer)
- the sum of process concurrency
def total_concurrency sum { |x| x["concurrency"].to_i } end
def total_rss_in_kb
-
(Integer)
- total amount of RSS memory consumed by Sidekiq processes
def total_rss_in_kb sum { |x| x["rss"].to_i } end