lib/sidekiq/rescue/server_middleware.rb



# frozen_string_literal: true

module Sidekiq
  module Rescue
    # Server middleware for sidekiq-rescue
    # It is responsible for catching the errors and rescheduling the job
    # according to the options provided
    # @api private
    class ServerMiddleware
      include Sidekiq::ServerMiddleware

      def call(job_instance, job_payload, _queue, &)
        job_class = job_instance.class
        if job_class.respond_to?(:sidekiq_rescue_options) && !job_class.sidekiq_rescue_options.nil?
          sidekiq_rescue(job_payload, job_class, &)
        else
          yield
        end
      end

      private

      def sidekiq_rescue(job_payload, job_class)
        yield
      rescue StandardError => e
        error_group, options = job_class.sidekiq_rescue_error_group_with_options_by(e)
        raise e unless error_group

        rescue_error(e, error_group, options, job_payload)
      end

      def rescue_error(error, error_group, options, job_payload)
        delay, limit, jitter = options.fetch_values(:delay, :limit, :jitter)
        queue = options.fetch(:queue, job_payload["queue"])

        rescue_counter = increment_rescue_counter_for(error_group, job_payload)
        raise error if rescue_counter > limit

        calculated_delay = calculate_delay(delay, rescue_counter, jitter)
        log_reschedule_info(rescue_counter, error, calculated_delay)
        reschedule_job(job_payload:, delay: calculated_delay, rescue_counter:,
                       error_group:, queue:)
      end

      def increment_rescue_counter_for(error_group, job_payload)
        rescue_counter = job_payload.dig("sidekiq_rescue_exceptions_counter", error_group.to_s) || 0
        rescue_counter += 1
        rescue_counter
      end

      def calculate_delay(delay, rescue_counter, jitter)
        delay = delay.call(rescue_counter) if delay.is_a?(Proc)
        jitter_delay = calculate_delay_jitter(jitter, delay)
        delay + jitter_delay
      end

      def calculate_delay_jitter(jitter, delay)
        return 0.0 if jitter.zero?

        jitter * Kernel.rand * delay
      end

      def log_reschedule_info(rescue_counter, error, delay)
        Sidekiq::Rescue.logger.info("[sidekiq_rescue] Job failed #{rescue_counter} times with error: " \
                                    "#{error.message}; rescheduling in #{delay} seconds")
      end

      def reschedule_job(job_payload:, delay:, rescue_counter:, error_group:, queue:)
        payload = job_payload.dup
        payload["at"] = Time.now.to_f + delay if delay.positive?
        payload["sidekiq_rescue_exceptions_counter"] = { error_group.to_s => rescue_counter }
        payload["queue"] = queue
        Sidekiq::Client.push(payload)
      end
    end
  end
end