lib/sidekiq/metrics/query.rb



# frozen_string_literal: true

require "date"
require "sidekiq"
require "sidekiq/metrics/shared"

module Sidekiq
  module Metrics
    # Allows caller to query for Sidekiq execution metrics within Redis.
    # Caller sets a set of attributes to act as filters. {#fetch} will call
    # Redis and return a Hash of results.
    #
    # NB: all metrics and times/dates are UTC only. We explicitly do not
    # support timezones.
    class Query
      def initialize(pool: nil, now: Time.now)
        @time = now.utc
        @pool = pool || Sidekiq.default_configuration.redis_pool
        @klass = nil
      end

      ROLLUPS = {
        # minutely aggregates per minute
        minutely: [60, ->(time) { time.strftime("j|%y%m%d|%-H:%M") }],
        # hourly aggregates every 10 minutes so we'll have six data points per hour
        hourly: [600, ->(time) {
          m = time.min
          mins = (m < 10) ? "0" : m.to_s[0]
          time.strftime("j|%y%m%d|%-H:#{mins}")
        }]
      }

      # Get metric data for all jobs from the last hour
      #  +class_filter+: return only results for classes matching filter
      #  +minutes+: the number of fine-grained minute buckets to retrieve
      #  +hours+: the number of coarser-grained 10-minute buckets to retrieve, in hours
      def top_jobs(class_filter: nil, minutes: nil, hours: nil)
        time = @time
        minutes = 60 unless minutes || hours

        # DoS protection, sanity check
        minutes = 60 if minutes && minutes > 480
        hours = 72 if hours && hours > 72

        granularity = hours ? :hourly : :minutely
        result = Result.new(granularity)
        result.ends_at = time
        count = hours ? hours * 6 : minutes
        stride, keyproc = ROLLUPS[granularity]

        redis_results = @pool.with do |conn|
          conn.pipelined do |pipe|
            count.times do |idx|
              key = keyproc.call(time)
              pipe.hgetall key
              time -= stride
            end
          end
        end

        result.starts_at = time
        time = @time
        redis_results.each do |hash|
          hash.each do |k, v|
            kls, metric = k.split("|")
            next if class_filter && !class_filter.match?(kls)
            result.job_results[kls].add_metric metric, time, v.to_i
          end
          time -= stride
        end

        result.marks = fetch_marks(result.starts_at..result.ends_at, granularity)
        result
      end

      def for_job(klass, minutes: nil, hours: nil)
        time = @time
        minutes = 60 unless minutes || hours

        # DoS protection, sanity check
        minutes = 60 if minutes && minutes > 480
        hours = 72 if hours && hours > 72

        granularity = hours ? :hourly : :minutely
        result = Result.new(granularity)
        result.ends_at = time
        count = hours ? hours * 6 : minutes
        stride, keyproc = ROLLUPS[granularity]

        redis_results = @pool.with do |conn|
          conn.pipelined do |pipe|
            count.times do |idx|
              key = keyproc.call(time)
              pipe.hmget key, "#{klass}|ms", "#{klass}|p", "#{klass}|f"
              time -= stride
            end
          end
        end

        result.starts_at = time
        time = @time
        @pool.with do |conn|
          redis_results.each do |(ms, p, f)|
            result.job_results[klass].add_metric "ms", time, ms.to_i if ms
            result.job_results[klass].add_metric "p", time, p.to_i if p
            result.job_results[klass].add_metric "f", time, f.to_i if f
            result.job_results[klass].add_hist time, Histogram.new(klass).fetch(conn, time).reverse if minutes
            time -= stride
          end
        end

        result.marks = fetch_marks(result.starts_at..result.ends_at, granularity)
        result
      end

      class Result < Struct.new(:granularity, :starts_at, :ends_at, :size, :job_results, :marks)
        def initialize(granularity = :minutely)
          super
          self.granularity = granularity
          self.marks = []
          self.job_results = Hash.new { |h, k| h[k] = JobResult.new(granularity) }
        end
      end

      class JobResult < Struct.new(:granularity, :series, :hist, :totals)
        def initialize(granularity = :minutely)
          super
          self.granularity = granularity
          self.series = Hash.new { |h, k| h[k] = Hash.new(0) }
          self.hist = Hash.new { |h, k| h[k] = [] }
          self.totals = Hash.new(0)
        end

        def add_metric(metric, time, value)
          totals[metric] += value
          series[metric][Query.bkt_time_s(time, granularity)] += value

          # Include timing measurements in seconds for convenience
          add_metric("s", time, value / 1000.0) if metric == "ms"
        end

        def add_hist(time, hist_result)
          hist[Query.bkt_time_s(time, granularity)] = hist_result
        end

        def total_avg(metric = "ms")
          completed = totals["p"] - totals["f"]
          return 0 if completed.zero?
          totals[metric].to_f / completed
        end

        def series_avg(metric = "ms")
          series[metric].each_with_object(Hash.new(0)) do |(bucket, value), result|
            completed = series.dig("p", bucket) - series.dig("f", bucket)
            result[bucket] = (completed == 0) ? 0 : value.to_f / completed
          end
        end
      end

      MarkResult = Struct.new(:time, :label, :bucket)

      def self.bkt_time_s(time, granularity)
        # truncate time to ten minutes ("8:40", not "8:43") or one minute
        truncation = (granularity == :hourly) ? 600 : 60
        Time.at(time.to_i - time.to_i % truncation).utc.iso8601
      end

      private

      def fetch_marks(time_range, granularity)
        [].tap do |result|
          marks = @pool.with { |c| c.hgetall("#{@time.strftime("%Y%m%d")}-marks") }

          marks.each do |timestamp, label|
            time = Time.parse(timestamp)
            if time_range.cover? time
              result << MarkResult.new(time, label, Query.bkt_time_s(time, granularity))
            end
          end
        end
      end
    end
  end
end