lib/active_job/enqueuing.rb



# frozen_string_literal: true

module ActiveJob
  # Provides behavior for enqueuing jobs.

  # Can be raised by adapters if they wish to communicate to the caller a reason
  # why the adapter was unexpectedly unable to enqueue a job.
  class EnqueueError < StandardError; end

  class << self
    # Push many jobs onto the queue at once without running enqueue callbacks.
    # Queue adapters may communicate the enqueue status of each job by setting
    # successfully_enqueued and/or enqueue_error on the passed-in job instances.
    def perform_all_later(*jobs)
      jobs.flatten!
      jobs.group_by(&:queue_adapter).each do |queue_adapter, adapter_jobs|
        instrument_enqueue_all(queue_adapter, adapter_jobs) do
          if queue_adapter.respond_to?(:enqueue_all)
            queue_adapter.enqueue_all(adapter_jobs)
          else
            adapter_jobs.each do |job|
              job.successfully_enqueued = false
              if job.scheduled_at
                queue_adapter.enqueue_at(job, job.scheduled_at.to_f)
              else
                queue_adapter.enqueue(job)
              end
              job.successfully_enqueued = true
            rescue EnqueueError => e
              job.enqueue_error = e
            end
            adapter_jobs.count(&:successfully_enqueued?)
          end
        end
      end
      nil
    end
  end

  module Enqueuing
    extend ActiveSupport::Concern

    included do
      ##
      # :singleton-method:
      #
      # Defines if enqueueing this job from inside an Active Record transaction
      # automatically defers the enqueue to after the transaction commits.
      #
      # It can be set on a per job basis:
      #  - `:always` forces the job to be deferred.
      #  - `:never` forces the job to be queued immediately.
      #  - `:default` lets the queue adapter define the behavior (recommended).
      class_attribute :enqueue_after_transaction_commit, instance_accessor: false, instance_predicate: false, default: :never
    end

    # Includes the +perform_later+ method for job initialization.
    module ClassMethods
      # Push a job onto the queue. By default the arguments must be either String,
      # Integer, Float, NilClass, TrueClass, FalseClass, BigDecimal, Symbol, Date,
      # Time, DateTime, ActiveSupport::TimeWithZone, ActiveSupport::Duration,
      # Hash, ActiveSupport::HashWithIndifferentAccess, Array, Range, or
      # GlobalID::Identification instances, although this can be extended by adding
      # custom serializers.
      #
      # Returns an instance of the job class queued with arguments available in
      # Job#arguments or +false+ if the enqueue did not succeed.
      #
      # After the attempted enqueue, the job will be yielded to an optional block.
      #
      # If Active Job is used conjointly with Active Record, and #perform_later is called
      # inside an Active Record transaction, then the enqueue is implicitly deferred to after
      # the transaction is committed, or dropped if it's rolled back. In such case #perform_later
      # will return the job instance like if it was successfully enqueued, but will still return
      # +false+ if a callback prevented the job from being enqueued.
      #
      # This behavior can be changed on a per job basis:
      #
      #  class NotificationJob < ApplicationJob
      #    self.enqueue_after_transaction_commit = false
      #  end
      def perform_later(...)
        job = job_or_instantiate(...)
        enqueue_result = job.enqueue

        yield job if block_given?

        enqueue_result
      end

      private
        def job_or_instantiate(*args, &_) # :doc:
          args.first.is_a?(self) ? args.first : new(*args)
        end
        ruby2_keywords(:job_or_instantiate)
    end

    # Enqueues the job to be performed by the queue adapter.
    #
    # ==== Options
    # * <tt>:wait</tt> - Enqueues the job with the specified delay
    # * <tt>:wait_until</tt> - Enqueues the job at the time specified
    # * <tt>:queue</tt> - Enqueues the job on the specified queue
    # * <tt>:priority</tt> - Enqueues the job with the specified priority
    #
    # ==== Examples
    #
    #    my_job_instance.enqueue
    #    my_job_instance.enqueue wait: 5.minutes
    #    my_job_instance.enqueue queue: :important
    #    my_job_instance.enqueue wait_until: Date.tomorrow.midnight
    #    my_job_instance.enqueue priority: 10
    def enqueue(options = {})
      set(options)
      self.successfully_enqueued = false

      run_callbacks :enqueue do
        raw_enqueue
      end

      if successfully_enqueued?
        self
      else
        false
      end
    end

    private
      def raw_enqueue
        if scheduled_at
          queue_adapter.enqueue_at self, scheduled_at.to_f
        else
          queue_adapter.enqueue self
        end

        self.successfully_enqueued = true
      rescue EnqueueError => e
        self.enqueue_error = e
      end
  end
end