module Resque::DynamicQueues

def self.included(receiver)

def self.included(receiver)
  receiver.class_eval do
    alias queues_without_dynamic queues
    alias queues queues_with_dynamic
    alias reserve_without_round_robin reserve
    alias reserve reserve_with_round_robin
  end
end

def create_job(queue, payload)

def create_job(queue, payload)
  return unless payload
  Resque::Job.new(queue, payload)
end

def filter_busy_queues qs

def filter_busy_queues qs
  busy_queues = Resque::Worker.working.map { |worker| worker.job["queue"] }.compact
  Array(qs.dup).compact - busy_queues
end

def get_categorized_queues(queue_list)

def get_categorized_queues(queue_list)
  priority_map = {"Synchronous" => 0, "High" => 1, "Medium" => 2, "Low" => 3}
  categorized_queues = {}
  for queue in queue_list.uniq
    priority = queue.split("_")[1]
    priority = "Medium" if !["Synchronous", "High", "Medium", "Low"].include?(priority)
    categorized_queues[priority] ||= []
    categorized_queues[priority].push(queue)
  end
  return categorized_queues.transform_keys{ |key| priority_map[key.to_s]}.sort
end

def get_grouped_queues

def get_grouped_queues
  self.queues.sort.group_by{|u| /(\d{1,20})_.*/.match(u) ? /(\d{1,20})_.*/.match(u).captures.first : nil}
end

def get_job_from_queues(grouped_queues)

def get_job_from_queues(grouped_queues)
  Resque.redis.blpop(grouped_queues, :timeout => (ENV["BLPOP_TIMEOUT"].to_i || 30))
end

def get_next_job(grouped_queues)

def get_next_job(grouped_queues)
  @n ||= 0
  low_priority_queues = []
  high_priority_queues = []
  # Categorize queues as low_priority if its appinstance id is lower than current
  grouped_queues.sort_by { |app_instance_id, _| app_instance_id.to_i }.each do |app_instance_queues|
    app_instance_id, queues = app_instance_queues
    queues = get_categorized_queues(queues).to_h.values.flatten
    app_instance_id.to_i <= @n ? low_priority_queues.push(queues) : high_priority_queues.push(queues)
  end
  # Reorder queue groups by putting high priority queues in the front
  grouped_queues =
    (high_priority_queues + low_priority_queues).
    flatten.
    delete_if { |queue| !should_work_on_queue?(queue) }.
    map { |queue| "queue:#{queue}" }
  queue, payload = get_job_from_queues(grouped_queues)
  return nil if queue.blank?
  
  queue = queue.split("queue:")[1]
  # track last app instance id
  @n = queue.split('_').first.to_i
  return create_job(queue, Resque.decode(payload))      
end

def get_queued_job(grouped_queues)

def get_queued_job(grouped_queues)
  if defined?(Resque::Plugins::ConcurrentRestriction)
    # Bounded retry
    1.upto(Resque::Plugins::ConcurrentRestriction.reserve_queued_job_attempts) do |i|
      resque_job = get_next_job(grouped_queues)
      # Short-curcuit if a job was not found
      return if resque_job.nil?
      # If there is a job on regular queues, then only run it if its not restricted
      job_class = resque_job.payload_class
      job_args = resque_job.args
      # Return to work on job if not a restricted job
      return resque_job unless job_class.is_a?(Resque::Plugins::ConcurrentRestriction)
      # Keep trying if job is restricted. If job is runnable, we keep the lock until
      # done_working
      return resque_job unless job_class.stash_if_restricted(resque_job)
    end
    
    # Safety net, here in case we hit the upper bound and there are still queued items
    return nil        
  else
    return get_next_job(grouped_queues)
  end
end

def get_restricted_job

def get_restricted_job
  Resque::Plugins::ConcurrentRestrictionJob.next_runnable_job_random
end

def queue_depth queuename

def queue_depth queuename
  busy_queues = Resque::Worker.working.map { |worker| worker.job["queue"] }.compact
  # find the queuename, count it.
  busy_queues.select {|q| q == queuename }.size
end

def queues_with_dynamic


