lib/canvas_sync/job_batches/jobs/managed_batch_job.rb



require_relative './base_job'

module CanvasSync
  module JobBatches
    class ManagedBatchJob < BaseJob
      def self.make_batch(sub_jobs, ordered: true, concurrency: nil, context: nil, desc_prefix: nil, &blk)
        desc_prefix ||= ''

        if concurrency == 0 || concurrency == nil || concurrency == true
          concurrency = sub_jobs.count
        elsif concurrency == false
          concurrency = 1
        end

        root_batch = Batch.new

        if concurrency < sub_jobs.count
          man_batch_id = SecureRandom.urlsafe_base64(10)

          Batch.redis do |r|
            r.multi do |r|
              r.hset("MNGBID-#{man_batch_id}", "root_bid", root_batch.bid)
              r.hset("MNGBID-#{man_batch_id}", "ordered", ordered)
              r.hset("MNGBID-#{man_batch_id}", "concurrency", concurrency)
              r.expire("MNGBID-#{man_batch_id}", Batch::BID_EXPIRE_TTL)

              mapped_sub_jobs = sub_jobs.each_with_index.map do |j, i|
                j['_mngbid_index_'] = i # This allows duplicate jobs when a Redis Set is used
                j = ::ActiveJob::Arguments.serialize([j])
                JSON.unparse(j)
              end
              if ordered
                r.rpush("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
              else
                r.sadd("MNGBID-#{man_batch_id}-jobs", mapped_sub_jobs)
              end
              r.expire("MNGBID-#{man_batch_id}-jobs", Batch::BID_EXPIRE_TTL)
            end
          end

          root_batch.allow_context_changes = (concurrency == 1)
          root_batch.on(:success, "#{to_s}.cleanup_redis", managed_batch_id: man_batch_id)

          desc_prefix = "MGD(#{man_batch_id}): #{desc_prefix}"
        end

        root_batch.context = context

        blk.call(ManagedBatchProxy.new(root_batch)) if blk.present?

        root_batch.description = "#{desc_prefix}: #{root_batch.description || 'Root'}"

        if concurrency < sub_jobs.count
          root_batch.jobs {}
          concurrency.times do
            perform_next_sequence_job(man_batch_id)
          end
        else
          root_batch.jobs do
            sub_jobs.each do |j|
              ChainBuilder.enqueue_job(j)
            end
          end
        end
      end

      def perform(sub_jobs, context: nil, ordered: true, concurrency: nil)
        self.class.make_batch(sub_jobs, ordered: ordered, concurrency: concurrency, context: context)
      end

      def self.cleanup_redis(status, options)
        man_batch_id = options['managed_batch_id']
        Batch.redis do |r|
          r.del(
            "MNGBID-#{man_batch_id}",
            "MNGBID-#{man_batch_id}-jobs",
          )
        end
      end

      def self.job_succeeded_callback(status, options)
        man_batch_id = options['managed_batch_id']
        perform_next_sequence_job(man_batch_id)
      end

      protected

      def self.perform_next_sequence_job(man_batch_id)
        root_bid, ordered = Batch.redis do |r|
          r.multi do |r|
            r.hget("MNGBID-#{man_batch_id}", "root_bid")
            r.hget("MNGBID-#{man_batch_id}", "ordered")
          end
        end

        next_job_json = Batch.redis do |r|
          if ordered
            r.lpop("MNGBID-#{man_batch_id}-jobs")
          else
            r.spop("MNGBID-#{man_batch_id}-jobs")
          end
        end

        return unless next_job_json.present?

        next_job = JSON.parse(next_job_json)
        next_job = ::ActiveJob::Arguments.deserialize(next_job)[0]

        Batch.new(root_bid).jobs do
          Batch.new.tap do |batch|
            batch.description = "Managed Batch Fiber (#{man_batch_id})"
            batch.on(:success, "#{self.to_s}.job_succeeded_callback", managed_batch_id: man_batch_id)
            batch.jobs do
              ChainBuilder.enqueue_job(next_job)
            end
          end
        end
      end

      class ManagedBatchProxy
        def initialize(real_batch)
          @real_batch = real_batch
        end

        delegate_missing_to :real_batch

        def jobs
          raise "Managed Batches do not support calling .jobs directly!"
        end

        private
        attr_reader :real_batch
      end
    end
  end
end