class Sidekiq::Stats

stat.processed
stat = Sidekiq::Stats.new
this Sidekiq cluster.
Retrieve runtime statistics from Redis regarding

def dead_size

def dead_size
  stat :dead_size
end

def default_queue_latency

def default_queue_latency
  stat :default_queue_latency
end

def enqueued

def enqueued
  stat :enqueued
end

def failed

def failed
  stat :failed
end

def fetch_stats!

Other tags:
    Api: - private
def fetch_stats!
  fetch_stats_fast!
  fetch_stats_slow!
end

def fetch_stats_fast!

Other tags:
    Api: - private
def fetch_stats_fast!
  pipe1_res = Sidekiq.redis { |conn|
    conn.pipelined do |pipeline|
      pipeline.get("stat:processed")
      pipeline.get("stat:failed")
      pipeline.zcard("schedule")
      pipeline.zcard("retry")
      pipeline.zcard("dead")
      pipeline.scard("processes")
      pipeline.lindex("queue:default", -1)
    end
  }
  default_queue_latency = if (entry = pipe1_res[6])
    job = begin
      Sidekiq.load_json(entry)
    rescue
      {}
    end
    enqueued_at = job["enqueued_at"]
    if enqueued_at
      if enqueued_at.is_a?(Float)
        # old format
        now = Time.now.to_f
        now - enqueued_at
      else
        now = ::Process.clock_gettime(::Process::CLOCK_REALTIME, :millisecond)
        (now - enqueued_at) / 1000.0
      end
    else
      0.0
    end
  else
    0.0
  end
  @stats = {
    processed: pipe1_res[0].to_i,
    failed: pipe1_res[1].to_i,
    scheduled_size: pipe1_res[2],
    retry_size: pipe1_res[3],
    dead_size: pipe1_res[4],
    processes_size: pipe1_res[5],
    default_queue_latency: default_queue_latency
  }
end

def fetch_stats_slow!

Other tags:
    Api: - private
def fetch_stats_slow!
  processes = Sidekiq.redis { |conn|
    conn.sscan("processes").to_a
  }
  queues = Sidekiq.redis { |conn|
    conn.sscan("queues").to_a
  }
  pipe2_res = Sidekiq.redis { |conn|
    conn.pipelined do |pipeline|
      processes.each { |key| pipeline.hget(key, "busy") }
      queues.each { |queue| pipeline.llen("queue:#{queue}") }
    end
  }
  s = processes.size
  workers_size = pipe2_res[0...s].sum(&:to_i)
  enqueued = pipe2_res[s..].sum(&:to_i)
  @stats[:workers_size] = workers_size
  @stats[:enqueued] = enqueued
  @stats
end

def initialize

def initialize
  fetch_stats_fast!
end

def processed

def processed
  stat :processed
end

def processes_size

def processes_size
  stat :processes_size
end

def queues

def queues
  Sidekiq.redis do |conn|
    queues = conn.sscan("queues").to_a
    lengths = conn.pipelined { |pipeline|
      queues.each do |queue|
        pipeline.llen("queue:#{queue}")
      end
    }
    array_of_arrays = queues.zip(lengths).sort_by { |_, size| -size }
    array_of_arrays.to_h
  end
end

def reset(*stats)

Other tags:
    Api: - private
def reset(*stats)
  all = %w[failed processed]
  stats = stats.empty? ? all : all & stats.flatten.compact.map(&:to_s)
  mset_args = []
  stats.each do |stat|
    mset_args << "stat:#{stat}"
    mset_args << 0
  end
  Sidekiq.redis do |conn|
    conn.mset(*mset_args)
  end
end

def retry_size

def retry_size
  stat :retry_size
end

def scheduled_size

def scheduled_size
  stat :scheduled_size
end

def stat(s)

def stat(s)
  fetch_stats_slow! if @stats[s].nil?
  @stats[s] || raise(ArgumentError, "Unknown stat #{s}")
end

def workers_size

def workers_size
  stat :workers_size
end