lib/canvas_sync/job_batches/active_job.rb



module CanvasSync
  module JobBatches
    module ActiveJob
      module BatchAwareJob
        extend ActiveSupport::Concern

        included do
          around_perform do |job, block|
            if (@bid) # This _must_ be @bid - not just bid
              prev_batch = Thread.current[CURRENT_BATCH_THREAD_KEY]
              begin
                Thread.current[CURRENT_BATCH_THREAD_KEY] = Batch.new(@bid)
                block.call
                Thread.current[CURRENT_BATCH_THREAD_KEY].save_context_changes
                Batch.process_successful_job(@bid, job_id)
              rescue
                Batch.process_failed_job(@bid, job_id)
                raise
              ensure
                Thread.current[CURRENT_BATCH_THREAD_KEY] = prev_batch
              end
            else
              block.call
            end
          end

          around_enqueue do |job, block|
            if (batch = Thread.current[CURRENT_BATCH_THREAD_KEY])
              @bid = batch.bid
              batch.increment_job_queue(job_id) if @bid
            end
            block.call
          end
        end

        def bid
          @bid || Thread.current[CURRENT_BATCH_THREAD_KEY]&.bid
        end

        def batch
          Thread.current[CURRENT_BATCH_THREAD_KEY]
        end

        def batch_context
          batch&.context || {}
        end

        def valid_within_batch?
          batch.valid?
        end

        def serialize
          super.tap do |data|
            data['batch_id'] = @bid # This _must_ be @bid - not just bid
            data
          end
        end

        def deserialize(data)
          super
          @bid = data['batch_id']
        end
      end

      class ActiveJobCallbackWorker < ::ActiveJob::Base
        include Batch::Callback::CallbackWorkerCommon

        def self.enqueue_all(args, queue)
          args.each do |arg_set|
            set(queue: queue).perform_later(*arg_set)
          end
        end
      end

      def self.handle_job_death(job, error = nil)
        if job.is_a?(Array)
          event = ActiveSupport::Notifications::Event.new(*job)
          payload = event.payload
          job = payload[:job].serialize
          error = payload[:error]
        end

        if job["job_id"].present? && job["batch_id"].present?
          CanvasSync::JobBatches::Batch.process_dead_job(job['batch_id'], job['job_id'])
        end
      end

      def self.configure
        ::ActiveJob::Base.include BatchAwareJob

        begin
          ActiveSupport::Notifications.subscribe "discard.active_job" do |*args|
            handle_job_death(args)
          end

          ActiveSupport::Notifications.subscribe "retry_stopped.active_job" do |*args|
            handle_job_death(args)
          end
        rescue => err
          Rails.logger.warn(err)
        end

        Batch::Callback.worker_class ||= ActiveJobCallbackWorker
      end
    end
  end
end