lib/active_job/logging.rb



# frozen_string_literal: true

require "active_support/core_ext/string/filters"
require "active_support/tagged_logging"
require "active_support/logger"

module ActiveJob
  module Logging #:nodoc:
    extend ActiveSupport::Concern

    included do
      cattr_accessor :logger, default: ActiveSupport::TaggedLogging.new(ActiveSupport::Logger.new(STDOUT))

      around_enqueue do |_, block|
        tag_logger do
          block.call
        end
      end

      around_perform do |job, block|
        tag_logger(job.class.name, job.job_id) do
          payload = { adapter: job.class.queue_adapter, job: job }
          ActiveSupport::Notifications.instrument("perform_start.active_job", payload.dup)
          ActiveSupport::Notifications.instrument("perform.active_job", payload) do
            block.call
          end
        end
      end

      around_enqueue do |job, block|
        if job.scheduled_at
          ActiveSupport::Notifications.instrument("enqueue_at.active_job",
            adapter: job.class.queue_adapter, job: job, &block)
        else
          ActiveSupport::Notifications.instrument("enqueue.active_job",
            adapter: job.class.queue_adapter, job: job, &block)
        end
      end
    end

    private
      def tag_logger(*tags)
        if logger.respond_to?(:tagged)
          tags.unshift "ActiveJob" unless logger_tagged_by_active_job?
          logger.tagged(*tags) { yield }
        else
          yield
        end
      end

      def logger_tagged_by_active_job?
        logger.formatter.current_tags.include?("ActiveJob")
      end

      class LogSubscriber < ActiveSupport::LogSubscriber #:nodoc:
        def enqueue(event)
          info do
            job = event.payload[:job]
            "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job)
          end
        end

        def enqueue_at(event)
          info do
            job = event.payload[:job]
            "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job)
          end
        end

        def perform_start(event)
          info do
            job = event.payload[:job]
            "Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} enqueued at #{job.enqueued_at}" + args_info(job)
          end
        end

        def perform(event)
          job = event.payload[:job]
          ex = event.payload[:exception_object]
          if ex
            error do
              "Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n")
            end
          else
            info do
              "Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms"
            end
          end
        end

        def enqueue_retry(event)
          job = event.payload[:job]
          ex = event.payload[:error]
          wait = event.payload[:wait]

          info do
            if ex
              "Retrying #{job.class} in #{wait.to_i} seconds, due to a #{ex.class}."
            else
              "Retrying #{job.class} in #{wait.to_i} seconds."
            end
          end
        end

        def retry_stopped(event)
          job = event.payload[:job]
          ex = event.payload[:error]

          error do
            "Stopped retrying #{job.class} due to a #{ex.class}, which reoccurred on #{job.executions} attempts."
          end
        end

        def discard(event)
          job = event.payload[:job]
          ex = event.payload[:error]

          error do
            "Discarded #{job.class} due to a #{ex.class}."
          end
        end

        private
          def queue_name(event)
            event.payload[:adapter].class.name.demodulize.remove("Adapter") + "(#{event.payload[:job].queue_name})"
          end

          def args_info(job)
            if job.arguments.any?
              " with arguments: " +
                job.arguments.map { |arg| format(arg).inspect }.join(", ")
            else
              ""
            end
          end

          def format(arg)
            case arg
            when Hash
              arg.transform_values { |value| format(value) }
            when Array
              arg.map { |value| format(value) }
            when GlobalID::Identification
              arg.to_global_id rescue arg
            else
              arg
            end
          end

          def scheduled_at(event)
            Time.at(event.payload[:job].scheduled_at).utc
          end

          def logger
            ActiveJob::Base.logger
          end
      end
  end
end

ActiveJob::Logging::LogSubscriber.attach_to :active_job