lib/active_job/queue_adapters/sidekiq_adapter.rb
# frozen_string_literal: true begin gem "activejob", ">= 7.0" require "active_job" module Sidekiq module ActiveJob # @api private class Wrapper include Sidekiq::Job def perform(job_data) ::ActiveJob::Base.execute(job_data.merge("provider_job_id" => jid)) end end end end unless ActiveJob::Base.respond_to?(:sidekiq_options) # By including the Options module, we allow AJs to directly control sidekiq features # via the *sidekiq_options* class method and, for instance, not use AJ's retry system. # AJ retries don't show up in the Sidekiq UI Retries tab, don't save any error data, can't be # manually retried, don't automatically die, etc. # # class SomeJob < ActiveJob::Base # queue_as :default # sidekiq_options retry: 3, backtrace: 10 # def perform # end # end ActiveJob::Base.include Sidekiq::Job::Options end # Patch the ActiveJob module module ActiveJob module QueueAdapters # Explicitly remove the implementation existing in older Rails. remove_const(:SidekiqAdapter) if const_defined?(:SidekiqAdapter) # Sidekiq adapter for Active Job # # To use Sidekiq set the queue_adapter config to +:sidekiq+. # # Rails.application.config.active_job.queue_adapter = :sidekiq class SidekiqAdapter # Defines whether enqueuing should happen implicitly to after commit when called # from inside a transaction. # @api private def enqueue_after_transaction_commit? true end # @api private def enqueue(job) job.provider_job_id = Sidekiq::ActiveJob::Wrapper.set( wrapped: job.class, queue: job.queue_name ).perform_async(job.serialize) end # @api private def enqueue_at(job, timestamp) job.provider_job_id = Sidekiq::ActiveJob::Wrapper.set( wrapped: job.class, queue: job.queue_name ).perform_at(timestamp, job.serialize) end # @api private def enqueue_all(jobs) enqueued_count = 0 jobs.group_by(&:class).each do |job_class, same_class_jobs| same_class_jobs.group_by(&:queue_name).each do |queue, same_class_and_queue_jobs| immediate_jobs, scheduled_jobs = same_class_and_queue_jobs.partition { |job| job.scheduled_at.nil? } if immediate_jobs.any? jids = Sidekiq::Client.push_bulk( "class" => Sidekiq::ActiveJob::Wrapper, "wrapped" => job_class, "queue" => queue, "args" => immediate_jobs.map { |job| [job.serialize] } ) enqueued_count += jids.compact.size end if scheduled_jobs.any? jids = Sidekiq::Client.push_bulk( "class" => Sidekiq::ActiveJob::Wrapper, "wrapped" => job_class, "queue" => queue, "args" => scheduled_jobs.map { |job| [job.serialize] }, "at" => scheduled_jobs.map { |job| job.scheduled_at&.to_f } ) enqueued_count += jids.compact.size end end end enqueued_count end # Defines a class alias for backwards compatibility with enqueued Active Job jobs. # @api private class JobWrapper < Sidekiq::ActiveJob::Wrapper end end end end rescue Gem::LoadError # ActiveJob not available or version requirement not met end