lib/resque/dynamic_queues.rb



module Resque
  module DynamicQueues
    def filter_busy_queues qs
      busy_queues = Resque::Worker.working.map { |worker| worker.job["queue"] }.compact
      Array(qs.dup).compact - busy_queues
    end

    def rotated_queues
      @n ||= 0
      @n += 1
      rot_queues = queues # since we rely on the resque-dynamic-queues plugin, this is all the queues, expanded out
      if rot_queues.size > 0
        @n = @n % rot_queues.size
        rot_queues.rotate(@n)
      else
        rot_queues
      end
    end

    def queue_depth queuename
      busy_queues = Resque::Worker.working.map { |worker| worker.job["queue"] }.compact
      # find the queuename, count it.
      busy_queues.select {|q| q == queuename }.size
    end

    DEFAULT_QUEUE_DEPTH = 0
    def should_work_on_queue? queuename
      return true if @queues.include? '*'  # workers with QUEUES=* are special and are not subject to queue depth setting
      max = DEFAULT_QUEUE_DEPTH
      unless ENV["RESQUE_QUEUE_DEPTH"].nil? || ENV["RESQUE_QUEUE_DEPTH"] == ""
        max = ENV["RESQUE_QUEUE_DEPTH"].to_i
      end
      return true if max == 0 # 0 means no limiting
      cur_depth = queue_depth(queuename)
      log! "queue #{queuename} depth = #{cur_depth} max = #{max}"
      return true if cur_depth < max
      false
    end

    def reserve_with_round_robin
      grouped_queues = queues.sort.group_by{|u| /(\d{1,20})_.*/.match(u) ? /(\d{1,20})_.*/.match(u).captures.first : nil}

      #Instance queue grouping
      if !grouped_queues.keys.include?(nil) && grouped_queues.keys.size > 0
        @n ||= 0
        @n += 1
        @n = @n % grouped_queues.keys.size

        grouped_queues.keys.rotate(@n).each do |key|
          grouped_queues[key].each do |queue|
            log! "Checking #{queue}"
            if should_work_on_queue?(queue) && @job_in_progress = Resque::Job.reserve(queue)
              log! "Found job on #{queue}"
              return @job_in_progress
            end
          end
          @n += 1 # Start the next search at the queue after the one from which we pick a job.
        end
        nil
      else 
        return reserve_without_round_robin
      end
      
    rescue Exception => e
      log "Error reserving job: #{e.inspect}"
      log e.backtrace.join("\n")
      raise e
    end

    # Returns a list of queues to use when searching for a job.
    #
    # A splat ("*") means you want every queue (in alpha order) - this
    # can be useful for dynamically adding new queues.
    #
    # The splat can also be used as a wildcard within a queue name,
    # e.g. "*high*", and negation can be indicated with a prefix of "!"
    #
    # An @key can be used to dynamically look up the queue list for key from redis.
    # If no key is supplied, it defaults to the worker's hostname, and wildcards
    # and negations can be used inside this dynamic queue list.   Set the queue
    # list for a key with Resque.set_dynamic_queue(key, ["q1", "q2"]
    #
    def queues_with_dynamic
      queue_names = @queues.dup
      
      return queues_without_dynamic if queue_names.grep(/(^!)|(^@)|(\*)/).size == 0

      real_queues = Resque.queues
      matched_queues = []

      #Remove Queues under Api Limits
      Redis.current.zremrangebyscore("APILimits", "0", "(#{Time.now.to_i}")
      api_limit_instances = Redis.current.zrange("APILimits", 0, -1).map {|key| key.to_i}
      real_queues = real_queues.select {|key| key if !api_limit_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)} ## 2
      
      #Queue Pausing 
      Redis.current.zremrangebyscore("resque:PauseQueue", "0", "(#{Time.now.to_i}")
      paused_instances = Redis.current.zrange("resque:PauseQueue", 0, -1).map {|key| key.split("__")[0].to_i}
      real_queues = real_queues.select {|key| key if !paused_instances.include?((key.match(/^(\d*)_.*/) || [])[1].to_i)}

      while q = queue_names.shift
        q = q.to_s

        if q =~ /^(!)?@(.*)/
          key = $2.strip
          key = hostname if key.size == 0

          add_queues = Resque.get_dynamic_queue(key)
          add_queues.map! { |q| q.gsub!(/^!/, '') || q.gsub!(/^/, '!') } if $1

          queue_names.concat(add_queues)
          next
        end

        if q =~ /^!/
          negated = true
          q = q[1..-1]
        end

        patstr = q.gsub(/\*/, '.*')
        pattern = /^#{patstr}$/
        if negated
          matched_queues -= matched_queues.grep(pattern)
        else
          matches = real_queues.grep(/^#{pattern}$/)
          matches = [q] if matches.size == 0 && q == patstr
          matched_queues.concat(matches.sort)
        end
      end
     
      return matched_queues.uniq
    end


    def self.included(receiver)
      receiver.class_eval do
        alias queues_without_dynamic queues
        alias queues queues_with_dynamic
        alias reserve_without_round_robin reserve
        alias reserve reserve_with_round_robin
      end
    end
  end
end