lib/active_job/core.rb



# frozen_string_literal: true

module ActiveJob
  # = Active Job \Core
  #
  # Provides general behavior that will be included into every Active Job
  # object that inherits from ActiveJob::Base.
  module Core
    extend ActiveSupport::Concern

    # Job arguments
    attr_accessor :arguments
    attr_writer :serialized_arguments

    # Time when the job should be performed
    attr_reader :scheduled_at

    attr_reader :_scheduled_at_time # :nodoc:

    # Job Identifier
    attr_accessor :job_id

    # Queue in which the job will reside.
    attr_writer :queue_name

    # Priority that the job will have (lower is more priority).
    attr_writer :priority

    # ID optionally provided by adapter
    attr_accessor :provider_job_id

    # Number of times this job has been executed (which increments on every retry, like after an exception).
    attr_accessor :executions

    # Hash that contains the number of times this job handled errors for each specific retry_on declaration.
    # Keys are the string representation of the exceptions listed in the retry_on declaration,
    # while its associated value holds the number of executions where the corresponding retry_on
    # declaration handled one of its listed exceptions.
    attr_accessor :exception_executions

    # I18n.locale to be used during the job.
    attr_accessor :locale

    # Timezone to be used during the job.
    attr_accessor :timezone

    # Track when a job was enqueued
    attr_accessor :enqueued_at

    # Track whether the adapter received the job successfully.
    attr_writer :successfully_enqueued # :nodoc:

    def successfully_enqueued?
      @successfully_enqueued
    end

    # Track any exceptions raised by the backend so callers can inspect the errors.
    attr_accessor :enqueue_error

    # These methods will be included into any Active Job object, adding
    # helpers for de/serialization and creation of job instances.
    module ClassMethods
      # Creates a new job instance from a hash created with +serialize+
      def deserialize(job_data)
        job = job_data["job_class"].constantize.new
        job.deserialize(job_data)
        job
      end

      # Creates a job preconfigured with the given options. You can call
      # perform_later with the job arguments to enqueue the job with the
      # preconfigured options
      #
      # ==== Options
      # * <tt>:wait</tt> - Enqueues the job with the specified delay
      # * <tt>:wait_until</tt> - Enqueues the job at the time specified
      # * <tt>:queue</tt> - Enqueues the job on the specified queue
      # * <tt>:priority</tt> - Enqueues the job with the specified priority
      #
      # ==== Examples
      #
      #    VideoJob.set(queue: :some_queue).perform_later(Video.last)
      #    VideoJob.set(wait: 5.minutes).perform_later(Video.last)
      #    VideoJob.set(wait_until: Time.now.tomorrow).perform_later(Video.last)
      #    VideoJob.set(queue: :some_queue, wait: 5.minutes).perform_later(Video.last)
      #    VideoJob.set(queue: :some_queue, wait_until: Time.now.tomorrow).perform_later(Video.last)
      #    VideoJob.set(queue: :some_queue, wait: 5.minutes, priority: 10).perform_later(Video.last)
      def set(options = {})
        ConfiguredJob.new(self, options)
      end
    end

    # Creates a new job instance. Takes the arguments that will be
    # passed to the perform method.
    def initialize(*arguments)
      @arguments  = arguments
      @job_id     = SecureRandom.uuid
      @queue_name = self.class.queue_name
      @scheduled_at = nil
      @_scheduled_at_time = nil
      @priority   = self.class.priority
      @executions = 0
      @exception_executions = {}
      @timezone   = Time.zone&.name
    end
    ruby2_keywords(:initialize)

    # Returns a hash with the job data that can safely be passed to the
    # queuing adapter.
    def serialize
      {
        "job_class"  => self.class.name,
        "job_id"     => job_id,
        "provider_job_id" => provider_job_id,
        "queue_name" => queue_name,
        "priority"   => priority,
        "arguments"  => serialize_arguments_if_needed(arguments),
        "executions" => executions,
        "exception_executions" => exception_executions,
        "locale"     => I18n.locale.to_s,
        "timezone"   => timezone,
        "enqueued_at" => Time.now.utc.iso8601(9),
        "scheduled_at" => _scheduled_at_time ? _scheduled_at_time.utc.iso8601(9) : nil,
      }
    end

    # Attaches the stored job data to the current instance. Receives a hash
    # returned from +serialize+
    #
    # ==== Examples
    #
    #    class DeliverWebhookJob < ActiveJob::Base
    #      attr_writer :attempt_number
    #
    #      def attempt_number
    #        @attempt_number ||= 0
    #      end
    #
    #      def serialize
    #        super.merge('attempt_number' => attempt_number + 1)
    #      end
    #
    #      def deserialize(job_data)
    #        super
    #        self.attempt_number = job_data['attempt_number']
    #      end
    #
    #      rescue_from(Timeout::Error) do |exception|
    #        raise exception if attempt_number > 5
    #        retry_job(wait: 10)
    #      end
    #    end
    def deserialize(job_data)
      self.job_id               = job_data["job_id"]
      self.provider_job_id      = job_data["provider_job_id"]
      self.queue_name           = job_data["queue_name"]
      self.priority             = job_data["priority"]
      self.serialized_arguments = job_data["arguments"]
      self.executions           = job_data["executions"]
      self.exception_executions = job_data["exception_executions"]
      self.locale               = job_data["locale"] || I18n.locale.to_s
      self.timezone             = job_data["timezone"] || Time.zone&.name
      self.enqueued_at          = Time.iso8601(job_data["enqueued_at"]) if job_data["enqueued_at"]
      self.scheduled_at         = Time.iso8601(job_data["scheduled_at"]) if job_data["scheduled_at"]
    end

    # Configures the job with the given options.
    def set(options = {}) # :nodoc:
      self.scheduled_at = options[:wait].seconds.from_now if options[:wait]
      self.scheduled_at = options[:wait_until] if options[:wait_until]
      self.queue_name   = self.class.queue_name_from_part(options[:queue]) if options[:queue]
      self.priority     = options[:priority].to_i if options[:priority]

      self
    end

    def scheduled_at=(value)
      @_scheduled_at_time = if value&.is_a?(Numeric)
        ActiveJob.deprecator.warn(<<~MSG.squish)
          Assigning a numeric/epoch value to scheduled_at is deprecated. Use a Time object instead.
        MSG
        Time.at(value)
      else
        value
      end
      @scheduled_at = value
    end

    private
      def serialize_arguments_if_needed(arguments)
        if arguments_serialized?
          @serialized_arguments
        else
          serialize_arguments(arguments)
        end
      end

      def deserialize_arguments_if_needed
        if arguments_serialized?
          @arguments = deserialize_arguments(@serialized_arguments)
          @serialized_arguments = nil
        end
      end

      def serialize_arguments(arguments)
        Arguments.serialize(arguments)
      end

      def deserialize_arguments(serialized_args)
        Arguments.deserialize(serialized_args)
      end

      def arguments_serialized?
        defined?(@serialized_arguments) && @serialized_arguments
      end
  end
end