lib/canvas_sync/job_batches/sidekiq.rb
begin require 'sidekiq/batch' rescue LoadError end module CanvasSync module JobBatches module Sidekiq module WorkerExtension def bid Thread.current[CURRENT_BATCH_THREAD_KEY].bid end def batch Thread.current[CURRENT_BATCH_THREAD_KEY] end def batch_context batch&.context || {} end def valid_within_batch? batch.valid? end end class SidekiqCallbackWorker include ::Sidekiq::Worker include WorkerExtension include Batch::Callback::CallbackWorkerCommon def self.enqueue_all(args, queue) return if args.empty? ::Sidekiq::Client.push_bulk( 'class' => self, 'args' => args, 'queue' => queue ) end end class ClientMiddleware def call(_worker, msg, _queue, _redis_pool = nil) if (batch = Thread.current[CURRENT_BATCH_THREAD_KEY]) && should_handle_batch?(msg) batch.increment_job_queue(msg['jid']) if (msg[:bid] = batch.bid) end yield end def should_handle_batch?(msg) return false if JobBatches::Sidekiq.is_activejob_job?(msg) true end end class ServerMiddleware def call(_worker, msg, _queue) if (bid = msg['bid']) prev_batch = Thread.current[CURRENT_BATCH_THREAD_KEY] begin Thread.current[CURRENT_BATCH_THREAD_KEY] = Batch.new(bid) yield Thread.current[CURRENT_BATCH_THREAD_KEY].save_context_changes Batch.process_successful_job(bid, msg['jid']) rescue Batch.process_failed_job(bid, msg['jid']) raise ensure Thread.current[CURRENT_BATCH_THREAD_KEY] = prev_batch end else yield end end end def self.is_activejob_job?(msg) return false unless defined?(::ActiveJob) msg['class'] == 'ActiveJob::QueueAdapters::SidekiqAdapter::JobWrapper' && (msg['wrapped'].to_s).constantize < JobBatches::ActiveJob::BatchAwareJob end def self.switch_tenant(job) if defined?(::Apartment) ::Apartment::Tenant.switch(job['apartment'] || 'public') do yield end else yield end end def self.configure if defined?(::Sidekiq::Batch) && ::Sidekiq::Batch != JobBatches::Batch print "WARNING: Detected Sidekiq Pro or sidekiq-batch. CanvasSync JobBatches may not be fully compatible!" end ::Sidekiq.configure_client do |config| config.client_middleware do |chain| chain.remove ::Sidekiq::Batch::Middleware::ClientMiddleware if defined?(::Sidekiq::Batch::Middleware::ClientMiddleware) chain.add JobBatches::Sidekiq::ClientMiddleware end end ::Sidekiq.configure_server do |config| config.client_middleware do |chain| chain.remove ::Sidekiq::Batch::Middleware::ClientMiddleware if defined?(::Sidekiq::Batch::Middleware::ClientMiddleware) chain.add JobBatches::Sidekiq::ClientMiddleware end config.server_middleware do |chain| chain.remove ::Sidekiq::Batch::Middleware::ServerMiddleware if defined?(::Sidekiq::Batch::Middleware::ServerMiddleware) chain.add JobBatches::Sidekiq::ServerMiddleware end config.death_handlers << ->(job, ex) do switch_tenant(job) do if is_activejob_job?(job) JobBatches::ActiveJob.handle_job_death(job["args"][0], ex) elsif job['bid'].present? ::Sidekiq::Batch.process_dead_job(job['bid'], job['jid']) end end end end ::Sidekiq.const_set(:Batch, CanvasSync::JobBatches::Batch) # This alias helps apartment-sidekiq set itself up correctly ::Sidekiq::Batch.const_set(:Server, CanvasSync::JobBatches::Sidekiq::ServerMiddleware) ::Sidekiq::Worker.send(:include, JobBatches::Sidekiq::WorkerExtension) Batch::Callback.worker_class = SidekiqCallbackWorker end end end end require_relative 'sidekiq/web'