lib/sidekiq/monitor.rb



#!/usr/bin/env ruby
# frozen_string_literal: true

require "fileutils"
require "sidekiq/api"

class Sidekiq::Monitor
  class Status
    VALID_SECTIONS = %w[all version overview processes queues]
    COL_PAD = 2

    def display(section = nil)
      section ||= "all"
      unless VALID_SECTIONS.include? section
        puts "I don't know how to check the status of '#{section}'!"
        puts "Try one of these: #{VALID_SECTIONS.join(", ")}"
        return
      end
      send(section)
    end

    def all
      version
      puts
      overview
      puts
      processes
      puts
      queues
    end

    def version
      puts "Sidekiq #{Sidekiq::VERSION}"
      puts Time.now.utc
    end

    def overview
      puts "---- Overview ----"
      puts "  Processed: #{delimit stats.processed}"
      puts "     Failed: #{delimit stats.failed}"
      puts "       Busy: #{delimit stats.workers_size}"
      puts "   Enqueued: #{delimit stats.enqueued}"
      puts "    Retries: #{delimit stats.retry_size}"
      puts "  Scheduled: #{delimit stats.scheduled_size}"
      puts "       Dead: #{delimit stats.dead_size}"
    end

    def processes
      puts "---- Processes (#{process_set.size}) ----"
      process_set.each_with_index do |process, index|
        # Keep compatibility with legacy versions since we don't want to break sidekiqmon during rolling upgrades or downgrades.
        #
        # Before:
        #   ["default", "critical"]
        #
        # After:
        #   {"default" => 1, "critical" => 10}
        queues =
          if process["weights"]
            process["weights"].sort_by { |queue| queue[0] }.map { |capsule| capsule.map { |name, weight| (weight > 0) ? "#{name}: #{weight}" : name }.join(", ") }
          else
            process["queues"].sort
          end

        puts "#{process["identity"]} #{tags_for(process)}"
        puts "  Started: #{Time.at(process["started_at"])} (#{time_ago(process["started_at"])})"
        puts "  Threads: #{process["concurrency"]} (#{process["busy"]} busy)"
        puts "   Queues: #{split_multiline(queues, pad: 11)}"
        puts "  Version: #{process["version"] || "Unknown"}" if process["version"] != Sidekiq::VERSION
        puts "" unless (index + 1) == process_set.size
      end
    end

    def queues
      puts "---- Queues (#{queue_data.size}) ----"
      columns = {
        name: [:ljust, (["name"] + queue_data.map(&:name)).map(&:length).max + COL_PAD],
        size: [:rjust, (["size"] + queue_data.map(&:size)).map(&:length).max + COL_PAD],
        latency: [:rjust, (["latency"] + queue_data.map(&:latency)).map(&:length).max + COL_PAD]
      }
      columns.each { |col, (dir, width)| print col.to_s.upcase.public_send(dir, width) }
      puts
      queue_data.each do |q|
        columns.each do |col, (dir, width)|
          print q.send(col).public_send(dir, width)
        end
        puts
      end
    end

    private

    def delimit(number)
      number.to_s.reverse.scan(/.{1,3}/).join(",").reverse
    end

    def split_multiline(values, opts = {})
      return "none" unless values
      pad = opts[:pad] || 0
      max_length = opts[:max_length] || (80 - pad)
      out = []
      line = +""
      values.each do |value|
        if (line.length + value.length) > max_length
          out << line
          line = " " * pad
        end
        line << value + ", "
      end
      out << line[0..-3]
      out.join("\n")
    end

    def tags_for(process)
      tags = [
        process["tag"],
        process["labels"],
        ((process["quiet"] == "true") ? "quiet" : nil)
      ].flatten.compact
      tags.any? ? "[#{tags.join("] [")}]" : nil
    end

    def time_ago(timestamp)
      seconds = Time.now - Time.at(timestamp)
      return "just now" if seconds < 60
      return "a minute ago" if seconds < 120
      return "#{seconds.floor / 60} minutes ago" if seconds < 3600
      return "an hour ago" if seconds < 7200
      "#{seconds.floor / 60 / 60} hours ago"
    end

    QUEUE_STRUCT = Struct.new(:name, :size, :latency)
    def queue_data
      @queue_data ||= Sidekiq::Queue.all.map { |q|
        QUEUE_STRUCT.new(q.name, q.size.to_s, sprintf("%#.2f", q.latency))
      }
    end

    def process_set
      @process_set ||= Sidekiq::ProcessSet.new
    end

    def stats
      @stats ||= Sidekiq::Stats.new
    end
  end
end