list for a key with Resque.set_dynamic_queue(key, ["q1", "q2"]
and negations can be used inside this dynamic queue list. Set the queue
If no key is supplied, it defaults to the worker's hostname, and wildcards
An @key can be used to dynamically look up the queue list for key from redis.

e.g. "*high*", and negation can be indicated with a prefix of "!"
The splat can also be used as a wildcard within a queue name,

can be useful for dynamically adding new queues.
A splat ("*") means you want every queue (in alpha order) - this

Returns a list of queues to use when searching for a job.
def queues_with_dynamic
  queue_names = @queues.dup
  
  return queues_without_dynamic if queue_names.grep(/(^!)|(^@)|(\*)/).size == 0
  real_queues = Resque.queues
  matched_queues = []
  #Remove Queues under Api Limits
  Redis.current.zremrangebyscore("APILimits", "0", "(#{Time.now.to_i}")
  api_limit_instances = Redis.current.zrange("APILimits", 0, -1).map {|key| key.to_i if key.match(/^\d*$/)}.compact
  real_queues = real_queues.select {|key| key if !api_limit_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)} ## 2
    
  #Queue Pausing 
  Resque.redis.zremrangebyscore("PauseQueue", "0", "(#{Time.now.to_i}")
  paused_instances = Resque.redis.zrange("PauseQueue", 0, -1).map {|key| key.split("__")[0].to_i if key.match(/^\d*__.*/)}.compact
  real_queues = real_queues.select {|key| key if !paused_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)}
  while q = queue_names.shift
    q = q.to_s
    if q =~ /^(!)?@(.*)/
      key = $2.strip
      key = hostname if key.size == 0
      add_queues = Resque.get_dynamic_queue(key)
      add_queues.map! { |q| q.gsub!(/^!/, '') || q.gsub!(/^/, '!') } if $1
      queue_names.concat(add_queues)
      next
    end
    if q =~ /^!/
      negated = true
      q = q[1..-1]
    end
    patstr = q.gsub(/\*/, '.*')
    pattern = /^#{patstr}$/
    if negated
      matched_queues -= matched_queues.grep(pattern)
    else
      matches = real_queues.grep(/^#{pattern}$/)
      matches = [q] if matches.size == 0 && q == patstr
      matched_queues.concat(matches.sort)
    end
  end
 
  return matched_queues.uniq
end

def reserve_with_round_robin

def reserve_with_round_robin
  grouped_queues = self.get_grouped_queues
  #Instance queue grouping
  if !grouped_queues.keys.include?(nil) && grouped_queues.keys.size > 0
    if ZuoraConnect.configuration.blpop_queue
      @job_in_progress = get_restricted_job
      return @job_in_progress if @job_in_progress.present?
      return @job_in_progress = get_queued_job(grouped_queues)
    else
      @n ||= 0       
      @n += 1
      @n = @n % grouped_queues.keys.size
      grouped_queues.keys.rotate(@n).each do |key|
        self.get_categorized_queues(grouped_queues[key]).each do |key, queues|
          queues.each do |queue|
            log! "Checking #{queue}"
            if should_work_on_queue?(queue) && @job_in_progress = Resque::Job.reserve(queue)
              log! "Found job on #{queue}"
              return @job_in_progress
            end
          end
        end
        @n += 1 # Start the next search at the queue after the one from which we pick a job.
      end
      nil
    end
  else 
    return reserve_without_round_robin
  end
  
rescue Exception => ex
  if defined?(Ougai::Logger) && Resque.logger.is_a?(Ougai::Logger)
    log_with_severity :error, "Error reserving job", ex
  else
    log_with_severity :error, "Error reserving job: #{ex.inspect}"
    log_with_severity :error, ex.backtrace.join("\n")
  end
  raise ex
end

def rotated_queues

def rotated_queues
  @n ||= 0
  @n += 1
  rot_queues = queues # since we rely on the resque-dynamic-queues plugin, this is all the queues, expanded out
  if rot_queues.size > 0
    @n = @n % rot_queues.size
    rot_queues.rotate(@n)
  else
    rot_queues
  end
end

def should_work_on_queue? queuename

def should_work_on_queue? queuename
  return true if @queues.include? '*'  # workers with QUEUES=* are special and are not subject to queue depth setting
  max = DEFAULT_QUEUE_DEPTH
  unless ENV["RESQUE_QUEUE_DEPTH"].nil? || ENV["RESQUE_QUEUE_DEPTH"] == ""
    max = ENV["RESQUE_QUEUE_DEPTH"].to_i
  end
  return true if max == 0 # 0 means no limiting
  cur_depth = queue_depth(queuename)
  log! "queue #{queuename} depth = #{cur_depth} max = #{max}"
  return true if cur_depth < max
  false
end