lib/active_job/log_subscriber.rb



# frozen_string_literal: true

require "active_support/log_subscriber"

module ActiveJob
  class LogSubscriber < ActiveSupport::LogSubscriber # :nodoc:
    class_attribute :backtrace_cleaner, default: ActiveSupport::BacktraceCleaner.new

    def enqueue(event)
      job = event.payload[:job]
      ex = event.payload[:exception_object] || job.enqueue_error

      if ex
        error do
          "Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})"
        end
      elsif event.payload[:aborted]
        info do
          "Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
        end
      else
        info do
          "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
        end
      end
    end
    subscribe_log_level :enqueue, :info

    def enqueue_at(event)
      job = event.payload[:job]
      ex = event.payload[:exception_object] || job.enqueue_error

      if ex
        error do
          "Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})"
        end
      elsif event.payload[:aborted]
        info do
          "Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution."
        end
      else
        info do
          "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
        end
      end
    end
    subscribe_log_level :enqueue_at, :info

    def enqueue_all(event)
      info do
        jobs = event.payload[:jobs]
        adapter = event.payload[:adapter]
        enqueued_count = event.payload[:enqueued_count]

        if enqueued_count == jobs.size
          enqueued_jobs_message(adapter, jobs)
        elsif jobs.any?(&:successfully_enqueued?)
          enqueued_jobs = jobs.select(&:successfully_enqueued?)

          failed_enqueue_count = jobs.size - enqueued_count
          if failed_enqueue_count == 0
            enqueued_jobs_message(adapter, enqueued_jobs)
          else
            "#{enqueued_jobs_message(adapter, enqueued_jobs)}. "\
              "Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)}"
          end
        else
          failed_enqueue_count = jobs.size - enqueued_count
          "Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)} "\
            "to #{ActiveJob.adapter_name(adapter)}"
        end
      end
    end
    subscribe_log_level :enqueue_all, :info

    def perform_start(event)
      info do
        job = event.payload[:job]
        enqueue_info = job.enqueued_at.present? ? " enqueued at #{job.enqueued_at.utc.iso8601(9)}" : ""

        "Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)}" + enqueue_info + args_info(job)
      end
    end
    subscribe_log_level :perform_start, :info

    def perform(event)
      job = event.payload[:job]
      ex = event.payload[:exception_object]
      if ex
        error do
          "Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
        end
      elsif event.payload[:aborted]
        error do
          "Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: a before_perform callback halted the job execution"
        end
      else
        info do
          "Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
        end
      end
    end
    subscribe_log_level :perform, :info

    def enqueue_retry(event)
      job = event.payload[:job]
      ex = event.payload[:error]
      wait = event.payload[:wait]

      info do
        if ex
          "Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds, due to a #{ex.class} (#{ex.message})."
        else
          "Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds."
        end
      end
    end
    subscribe_log_level :enqueue_retry, :info

    def retry_stopped(event)
      job = event.payload[:job]
      ex = event.payload[:error]

      error do
        "Stopped retrying #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message}), which reoccurred on #{job.executions} attempts."
      end
    end
    subscribe_log_level :enqueue_retry, :error

    def discard(event)
      job = event.payload[:job]
      ex = event.payload[:error]

      error do
        "Discarded #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message})."
      end
    end
    subscribe_log_level :discard, :error

    private
      def queue_name(event)
        ActiveJob.adapter_name(event.payload[:adapter]) + "(#{event.payload[:job].queue_name})"
      end

      def args_info(job)
        if job.class.log_arguments? && job.arguments.any?
          " with arguments: " +
            job.arguments.map { |arg| format(arg).inspect }.join(", ")
        else
          ""
        end
      end

      def format(arg)
        case arg
        when Hash
          arg.transform_values { |value| format(value) }
        when Array
          arg.map { |value| format(value) }
        when GlobalID::Identification
          arg.to_global_id rescue arg
        else
          arg
        end
      end

      def scheduled_at(event)
        Time.at(event.payload[:job].scheduled_at).utc
      end

      def logger
        ActiveJob::Base.logger
      end

      def info(progname = nil, &block)
        return unless super

        if ActiveJob.verbose_enqueue_logs
          log_enqueue_source
        end
      end

      def error(progname = nil, &block)
        return unless super

        if ActiveJob.verbose_enqueue_logs
          log_enqueue_source
        end
      end

      def log_enqueue_source
        source = extract_enqueue_source_location(caller)

        if source
          logger.info("↳ #{source}")
        end
      end

      def extract_enqueue_source_location(locations)
        backtrace_cleaner.clean(locations.lazy).first
      end

      def enqueued_jobs_message(adapter, enqueued_jobs)
        enqueued_count = enqueued_jobs.size
        job_classes_counts = enqueued_jobs.map(&:class).tally.sort_by { |_k, v| -v }
        "Enqueued #{enqueued_count} #{'job'.pluralize(enqueued_count)} to #{ActiveJob.adapter_name(adapter)}"\
          " (#{job_classes_counts.map { |klass, count| "#{count} #{klass}" }.join(', ')})"
      end
  end
end

ActiveJob::LogSubscriber.attach_to :active_job