class Sidekiq::ProcessSet


@yieldparam [Sidekiq::Process]
so this set should be relatively accurate, barring network partitions.
right now. Each process sends a heartbeat to Redis every 5 seconds
Enumerates the set of Sidekiq processes which are actively working
#

def self.[](identity)

def self.[](identity)
  exists, (info, busy, beat, quiet, rss, rtt_us) = Sidekiq.redis { |conn|
    conn.multi { |transaction|
      transaction.sismember("processes", identity)
      transaction.hmget(identity, "info", "busy", "beat", "quiet", "rss", "rtt_us")
    }
  }
  return nil if exists == 0 || info.nil?
  hash = Sidekiq.load_json(info)
  Process.new(hash.merge("busy" => busy.to_i,
    "beat" => beat.to_f,
    "quiet" => quiet,
    "rss" => rss.to_i,
    "rtt_us" => rtt_us.to_i))
end

def cleanup

Other tags:
    Api: - private
def cleanup
  # dont run cleanup more than once per minute
  return 0 unless Sidekiq.redis { |conn| conn.set("process_cleanup", "1", "NX", "EX", "60") }
  count = 0
  Sidekiq.redis do |conn|
    procs = conn.sscan("processes").to_a
    heartbeats = conn.pipelined { |pipeline|
      procs.each do |key|
        pipeline.hget(key, "info")
      end
    }
    # the hash named key has an expiry of 60 seconds.
    # if it's not found, that means the process has not reported
    # in to Redis and probably died.
    to_prune = procs.select.with_index { |proc, i|
      heartbeats[i].nil?
    }
    count = conn.srem("processes", to_prune) unless to_prune.empty?
  end
  count
end

def each

def each
  result = Sidekiq.redis { |conn|
    procs = conn.sscan("processes").to_a.sort
    # We're making a tradeoff here between consuming more memory instead of
    # making more roundtrips to Redis, but if you have hundreds or thousands of workers,
    # you'll be happier this way
    conn.pipelined do |pipeline|
      procs.each do |key|
        pipeline.hmget(key, "info", "busy", "beat", "quiet", "rss", "rtt_us")
      end
    end
  }
  result.each do |info, busy, beat, quiet, rss, rtt_us|
    # If a process is stopped between when we query Redis for `procs` and
    # when we query for `result`, we will have an item in `result` that is
    # composed of `nil` values.
    next if info.nil?
    hash = Sidekiq.load_json(info)
    yield Process.new(hash.merge("busy" => busy.to_i,
      "beat" => beat.to_f,
      "quiet" => quiet,
      "rss" => rss.to_i,
      "rtt_us" => rtt_us.to_i))
  end
end

def initialize(clean_plz = true)

Other tags:
    Api: - private
def initialize(clean_plz = true)
  cleanup if clean_plz
end

def leader

Returns:
  • (String) - empty string if no leader
  • (String) - Identity of cluster leader
def leader
  @leader ||= begin
    x = Sidekiq.redis { |c| c.get("dear-leader") }
    # need a non-falsy value so we can memoize
    x ||= ""
    x
  end
end

def size

Returns:
  • (Integer) - current number of registered Sidekiq processes
def size
  Sidekiq.redis { |conn| conn.scard("processes") }
end

def total_concurrency

Returns:
  • (Integer) - the sum of process concurrency
def total_concurrency
  sum { |x| x["concurrency"].to_i }
end

def total_rss_in_kb

Returns:
  • (Integer) - total amount of RSS memory consumed by Sidekiq processes
def total_rss_in_kb
  sum { |x| x["rss"].to_i }
end