class CanvasSync::JobBatches::Pool

def self.from_pid(pid)

def self.from_pid(pid)
  raise "PID must be given" unless pid.present?
  new(pid)
end

def self.job_checked_in(status, options)

def self.job_checked_in(status, options)
  pid = options['pool_id']
  from_pid(pid).job_checked_in(status, options)
end

def self.redis(&blk)

def self.redis(&blk)
  Batch.redis &blk
end

def <<(job_desc)

def <<(job_desc)
  add_job(job_desc)
end

def active_count

def active_count
  redis.hincrby(redis_key, "active_count", 0)
end

def add_job(job_desc)

def add_job(job_desc)
  add_jobs([job_desc])
end

def add_jobs(job_descs)

def add_jobs(job_descs)
  job_descs.each do |job_desc|
    wrapper = Batch.new
    wrapper.description = "Pool Job Wrapper (PID: #{pid})"
    checkin_event = (on_failed_job == :wait) ? :success : :complete
    wrapper.on(checkin_event, "#{self.class.to_s}.job_checked_in", pool_id: pid)
    wrapper.jobs {}
    job_desc = job_desc.symbolize_keys
    job_desc = job_desc.merge!(
      job: job_desc[:job].to_s,
      pool_wrapper_batch: wrapper.bid,
    )
    push_job_to_pool(job_desc)
  end
  refill_allotment
end

def cleanup_redis

def cleanup_redis
  Batch.logger.debug {"Cleaning redis of pool #{pid}"}
  redis do |r|
    r.zrem("pools", pid)
    r.unlink(
      "#{redis_key}",
      "#{redis_key}-jobs",
    )
  end
end

def flush_pending_attrs

def flush_pending_attrs
  super
  redis.zadd("pools", created_at, pid)
end

def initialize(pooolid = nil, **kwargs)

def initialize(pooolid = nil, **kwargs)
  if pooolid
    @existing = true
    @pid = pooolid
  else
    @pid = SecureRandom.urlsafe_base64(10)
    initialize_new(**kwargs)
  end
end

def initialize_new(concurrency: nil, order: :fifo, clean_when_empty: true, on_failed_job: :wait, description: nil)

def initialize_new(concurrency: nil, order: :fifo, clean_when_empty: true, on_failed_job: :wait, description: nil)
  self.created_at = Time.now.utc.to_f
  self.description = description
  self.order = order
  self.concurrency = concurrency
  self.clean_when_empty = clean_when_empty
  self.on_failed_job = on_failed_job
  flush_pending_attrs
end

def job_checked_in(status, options)

def job_checked_in(status, options)
  active_count, pending_count = redis do |r|
    return unless r.exists?(redis_key)
    # Make sure this is loaded outside of the pipeline
    self.order
    redis.multi do |r|
      r.hincrby(redis_key, "active_count", -1)
      self.pending_count(r)
    end
  end
  added_count = refill_allotment
  if active_count == 0 && added_count == 0 && pending_count == 0
    if clean_when_empty && redis.hget(redis_key, 'keep_open') != 'true'
      cleanup_redis
    end
  end
end

def keep_open!

def keep_open!
  if block_given?
    begin
      keep_open!
      yield
    ensure
      let_close!
    end
  else
    redis.hset(redis_key, 'keep_open', true)
  end
end

def let_close!

def let_close!
  _, active_count = redis.multi do |r|
    r.hset(redis_key, 'keep_open', false)
    r.hincrby(redis_key, "active_count", 0)
  end
  if active_count == 0 && pending_count == 0
    cleanup_redis if clean_when_empty
  end
end

def pending_count(r = redis)

def pending_count(r = redis)
  jobs_key = "#{redis_key}-jobs"
  order = self.order || 'fifo'
  case order.to_sym
  when :fifo, :lifo
    r.llen(jobs_key)
  when :random
    r.scard(jobs_key)
  when :priority
    r.zcard(jobs_key)
  end
end

def pop_job_from_pool

def pop_job_from_pool
  jobs_key = "#{redis_key}-jobs"
  order = self.order
  job_json = case order.to_sym
  when :fifo
    redis.lpop(jobs_key)
  when :lifo
    redis.rpop(jobs_key)
  when :random
    redis.spop(jobs_key)
  when :priority
    redis.zpopmax(jobs_key)
  end
  return nil unless job_json.present?
  ::ActiveJob::Arguments.deserialize(JSON.parse(job_json))[0]&.symbolize_keys
end

def push_job_to_pool(job_desc)

def push_job_to_pool(job_desc)
  jobs_key = "#{redis_key}-jobs"
  # This allows duplicate jobs when a Redis Set is used
  job_desc[:_pool_random_key_] = SecureRandom.urlsafe_base64(10)
  job_json = JSON.unparse(::ActiveJob::Arguments.serialize([job_desc]))
  order = self.order
  redis.multi do |r|
    case order.to_sym
    when :fifo, :lifo
      r.rpush(jobs_key, job_json)
    when :random
      r.sadd(jobs_key, job_json)
    when :priority
      r.zadd(jobs_key, job_desc[:priority] || 0, job_json)
    end
    r.expire(jobs_key, Batch::BID_EXPIRE_TTL)
  end
end

def redis_key

def redis_key
  "POOLID-#{pid}"
end

def refill_allotment

def refill_allotment
  jobs_added = 0
  limit = concurrency.to_i
  redis do |r|
    current_count = 0
    while true
      current_count = HINCR_MAX.call(r, [redis_key], ["active_count", limit]).to_i
      if current_count < limit
        job_desc = pop_job_from_pool
        if job_desc.present?
          Batch.new(job_desc[:pool_wrapper_batch]).jobs do
            ChainBuilder.enqueue_job(job_desc)
          end
          jobs_added += 1
        else
          r.hincrby(redis_key, "active_count", -1)
          break
        end
      else
        break
      end
    end
    r.expire(redis_key, Batch::BID_EXPIRE_TTL)
    r.expire("#{redis_key}-jobs", Batch::BID_EXPIRE_TTL)
  end
  jobs_added
end