class ActiveJob::LogSubscriber
:nodoc:
def args_info(job)
def args_info(job) if job.class.log_arguments? && job.arguments.any? " with arguments: " + job.arguments.map { |arg| format(arg).inspect }.join(", ") else "" end end
def discard(event)
def discard(event) job = event.payload[:job] ex = event.payload[:error] error do "Discarded #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message})." end end
def enqueue(event)
def enqueue(event) job = event.payload[:job] ex = event.payload[:exception_object] || job.enqueue_error if ex error do "Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})" end elsif event.payload[:aborted] info do "Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution." end else info do "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)}" + args_info(job) end end end
def enqueue_all(event)
def enqueue_all(event) info do jobs = event.payload[:jobs] adapter = event.payload[:adapter] enqueued_count = event.payload[:enqueued_count].to_i if enqueued_count == jobs.size enqueued_jobs_message(adapter, jobs) elsif jobs.any?(&:successfully_enqueued?) enqueued_jobs = jobs.select(&:successfully_enqueued?) failed_enqueue_count = jobs.size - enqueued_count if failed_enqueue_count == 0 enqueued_jobs_message(adapter, enqueued_jobs) else "#{enqueued_jobs_message(adapter, enqueued_jobs)}. "\ "Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)}" end else failed_enqueue_count = jobs.size - enqueued_count "Failed enqueuing #{failed_enqueue_count} #{'job'.pluralize(failed_enqueue_count)} "\ "to #{ActiveJob.adapter_name(adapter)}" end end end
def enqueue_at(event)
def enqueue_at(event) job = event.payload[:job] ex = event.payload[:exception_object] || job.enqueue_error if ex error do "Failed enqueuing #{job.class.name} to #{queue_name(event)}: #{ex.class} (#{ex.message})" end elsif event.payload[:aborted] info do "Failed enqueuing #{job.class.name} to #{queue_name(event)}, a before_enqueue callback halted the enqueuing execution." end else info do "Enqueued #{job.class.name} (Job ID: #{job.job_id}) to #{queue_name(event)} at #{scheduled_at(event)}" + args_info(job) end end end
def enqueue_retry(event)
def enqueue_retry(event) job = event.payload[:job] ex = event.payload[:error] wait = event.payload[:wait] info do if ex "Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds, due to a #{ex.class} (#{ex.message})." else "Retrying #{job.class} (Job ID: #{job.job_id}) after #{job.executions} attempts in #{wait.to_i} seconds." end end end
def enqueue_source_location
def enqueue_source_location Thread.each_caller_location do |location| frame = backtrace_cleaner.clean_frame(location) return frame if frame end nil end
def enqueued_jobs_message(adapter, enqueued_jobs)
def enqueued_jobs_message(adapter, enqueued_jobs) enqueued_count = enqueued_jobs.size job_classes_counts = enqueued_jobs.map(&:class).tally.sort_by { |_k, v| -v } "Enqueued #{enqueued_count} #{'job'.pluralize(enqueued_count)} to #{ActiveJob.adapter_name(adapter)}"\ " (#{job_classes_counts.map { |klass, count| "#{count} #{klass}" }.join(', ')})" end
def error(progname = nil, &block)
def error(progname = nil, &block) return unless super if ActiveJob.verbose_enqueue_logs log_enqueue_source end end
def format(arg)
def format(arg) case arg when Hash arg.transform_values { |value| format(value) } when Array arg.map { |value| format(value) } when GlobalID::Identification arg.to_global_id rescue arg else arg end end
def info(progname = nil, &block)
def info(progname = nil, &block) return unless super if ActiveJob.verbose_enqueue_logs log_enqueue_source end end
def log_enqueue_source
def log_enqueue_source source = enqueue_source_location if source logger.info("↳ #{source}") end end
def logger
def logger ActiveJob::Base.logger end
def perform(event)
def perform(event) job = event.payload[:job] ex = event.payload[:exception_object] if ex error do "Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: #{ex.class} (#{ex.message}):\n" + Array(ex.backtrace).join("\n") end elsif event.payload[:aborted] error do "Error performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms: a before_perform callback halted the job execution" end else info do "Performed #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)} in #{event.duration.round(2)}ms" end end end
def perform_start(event)
def perform_start(event) info do job = event.payload[:job] enqueue_info = job.enqueued_at.present? ? " enqueued at #{job.enqueued_at.utc.iso8601(9)}" : "" "Performing #{job.class.name} (Job ID: #{job.job_id}) from #{queue_name(event)}" + enqueue_info + args_info(job) end end
def queue_name(event)
def queue_name(event) ActiveJob.adapter_name(event.payload[:adapter]) + "(#{event.payload[:job].queue_name})" end
def retry_stopped(event)
def retry_stopped(event) job = event.payload[:job] ex = event.payload[:error] error do "Stopped retrying #{job.class} (Job ID: #{job.job_id}) due to a #{ex.class} (#{ex.message}), which reoccurred on #{job.executions} attempts." end end
def scheduled_at(event)
def scheduled_at(event) Time.at(event.payload[:job].scheduled_at).utc end