lib/marj/record.rb



# frozen_string_literal: true

require 'active_job'
require 'active_record'

module Marj
  # The default +ActiveRecord+ class.
  class Record < ActiveRecord::Base
    self.table_name = :jobs

    # Order by +enqueued_at+ rather than +job_id+ (the default).
    self.implicit_order_column = 'enqueued_at'

    # Using a custom serializer for exception_executions so that we can interact with it as a hash rather than a
    # string.
    serialize(:exception_executions, coder: JSON)

    # Using a custom serializer for arguments so that we can interact with as an array rather than a string.
    # This enables code like:
    #   Marj::Record.next.arguments.first
    #   Marj::Record.next.update!(arguments: ['foo', 1, Time.now])
    serialize(:arguments, coder: Class.new do
      def self.dump(arguments)
        return ActiveJob::Arguments.serialize(arguments).to_json if arguments.is_a?(Array)
        return arguments if arguments.is_a?(String) || arguments.nil?

        raise "invalid arguments: #{arguments}"
      end

      def self.load(arguments)
        arguments ? ActiveJob::Arguments.deserialize(JSON.parse(arguments)) : nil
      end
    end)

    # Using a custom serializer for job_class so that we can interact with it as a class rather than a string.
    serialize(:job_class, coder: Class.new do
      def self.dump(clazz)
        return clazz.name if clazz.is_a?(Class)
        return clazz if clazz.is_a?(String) || clazz.nil?

        raise "invalid class: #{clazz}"
      end

      def self.load(str)
        str&.constantize
      end
    end)

    # Returns a job object for this record which will update the database when successfully executed, enqueued or
    # discarded.
    #
    # @return [ActiveJob::Base]
    def to_job
      # See register_callbacks for details on how callbacks are used.
      job = job_class.new.tap { register_callbacks(_1) }

      # ActiveJob::Base#deserialize expects serialized arguments. But the record arguments have already been
      # deserialized by a custom ActiveRecord serializer (see below). So instead we use the raw arguments string.
      job_data = attributes.merge('arguments' => JSON.parse(read_attribute_before_type_cast(:arguments)))

      # ActiveJob::Base#deserialize expects dates to be strings rather than Time objects.
      job_data = job_data.to_h { |k, v| [k, %w[enqueued_at scheduled_at].include?(k) ? v&.iso8601 : v] }

      job.deserialize(job_data)

      # ActiveJob deserializes arguments on demand when a job is performed. Until then they are empty. That's strange.
      # Instead, deserialize them now. Also, clear `serialized_arguments` to prevent ActiveJob from overwriting changes
      # to arguments when serializing later.
      job.arguments = arguments
      job.serialized_arguments = nil

      job
    end

    # Registers callbacks for the given job which destroy this record when the job succeeds or is discarded.
    #
    # @param job [ActiveJob::Base]
    # @return [ActiveJob::Base]
    def register_callbacks(job)
      raise 'callbacks already registered' if job.singleton_class.instance_variable_get(:@record)

      record = self
      # We need to detect three cases:
      #  - If a job succeeds, after_perform will be called.
      #  - If a job fails and should be retried, enqueue will be called. This is handled by the queue adapter.
      #  - If a job exceeds its max attempts, after_discard will be called.
      job.singleton_class.after_perform { |_j| job.queue_adapter.delete(job) }
      job.singleton_class.after_discard { |_j, _exception| job.queue_adapter.discard(job, run_callbacks: false) }
      job.singleton_class.instance_variable_set(:@record, record)

      job
    end
    private :register_callbacks

    class << self
      # Returns an +ActiveRecord::Relation+ scope for enqueued jobs with a +scheduled_at+ that is either +null+ or in
      # the past.
      #
      # @return [ActiveRecord::Relation]
      def due
        where('scheduled_at IS NULL OR scheduled_at <= ?', Time.now.utc)
      end

      # Returns an +ActiveRecord::Relation+ scope for jobs ordered by due date.
      #
      # Jobs are ordered by the following criteria, in order:
      # 1. past or null scheduled_at before future scheduled_at
      # 2. ascending priority, nulls last
      # 3. ascending scheduled_at, nulls last
      # 4. ascending enqueued_at
      #
      # @return [ActiveRecord::Relation]
      def by_due_date
        order(
          Arel.sql(<<~SQL.squish, Time.now.utc)
            CASE WHEN scheduled_at IS NULL OR scheduled_at <= ? THEN 0 ELSE 1 END,
            CASE WHEN priority IS NULL THEN 1 ELSE 0 END, priority,
            CASE WHEN scheduled_at IS NULL THEN 1 ELSE 0 END, scheduled_at,
            enqueued_at
          SQL
        )
      end
    end
  end
end