class MarjAdapter
See github.com/nicholasdower/marj<br><br>the {Marj} module.
Although it is possible to access the adapter directly in order to query, discard or delete, it is recommended to use
- A delete method which can be used to delete enqueued jobs.
- A discard method which can be used to discard enqueued jobs.
- A query method which can be used to query enqueued jobs
In addition to the standard ActiveJob queue adapter API, this adapter provides:
ActiveJob queue adapter for Marj.
def delete(job)
-
(ActiveJob::Base)- the deleted job
def delete(job) job.tap { destroy_record(job) } end
def destroy_record(job)
-
(ActiveRecord::Base, NilClass)- the destroyed record or +nil+ if no such record exists
def destroy_record(job) record = job.singleton_class.instance_variable_get(:@record) record ||= Marj::Record.find_by(job_id: job.job_id)&.tap { _1.send(:register_callbacks, job) } record&.destroy end
def discard(job, run_callbacks: true)
-
(ActiveJob::Base)- the discarded job
Parameters:
-
run_callbacks(Boolean) -- whether to run the +after_discard+ callbacks -
job(ActiveJob::Base) -- the job being discarded
def discard(job, run_callbacks: true) job.tap do @discard_proc.call(job) run_after_discard_callbacks(job) if run_callbacks end end
def enqueue(job)
-
(ActiveJob::Base)- the enqueued job
Parameters:
-
job(ActiveJob::Base) -- the job to enqueue
def enqueue(job) enqueue_at(job) end
def enqueue_after_transaction_commit?
Returns whether jobs enqueued during a transaction should actually be enqueued only after the
def enqueue_after_transaction_commit? false end
def enqueue_at(job, timestamp = nil)
-
(ActiveJob::Base)- the enqueued job
Parameters:
-
timestamp(Numeric, NilClass) -- optional number of seconds since Unix epoch at which to execute the job -
job(ActiveJob::Base) -- the job to enqueue
def enqueue_at(job, timestamp = nil) job.scheduled_at = timestamp ? Time.at(timestamp).utc : nil # Argument serialization is done by ActiveJob. ActiveRecord expects deserialized arguments. serialized = job.serialize.symbolize_keys!.without(:provider_job_id).merge(arguments: job.arguments) # Serialize sets locale to I18n.locale.to_s and enqueued_at to Time.now.utc.iso8601(9). # Update the job to reflect what is being enqueued. job.locale = serialized[:locale] job.enqueued_at = Time.iso8601(serialized[:enqueued_at]).utc # When a job is enqueued, we must create/update the corresponding database record. We also must ensure callbacks # are registered on the job instance so that when the job is executed, the database record is deleted or updated # (depending on the result). # # We keep track of whether callbacks have been registered by setting the @record instance variable on the job's # singleton class. This holds a reference to the record. This ensures that if execute is called on a record # instance, any updates to the database are reflected on that record instance. if (existing_record = job.singleton_class.instance_variable_get(:@record)) # This job instance has already been associated with a database row. if record_class.exists?(job_id: job.job_id) # The database row still exists, we simply need to update it. existing_record.update!(serialized) else # Someone else deleted the database row, we need to recreate and reload the existing record instance. We don't # want to register the new instance because someone might still have a reference to the existing one. record_class.create!(serialized) existing_record.reload end else # This job instance has not been associated with a database row. if (new_record = record_class.find_by(job_id: job.job_id)) # The database row already exists. Update it. new_record.update!(serialized) else # The database row does not exist. Create it. new_record = record_class.create!(serialized) end new_record.send(:register_callbacks, job) end job end
def initialize(record_class: 'Marj::Record', discard: proc { |job| delete(job) })
-
discard(Proc) -- the proc to use to discard jobs, defaults to delegating to {delete} -
record_class(Class, String) -- the +ActiveRecord+ class (or its name) to use, defaults to +Marj::Record+
def initialize(record_class: 'Marj::Record', discard: proc { |job| delete(job) }) @record_class = record_class @discard_proc = discard end
def query(*args, **kwargs)
query(job_id: '123') # Returns the job with job_id '123' or nil if no such job exists
query(queue_name: 'foo') # Returns all jobs in the 'foo' queue
query(:due, job_class: Foo) # Returns jobs which are due to be executed with job_class Foo
query(job_class: Foo) # Returns all jobs with job_class Foo
query(:due, limit: 10) # Returns at most 10 jobs which are due to be executed
query(:due) # Returns jobs which are due to be executed
query(:all) # Returns all jobs
query # Returns all jobs
Example usage:
By default jobs are ordered by when they should be executed.
- If +:order+ is specified, the jobs are ordered by the given attribute.
- If +:limit+ is specified, the maximum number of jobs is limited.
- If only a job ID is specified, the corresponding job is returned.
- Symbol arguments are treated as +ActiveRecord+ scopes.
Queries enqueued jobs. Similar to +ActiveRecord.where+ with a few additional features:
def query(*args, **kwargs) args, kwargs = args.dup, kwargs.dup.symbolize_keys kwargs = kwargs.merge(job_id: kwargs.delete(:id)) if kwargs.key?(:id) kwargs[:job_id] = args.shift if args.size == 1 && args.first.is_a?(String) && args.first.match(JOB_ID_REGEX) if args.empty? && kwargs.size == 1 && kwargs.key?(:job_id) return record_class.find_by(job_id: kwargs[:job_id])&.to_job end symbol_args, args = args.partition { _1.is_a?(Symbol) } symbol_args.delete(:all) limit = kwargs.delete(:limit) relation = record_class.all relation = relation.order(kwargs.delete(:order)) if kwargs.key?(:order) relation = relation.where(*args, **kwargs) if args.any? || kwargs.any? relation = relation.limit(limit) if limit relation = relation.send(symbol_args.shift) while symbol_args.any? relation = relation.by_due_date if relation.is_a?(ActiveRecord::Relation) && relation.order_values.empty? if relation.is_a?(Enumerable) relation.map(&:to_job) elsif relation.is_a?(record_class) relation.to_job else relation end end
def record_class
-
(Class)- the +ActiveRecord+ class
def record_class @record_class = @record_class.is_a?(String) ? @record_class.constantize : @record_class end
def run_after_discard_callbacks(job)
-
(NilClass)- the given job
Parameters:
-
job(ActiveJob::Base) -- the job being discarded
def run_after_discard_callbacks(job) # Copied from ActiveJob::Exceptions#run_after_discard_procs exceptions = [] job.after_discard_procs.each do |blk| instance_exec(job, nil, &blk) rescue StandardError => e exceptions << e end raise exceptions.last if exceptions.any? nil end