class Sidekiq::Stats
stat.processed
stat = Sidekiq::Stats.new
this Sidekiq cluster.
Retrieve runtime statistics from Redis regarding
def dead_size
def dead_size stat :dead_size end
def default_queue_latency
def default_queue_latency stat :default_queue_latency end
def enqueued
def enqueued stat :enqueued end
def failed
def failed stat :failed end
def fetch_stats!
- Api: - private
def fetch_stats! fetch_stats_fast! fetch_stats_slow! end
def fetch_stats_fast!
- Api: - private
def fetch_stats_fast! pipe1_res = Sidekiq.redis { |conn| conn.pipelined do |pipeline| pipeline.get("stat:processed") pipeline.get("stat:failed") pipeline.zcard("schedule") pipeline.zcard("retry") pipeline.zcard("dead") pipeline.scard("processes") pipeline.lindex("queue:default", -1) end } default_queue_latency = if (entry = pipe1_res[6]) job = begin Sidekiq.load_json(entry) rescue {} end enqueued_at = job["enqueued_at"] if enqueued_at if enqueued_at.is_a?(Float) # old format now = Time.now.to_f now - enqueued_at else now = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond) (now - enqueued_at) / 1000.0 end else 0.0 end else 0.0 end @stats = { processed: pipe1_res[0].to_i, failed: pipe1_res[1].to_i, scheduled_size: pipe1_res[2], retry_size: pipe1_res[3], dead_size: pipe1_res[4], processes_size: pipe1_res[5], default_queue_latency: default_queue_latency } end
def fetch_stats_slow!
- Api: - private
def fetch_stats_slow! processes = Sidekiq.redis { |conn| conn.sscan("processes").to_a } queues = Sidekiq.redis { |conn| conn.sscan("queues").to_a } pipe2_res = Sidekiq.redis { |conn| conn.pipelined do |pipeline| processes.each { |key| pipeline.hget(key, "busy") } queues.each { |queue| pipeline.llen("queue:#{queue}") } end } s = processes.size workers_size = pipe2_res[0...s].sum(&:to_i) enqueued = pipe2_res[s..].sum(&:to_i) @stats[:workers_size] = workers_size @stats[:enqueued] = enqueued @stats end
def initialize
def initialize fetch_stats_fast! end
def processed
def processed stat :processed end
def processes_size
def processes_size stat :processes_size end
def queues
def queues Sidekiq.redis do |conn| queues = conn.sscan("queues").to_a lengths = conn.pipelined { |pipeline| queues.each do |queue| pipeline.llen("queue:#{queue}") end } array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size } array_of_arrays.to_h end end
def reset(*stats)
- Api: - private
def reset(*stats) all = %w[failed processed] stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s) mset_args = [] stats.each do |stat| mset_args << "stat:#{stat}" mset_args << 0 end Sidekiq.redis do |conn| conn.mset(*mset_args) end end
def retry_size
def retry_size stat :retry_size end
def scheduled_size
def scheduled_size stat :scheduled_size end
def stat(s)
def stat(s) fetch_stats_slow! if @stats[s].nil? @stats[s] || raise(ArgumentError, "Unknown stat #{s}") end
def workers_size
def workers_size stat :workers_size end